Is there a way to easily iterate through WF jobs?
Ideally I’d like to list the set of variables used in all WF jobs.
I tried to use the python API, but so far i’ve seen no function to list all jobs of a WF.
Is there a way to easily iterate through WF jobs?
Ideally I’d like to list the set of variables used in all WF jobs.
I tried to use the python API, but so far i’ve seen no function to list all jobs of a WF.
Hi Jonas,
you can use the OD API with the following endpoint to extract a list of job IDs for a specific workflow:
https://od-api.pages.intranet.onelogic.de/#workflows__workflowid__jobs_get
These IDs can then be used in the following to extract the variable information (and some more):
# Get deep workflow job by ID
from onedata.workflows.jobs.types import WorkflowJob
wf_job_id = "00000000-0000-0000-0000-000000000003"
wf_job: WorkflowJob = onedata_api.workflows.jobs.get(id=wf_job_id)
Hi Jonas,
as Katharina already mentioned, you would have to use the OD API, because the ONE DATA Python SDK does not officially cover retrieving a list of Jobs for a workflow.
However… the SDK uses the mentioned endpoint of the OD API internally and you can utilize this in a hacky way to get the response into your hands.
I highly recommend to place a feature request for this, so we can provide an official and tested solution for this hacky workaround.
But here is the code:
from onedata.api import OneDataApi
from onedata.common.types import Paginated
from onedata.workflows.jobs.commands import JobListCommand
from onedata.workflows.jobs.types import WorkflowJob, JobExecutionState
wf_id = "" # FIXME
onedata_api = OneDataApi() # FIXME
job_ids = list()
for possibleExecutionState in [JobExecutionState.SUCCESS]: # add more states here to fit your needs
# This is hacky and untested. Use with caution. Please create a feature request when you need this feature in production.
paginatedJobsInCurrentState: Paginated = onedata_api.execute_command(
JobListCommand(wf_id, possibleExecutionState.value, 10))
job_ids.extend(list(map(lambda job: job.id, paginatedJobsInCurrentState.items())))
# prevent duplicates
job_ids = set(job_ids)
for job_id in job_ids:
deepJob: WorkflowJob = onedata_api.workflows.jobs.get(job_id)
print(f"Variables used in job `{deepJob.id}`: {deepJob.variable_configurations}")
Cheers,
Tristan