draft version 2021/02/14 Matt Iadanza

Writing pipeliner plugins

Getting started

Required imports

from pipeliner.pipeliner_job import PipelinerJob, Ref
from pipeliner.job_options import JobOption
from pipeliner.data_structure import Node

First define the plugin object

class MyPlugIn(PipelinerJob):
        PROCESS_NAME = "myplugin.dosomething"
        OUT_DIR = "MyPlugIn"

PROCESS_NAME is what the pipeliner will use internally to identify the job type. The convention is:

program_name.function.additional_keywords

The above example plugin would be called from a star file with myplugin.dosomething in the jobtype field.

OUT_DIR is where the plugin will write its jobs when they are run. A directory name already in use by the pipeliner can be used or a new one can be defined.

Some examples from the pipeliner are:

class name

PROCESS_NAME

OUT_DIR

RelionAutoPickLog

relion.autopick.log

AutoPick

CyYOLOAutoPick

cryolo.autopick

AutoPick

RelionPolishTrain

relion.polish.train

Polish

Inside the MyPlugIn class there need to be some specific functions each will be explained in more detail in later sections.


required functions:

__init__(self)

Defines basic info about the plugin and the job options

get_commands(self)

Returns the list of command-line commands the plugin will make the pipeliner run.

optional functions:

These functions may not be necessary for the plugin to run but will make it work more effectively with the pipeliner

gather_metadata(self)

Returns a dictionary of metadata about a job that has been run. This only needs to return information about the results of the job, all the info about the parameters used to run the job will be automatically generated.

prepare_archive_data(self)

Returns information that can be used to create an archive of the job.

prepare_cleanup_lists(self, harsh=False)

Returns lists of files and directories to move to the trash when this type of job is cleaned up.

post_run_actions(self)

Any actions that should be performed after the job has completed running, this could be something like deleting temp files.


1) The __init__ function

First define the plugin info

The first thing the __init__ function should do is define some info about the plugin: Here is an example of how it is done for the buccaneer job type.

self.jobinfo.version = "0.1"
self.jobinfo.job_author = "Matt Iadanza"
self.jobinfo.short_desc = "Automated model building. Requires CCP4"
self.display_name = "Buccaneer"
self.jobinfo.long_desc = (
    "Buccaneer performs statistical chain"
    " tracing by identifying connected alpha-carbon positions using a "
    "likelihood-based density target. The target distributions are generated"
    "by a simulation calculation using a known reference structure for which"
    " calculated phases are available. The success of the method is dependent"
    " on the features of the reference structure matching those of the "
    "unsolved, work structure. For almost all cases, a single reference "
    "structure can be used, with modifications automatically applied to "
    "the reference structure to match its features to the work structure."
    " N.B. requires CCP4."
)
self.jobinfo.documentation = "https://www.ccp4.ac.uk/html/cbuccaneer.html"
self.jobinfo.programs = ["buccaneer"]
self.jobinfo.references = [
    Ref(
        authors=["Cowtan K"],
        title="The Buccaneer software for automated model building."
        "1. Tracing protein chains",
        journal="Acta Crystallogr D Biol Crystallogr.",
        year="2006",
        volume="62",
        issue="Pt 9",
        pages="1002-11",
        doi="10.1107/S0907444906022116",
    ),
    Ref(
        authors=["Hoh SW", "Burnley T", "Cowtan K"],
        title="Current approaches for automated model building"
        " into cryo-EM maps using Buccaneer with CCP-EM.",
        journal="Acta Crystallogr D Struct Biol",
        year="2020",
        volume="76",
        issue="Pt 6",
        pages="531-541",
        doi="10.1107/S2059798320005513",
    ),
]

See the documentation for the JobInfo object for more info on the parameters that are available.

References are entered as Ref objects

Next define the JobOptions

JobOption objects define the parameters used to run a job and how they appear in the automatically generated GUI.

There are different types of JobOption which accept different types of data and appear differently in the GUI:

JobOption Class

Type of data

Appears in GUI as

StringJobOption

str

A basic text box

BooleanJobOption

bool

A switchable Yes/No toggle

MultipleChoiceJobOption

str

A pulldown selection menu

FloatJobOption

float

A text box with a spinner dial

IntJobOption

int

A text box with a spinner dial

FilenameJobOption

str

A button that opens a file selection dialog

InputNodeJobOption

str

A button that opens a file selection dialog

All JobOptions have some common attributes and specific JobOption types have additional attributes that need to be defined. See the pipeliner documentation for full descriptions of each JobOption type:

Below are some examples of JobOptions used in the relion.refine3d job type:

FloatJobOption

self.joboptions["helical_tube_inner_diameter"] = FloatJobOption(
    label="Tube diameter - inner (A):",
    default_value=-1,
    hard_min=-1,
    required_if=[True]
    help_text=(
        "Inner and outer diameter (in Angstroms) of the reconstructed helix"
        " spanning across Z axis. Set the..."
    ),
)

IntJobOption

self.joboptions["helical_nr_asu"] = IntJobOption(
    label="Number of unique asymmetrical units:",
    default_value=1,
    suggested_min=1,
    suggested_max=100,
    step_value=1,
    hard_min=1,
    help_text=(
        "Number of unique helical asymmetrical units in each segment box. If"
        " the inter-box distance (set in segment...."
    ),
)

Note

The min and max values for FloatJobOption and IntJobOption only define what appears on the slider in the GUI, they do not set the min or max value actually allowed for the parameter. These are defined by hard_min and hard_max attributes.

BooleanJobOption

self.joboptions["ref_correct_greyscale"] = BooleanJobOption(
     label="Ref. map is on absolute greyscale?",
     default_value=False,
     help_text=(
         "Probabilities are calculated based on a Gaussian noise model, which"
         " contains a squared difference..."
     ),
 )

InputNodeJobOption

self.joboptions["fn_img"] = InputNodeJobOption(
    label="Input images STAR file:",
    node_type="ImagesData.star.relion",
    default_value="",
    directory="",
    pattern=files_exts("STAR files", [".star"]),
    help_text=(
        "A STAR file with all images (and their metadata). \n \n Alternatively,"
        " you may give a Spider/MRC stack of 2D..."
    ),
)

FilenameJobOption

Note

Here the pattern is being filled by the helper function files_exts() which generates formated lists of file descriptions.

Preset joboptions:

There are a set of JobOptions that are used in most jobs to define the various running parameters, including queue submission, MPI, and threads. The function

self.get_runtab_options()

automatically adds these options.

If the job uses MPI and/or threads they are specified in get_runtab_options():

self.get_runtab_options(mpi=True, threads=True)

For a list of the joboptions that are added from this function seee Appendix I.

A job’s parameters are read from a job.star or run.job file

The values for a job’s job options will be read from the run.job or job.star file that is used to create the job. Run.job files use the label while job.star files use the dictionary key.

The ‘ref_correct_greyscale’ job option above looks like this in a run.job file:

Ref. map is on absolute greyscale? == Yes

and looks like this is in job.star file:

ref_correct_greyscale        Yes

The job.star file to run the example plugin would be as follows:

data_job                                              # this block defines the job

_rlnJobTypeLabel             myplugin.dosomething     # PROCESS_NAME
_rlnJobIsContinue            0                        # is this a continuation? 0 = False 1 = True

data_joboptions_values                                # this block has the joboption values

loop_
_rlnJobOptionVariable #1
_rlnJobOptionValue #2
parameter1         No                                 # joboption
parameter2         10                                 # joboption
parameter3         1000                               # joboption
parameter4         -1                                 # joboption

2) The get_commands function

This function get_commands(self) needs to do three things:

  • Return a list the commands that need to be run by the job, in order

  • Define the job’s input nodes

  • Define the job’s output nodes

Returning the commands list

get_commands(self) needs to return a list of commands to be run in order. Each command in the list should be a list itself, with an item for each argument IE:

[["myprogram", "argument1", "argument2"], ["another_program", "argument"]]

This list can be assembled in any manner that is desired. A few variables are available that contain information about the job and can be used in this process:

self.output_name

string; This variable contains the output directory of the job, with a trailing slash IE: MyPlugIn/job004/

self.is_continue

bool; If the job is a continuation or a new run.

self.joboptions

dict; A dictionary of pipeliner.job_options.JobOption objects generated from the job.

There are some methods available for accessing the values in a JobOption

get_string(required=False, errormsg=””)

Returns the value of a pipeliner.job_options.JobOption as a string. If required is True and the joboption is blank it will raise an error with the message in errormsg

get_number(required=False, errormsg=””)

Returns the value of a pipeliner.job_options.JobOption as a float. If required is True and the joboption value is blank it will raise an error with the message in errormsg. If the joboption value cannot be converted into a float it will return an error

get_boolean

Returns True if the pipeliner.job_options.JobOption value is “yes”, “true”, “y”, or “1” (case insensitive) otherwise returns False

See the examples below for how to use the self.jobopions dict.

Defining input and output nodes

get_commands(self) also needs to define the input and output nodes for the job. These files will appear as nodes in the pipeline, and the output nodes will be used to determine if the job has been successful once it has been executed.

To define an input node add a Node object to self.input_nodes:

self.input_nodes.append(Node("File_Name", "Node_Type"))

The same is done for output nodes:

self.output_nodes.append(Node("File_Name", "Node_Type"))

Node_type is used when the pipeliner presents recommended inputs for future jobs, but inputs are never restricted to specific node types. The naming of node types follows this convention:

NodeType.extension.program.additional_keywords

Only the first two are required but the more descriptors added, the more useful the node will be for other jobs. For example a RELION Refine3D job (job type relion.refine3d) generates the following output nodes:

Node type

File

DensityMap.mrc.relion.refine3d

The output map

DensityMap.mrc.relion.reline3d.halfmap

One of the halfmaps

ParticleData.star.relion.refine3d

The star file containing all the particle data

ProcessData.star.relion.optimiser

The optimiser file which RELION uses for continuing a job

The node type is not limited to any set of specific types but using an existing type (see Appendix II) will make the outputs of the plugin more accessible to other jobs.

Example - A very simple plugin

from pipeliner.pipeliner_job import PipelinerJob
from pipeliner.job_options import JobOption
from pipeliner.data_structure import Node

class MyPlugIn(PipelinerJob):
        PROCESS_NAME = "myplugin.dosomething"
        OUT_DIR = "MyPlugIn"

        def __init__(self):

                self.jobinfo.version = "0.1",
                self.jobinfo.job_author = "Matt Iadanza",
                self.jobinfo.programs = ["myprogram"],
                self.jobinfo.references = [
                        Ref(
                                authors=["Iadanza MG"],
                                title="Myprogram is the best software ever!",
                                journal="NCS",
                                year="2021",
                                volume="1",
                                issue="2",
                                pages="420-421",
                                doi="10.1107/S0907444906022116"
                        )
                ]

                self.joboptions["input_file"] = FilenameJobOption(
                        "The input file",
                        "",
                        "mrc file (*.mrc)",
                        "."
                        "This file should be a .mrc file"
                )
                self.joboptions["a_choice"] = BooleanJobOption(
                        "Add a number?",
                        True,
                        "Helptext for choice",
                )
                self.joboptions["a_number"] = JobOption.as_float_slider(
                        "This is a number",
                        "0",
                        "0",
                        "10",
                        "0.5",
                        "It's a number from 1 to 10",
                )

        def get_commands(self):
                command = ["myprogram"]
                input_file = self.joboptions["input_file"].get_string(True, "Input file missing")
                command.append(input_file)

                output_file = self.output_name + "output.txt"
                command += ["--out", output_file]

                do_alternative = self.joboptions["a_choice"].get_boolean()
                if do_alternative:
                        alt_arg = self.joboptions["a_number"].get_number()
                        command += ["--number", alt_arg]

                self.input_nodes.append(Node(input_file, "DensityMap.mrc"))
                self.output_nodes.append(Node(output_file, "LogFile.txt.myprogram"))

                final_commands_list = [command]
                return final_commands_list

This plugin takes a mrc file as an input and returns the following :

[['myprogram', 'input_file', '--out', 'MyPlugIn/job001/output.txt']]

if the joboption “a_choice” was True and “a_number” was 3 it returns:

[['myprogram', 'input_file', '--out', 'MyPlugIn/job001/output.txt', '--number', '3']]

Note that even though there is only a single command to be run it must still be a list inside the main commands list.

When the job is run all of the commands will be executed in order. The STDOUT and STDERR from each command will be appended to the <output_name>/run.out and <output_name>/run.err files.

After this job is run, the pipeliner will look for the all the files specified in the output nodes. (“MyPlugIn/job001/output.txt” in this case) If all the files are found it will mark the job as successful otherwise it will mark the job as failed.

Example - A more complex plugin

This example builds off the previous simple plugin but adds more features

from pipeliner.pipeliner_job import PipelinerJob, Ref
from pipeliner.job_options import JobOption
from pipeliner.data_structure import Node

class MyPlugIn(PipelinerJob):
        PROCESS_NAME = "myplugin.dosomething"
        OUT_DIR = "MyPlugIn"

        def __init__(self):

                self.jobinfo.version = "0.1",
                self.jobinfo.job_author = "Matt Iadanza",
                self.jobinfo.programs = ["myprogram"],
                self.jobinfo.references = [
                        Ref(
                                authors=["Iadanza MG"],
                                title="Myprogram is the best software ever!",
                                journal="NCS",
                                year="2021",
                                volume="1",
                                issue="2",
                                pages="420-421",
                                doi="10.1107/S0907444906022116"
                        )
                ]


                self.joboptions["input_file"] = FilenameJobOption(
                        "The input file",
                        "",
                        "mrc file (*.mrc)",
                        "."
                        "Helptext for input file"
                )
                self.joboptions["a_choice"] = BooleanJobOption(
                        "",
                        True,
                        "Helptext for choice",
                )
                self.joboptions["a_number"] = JobOption.as_float_slider(
                        "This is a number",
                        "0",
                        "0",
                        "10",
                        "0.5",
                        "Help text for alternative: it's a number from 1 to 10",
                )
                self.get_runtab_options(mpi=True)                   # added the runtab options with MPI enabled

        def get_commands(self):
                nr_mpi = self.joboptions["nr_mpi"].get_number()     # change the command
                if nr_mpi > 1:                                      # if mpi is being used
                        command = ["mpirun", "-n", nr_mpi, "myprogram_mpi"]
                else:
                        command = ["myprogram"]

                input = self.joboptions["input_file"].get_string(True, "Input file missing")
                command.append(input)

                output_file = self.output_name + "output.txt"
                command += ["--out", output_file]

                do_alternative = self.joboptions["a_choice"].get_boolean()
                if do_alternative:
                        alt_arg = self.joboptions["a_number"].get_number()
                        command += ["--number", alt_arg]

                if self.is_continue:                                    # changes the command if the
                        command.append("--continue")                        # job is a continuation

                additional_args = self.joboptions["other_args"].get_string()    # another field from runtab options
                if additional_args != "":                                       # not mandatory in this job
                        command += ["--extra", *additional_args.split()]     # NOTE: additional_args from the
                                                                                # joboption are a string so they need
                                                                                # to be split into individual arguments
                self.input_nodes.append(Node(input, "DensityMap.mrc"))
                self.output_nodes.append(Node(output_file, "LogFile.txt.myprogram"))

                command2 = ["my_cleanupscript.py", "--do_cleanup"]                         # Run a clean up script second
                cleanup_log = os.path.join(self.output_name, "cleanup.log")                # It produces a log file
                self.output_nodes.append(Node(cleanup_log, "ProcessData.log.cleanup"))     # which is added as an output node

                final_commands_list = [command, command2]                     # the two commands are put in the final list
                return final_commands_list

This plugin might return a set of commands like:

[
    ['mpirun', '-n', '12', 'myprogram_mpi', 'input_file', '--out', 'MyPlugIn/job001/output.txt', '--number', '3', '--continue', '--extra', '100'],
    ['my_cleanupscript.py', '--do_cleanup'],
]

which would be run in order. After all commands have been executed the pipeliner will look for the files “MyPlugIn/job001/output.txt” and ‘MyPlugIn/job001/cleanup.log”. If both files are present the job will be marked successful.

Optional functions

These functions are not required for a job to function but help to better integrate the plugin into the pipeliner.

If any of these functions is called but does not exist, the pipeliner will print a warning but will not raise an error.

3) The gather_metadata function (optional)

gather_metadata(self) should return a dict of important metadata. Exactly what should be returned is still be decided upon, but this should be the type of information that would be useful for an emdb/pdb deposition.

4) The prepare_archive_data function (optional)

prepare_archive_data should return a list of files that need to be included if the job is archived

The input and output nodes of the job will automatically be added to the list so these do not need to be included.

This should include data that are not output nodes, but are essential to the job such as the coordinates generated by picking jobs or extracted particle images. The best way to decide if a file is essential is if the file is refered to by a starfile that is an input or output node, it should be archived.

5) The prepare_clean_up_lists function (optional)

prepare_clean_up_lists(self, harsh=False) should return two lists of things to delete when the job is cleaned up: one of files and one of directories.

Normal cleaning should delete intermediate files that are not needed for the final result.

Harsh cleaning should delete things that can be easily reproduced by re-running the job as well as the intermediate files.

Use you discretion as to what data can be removed.

If a file slated for deletion is in the pipeline as an input or output node it, and the directory containing it, will not be deleted even if they are in the lists of files/dirs to be removed.

Using plugins

Once the plugin has been written put the file in relion-pipeline/pipeliner/jobs/other/

Testing the plugin

  1. Generate default run.job and job.star files

Run the command CL_pipeline --default_runjob <process_name>

This should generate the run.job file for the plugin with default values in all fields. Check it for correctness.

Run the command CL_pipeline --default_jobstar <process_name> This should generate the job.star file for the plugin with default values in all fields. Again check it for correctness.

Either a run.job or job.star file can be used to run a plugin.

  1. Test the plugin returns the expected commands and nodes

Run the command CL_pipeline --check_command <job.star or run.job file>

This will output the commands generated by the plugin and lists of the expected input and output nodes. Check it for correctness. Manually change the options in run.job or job.star file and make sure the plugin returns the expected commands and nodes with the new parameters.

  1. Test the info about your plugin displays correctly

Run the command CL_pipeline --job_info <job_type>

This should output info about your job, make sure it is correct.

Appendix I - get_runtab_options() joboptions

If self.get_runtab_options() is included in the job __init__ the following joboptions will be included

self.joboptions["other_args"] = JobOption.as_textbox(
        "Additional arguments: ",
        "",
        "",
        True,
)

self.joboptions["do_queue"] = BooleanJobOption(
        "Submit to queue?",
        do_queue_default,
        "If set to Yes, the job will be submit to a queue,"
        " otherwise the job will be executed locally. Note "
        "that only MPI jobs may be sent to a queue. The default "
        "can be set through the environment variable PIPELINER_QUEUE_USE.",
        True,
)

self.joboptions["queuename"] = JobOption.as_textbox(
        "Queue name: ",
        queue_name_default,
        "Name of the queue to which to submit the job. The"
        " default name can be set through the environment"
        " variable PIPELINER_QUEUE_NAME.",
        True,
)

self.joboptions["qsub"] = JobOption.as_textbox(
        "Queue submit command: ",
        qsub_default,
        "Name of the command used to submit scripts to the queue,"
        " e.g. qsub or bsub. Note that the person who installed the pipeliner"
        " should have made a custom script for your cluster/queue setup."
        " Check this is the case (or create your own script."
        " if you have trouble submitting jobs. The default "
        "command can be set through the environment variable PIPELINER_QSUB_COMMAND.",
        True,
)

self.joboptions["min_dedicated"] = JobOption.as_int_slider(
        "Minimum dedicated cores per node: ",
        min_cores_default,
        1,
        64,
        1,
        "Minimum number of dedicated cores that need to be requested "
        "on each node. This is useful to force the queue to fill up entire "
        "nodes of a given size. The default can be set through the environment"
        " variable PIPELINER_MINIMUM_DEDICATED.",
        True,
)

self.joboptions["qsubscript"] = FilenameJobOption(
        "Standard submission script: ",
        qsub_template_default,
        "*",
        ".",
        "The template for your standard queue job submission script."
        " Its default location may be changed by setting the environment"
        " variable PIPELINER_QSUB_TEMPLATE. In the template script a number"
        " of variables will be replaced: \nXXXcommandXXX = relion command"
        " + arguments; \nXXXqueueXXX = The queue name;"
        "\nXXXmpinodesXXX = The number of MPI nodes;"
        "\nXXXthreadsXXX = The number of threads;"
        "\nXXXcoresXXX = XXXmpinodesXXX * XXXthreadsXXX;"
        "\nXXXdedicatedXXX = The minimum number of dedicated cores on each node;"
        "\nXXXnodesXXX = The number of requested nodes"
        " = CEIL(XXXcoresXXX / XXXdedicatedXXX);"
        "\nIf these options are not enough for your standard jobs,"
        " you may define a user-specified number of extra variables"
        ": XXXextra1XXX, XXXextra2XXX, etc. The number of extra variables"
        " is controlled through the environment variable PIPELINER_QSUB_EXTRA_COUNT."
        " Their help text is set by the environment variables "
        "PIPELINER_QSUB_EXTRA1, PIPELINER_QSUB_EXTRA2, etc For example, "
        "setenv PIPELINER_QSUB_EXTRA_COUNT 1, together with "
        "setenv PIPELINER_QSUB_EXTRA1 'Max number of hours in queue' "
        "will result in an additional (text) ein the GUI Any "
        "variables XXXextra1XXX in the template script will be "
        "replaced by the corresponding value.Likewise, default values"
        " for the extra entries can be set through environment variables "
        "PIPELINER_QSUB_EXTRA1_DEFAULT, PIPELINER_QSUB_EXTRA2_DEFAULT, etc. "
        "But note that (unlike all other entries in the GUI) the extra "
        "values are not remembered from one run to the other.",
        True,
)

If additional qsub options have been added using the PIPELINER_QSUB_EXTRA1, PIPELINER_QSUB_EXTRA1_DEFAULT, and PIPELINER_QSUB_EXTRA1_HELP environment variables (see the PIPELINER documentation for more info) these extra variables are added as job options as well

self.joboptions["qsub_extra_1"].as_textbox(
        PIPELINER_QSUB_EXTRA1,             # label
        PIPELINER_QSUB_EXTRA1_DEFAULT,     # default value
        PIPELINER_QSUB_EXTRA1_HELP,        # help text
)

Up to four additional qsub variables as desired can be added in this way.


If self.get_runtab_options(mpi=True) then this job option will also be added

self.joboptions["nr_mpi"] = JobOption.as_int_slider(
        "Number of MPI procs:",
        1,
        1,
        64,
        1,
        "Number of MPI nodes to use in parallel. When set to 1"
        ", MPI will not be used. The maximum can be set through the environment"
        " variable PIPELINER_MPI_MAX.",
        True,
)

If self.get_runtab_options(threads=True) then this job option will also be added

self.joboptions["nr_threads"] = JobOption.as_int_slider(
        "Number of threads:",
        1,
        16,
        1,
        "Number of shared-memory (POSIX) threads to use in"
        " parallel. When set to 1, no multi-threading will "
        "be used. The maximum can be set through the environment"
        " variable PIPELINER_THREAD_MAX.",
        True,
)

Appendix II - Node types already in use in the pipeliner

Type Name

Description

AtomCoords

An atomic model file

DensityMap

A 3D cryoEM density map (could be half map, full map, sharpened etc)

Image2D

A single 2D image (not actually used yet, could be dropped?)

Image2DStack

A single file containing a stack of 2D images

Image3D

Any 3D image that is not a density map or mask, for example a local resolution map, 3D FSC or cryoEF 3D transfer function

Images2DData

Information about groups of 2D images, for example 2D references or reprojections (similar to ParticlesData, but ParticlesData is specifically for real extracted particles)

Images3DData

Information about groups of 3D images, for example 3D references

LigandDescription

The stereo-chemical description of a ligand molecule

LogFile

A log file from a process. Could be PDF, text or other formats

Mask2D

A mask for use with 2D images

Mask3D

A mask for use with 3D volumes

MicrographsCoords

A file containing coordinate info, e.g. a star file with a list of coordinate files as created by a Relion picking job

MicrographsData

A set of micrographs and metadata about them, for example a Relion corrected_micrographs.star file

MicrographMoviesData

Metadata about multiple micrograph movies e.g. movies.star

ParticlesData

A set of particles and their metadata, e.g particles.star from Relion

ProcessData

Other data resulting from a process that might be of use for other processes, for example an optimiser star file, postprocess star file or polishing params

Restraints

Distance and angle restraints for atomic model refinement

RigidBodies

A description of rigid bodies in an atomic model

Sequence

A protein or nucleic acid sequence file

StructureFactors

A set of structure factors in reciprocal space, usually corresponding to a real-space density map