Tools for Running Jobs

The JobRunner

class pipeliner.job_runner.JobRunner(project_name: str = 'default')

Bases: object

The JobRunner object handles running jobs (who would have thought?)

graph

The pipeline for the project

Type

ProjectGraph

add_job_to_pipeline(job: pipeliner.pipeliner_job.PipelinerJob, status: str, allow_overwrite: bool) pipeliner.data_structure.Process

Add the job to the pipeline and create its temp nodes files

Parameters
  • job (PipelinerJob) – The job that is being run

  • status (str) – The status of the new job either Running or Scheduled

  • allow_overwrite (bool) – Is the job to be run overwriting a previous job?

Returns

The job that is being run

Return type

Process

get_commandline_job(thisjob: pipeliner.pipeliner_job.PipelinerJob, current_proc: Optional[pipeliner.data_structure.Process], is_main_continue: bool, is_scheduled: bool, do_makedir: bool, overwrite: bool = False, subsequent_scheduled: bool = False) list

Assemble all of the commands necessary to run a job

Parameters
  • thisjob (PipelinerJob) – The job that is being run

  • current_proc (Process) – The existing process object for the current job, if the job to be run is a continuation or overwrite job, otherwise None

  • is_main_continue (bool) – Is the job to be run a continuation?

  • is_scheduled (bool) – Has the job to be run already been scheduled?

  • do_makedir (bool) – Should a directory for the job be made if necessary?

  • overwrite (bool) – Is the job to be run overwriting a previous job?

  • subsequent_scheduled (bool) – Is the job to be run a subsequent iteration of a job that has already been run in a currently running schedule?

Returns

[[[Actual, command], [to, be, run]], [[the, Job, commands]]] If the

job is being submitted to a queue [0] will be the qsub command and [1] will the be the actual job commands. For local jobs they will be identical

This is a list of lists, Each sublist is holds the arguments for a single command.

Return type

list

Raises
  • ValueError – If an attempt is made to overwrite or contiune a job that doesn’t exist

  • RuntimeError – If no commands are generated

prepare_job_to_run(job: pipeliner.pipeliner_job.PipelinerJob, current_proc: Optional[pipeliner.data_structure.Process], is_main_continue: bool, is_scheduled: bool, overwrite_current: bool = False, subsequent_scheduled: bool = False) Tuple[List[list], bool]

Do the setup for running a job

This includes: - Removing the original files if overwriting - Removing any control files

Parameters
  • job (PipelinerJob) – The job that is being run

  • current_proc (Process) – The existing process object for the current job, if the job to be run is a continuation or overwrite job, otherwise None

  • is_main_continue (bool) – Is the job to be run a continuation?

  • is_scheduled (bool) – Has the job to be run already been scheduled?

  • do_makedir (bool) – Should a directory for the job be made if necessary?

  • overwrite (bool) – Is the job to be run overwriting a previous job?

  • subsequent_scheduled (bool) – Is the job to be run a subsequent iteration of a job that has already been run in a currently running schedule?

Returns

The commands (lists of lists) and if the job is overwriting (bool)

Return type

tuple

run_job(job: pipeliner.pipeliner_job.PipelinerJob, current_proc: Optional[pipeliner.data_structure.Process], is_main_continue: bool, is_scheduled: bool, overwrite_current: bool = False, subsequent_scheduled: bool = False, wait_for_queued: bool = True, comment: Optional[str] = None) pipeliner.data_structure.Process

Run a job, add to the pipeline with running status and execute its commmands

Parameters
  • job (PipelinerJob) – The job that is being run

  • current_proc (Process) – The existing process object for the current job, if the job to be run is a continuation or overwrite job, otherwise None

  • is_main_continue (bool) – Is the job to be run a continuation?

  • is_scheduled (bool) – Has the job to be run already been scheduled?

  • overwrite_current (bool) – Is the job to be run overwriting a previous job?

  • subsequent_scheduled (bool) – Is the job to be run a subsequent iteration of a job that has already been run in a currently running schedule?

  • wait_for_queued (bool) – If this job is sent to the queue should the pipeliner wait for it to finish before continuing on?

  • comment (str) – Comments to add to the job’s jobinfo file

Returns

The job that is being run

Return type

Process

run_scheduled_jobs(fn_sched: str, job_ids: Optional[List[str]] = None, nr_repeat: int = 1, minutes_wait: int = 0, minutes_wait_before: int = 0, seconds_wait_after: int = 0)

Run the jobs in a schedule

Parameters
  • fn_sched (str) – The name to be assigned to the schedule

  • job_ids (list) – A list of str job names

  • nr_repeat (int) – Number of times to repeat the entire schedule

  • minutes_wait (int) – Minimum time to wait between jobs in minutes. If this has been passed whilst the job is running the next job will start immediately

  • minutes_wait_before (int) – Wait this amount of time before initially starting to run the schedule

  • seconds_wait_after (int) – Wait this many seconds before starting each job this wait always occurs, even if the minimum time between jobs has already be surpassed

Raises
schedule_fail(message: str, sl_name: str, sched_lock: str)

Run when a schedule fails

Write to the log and then delete the schedule lock file

Parameters
  • message (str) – The message to display and write to the log file

  • sl_name (str) – Name of the log file

  • sched_lock (str) – Name of the lock file

Returns

An error message determined by why the schedule failed

Return type

str

schedule_job(job: pipeliner.pipeliner_job.PipelinerJob, current_proc: Optional[pipeliner.data_structure.Process], is_main_continue: bool, overwrite_current: bool = False, subsequent_scheduled: bool = False, comment: Optional[str] = None) pipeliner.data_structure.Process

Schedule a job, add to the pipeline with scheduled status

Parameters
  • job (PipelinerJob) – The job that is being run

  • current_proc (Process) – The existing process object for the current job, if the job to be run is a continuation or overwrite job, otherwise None

  • is_main_continue (bool) – Is the job to be run a continuation?

  • is_scheduled (bool) – Has the job to be run already been scheduled?

  • overwrite_current (bool) – Is the job to be run overwriting a previous job?

  • subsequent_scheduled (bool) – Is the job to be run a subsequent iteration of a job that has already been run in a currently running schedule?

Returns

The job that is being run

Return type

Process

wait_for_queued_job_completion(outdir: str)

Wait for a job that has been sent to the queue to finish

Parameters

outdir (str) – The job’s output directory

write_to_sched_log(message: str, logfile: str)

For real time updating of the schedule log

Parameters
  • message (str) – The message to display and write to the log file

  • sl_name (str) – Name of the log file

Check_completion

The job factory

The job factory functions identify the available job types and return the correct type of job from the job type specified in a parameter file

pipeliner.job_factory.active_job_from_proc(the_proc: pipeliner.data_structure.Process) pipeliner.pipeliner_job.PipelinerJob

Create an active job from an existing process

Used when the functions inside a job subclass need to be called on an existing job

Parameters

the_proc (Process) – The process to create a job from

Returns

The job subclass

object for the process

Return type

PipelinerJob

pipeliner.job_factory.convert_relion4_jobtypes_to_pipeliner(type_name, joboptions, jobs_dict)

Convert an ambigious Relion4 style job type to the pipeliner type

Parameters
  • type_name (str) – The current relion4 type for the job

  • joboptions (dict) – The job’s joboptions

  • jobs_kdict (dict) – The job_factory jobs and classes dict, output from gather_jobtypes()

  • Returns

    PipelinerJob: The job object for

    the convert pipeler version of the job

pipeliner.job_factory.gather_all_jobtypes() dict

Assemble a dict of all the available job types

Returns

The job classes dict

The dict keys are the job process names and it returns the class for that specific job type

Return type

dict

Raises

ValueError – If a process name is being used by more than one job type

pipeliner.job_factory.job_from_dict(job_input: dict) pipeliner.pipeliner_job.PipelinerJob

Create a job from a dictionary

The dict must define the job type with a ‘_rlnJobTypeLabel’ key. Any other keys will override the default options for that parameter

Parameters

job_input (dict) – The dict containing the params. At minimum it must contain {‘_rlnJobTypeLabel’: <jobtype>}

Returns

The job subclass

Return type

PipelinerJob

Raises
  • ValueError – If the ‘_rlnJobTypeLabel’ key is missing from the dict

  • ValueError – If ‘_rlnJobIsContinue’ is in the dict - this function is not for creating continuations of jobs

  • ValueError – If the specified jobtype is not found

  • ValueError – If any of the parameters in the dict are not in the jobtype returned

pipeliner.job_factory.new_job_of_type(type_name: str, joboptions: Optional[dict] = None) pipeliner.pipeliner_job.PipelinerJob

Creates a new object of the correct PipelinerJob sub-type

Parameters
  • type_name (str) – The job process name

  • joboptions (dict) – Dict of the job’s joboptions, only necessary if converting from a RELION4.0 style jobname

Returns

The job subclass

Return type

PipelinerJob

Raises

RuntimeError – If the job type is not found

pipeliner.job_factory.read_job(filename: str) pipeliner.pipeliner_job.PipelinerJob

Reads run.job and job.star files and returns the correct Pipeliner job class

Parameters

filename (str) – The run.job or job.star file

Returns

The job subclass

Return type

PipelinerJob

Raises
  • ValueError – If the file name entered does not end with run.job or job.star

  • RuntimeError – If the input file is in the RELION 3.1 format and conversion fails

  • RuntimeError – If the job type specified in the file cannont be found