Tools for Running Jobs

The job factory, job manager and job runner are used to create, schedule and run jobs. These objects and the functions inside them generally do not need to be accessed directly, usually the desired action can be done more easily through the functions in the pipeliner api

The job factory

The job factory functions identify the available job types and return the correct type of job from the job type specified in a parameter file

pipeliner.job_factory.active_job_from_proc(the_proc: Process) PipelinerJob

Create an active job from an existing process

Used when the functions inside a job subclass need to be called on an existing job

Parameters:

the_proc (Process) – The process to create a job from

Returns:

The job subclass

object for the process

Return type:

PipelinerJob

pipeliner.job_factory.get_job_from_entrypoint(type_name: str) PipelinerJob | None
pipeliner.job_factory.get_job_types(search_term: str = '') List[PipelinerJob]

Returns all the job types and info about them

Note that the returned list actually contains job instances, not classes.

Parameters:

search_term (str) – Only return jobs with this string in their job type name

Returns:

A list of the PipelinerJob-based job

objects for each job found

Return type:

list

pipeliner.job_factory.job_can_run(job_type: str) bool

Check that the programs are available to run a specific job type

Parameters:

job_type (str) – The job type IE relion.class3d.helical

Returns:

Are the programs needed to run that job available on this system

Return type:

bool

pipeliner.job_factory.job_from_dict(job_input: Mapping[str, str | int | float | bool]) PipelinerJob

Create a job from a dictionary

The dict must define the job type with a ‘_rlnJobTypeLabel’ key. Any other keys will override the default options for that parameter

Parameters:

job_input (dict) – The dict containing the params. At minimum, it must contain {‘_rlnJobTypeLabel’: <jobtype>}

Returns:

The job subclass

Return type:

PipelinerJob

Raises:
  • ValueError – If the ‘_rlnJobTypeLabel’ key is missing from the dict or is not a string

  • ValueError – If ‘_rlnJobIsContinue’ is in the dict - this function is not for creating continuations of jobs

  • ValueError – If the specified jobtype is not found

  • ValueError – If any of the parameters in the dict are not in the jobtype returned

pipeliner.job_factory.new_job_of_type(type_name: str, joboptions: Mapping[str, str | int | float | bool] | None = None) PipelinerJob

Creates a new object of the correct PipelinerJob sub-type

Parameters:
  • type_name (str) – The job process name

  • joboptions (dict) – Dict of the job’s joboptions, only necessary if converting from a RELION4.0 style jobname

Returns:

The job subclass

Return type:

PipelinerJob

Raises:

ValueError – If the job type is not found

pipeliner.job_factory.read_job(filename: str) PipelinerJob

Reads run.job and job.star files and returns the correct Pipeliner job class

Parameters:

filename (str) – The run.job or job.star file

Returns:

The job subclass

Return type:

PipelinerJob

Raises:
  • ValueError – If the file name entered does not end with .job or .star

  • RuntimeError – If the input file is in the RELION 3.1 format and conversion fails

  • RuntimeError – If the job type specified in the file cannot be found

The job manager

pipeliner.job_manager.abort_job(pipeline: ProjectGraph, job_name: str) None

Abort a running job.

This function writes a file to signal that the job should abort, but does not wait for the job to respond.

Parameters:
  • pipeline – A writable (i.e. non-read-only) ProjectGraph object

  • job_name – The job name to abort (including a trailing slash “/”)

Raises:
  • ValueError – If there is no job with the given name

  • RuntimeError – If the job is in any state except Running

pipeliner.job_manager.run_job(pipeline: ProjectGraph, job: PipelinerJob, alias: str | None = None, overwrite: Process | None = None, ignore_invalid_joboptions=False, run_in_foreground=False) Process

Run a job.

The job will first be scheduled by passing it to schedule_job and then run by calling run_scheduled_job. See those functions for more details.

Parameters:
  • pipeline – A writable (i.e. non-read-only) ProjectGraph object to add the new job to.

  • job – The job to run.

  • alias – An optional alias for the job, which can be used to refer to the job instead of its number.

  • overwrite – An optional Process object representing an old job (of the same type) to be overwritten and replaced by the new job.

  • ignore_invalid_joboptions (bool) – Run the job anyway even if the job options appear to be invalid.

  • run_in_foreground – If True, run the job in the foreground and do not return until it has finished. If False, the job will be launched in the background and this function will return immediately. This option is mainly intended for testing of the pipeliner. Note that if run_in_foreground is True, the project will be kept locked by this process for the entire duration of the job, preventing any other actions happening in the project in the meantime.

Returns:

The Process object representing the job.

Raises:
  • ValueError – If overwrite is given, but the old job is a different type or other processes in the pipeline have used its outputs.

  • ValueError – If job.is_continue is True but the job does not already have an output directory assigned.

  • ValueError – If ignore_invalid_joboptions is False and the job options appear to be invalid.

  • ValueError – If run_in_foreground is True and the job is set to be submitted to a queue.

  • RuntimeError – If pipeline.read_only is True.

pipeliner.job_manager.run_schedule(fn_sched: str, job_ids: List[str] | None = None, nr_repeat: int = 1, minutes_wait: int = 0, minutes_wait_before: int = 0, seconds_wait_after: int = 0, pipeline_name: str = 'default') None

Run the jobs in a schedule.

Parameters:
  • fn_sched (str) – The name to be assigned to the schedule

  • job_ids (list) – A list of str job names

  • nr_repeat (int) – Number of times to repeat the entire schedule

  • minutes_wait (int) – Minimum time to wait between jobs in minutes. If this has been passed whilst the job is running the next job will start immediately

  • minutes_wait_before (int) – Wait this amount of time before initially starting to run the schedule

  • seconds_wait_after (int) – Wait this many seconds before starting each job this wait always occurs, even if the minimum time between jobs has already been surpassed

  • pipeline_name (str) – The name of the pipeline to use

Raises:
  • ValueError – If a schedule lock file exists with the selected schedule name, suggesting another schedule with the same name is already running

  • ValueError – If the job directory for a scheduled job could not be found

  • ValueError – (If a job.star file is not found in one of the directories of a job to be run

  • ValueError – If an input node for a job cannot be found

  • ValueError – If a job in the schedule fails

pipeliner.job_manager.run_scheduled_job(pipeline: ProjectGraph, job: PipelinerJob, process: Process, run_in_foreground=False) Process

Run a job that has already been scheduled.

By default, the job will be launched in the background in a detached process. If necessary (mainly for testing) the job can instead be run in the foreground by passing run_in_foreground=True. Note that this option overrides the job’s queue submission settings.

Parameters:
  • pipeline – A writable (i.e. non-read-only) ProjectGraph pipeline containing the scheduled job.

  • job – The job to run.

  • process – The Process object representing the job’s entry in the pipeline.

  • run_in_foreground – If True, run the job in the foreground and do not return until it has finished. If False, the job will be launched in the background and this function will return immediately. This option is mainly intended for testing of the pipeliner. Note that if run_in_foreground is True, the project will be kept locked by this process for the entire duration of the job, preventing any other actions happening in the project in the meantime.

Returns:

The Process object representing the job. Note that this might be a new and different object from the one passed in to the process argument.

Raises:
  • ValueError – If the job is not already present in the pipeline in the Scheduled state.

  • FileNotFoundError – If the job’s “job.star” file cannot be found in the job directory.

  • RuntimeError – If pipeline.read_only is True.

pipeliner.job_manager.schedule_job(pipeline: ProjectGraph, job: PipelinerJob, alias: str | None = None, overwrite: Process | None = None, ignore_invalid_joboptions=False) Process

Schedule a job to run later.

Note that jobs must be scheduled before they can be run.

The job will be added to the pipeline in the Scheduled state, and the job’s options will be written to “run.job” and “job.star” files in the job directory.

If the job is new, it will be assigned a job number and a new output directory will be created for it. If the job is a continuation (i.e. job.is_continue is True) the previous job directory and number will be used. If overwrite is given, the job in overwrite.name will be deleted and replaced with the new job (as long as the old and new jobs are of the same type).

If any of the job’s input files are in the project “DoppioUploads” directory, they will be moved into an “InputFiles” directory inside the job directory, and the relevant job options will be updated to point to the new file locations.

Parameters:
  • pipeline – A writable (i.e. non-read-only) ProjectGraph object to add the new job to.

  • job – The job to schedule and add to the pipeline.

  • alias – An optional alias for the job, which can be used to refer to the job instead of its number.

  • overwrite – An optional Process object representing an old job (of the same type) to be overwritten and replaced by the new job.

  • ignore_invalid_joboptions (bool) – Schedule the job anyway even if the job options appear to be invalid.

Returns:

The Process object representing the newly-scheduled job.

Raises:
  • ValueError – If overwrite is given, but the old job is a different type or other processes in the pipeline have used its outputs.

  • ValueError – If job.is_continue is True but the job does not already have an output directory assigned.

  • ValueError – If ignore_invalid_joboptions is False and the job options appear to be invalid.

  • RuntimeError – If pipeline.read_only is True.

pipeliner.job_manager.wait_for_job_to_finish(job: PipelinerJob, ping: float = 1.0, timeout: float = 86400.0, error_on_fail: bool = False, error_on_abort: bool = False) str

Wait for the job to finish, with any status

Parameters:
  • job (PipelinerJob) – The job to wait for

  • ping (float) – How long to wait before checking for the file again (in seconds)

  • timeout (float) – Raise an error after this much time has elapsed, even if the job hasn’t finished (in seconds). The default is 24 hours.

  • error_on_fail (bool) – Raise an error if the job fails

  • error_on_abort (bool) – Raise an error if the job is aborted

Returns:

The final status of the job, Failed, Succeeded, or Aborted

Return type:

str

Raises:
  • RuntimeError – upon the watched job failing, being aborted, or either if error_on_fail or error_on_abort are True

  • RuntimeError – If the job still hasn’t finished by the timeout time

The job runner

job_runner.py

This script is responsible for executing pipeliner jobs.

It is intended only for internal use by the pipeliner and should not normally be called by any other code.

Users should instead run jobs using the “–run_job” argument to the standard “pipeliner” command.