Writing new jobs
New software can be added to the ccpem-pipeliner by writing a
PipelinerJob
.
A PipelinerJob
is completely self contained and has
everything needed to run a new piece of software in the pipeliner framework. It must
accomplish the following tasks:
Define the job’s input parameters and how they are displayed in the GUI
Define how the parameters will be validated
Generate the commands that need to be run
Define the job’s output nodes (i.e. the output files that are displayed in the GUI and made available as inputs to other jobs in the pipeline)
Perform any final tasks that need to happen after the commands have been executed
Make objects that allow the GUI to display the job’s results graphically
Gather metadata about the job
Define how to clean up the job
Create the objects necessary to create a PDB/EMDB/EMPIAR deposition
Note
A minimal PipelinerJob
only needs to define input
parameters, define output nodes, and generate commands. Adding the additional
methods will add functionality to the job and integrate it more fully into Doppio, the
GUI built on top of the Pipeliner.
Making a new PipelinerJob
Required imports:
from pipeliner.pipeliner_job import PipelinerJob, Ref, ExternalProgram
from pipeliner.job_options import StringJobOption # and other types as necessary
from pipeliner.node_factory import create_node
from pipeliner.display_tools import create_results_display_object
Make a class for the new job:
class MyJob(PipelinerJob):
PROCESS_NAME = "software.function.keywords"
OUT_DIR = "MyJob"
def __init__(self):
super().__init__() # don't forget to initialize the PipelinerJob superclass first
PROCESS_NAME
is the name the pipeliner will use to identify the job, and should
match the job type name that is used when the job is added to the setup.cfg
file
(see “Adding the new job to the pipeliner” below).
OUT_DIR
is both the directory where the job’s output directory (jobNNN/
) will be
written and is used to group jobs together in the GUI.
Some examples:
class RelionLogAutopick(PipelinerJob):
PROCESS_NAME = "relion.autopick.log"
OUT_DIR = "AutoPick"
class RelionTrainTopaz(PipelinerJob):
PROCESS_NAME = "relion.autopick.topaz.train"
OUT_DIR = "AutoPick"
class CryoloAutopick(PipelinerJob):
PROCESS_NAME = "cryolo.autopick"
OUT_DIR = "AutoPick"
__init__
method
- The job’s
__init__
needs to do the following things: define information about the job
define the parameters for the job
Define information about the job
Information about the job and the programs it needs to run are stored in its
JobInfo
object. For example:
self.jobinfo.display_name = "RELION initial model generation
self.jobinfo.version = "0.1"
self.jobinfo.programs = [ExternalProgram(command="relion_refine")]
self.jobinfo.short_desc = "Create a de novo 3D initial model"
self.jobinfo.long_desc = "Relion 4.0 uses a gradient-driven algorithm to generate a de novo 3D"
self.jobinfo.references = [
Ref(
authors=["Scheres SHW"],
title="RELION: implementation of a Bayesian approach to cryo-EM structure”,
journal="J Struct Biol.",
year="2012",
volume="180",
issue="3",
pages="519-30",
doi="10.1016/j.jsb.2012.09.006",
)
]
Note
Each program in programs
should be entered
with an ExternalProgram
object.
These programs will be checked for availability, if they are not found in the system PATH
the GUI will mark the job as unavailable.
Define the job’s input parameters
Input parameters are defined by adding JobOption
objects to the job’s joboptions
attribute. There are 8 types of
JobOption
:
Type |
Data type |
Appearance in GUI |
Usage notes |
---|---|---|---|
|
Text entry field |
||
|
Number entry field |
||
|
Number entry field |
||
|
Checkbox |
||
|
Drawdown list |
||
|
Text entry field (with options) and a file browser |
Files that are part of the project |
|
|
Text entry field (with options) and a file browser for entering multiple files |
Files that are part of the project |
|
|
Text entry field |
Files that are outside the project |
|
|
Text entry field |
A path to a directory |
|
|
Text entry field |
A search string for multiple files |
Note
The job option types and GUI behaviour for input nodes and files are currently under review and will probably change in the near future. See also File paths in pipeliner jobs below.
Special predefined job options
There are two methods in PipelinerJob
that create
special predefined JobOptions
.
This adds a special StringJobOption
called
other_args
.
This can be used in calling
parse_additional_args()
in the job’s
get_commands()
method.
It returns a list of the additional args with the quotation structure preserved.
For example, if the value for self.joboptions[“other_args”]
is
'arg1 arg2 "arg3 arg4"'
, it would return: ['arg1', 'arg2', '"arg3 arg4"']
This adds the job options in RELION’s ‘Run’ tab:
Name |
JobOption type |
Description |
---|---|---|
nr_mpi |
Number of MPIs to use only; added if |
|
nr_threads |
Number of threads to use; only added if |
|
do_queue |
Should the job be submitted to the queue? |
|
queuename |
Name of the queue |
|
qsub |
Queue submission command to use |
|
qsubscript |
Path to template for the submission script |
|
min_dedicated |
Minimum dedicated cores per node |
create_output_nodes
method
The create_output_nodes()
method is used to
define the job’s outputs, in the form of Node
objects added to
the job’s output_nodes
list.
The simplest way to do this is to use the
add_output_node()
helper method, which
creates a node for a file in the job’s output directory and adds it to the
output_nodes
. For example, for a job that
makes a cryo-EM density map called map.mrc
as output:
from pipeliner.nodes import NODE_DENSITYMAP
self.add_output_node("map.mrc", NODE_DENSITYMAP, ["keyword1", "keyword2"]))
Information about the different node types can be found below.
Note that create_output_nodes()
is called
before the job is run, so it defines the outputs that the job is expected to produce.
It’s helpful to define the outputs early because then other jobs can be scheduled to run
afterwards using this job’s outputs as their inputs. However, in some cases it’s not
possible to know all of the job’s outputs until the commands have actually been run, in
which case additional output nodes can be added at the end of the job using the
create_post_run_output_nodes method.
get_commands
method
The get_commands()
method defines what the
job will actually do, by creating a list of commands that will be run. It is called
by the job runner when the job is started.
The commands must be returned as a list of PipelinerCommand
objects,
each of which contains a list of strings (i.e. a command and arguments, suitable for passing to
subprocess.run()
).
[
PipelinerCommand(["first", "command", "to", "run"]),
PipelinerCommand(["next", "command", "to", "run"]),
PipelinerCommand(["final", "command", "to", "run"]),
]
The commands can be assembled in any manner you see fit. A variety of attributes
of the PipelinerJob
are available to get pieces of
data during this process:
Attribute |
Description |
Data type |
---|---|---|
The directory where the job should put its output files. (This is also used as the job’s name in the pipeline.) |
|
|
A dict containing all of the JobOptions defined in |
|
|
Is the job being continued? |
|
|
List of the job’s input nodes |
|
|
List of the job’s output nodes |
|
Note
The get_commands()
method is run in a
different process from the one where the job was created and submitted, and for jobs
sent to a queue, it will also be on a different computer. Any state that was saved in
attributes of the PipelinerJob
before it was
scheduled to run will be lost. You should make sure that your job is written such that
it can be reliably recreated from its job option values alone.
Also note that get_commands()
should run
quickly and not produce any output files itself. Any actions that write files or take
some time to run should be moved into scripts or executables that can be run as one of
the job’s commands.
File paths in pipeliner jobs
The project directory is the root for all the pipeliner’s file handling: input and output files are stored as relative paths from the project directory, and jobs are run with the project directory as their working directory.
Jobs should be self-contained, i.e. they should write files only into their own job
directory, output_dir
(which will have
the form JobType/jobNNN/
, for example Import/job001/
). The job’s commands
should take care to direct their output files only to this directory (or subdirectories
within it) and not elsewhere in the project.
For example:
commands = [
["touch", os.path.join(self.output_dir, "output_file_1")], # this is fine
["cp", input_file_1, input_file_2, self.output_dir], # this is also fine
# The next command is NOT fine - it would write the file directly in the project directory
["touch", "output_file_2"],
]
The only exception to the rule that all paths should be relative and within the project is where a job needs to access some centrally-installed file or disk using an absolute path, for example a program executable, queue submission script template or local scratch disk. In these cases, absolute paths can be used but the files should be treated as external to the project and not added as input or output nodes.
Some uncooperative programs do not have the ability to specify where their results are
written. In these cases the location from which the commands are executed can be
changed by setting the working_dir
attribute. The most common use case is programs that write their outputs directly into
the working directory, in which case
working_dir
should be set to
output_dir
. The best place to do this is
usually at the start of the get_commands()
method.
If working_dir
has been set then it is
important that any file paths passed as arguments to the job’s commands are given
relative to the working directory rather than the project directory. The easiest way to
accomplish this is by using os.path.relpath()
. For example, in a pipeliner job the
path to an input file is typically found using something like this:
input_file = self.joboptions["input_file"].get_string()
. Normally input_file
can then be used directly, but in a job where
working_dir
is set
os.path.relpath(input_file, self.working_dir)
should be used instead.
Note
Only the job’s commands themselves will be run in the job’s
working_dir
. The pipeliner code
(including all PipelinerJob
methods) is still run in
the project directory.
additional_joboption_validation
method
The additional_joboption_validation()
method serves
to do advanced validation of the JobOptions
before a job is run.
Simple validation is done automatically:
parameters that are required have non-empty values
parameter values are of the right type
parameter values are within their specified ranges
additional_joboption_validation()
is used for more
specific validation tasks that take into account more than one job option such as:
parameter A must be > parameter B
parameters C, D, and E cannot all be equal
The method should return a list of
JobOptionValidationResult
objects.
Some examples:
def additional_joboption_validation(self):
errors = []
jobop_a = self.joboptions["param_a"]
jobop_b = self.joboptions["param_b"]
if jobop_a.get_number() < jobop_b.get_number():
errors.append(
JobOptionValidationResult(
type="error",
raised_by=[jobop_a, jobop_b],
message="A must be greater than B",
)
)
jobop_c = self.joboptions["param_c"].get_string()
jobop_d = self.joboptions["param_d"].get_string()
jobop_e = self.joboptions["param_d"].get_string()
if jobop_c == jobop_d == jobop_e:
errors.append(
JobOptionValidationResult(
type="error",
raised_by=[jobop_c, jobop_d],
message="C, D, and E cannot all be the same!",
)
)
return errors
create_post_run_output_nodes
method
This function creates output nodes after the job has run. This is only necessary when the names of the output nodes are not known beforehand.
Example of a create_post_run_output_nodes function: In this case the job doesn’t know
the number of outputs it will produce until after it has executed the commands so they
must be created ex post facto by create_post_run_output_nodes
def create_run_output_nodes(self):
outputs = glob(self.output_name + "result_*.mrc")
for f in outputs:
self.output_nodes.append(create_node(f, "DensityMap", ["node", "keywords"]))
create_results_display
method
The create_results_display()
method
generates result display objects that allow the Doppio GUI to display results from the
pipeliner.
There are currently 16 types of
ResultsDisplayObject
:
Type |
Description |
---|---|
A single line of text |
|
An array of thumbnail images |
|
A plot with points, lines, or both |
|
A 2D Image |
|
A histogram |
|
Multiple text containing cells with a header row |
|
Integrated 3D viewer for mrc, map, cif, and pdb files |
|
A general Object that displays a webpage |
|
A special class of |
|
A class that allows a Plotly object to be displayed; allows creation of more complex displays |
|
Allows a Plotly object to be passed directly to the display; for creation of more complex displays |
|
Allows for creation of more complex histograms with Plotly express histogram |
|
Allows for creation of more complex scatter plots with Plotly express scatter |
|
Displays the contents of a text file |
|
Displays the contents of a pdf file |
|
Displays html formatted results |
|
Displays JSON formatted results |
They should all be created with the
pipeliner.display_tools.create_results_display_object()
function. This function
safely creates the object, returning a
ResultsDisplayPending
object with an
explanation if any errors are encountered.
gather_metadata
method
The gather_metadata()
method returns a
dict of metadata about the results of the job. It doesn’t need to gather any
information about the parameters used, this will be done automatically.
Do this in any way you see fit, just return a dict
.
prepare_clean_up_lists
method
The prepare_clean_up_lists()
method
returns a list
with two items; a list of files to delete and a list of
directories that should be removed when the job is cleaned up. The purpose of cleanup
is to free up disk space by removing unneeded files.
Warning
Don’t delete anything yet! This method should prepare lists of files for deletion, but they might not actually be deleted (e.g. if the user decides to cancel the clean up) so don’t do anything irreversible here.
There are two levels of cleanup:
Standard: delete files that are not necessary, such as intermediate iterations or tmp files
Harsh: delete more, such as output files that can be reproduced easily
Make sure prepare_clean_up_lists()
doesn’t
delete anything important or used by Doppio for results display.
Example prepare_clean_up_lists()
:
def prepare_clean_up_lists(self, do_harsh):
files_to_remove, dirs_to_remove = [], []
tmp_files = glob(self.output_name + "*.tmp")
tmp_dir = self.output_name + "tmpfiles"
files_to_remove.extend(tmp_files)
dirs_to_remove.extend(tmp_dir)
if do_harsh:
extra_files = glob(self.output_name + "*.extra")
extra_dir = self.output_name + "extrafiles"
files_to_remove.extend(extra_files)
dirs_to_remove.extend(extra_dir)
return [files_to_remove, dirs_to_remove]
prepare_deposition_data
method
The prepare_deposition_data()
method returns a
list of deposition objects which are used to prepare data for deposition in to the PDB,
EMDB, and EMPIAR.
This method must be implemented in any PipelinerJob
that produces data included in a database deposition. For the PDB and EMDB these are
defined by the published
schema.
This feature is currently in development…
def onedep_deposition(self):
sym = this_function_gets_symmetry()
reso = this_function_gets_the_resolution()
sp_filter = spatial_filtering_type_entry(
high_frequency_cutoff=reso,
software_list=("relion_refine",),
)
rec_filter = reconstruction_filtering_type_entry(spatial_filtering=sp_filter)
recdep = final_reconstruction_type_entry(
applied_symmetry=sym,
algorithm="FOURIER SPACE",
resolution=reso,
resolution_method="FSC 0.143 CUT-OFF",
reconstruction_filtering=rec_filter,
)
return [recdep]
Adding the new job to the pipeliner
Now you have a file my_new_job.py
.
It contains a class MyJob
that describes a job of the type mysoftware.function
.
Well done!
Put your job file into
ccpem-pipeliner/pipeliner/jobs/other
Add an entry point definition for your new job in
ccpem-pipeliner/setup.cfg
: Go to theccpem_pipeliner.jobs
section Add your job in the format:
jobtype = package_name.module_name:ClassName
e.g.
mysoftware.function = pipeliner.jobs.other.my_new_job:MyJob
Update the pipeliner installation with:
pip install –e .
Check for your job by running
pipeliner --job_info mysoftware.function
from the command line
Node type names
These top-level node types are already in use in the pipeliner. It is good
practice to use the constants found in pipeliner.nodes
rather than typing node
types to maintain continuity.
(scroll the table right to see the constant names)
Node type |
Description |
Constant |
---|---|---|
AtomCoords |
An atomic model file |
NODE_ATOMCOORDS |
AtomCoordsGroupMetadata |
Metadata about a set of atomic coordinates files, a list of atomic models for example |
NODE_ATOMCOORDSGROUPMETADATA |
DensityMap |
A 3D cryoEM density map (could be half map, full map, sharpened etc) |
NODE_DENSITYMAP |
DensityMapMetadata |
Metadata about a single density map |
NODE_DENSITYMAPMETADATA |
DensityMapGroupMetadata |
Metadata about multiple density maps |
NODE_DENSITYMAPGROUPMETADATA |
EulerAngles |
Data about Euler angles |
NODE_EULERANGLES |
EvaluationMetric |
A file containing evaluation metrics |
NODE_EVALUATIONMETRIC |
Image2D |
A single 2D image |
NODE_IMAGE2D |
Image2DStack |
A single file containing a stack of 2D images |
NODE_IMAGE2DSTACK |
Image2DMetadata |
Metadata about a single 2D image or stack |
NODE_IMAGE2DMETADATA |
Image2DGroupMetadata |
Metadata about a group of 2D images or stacks |
NODE_IMAGE2DGROUPMETADATA |
Image3D |
Any 3D image that is not a density map or mask, for example a local resolution map, 3D FSC or cryoEF 3D transfer function |
NODE_IMAGE3D |
Image3DMetadata |
Metadata about a single 3D image, except density maps, which have their own specific node type (DensityMap) |
NODE_IMAGE3DMETADATA |
Image3DGroupMetadata |
Metadata about a group of 3D images (but not masks or density maps, which have their own specific types) |
NODE_IMAGE3DGROUPMETADATA |
LigandDescription |
The stereochemical description of a ligand molecule |
NODE_LIGANDDESCRIPTION |
LogFile |
A log file from a process. Could be PDF, text or another format |
NODE_LOGFILE |
Mask2D |
A mask for use with 2D images |
NODE_MASK2D |
Mask3D |
A mask for use with 3D volumes |
NODE_MASK3D |
MicrographCoords |
A file containing coordinate info for a single micrograph, e.g. a .star or .box file produced from picking |
NODE_MICROGRAPHCOORDS |
MicrographCoordsGroup |
A file containing coordinate info for multiple micrographs, e.g. a STAR file with a list of coordinate files as created by a RELION picking job |
NODE_MICROGRAPHCOORDSGROUP |
Micrograph |
A single micrograph |
NODE_MICROGRAPH |
MicrographMetadata |
Metadata about a single micrograph |
NODE_MICROGRAPHMETADATA |
MicrographGroupMetadata |
Metadata about a set of micrographs, for example a RELION corrected_micrographs.star file |
NODE_MICROGRAPHGROUPMETADATA |
MicrographMovie |
A single multi-frame micrograph movie |
NODE_MICROGRAPHMOVIE |
MicrographMovieMetadata |
Metadata about a single multi-frame micrograph movie |
NODE_MICROGRAPHMOVIEMETADATA |
MicrographMovieGroupMetadata |
Metadata about multiple micrograph movies, e.g. movies.star |
NODE_MICROGRAPHMOVIEGROUPMETADATA |
MicroscopeData |
Data about the microscope, such as collection parameters, defect files or MTF curves |
NODE_MICROSCOPEDATA |
MlModel |
A machine learning model |
NODE_MLMODEL |
OptimiserData |
Specific type for RELION optimiser data from a refinement or classification job |
NODE_OPTIMISERDATA |
ParamsData |
Contains parameters for running an external program |
NODE_PARAMSDATA |
ParticleGroupMetadata |
Metadata for a set of particles, e.g particles.star from RELION |
NODE_PARTICLEGROUPMETADATA |
ProcessData |
Other data resulting from a process that might be of use for other processes, for example an optimiser STAR file, postprocess STAR file or particle polishing parameters |
NODE_PROCESSDATA |
Restraints |
Distance and angle restraints for atomic model refinement |
NODE_RESTRAINTS |
RigidBodies |
A description of rigid bodies in an atomic model |
NODE_RIGIDBODIES |
Sequence |
A protein or nucleic acid sequence file |
NODE_SEQUENCE |
SequenceGroup |
A group of protein or nucleic acid sequences |
NODE_SEQUENCEGROUP |
SequenceAlignment |
Files that contain or are used to generate multi sequence alignments |
NODE_SEQUENCEALIGNMENT |
StructureFactors |
A set of structure factors in reciprocal space, usually corresponding to a real space density map |
NODE_STRUCTUREFACTORS |
TiltSeriesMetadata |
Metadata about a single tomographic tilt series |
NODE_TILTSERIESMETADATA |
TiltSeriesGroupMetadata |
Metadata about a group of multiple tomographic tilt series |
NODE_TILTSERIESGROUPMETADATA |
TomogramMetadata |
Metadata about a single tomogram |
NODE_TOMOGRAMMETADATA |
TomogramGroupMetadata |
Metadata about multiple tomograms |
NODE_TOMOGRAMGROUPMETADATA |
TomoOptimisationSet |
Data about a RELION tomography optimisation set |
NODE_TOMOOPTIMISATIONSET |
TomoTrajectoryData |
Data about a RELION tomography trajectory |
NODE_TOMOTRAJECTORYDATA |
TomoManifoldData |
Data about a RELION tomography manifold |
NODE_TOMOMANIFOLDDATA |