Writing new jobs

New software can be added to the ccpem-pipeliner by writing a PipelinerJob.

A PipelinerJob is completely self contained and has everything needed to run a new piece of software in the pipeliner framework. It must accomplish the following tasks:

  1. Define the job’s input parameters and how they are displayed in the GUI

  2. Define how the parameters will be validated

  3. Generate the commands that need to be run

  4. Define the job’s output nodes (i.e. the output files that are displayed in the GUI and made available as inputs to other jobs in the pipeline)

  5. Perform any final tasks that need to happen after the commands have been executed

  6. Make objects that allow the GUI to display the job’s results graphically

  7. Gather metadata about the job

  8. Define how to clean up the job

  9. Create the objects necessary to create a PDB/EMDB/EMPIAR deposition

Note

A minimal PipelinerJob only needs to define input parameters, define output nodes, and generate commands. Adding the additional methods will add functionality to the job and integrate it more fully into Doppio, the GUI built on top of the Pipeliner.

Making a new PipelinerJob

Required imports:

from pipeliner.pipeliner_job import PipelinerJob, Ref, ExternalProgram
from pipeliner.job_options import StringJobOption # and other types as necessary
from pipeliner.node_factory import create_node
from pipeliner.display_tools import create_results_display_object

Make a class for the new job:

class MyJob(PipelinerJob):
    PROCESS_NAME = "software.function.keywords"
    OUT_DIR = "MyJob"

    def __init__(self):
        super().__init__()  # don't forget to initialize the PipelinerJob superclass first

PROCESS_NAME is the name the pipeliner will use to identify the job, and should match the job type name that is used when the job is added to the setup.cfg file (see “Adding the new job to the pipeliner” below).

OUT_DIR is both the directory where the job’s output directory (jobNNN/) will be written and is used to group jobs together in the GUI.

Some examples:

class RelionLogAutopick(PipelinerJob):
    PROCESS_NAME = "relion.autopick.log"
    OUT_DIR = "AutoPick"

class RelionTrainTopaz(PipelinerJob):
    PROCESS_NAME = "relion.autopick.topaz.train"
    OUT_DIR = "AutoPick"

class CryoloAutopick(PipelinerJob):
    PROCESS_NAME = "cryolo.autopick"
    OUT_DIR = "AutoPick"

__init__ method

The job’s __init__ needs to do the following things:
  • define information about the job

  • define the parameters for the job

Define information about the job

Information about the job and the programs it needs to run are stored in its JobInfo object. For example:

self.jobinfo.display_name = "RELION initial model generation
self.jobinfo.version = "0.1"
self.jobinfo.programs = [ExternalProgram(command="relion_refine")]
self.jobinfo.short_desc = "Create a de novo 3D initial model"
self.jobinfo.long_desc = "Relion 4.0 uses a gradient-driven algorithm to generate a de novo 3D"
self.jobinfo.references = [
    Ref(
        authors=["Scheres SHW"],
        title="RELION: implementation of a Bayesian approach to cryo-EM structure”,
        journal="J Struct Biol.",
        year="2012",
        volume="180",
        issue="3",
        pages="519-30",
        doi="10.1016/j.jsb.2012.09.006",
    )
]

Note

Each program in programs should be entered with an ExternalProgram object. These programs will be checked for availability, if they are not found in the system PATH the GUI will mark the job as unavailable.

Define the job’s input parameters

Input parameters are defined by adding JobOption objects to the job’s joboptions attribute. There are 8 types of JobOption :

JobOption Types

Type

Data type

Appearance in GUI

Usage notes

StringJobOption

str

Text entry field

FloatJobOption

float

Number entry field

IntJobOption

int

Number entry field

BooleanJobOption

bool

Checkbox

MultipleChoiceJobOption

str

Drawdown list

InputNodeJobOption

str

Text entry field (with options) and a file browser

Files that are part of the project

MultiInputNodeJobOption

str

Text entry field (with options) and a file browser for entering multiple files

Files that are part of the project

ExternalFileJobOption

str

Text entry field

Files that are outside the project

DirPathJobOption

str

Text entry field

A path to a directory

SearchStringJobOption

str

Text entry field

A search string for multiple files

Note

The job option types and GUI behaviour for input nodes and files are currently under review and will probably change in the near future. See also File paths in pipeliner jobs below.

Special predefined job options

There are two methods in PipelinerJob that create special predefined JobOptions.

make_additional_args():

This adds a special StringJobOption called other_args.

This can be used in calling parse_additional_args() in the job’s get_commands() method. It returns a list of the additional args with the quotation structure preserved.

For example, if the value for self.joboptions[“other_args”] is 'arg1 arg2 "arg3 arg4"', it would return: ['arg1', 'arg2', '"arg3 arg4"']

get_runtab_options():

This adds the job options in RELION’s ‘Run’ tab:

JobOption Types

Name

JobOption type

Description

nr_mpi

IntJobOption

Number of MPIs to use only; added if mpi=True

nr_threads

StringJobOption

Number of threads to use; only added if threads=True

do_queue

BooleanJobOption

Should the job be submitted to the queue?

queuename

StringJobOption

Name of the queue

qsub

StringJobOption

Queue submission command to use

qsubscript

ExternalFileJobOption

Path to template for the submission script

min_dedicated

IntJobOption

Minimum dedicated cores per node

create_output_nodes method

The create_output_nodes() method is used to define the job’s outputs, in the form of Node objects added to the job’s output_nodes list.

The simplest way to do this is to use the add_output_node() helper method, which creates a node for a file in the job’s output directory and adds it to the output_nodes. For example, for a job that makes a cryo-EM density map called map.mrc as output:

from pipeliner.nodes import NODE_DENSITYMAP
self.add_output_node("map.mrc", NODE_DENSITYMAP, ["keyword1", "keyword2"]))

Information about the different node types can be found below.

Note that create_output_nodes() is called before the job is run, so it defines the outputs that the job is expected to produce. It’s helpful to define the outputs early because then other jobs can be scheduled to run afterwards using this job’s outputs as their inputs. However, in some cases it’s not possible to know all of the job’s outputs until the commands have actually been run, in which case additional output nodes can be added at the end of the job using the create_post_run_output_nodes method.

get_commands method

The get_commands() method defines what the job will actually do, by creating a list of commands that will be run. It is called by the job runner when the job is started.

The commands must be returned as a list of PipelinerCommand objects, each of which contains a list of strings (i.e. a command and arguments, suitable for passing to subprocess.run()).

[
    PipelinerCommand(["first", "command", "to", "run"]),
    PipelinerCommand(["next", "command", "to", "run"]),
    PipelinerCommand(["final", "command", "to", "run"]),
]

The commands can be assembled in any manner you see fit. A variety of attributes of the PipelinerJob are available to get pieces of data during this process:

JobOption Types

Attribute

Description

Data type

output_dir

The directory where the job should put its output files. (This is also used as the job’s name in the pipeline.)

str

joboptions

A dict containing all of the JobOptions defined in __init__

dict{str: JobOption}

is_continue

Is the job being continued?

bool

input_nodes

List of the job’s input nodes

list[Node]

output_nodes

List of the job’s output nodes

list[Node]

Note

The get_commands() method is run in a different process from the one where the job was created and submitted, and for jobs sent to a queue, it will also be on a different computer. Any state that was saved in attributes of the PipelinerJob before it was scheduled to run will be lost. You should make sure that your job is written such that it can be reliably recreated from its job option values alone.

Also note that get_commands() should run quickly and not produce any output files itself. Any actions that write files or take some time to run should be moved into scripts or executables that can be run as one of the job’s commands.

File paths in pipeliner jobs

The project directory is the root for all the pipeliner’s file handling: input and output files are stored as relative paths from the project directory, and jobs are run with the project directory as their working directory.

Jobs should be self-contained, i.e. they should write files only into their own job directory, output_dir (which will have the form JobType/jobNNN/, for example Import/job001/). The job’s commands should take care to direct their output files only to this directory (or subdirectories within it) and not elsewhere in the project.

For example:

 commands = [
    ["touch", os.path.join(self.output_dir, "output_file_1")],  # this is fine
    ["cp", input_file_1, input_file_2, self.output_dir],        # this is also fine

    # The next command is NOT fine - it would write the file directly in the project directory
    ["touch", "output_file_2"],
]

The only exception to the rule that all paths should be relative and within the project is where a job needs to access some centrally-installed file or disk using an absolute path, for example a program executable, queue submission script template or local scratch disk. In these cases, absolute paths can be used but the files should be treated as external to the project and not added as input or output nodes.

Some uncooperative programs do not have the ability to specify where their results are written. In these cases the location from which the commands are executed can be changed by setting the working_dir attribute. The most common use case is programs that write their outputs directly into the working directory, in which case working_dir should be set to output_dir. The best place to do this is usually at the start of the get_commands() method.

If working_dir has been set then it is important that any file paths passed as arguments to the job’s commands are given relative to the working directory rather than the project directory. The easiest way to accomplish this is by using os.path.relpath(). For example, in a pipeliner job the path to an input file is typically found using something like this: input_file = self.joboptions["input_file"].get_string(). Normally input_file can then be used directly, but in a job where working_dir is set os.path.relpath(input_file, self.working_dir) should be used instead.

Note

Only the job’s commands themselves will be run in the job’s working_dir. The pipeliner code (including all PipelinerJob methods) is still run in the project directory.

additional_joboption_validation method

The additional_joboption_validation() method serves to do advanced validation of the JobOptions before a job is run. Simple validation is done automatically:

  • parameters that are required have non-empty values

  • parameter values are of the right type

  • parameter values are within their specified ranges

additional_joboption_validation() is used for more specific validation tasks that take into account more than one job option such as:

  • parameter A must be > parameter B

  • parameters C, D, and E cannot all be equal

The method should return a list of JobOptionValidationResult objects.

Some examples:

def additional_joboption_validation(self):
  errors = []

  jobop_a = self.joboptions["param_a"]
  jobop_b = self.joboptions["param_b"]
  if jobop_a.get_number() < jobop_b.get_number():
    errors.append(
      JobOptionValidationResult(
        type="error",
        raised_by=[jobop_a, jobop_b],
        message="A must be greater than B",
      )
    )

  jobop_c = self.joboptions["param_c"].get_string()
  jobop_d = self.joboptions["param_d"].get_string()
  jobop_e = self.joboptions["param_d"].get_string()

  if jobop_c == jobop_d == jobop_e:
    errors.append(
      JobOptionValidationResult(
        type="error",
        raised_by=[jobop_c, jobop_d],
        message="C, D, and E cannot all be the same!",
      )
    )

  return errors

create_post_run_output_nodes method

This function creates output nodes after the job has run. This is only necessary when the names of the output nodes are not known beforehand.

Example of a create_post_run_output_nodes function: In this case the job doesn’t know the number of outputs it will produce until after it has executed the commands so they must be created ex post facto by create_post_run_output_nodes

def create_run_output_nodes(self):
  outputs = glob(self.output_name + "result_*.mrc")
    for f in outputs:
      self.output_nodes.append(create_node(f, "DensityMap", ["node", "keywords"]))

create_results_display method

The create_results_display() method generates result display objects that allow the Doppio GUI to display results from the pipeliner.

There are currently 16 types of ResultsDisplayObject:

ResultsDisplayObject Types

Type

Description

ResultsDisplayText

A single line of text

ResultsDisplayMontage

An array of thumbnail images

ResultsDisplayGraph

A plot with points, lines, or both

ResultsDisplayImage

A 2D Image

ResultsDisplayHistogram

A histogram

ResultsDisplayTable

Multiple text containing cells with a header row

ResultsDisplayMapModel

Integrated 3D viewer for mrc, map, cif, and pdb files

ResultsDisplayRvapi

A general Object that displays a webpage

ResultsDisplayPending

A special class of ResultsDisplayObject generated when there has been an error. Doppio attempts to update any ResultsDisplayPending objects when a job is viewed.

ResultsDisplayPlotlyObj

A class that allows a Plotly object to be displayed; allows creation of more complex displays

ResultsDisplayPlotlyObj

Allows a Plotly object to be passed directly to the display; for creation of more complex displays

ResultsDisplayPlotlyHistogram

Allows for creation of more complex histograms with Plotly express histogram

ResultsDisplayPlotlyScatter

Allows for creation of more complex scatter plots with Plotly express scatter

ResultsDisplayTextFile

Displays the contents of a text file

ResultsDisplayPdfFile

Displays the contents of a pdf file

ResultsDisplayHtml

Displays html formatted results

ResultsDisplayJson

Displays JSON formatted results

They should all be created with the pipeliner.display_tools.create_results_display_object() function. This function safely creates the object, returning a ResultsDisplayPending object with an explanation if any errors are encountered.

gather_metadata method

The gather_metadata() method returns a dict of metadata about the results of the job. It doesn’t need to gather any information about the parameters used, this will be done automatically.

Do this in any way you see fit, just return a dict.

prepare_clean_up_lists method

The prepare_clean_up_lists() method returns a list with two items; a list of files to delete and a list of directories that should be removed when the job is cleaned up. The purpose of cleanup is to free up disk space by removing unneeded files.

Warning

Don’t delete anything yet! This method should prepare lists of files for deletion, but they might not actually be deleted (e.g. if the user decides to cancel the clean up) so don’t do anything irreversible here.

There are two levels of cleanup:

  • Standard: delete files that are not necessary, such as intermediate iterations or tmp files

  • Harsh: delete more, such as output files that can be reproduced easily

Make sure prepare_clean_up_lists() doesn’t delete anything important or used by Doppio for results display.

Example prepare_clean_up_lists():

def prepare_clean_up_lists(self, do_harsh):
   files_to_remove, dirs_to_remove = [], []

   tmp_files = glob(self.output_name + "*.tmp")
   tmp_dir = self.output_name + "tmpfiles"

   files_to_remove.extend(tmp_files)
   dirs_to_remove.extend(tmp_dir)

   if do_harsh:
       extra_files = glob(self.output_name + "*.extra")
       extra_dir = self.output_name + "extrafiles"
       files_to_remove.extend(extra_files)
       dirs_to_remove.extend(extra_dir)

   return [files_to_remove, dirs_to_remove]

prepare_deposition_data method

The prepare_deposition_data() method returns a list of deposition objects which are used to prepare data for deposition in to the PDB, EMDB, and EMPIAR.

This method must be implemented in any PipelinerJob that produces data included in a database deposition. For the PDB and EMDB these are defined by the published schema.

This feature is currently in development…

def onedep_deposition(self):
  sym = this_function_gets_symmetry()
  reso = this_function_gets_the_resolution()

  sp_filter = spatial_filtering_type_entry(
      high_frequency_cutoff=reso,
      software_list=("relion_refine",),
  )

  rec_filter = reconstruction_filtering_type_entry(spatial_filtering=sp_filter)

  recdep = final_reconstruction_type_entry(
      applied_symmetry=sym,
      algorithm="FOURIER SPACE",
      resolution=reso,
      resolution_method="FSC 0.143 CUT-OFF",
      reconstruction_filtering=rec_filter,
  )

  return [recdep]

Adding the new job to the pipeliner

Now you have a file my_new_job.py. It contains a class MyJob that describes a job of the type mysoftware.function.

Well done!

  1. Put your job file into ccpem-pipeliner/pipeliner/jobs/other

  2. Add an entry point definition for your new job in ccpem-pipeliner/setup.cfg: Go to the ccpem_pipeliner.jobs section Add your job in the format:

    jobtype = package_name.module_name:ClassName

    e.g. mysoftware.function = pipeliner.jobs.other.my_new_job:MyJob

  3. Update the pipeliner installation with: pip install –e .

  4. Check for your job by running pipeliner --job_info mysoftware.function from the command line

Node type names

These top-level node types are already in use in the pipeliner. It is good practice to use the constants found in pipeliner.nodes rather than typing node types to maintain continuity.

(scroll the table right to see the constant names)

Top-Level Node Types

Node type

Description

Constant

AtomCoords

An atomic model file

NODE_ATOMCOORDS

AtomCoordsGroupMetadata

Metadata about a set of atomic coordinates files, a list of atomic models for example

NODE_ATOMCOORDSGROUPMETADATA

DensityMap

A 3D cryoEM density map (could be half map, full map, sharpened etc)

NODE_DENSITYMAP

DensityMapMetadata

Metadata about a single density map

NODE_DENSITYMAPMETADATA

DensityMapGroupMetadata

Metadata about multiple density maps

NODE_DENSITYMAPGROUPMETADATA

EulerAngles

Data about Euler angles

NODE_EULERANGLES

EvaluationMetric

A file containing evaluation metrics

NODE_EVALUATIONMETRIC

Image2D

A single 2D image

NODE_IMAGE2D

Image2DStack

A single file containing a stack of 2D images

NODE_IMAGE2DSTACK

Image2DMetadata

Metadata about a single 2D image or stack

NODE_IMAGE2DMETADATA

Image2DGroupMetadata

Metadata about a group of 2D images or stacks

NODE_IMAGE2DGROUPMETADATA

Image3D

Any 3D image that is not a density map or mask, for example a local resolution map, 3D FSC or cryoEF 3D transfer function

NODE_IMAGE3D

Image3DMetadata

Metadata about a single 3D image, except density maps, which have their own specific node type (DensityMap)

NODE_IMAGE3DMETADATA

Image3DGroupMetadata

Metadata about a group of 3D images (but not masks or density maps, which have their own specific types)

NODE_IMAGE3DGROUPMETADATA

LigandDescription

The stereochemical description of a ligand molecule

NODE_LIGANDDESCRIPTION

LogFile

A log file from a process. Could be PDF, text or another format

NODE_LOGFILE

Mask2D

A mask for use with 2D images

NODE_MASK2D

Mask3D

A mask for use with 3D volumes

NODE_MASK3D

MicrographCoords

A file containing coordinate info for a single micrograph, e.g. a .star or .box file produced from picking

NODE_MICROGRAPHCOORDS

MicrographCoordsGroup

A file containing coordinate info for multiple micrographs, e.g. a STAR file with a list of coordinate files as created by a RELION picking job

NODE_MICROGRAPHCOORDSGROUP

Micrograph

A single micrograph

NODE_MICROGRAPH

MicrographMetadata

Metadata about a single micrograph

NODE_MICROGRAPHMETADATA

MicrographGroupMetadata

Metadata about a set of micrographs, for example a RELION corrected_micrographs.star file

NODE_MICROGRAPHGROUPMETADATA

MicrographMovie

A single multi-frame micrograph movie

NODE_MICROGRAPHMOVIE

MicrographMovieMetadata

Metadata about a single multi-frame micrograph movie

NODE_MICROGRAPHMOVIEMETADATA

MicrographMovieGroupMetadata

Metadata about multiple micrograph movies, e.g. movies.star

NODE_MICROGRAPHMOVIEGROUPMETADATA

MicroscopeData

Data about the microscope, such as collection parameters, defect files or MTF curves

NODE_MICROSCOPEDATA

MlModel

A machine learning model

NODE_MLMODEL

OptimiserData

Specific type for RELION optimiser data from a refinement or classification job

NODE_OPTIMISERDATA

ParamsData

Contains parameters for running an external program

NODE_PARAMSDATA

ParticleGroupMetadata

Metadata for a set of particles, e.g particles.star from RELION

NODE_PARTICLEGROUPMETADATA

ProcessData

Other data resulting from a process that might be of use for other processes, for example an optimiser STAR file, postprocess STAR file or particle polishing parameters

NODE_PROCESSDATA

Restraints

Distance and angle restraints for atomic model refinement

NODE_RESTRAINTS

RigidBodies

A description of rigid bodies in an atomic model

NODE_RIGIDBODIES

Sequence

A protein or nucleic acid sequence file

NODE_SEQUENCE

SequenceGroup

A group of protein or nucleic acid sequences

NODE_SEQUENCEGROUP

SequenceAlignment

Files that contain or are used to generate multi sequence alignments

NODE_SEQUENCEALIGNMENT

StructureFactors

A set of structure factors in reciprocal space, usually corresponding to a real space density map

NODE_STRUCTUREFACTORS

TiltSeriesMetadata

Metadata about a single tomographic tilt series

NODE_TILTSERIESMETADATA

TiltSeriesGroupMetadata

Metadata about a group of multiple tomographic tilt series

NODE_TILTSERIESGROUPMETADATA

TomogramMetadata

Metadata about a single tomogram

NODE_TOMOGRAMMETADATA

TomogramGroupMetadata

Metadata about multiple tomograms

NODE_TOMOGRAMGROUPMETADATA

TomoOptimisationSet

Data about a RELION tomography optimisation set

NODE_TOMOOPTIMISATIONSET

TomoTrajectoryData

Data about a RELION tomography trajectory

NODE_TOMOTRAJECTORYDATA

TomoManifoldData

Data about a RELION tomography manifold

NODE_TOMOMANIFOLDDATA