Tools for Running Jobs
The job factory, job manager and job runner are used to create, schedule and run jobs. These objects and the functions inside them generally do not need to be accessed directly, usually the desired action can be done more easily through the functions in the pipeliner api
The job factory
The job factory functions identify the available job types and return the correct type of job from the job type specified in a parameter file
- pipeliner.job_factory.active_job_from_proc(the_proc: Process) PipelinerJob
Create an active job from an existing process
Used when the functions inside a job subclass need to be called on an existing job
- Parameters:
the_proc (
Process
) – The process to create a job from- Returns:
- The job subclass
object for the process
- Return type:
- pipeliner.job_factory.get_job_from_entrypoint(type_name: str) PipelinerJob | None
- pipeliner.job_factory.get_job_types(search_term: str = '') List[PipelinerJob]
Returns all the job types and info about them
Note that the returned list actually contains job instances, not classes.
- Parameters:
search_term (str) – Only return jobs with this string in their job type name
- Returns:
- A list of the
PipelinerJob
-based job objects for each job found
- A list of the
- Return type:
- pipeliner.job_factory.job_can_run(job_type: str) bool
Check that the programs are available to run a specific job type
- pipeliner.job_factory.job_from_dict(job_input: Mapping[str, str | int | float | bool]) PipelinerJob
Create a job from a dictionary
The dict must define the job type with a ‘_rlnJobTypeLabel’ key. Any other keys will override the default options for that parameter
- Parameters:
job_input (dict) – The dict containing the params. At minimum, it must contain {‘_rlnJobTypeLabel’: <jobtype>}
- Returns:
The job subclass
- Return type:
- Raises:
ValueError – If the ‘_rlnJobTypeLabel’ key is missing from the dict or is not a string
ValueError – If ‘_rlnJobIsContinue’ is in the dict - this function is not for creating continuations of jobs
ValueError – If the specified jobtype is not found
ValueError – If any of the parameters in the dict are not in the jobtype returned
- pipeliner.job_factory.new_job_of_type(type_name: str, joboptions: Mapping[str, str | int | float | bool] | None = None) PipelinerJob
Creates a new object of the correct PipelinerJob sub-type
- Parameters:
- Returns:
The job subclass
- Return type:
- Raises:
ValueError – If the job type is not found
- pipeliner.job_factory.read_job(filename: str) PipelinerJob
Reads run.job and job.star files and returns the correct Pipeliner job class
- Parameters:
filename (str) – The run.job or job.star file
- Returns:
The job subclass
- Return type:
- Raises:
ValueError – If the file name entered does not end with .job or .star
RuntimeError – If the input file is in the RELION 3.1 format and conversion fails
RuntimeError – If the job type specified in the file cannot be found
The job manager
- pipeliner.job_manager.abort_job(pipeline: ProjectGraph, job_name: str) None
Abort a running job.
This function writes a file to signal that the job should abort, but does not wait for the job to respond.
- Parameters:
pipeline – A writable (i.e. non-read-only) ProjectGraph object
job_name – The job name to abort (including a trailing slash “/”)
- Raises:
ValueError – If there is no job with the given name
RuntimeError – If the job is in any state except Running
- pipeliner.job_manager.run_job(pipeline: ProjectGraph, job: PipelinerJob, alias: str | None = None, overwrite: Process | None = None, ignore_invalid_joboptions=False, run_in_foreground=False) Process
Run a job.
The job will first be scheduled by passing it to
schedule_job
and then run by callingrun_scheduled_job
. See those functions for more details.- Parameters:
pipeline – A writable (i.e. non-read-only)
ProjectGraph
object to add the new job to.job – The job to run.
alias – An optional alias for the job, which can be used to refer to the job instead of its number.
overwrite – An optional Process object representing an old job (of the same type) to be overwritten and replaced by the new job.
ignore_invalid_joboptions (bool) – Run the job anyway even if the job options appear to be invalid.
run_in_foreground – If True, run the job in the foreground and do not return until it has finished. If False, the job will be launched in the background and this function will return immediately. This option is mainly intended for testing of the pipeliner. Note that if
run_in_foreground
is True, the project will be kept locked by this process for the entire duration of the job, preventing any other actions happening in the project in the meantime.
- Returns:
The Process object representing the job.
- Raises:
ValueError – If
overwrite
is given, but the old job is a different type or other processes in the pipeline have used its outputs.ValueError – If
job.is_continue
is True but the job does not already have an output directory assigned.ValueError – If
ignore_invalid_joboptions
is False and the job options appear to be invalid.ValueError – If
run_in_foreground
is True and the job is set to be submitted to a queue.RuntimeError – If
pipeline.read_only
is True.
- pipeliner.job_manager.run_schedule(fn_sched: str, job_ids: List[str] | None = None, nr_repeat: int = 1, minutes_wait: int = 0, minutes_wait_before: int = 0, seconds_wait_after: int = 0, pipeline_name: str = 'default') None
Run the jobs in a schedule.
- Parameters:
fn_sched (str) – The name to be assigned to the schedule
nr_repeat (int) – Number of times to repeat the entire schedule
minutes_wait (int) – Minimum time to wait between jobs in minutes. If this has been passed whilst the job is running the next job will start immediately
minutes_wait_before (int) – Wait this amount of time before initially starting to run the schedule
seconds_wait_after (int) – Wait this many seconds before starting each job this wait always occurs, even if the minimum time between jobs has already been surpassed
pipeline_name (str) – The name of the pipeline to use
- Raises:
ValueError – If a schedule lock file exists with the selected schedule name, suggesting another schedule with the same name is already running
ValueError – If the job directory for a scheduled job could not be found
ValueError – (If a job.star file is not found in one of the directories of a job to be run
ValueError – If an input node for a job cannot be found
ValueError – If a job in the schedule fails
- pipeliner.job_manager.run_scheduled_job(pipeline: ProjectGraph, job: PipelinerJob, process: Process, run_in_foreground=False) Process
Run a job that has already been scheduled.
By default, the job will be launched in the background in a detached process. If necessary (mainly for testing) the job can instead be run in the foreground by passing
run_in_foreground=True
. Note that this option overrides the job’s queue submission settings.- Parameters:
pipeline – A writable (i.e. non-read-only)
ProjectGraph
pipeline containing the scheduled job.job – The job to run.
process – The Process object representing the job’s entry in the pipeline.
run_in_foreground – If True, run the job in the foreground and do not return until it has finished. If False, the job will be launched in the background and this function will return immediately. This option is mainly intended for testing of the pipeliner. Note that if
run_in_foreground
is True, the project will be kept locked by this process for the entire duration of the job, preventing any other actions happening in the project in the meantime.
- Returns:
The Process object representing the job. Note that this might be a new and different object from the one passed in to the
process
argument.- Raises:
ValueError – If the job is not already present in the pipeline in the Scheduled state.
FileNotFoundError – If the job’s “job.star” file cannot be found in the job directory.
RuntimeError – If
pipeline.read_only
is True.
- pipeliner.job_manager.schedule_job(pipeline: ProjectGraph, job: PipelinerJob, alias: str | None = None, overwrite: Process | None = None, ignore_invalid_joboptions=False) Process
Schedule a job to run later.
Note that jobs must be scheduled before they can be run.
The job will be added to the pipeline in the Scheduled state, and the job’s options will be written to “run.job” and “job.star” files in the job directory.
If the job is new, it will be assigned a job number and a new output directory will be created for it. If the job is a continuation (i.e.
job.is_continue
is True) the previous job directory and number will be used. Ifoverwrite
is given, the job inoverwrite.name
will be deleted and replaced with the new job (as long as the old and new jobs are of the same type).If any of the job’s input files are in the project “DoppioUploads” directory, they will be moved into an “InputFiles” directory inside the job directory, and the relevant job options will be updated to point to the new file locations.
- Parameters:
pipeline – A writable (i.e. non-read-only)
ProjectGraph
object to add the new job to.job – The job to schedule and add to the pipeline.
alias – An optional alias for the job, which can be used to refer to the job instead of its number.
overwrite – An optional Process object representing an old job (of the same type) to be overwritten and replaced by the new job.
ignore_invalid_joboptions (bool) – Schedule the job anyway even if the job options appear to be invalid.
- Returns:
The Process object representing the newly-scheduled job.
- Raises:
ValueError – If
overwrite
is given, but the old job is a different type or other processes in the pipeline have used its outputs.ValueError – If
job.is_continue
is True but the job does not already have an output directory assigned.ValueError – If
ignore_invalid_joboptions
is False and the job options appear to be invalid.RuntimeError – If
pipeline.read_only
is True.
- pipeliner.job_manager.wait_for_job_to_finish(job: PipelinerJob, ping: float = 1.0, timeout: float = 86400.0, error_on_fail: bool = False, error_on_abort: bool = False) str
Wait for the job to finish, with any status
- Parameters:
job (PipelinerJob) – The job to wait for
ping (float) – How long to wait before checking for the file again (in seconds)
timeout (float) – Raise an error after this much time has elapsed, even if the job hasn’t finished (in seconds). The default is 24 hours.
error_on_fail (bool) – Raise an error if the job fails
error_on_abort (bool) – Raise an error if the job is aborted
- Returns:
The final status of the job, Failed, Succeeded, or Aborted
- Return type:
- Raises:
RuntimeError – upon the watched job failing, being aborted, or either if error_on_fail or error_on_abort are True
RuntimeError – If the job still hasn’t finished by the timeout time
The job runner
job_runner.py
This script is responsible for executing pipeliner jobs.
It is intended only for internal use by the pipeliner and should not normally be called by any other code.
Users should instead run jobs using the “–run_job” argument to the standard “pipeliner” command.