CCPEM-Pipeliner API

The pipeliner api provides access to all of the main functions of the pipeliner

PipelinerProject

To interact with a pipeliner project it must be created as a PipelinerProject object

class pipeliner.api.manage_project.PipelinerProject(pipeline_name: str = 'default', project_name: str | None = None, description: str | None = None, make_new_project: bool = False)

Bases: object

This class forms the basis for a project.

pipeline_name

The name of the pipeline. Defaults to default if not set. There is really no good reason to give the pipeline any other name.

Type:

str

abort_job(job_name: str) None

Abort a running job.

This function signals to the job that it should abort, but does not wait for the job to respond.

Parameters:

job_name – The job name. This must be exact, use parse_procname to find the job if you need to find it from a partial name, number or alias.

Raises:
  • ValueError – If there is no job with the given name

  • RuntimeError – If the job is in any state except Running

cleanup_all(harsh: bool = False)

Runs cleanup on all jobs in a project

Parameters:

harsh (bool) – Should harsh cleaning be performed?

compare_job_parameters(jobs_list: List[str]) dict

Compare the running parameters of multiple jobs

Parameters:

jobs_list (list) – The jobs to compare

Returns:

{parameter: [value, value, value]}

Return type:

dict

Raises:
  • ValueError – If any of the jobs is not found

  • ValueError – If the jobs being compared are not of the same type

continue_job(job_to_continue: str, comment: str | None = None, run_in_foreground=False) PipelinerJob

Continue a job that has already been run

To change the parameters in a continuation the user needs to edit the continue_job.star file in the job’s directory

Parameters:
  • job_to_continue (str) – The name of the job to continue

  • comment (str) – Comments for the job’s jobinfo file

Returns:

The

PipelinerJob object for the created job

Return type:

PipelinerJob

Raises:
  • ValueError – If the continue_job.star file is not found and there is no job.star file in the job’s directory to use as a backup

  • ValueError – If the job is of a type that needs a optimizer file to continue and this file is not found

  • ValueError – The job has iterations but the parameters specified would result in no additional iterations being run

create_archive(job: str, full: bool = False, tar: bool = True) str

Creates an archive

Archives can be full or simple. Simple archives contain the directory structure of the project, the parameter files for each job and a script to rerun the project through the terminal job. The full archive contains the full job dirs for the terminal job and all of its children

Parameters:
  • job (str) – The name of the terminal job in the workflow

  • full (bool) – If True a full archive is written else a simple archive is written

  • tar (bool) – Should the newly written archive be compressed?

Returns:

A message telling the type of archive and its name

Return type:

str

create_reference_report(terminal_job: str) Tuple[str, int]

Create a report on all the references used in the project

Parameters:

terminal_job (str) – use this job and all its parents

Returns:

(The name of the report created, number of jobs in report)

Return type:

tuple

delete_job(job: str) bool

Delete a job

Removes the job from the main project and moves it and its children it to the Trash

Parameters:

job (str) – The name of the job to be deleted

Returns:

True If a job was deleted, False if no jobs were deleted

Return type:

bool

edit_comment(job_name: str, comment: str | None = None, overwrite: bool = False, new_rank: int | None = None)

Edit the comment of a job

Parameters:
  • job_name (str) – The name of the job to eddit the comment for

  • comment (str) – The comment to add/append

  • overwrite (bool) – if True overwrites otiginal comment, otherwise appends it to the current comment

  • new_rank (int) – New rank to assign to job, use -1 to revert the rank to None

Raises:

ValueError – If the new rank is not None or an integer

static empty_trash()

Deletes all the files and dirs in the Trash directory

Returns:

True if any files were deleted, False If no files were deleted

Return type:

bool

find_job_by_comment(contains: List[str] | None = None, not_contains: List[str] | None = None, job_type: str | None = None, command: bool = False) List[str]

Find Jobs by their comments or command

Parameters:
  • contains (list) – Find jobs that contain all of the strings in this list

  • not_contains (list) – Find jobs that do not contain any of these strings

  • job_type (str) – Only consider jobs who’s type contain this string

  • command (bool) – If True searches the job’s command history rather than its comments

Returns:

Names of all the jobs found

Return type:

list

Raises:

ValueError – If nothing is specified for contains and not_contains

find_job_by_rank(equals: int | None = None, less_than: int | None = None, greater_than: int | None = None, job_type: str | None = None) List[str]

Find jobs by their rank

Ignores jobs that are unranked

Parameters:
  • equals (int) – Find jobs with this exact rank

  • less_than (int) – Find jobs with ranks less then this number

  • greater_than (int) – Find jobs with ranks higher than this number

  • job_type (str) – Only consider jobs that contain this string in their job type

Returns:

Names of the matching jobs

Return type:

list

Raises:
  • ValueError – If nothing is specified to search for

  • ValueError – If both equals and less_than/greater than are specified

get_job(job_name: str) PipelinerJob

Get an existing job from the project.

Parameters:

job_name (str) – The name of the job to get

Raises:

ValueError – if the named job cannot be found

parse_proclist(list_o_procs: list, search_trash: bool = False) list

Finds full process names for multiple processes

Returns full process names IE: Import/job001/ from job001 or 1

Parameters:
  • list_o_procs (list) – A list of string process names

  • search_trash (bool) – Should the trash also be search?

Returns:

All of the full process names

Return type:

list

parse_procname(in_proc: str, search_trash: bool = False) str

Find process name with the ability for parse ambiguous input.

Returns full process names IE: Import/job001/ from job001 or 1 Can look in both active processes and the Trash Can, accepts inputs containing only job number and process type and alias IE Import/my_alias

Parameters:
  • in_proc (str) – The text that is being checked against the list of processes

  • search_trash (bool) – Should it return the process name if the process is in the trash?

Returns:

the process name

Return type:

str

Raises:
  • ValueError – if the process was in the trash but search_trash is false

  • ValueError – if the process name is not in the pipeliner format, jobxxx, or a number. IE: An unrelated string

  • ValueError – if the process name is not found

static prepare_deposition(terminal_job: str, depo_type: Literal['onedep', 'empiar'], depo_id: str | None = None, jobstar_file: str | None = None, empiar_do_mov: bool = True, empiar_do_mics: bool = True, empiar_do_parts: bool = True, empiar_do_rparts: bool = True) str

Prepare a deposition for EMPIAR, EMDB, or PDB databases

Parameters:
  • terminal_job (str) – This job and all its parents will be included in the deposition

  • depo_type (Literal["onedep", "empiar"]) – ‘ondep’ is used for PDB and EMDB, ‘empir for EMPIAR’

  • depo_id (Optional[str]) – A name for the deposition

  • jobstar_file – (Optional[str]): For EMPIAR; A job.star file that contains additional required information that cannot be gathered from the jobs themselves see ‘:class:~pipeliner.jobs.other.empiar_deposition_job.EmpiarDepositionJob’ for the specifics

  • empiar_do_mov (bool) – For EMPIAR; should raw movies be included?

  • empiar_do_mics (bool) – For EMPIAR; Should corrected micrographs be included?

  • empiar_do_parts (bool) – For EMPIAR; Should particles be included?

  • empiar_do_parts – For EMPIAR; Should polished particles be included?

Returns:

The name path of the created archive

Return type:

str

prepare_metadata_report(jobname: str) Tuple[str, int]

Returns a full metadata trace for a job and all upstream jobs

Parameters:

jobname – The name of the job to run on

Returns:

(dict: str: file written, int:

number of jobs in the report)

Return type:

tuple

run_cleanup(jobs: list, harsh: bool = False) bool

Run the cleanup function for multiple jobs

Each job defines its own method for cleanup and harsh cleanup

Parameters:
  • jobs (list) – List of string job names to operate on

  • harsh (bool) – Should harsh cleaning be performed

Returns:

True if cleanup is successful, otherwise False

Return type:

bool

run_job(jobinput: str | dict | PipelinerJob, overwrite: str | None = None, alias: str | None = None, ignore_invalid_joboptions=False, run_in_foreground=False) PipelinerJob

Run a new job in the project

If a file is specified the job will be created from the parameters in that file If a dict is input the job will be created with defaults for all options except those specified in the dict.

If a dict is used for input it MUST contain at minimum {“_rlnJobTypeLabel”: <the jobtype>}

Parameters:
  • jobinput (str, dict, PipelinerJob) – The path to a run.job or job.star file that defines the parameters for the job or a dict specifying job parameters or a PipelinerJob object

  • overwrite (str) – The name of a job to overwrite, if None a new job will be created. A job can only be overwritten by a job of the same type

  • alias (str) – Alias to assign to the new job

  • ignore_invalid_joboptions (bool) – Run the job anyway even if the job options appear to be invalid

  • run_in_foreground (bool) – Run job in the main process blocking anything else from happening until it completes

Returns:

The name of the job that was run

Return type:

str

Raises:
  • ValueError – If this method is used to continue a job

  • ValueError – If the job options are invalid and ignore_invalid_joboptions is not set

run_schedule(fn_sched: str, job_ids: List[str], nr_repeat: int = 1, minutes_wait: int = 0, minutes_wait_before: int = 0, seconds_wait_after: int = 5) str

Runs a list of scheduled jobs

Parameters:
  • fn_sched (str) – A name to assign to the schedule

  • job_ids (list) – A list of string job names to run

  • nr_repeat (int) – Number of times to repeat the entire schedule

  • minutes_wait (int) – Minimum number of minutes to wait between running each subsequent job

  • minutes_wait_before (int) – Initial number of minutes to wait before starting to run the schedules.

  • seconds_wait_after (int) – Time to wait after running each job

Returns:

The name of the schedule that is run

Return type:

str

Raises:

ValueError – If the schedule name is already in use

run_scheduled_job(job: PipelinerJob, run_in_foreground: bool = False)

Run a job that has been scheduled

Parameters:
  • job (PipelinerJob) – The job to run

  • run_in_foreground (bool) – Should the job be run in the foreground instead of being spun off as a separate process, for testing

schedule_continue_job(job_to_continue: str, params_dict: dict | None = None, comments: str | None = None) PipelinerJob

Schedule a job to run

Adds the job to the pipeline with scheduled status, does not run it

Parameters:
  • job_to_continue (str) – the name of the job to continue

  • params_dict (dict) – Parameters to change in the continuation job.star file. {param name: value}

  • comments (str) – comments to add to the job’s jobinfo file

Returns:

The

PipelinerJob object for the newly scheduled job

Return type:

PipelinerJob

schedule_job(job_input: str | Dict[str, str | float | int | bool], comment: str | None = None, alias: str | None = None) PipelinerJob

Schedule a job to run

Adds the job to the pipeline with scheduled status, does not run it

Parameters:
  • job_input (str) – The path to a run.job or job.star file that defines the parameters for the job or a dictionary containing job parameters

  • comment (str) – Comments to put in the job’s jobinfo file

  • alias (str) – Alias to give to the job

Returns:

The PipelinerJob object for the scheduled job

set_alias(job: str, new_alias: str | None)

Set the alias for a job

Parameters:
  • job (str) – The name of the job to set the alias for

  • new_alias (str) – The new alias

Raises:

ValueError – If the alias could not be set for any reason.

stop_schedule(schedule_name: str) bool

Stops a currently running schedule

Kills the process running the schedule and marks the currently running job as aborted. Works to stop schedules that were started using the RELION GUI or pipeliner.

Parameters:

schedule_name (str) – The name of the schedule to stop

Returns:

True If the schedule was stopped, False if the schedule could not be found to stop

Return type:

bool

undelete_job(job: str) bool

Restores a job from the Trash back into the project

Also restores the job’s alias if one existed

Parameters:

job (str) – The job to undelete

Returns:

True If a job was restored, otherwise False

Return type:

bool

update_job_status(job: str, new_status: str)

Mark a job as finished, failed or aborted

If is_failed and is_aborted are both False the job is marked as finished.

Parameters:
  • job (str) – The name of the job to update

  • new_status (str) – The new status for the job; Choose from “Running”, , “Scheduled”, “Succeeded”, “Failed or “Aborted”. Status names are not case sensitive

Raises:

ValueError – If the new status is not one of the options

pipeliner.api.manage_project.convert_pipeline(pipeline_name: str = 'default') bool

Converts a pipeline file from the RELION 2.0-3.1 format

This format has integer node, process, and status IDs. The pipeliner format uses string IDs

Parameters:

pipeline_name (str) – The name of the pipeline to be converted

Returns:

The result of the conversion

True if the pipeline was converted, False if the pipeline was already in pipeliner format

Return type:

bool

pipeliner.api.manage_project.delete_summary_data_archive(filename: str)

Remove an archive from the summary data

Args: filename (str): The name of the archive zip file or dir

pipeliner.api.manage_project.delete_summary_data_metadata_report(filename: str)

Remove a metadata report from the summary data

Args: filename (str): The name of the report file

pipeliner.api.manage_project.delete_summary_data_reference_report(filename: str)

Remove a reference report from the summary data

Args: filename (str): The name of the report file

pipeliner.api.manage_project.get_archives_list_from_summary_file() Tuple[List[str], List[List[str]]]

Get a list of the reference reports in the summary file

Returns:

([column, headers], [[line, 1, data], [line, 2, data]]

Return type:

tuple

pipeliner.api.manage_project.get_commands_and_nodes(job_file: str) tuple

Tell what commands a job file would return and nodes that would be created

Parameters:

job_file (str) – The path to a run.job or job.star file

Returns:

  • A list of commands. Each item in the commands list is a list of commands arguments. IE: [[com1-arg1, com1-arg2],[com2-arg1]]

  • A list of input nodes that would be created. Each item in the list is a tuple: [(name, type), (name, type)]

  • A list of output nodes that would be created. Each item in the list is a tuple: [(name, type), (name, type)]

  • A list of any PipelinerWarning raised by joboption validation

  • A list of the ExternalProgram objects used by the job

Return type:

tuple

pipeliner.api.manage_project.get_metadata_reports_from_summary_file() Tuple[List[str], List[List[str]]]

Get a list of the metadata reports in the summary file

Returns:

([column, headers], [[line, 1, data], [line, 2, data]]

Return type:

tuple

pipeliner.api.manage_project.get_ref_reports_from_summary_file() Tuple[List[str], List[List[str]]]

Get a list of the reference reports in the summary file

Returns:

([column, headers], [[line, 1, data], [line, 2, data]]

Return type:

tuple

pipeliner.api.manage_project.look_for_project(pipeline_name: str = 'default') dict | None

See if a pipeliner project exists in the current directory

Parameters:

pipeline_name (str) – The name of the pipeline to look for. This is the same as the pipeline file name with “_pipeline.star” removed.

Returns:

Info about the project, as a dict, or None if there is no existing project.

api_utilities

Utility functions do not require an existing project

pipeliner.api.api_utils.edit_jobstar(fn_template: str, params_to_change: dict, out_fn: str) str

Modify one or more parameters in a job.star file

Parameters:
  • fn_template (str) – The name of the job.star file to use as a template

  • params_to_change (dict) – The parameters to change in the format {param_name: new_value}

  • out_fn (str) – Name for the new file to be written

Returns:

The name of the output file written

Return type:

str

pipeliner.api.api_utils.get_job_info(job_type: str) JobInfo | None

Get information about a job

Parameters:

job_type (str) – The type of job to return info on

Returns:

JobInfo object with info about the job and it’s references

Return type:

JobInfo

Raises:

ValueError – If the job type is not found

pipeliner.api.api_utils.job_default_parameters_dict(jobtype: str) dict

Get dictionary of a job’s parameters

Parameters:

jobtype (str) – The type of job to get the dict for

Returns:

The parameters dict. Suitable for running a job from

run_job()

Return type:

dict

pipeliner.api.api_utils.validate_starfile(fn_in: str)

Checks for inappropriate use of reserved words in starfiles

Writes a corrected version with proper quotation if possible. The original file is saved with a ‘.orig’ suffix added.

Parameters:

fn_in (str) – The name of the file to check

pipeliner.api.api_utils.write_default_jobstar(job_type: str, out_fn: str | None = None, relionstyle: bool = False)

Write a job.star file for the specified type of job

The default jobstar contains all the job options with their values set as the defaults

Parameters:
  • job_type (str) – The type of job

  • out_fn (str) – Name of the file to write the output to. If left blank defaults to <job_type>_job.star

  • relionstyle (bool) – Should the job.star files be written in the relion format? Relion files are compatible with the pipeliner, but the pipeliner versions are not back compatible with Relion. If this option is selected a Relion job type should be used for job_type

Returns:

The name of the output file written

Return type:

str

pipeliner.api.api_utils.write_default_runjob(job_type: str, out_fn: str | None = None) str

Write a run.job file for the specified type of job

The default runjob contains all the job option labels with their values set as the defaults

Parameters:
  • job_type (str) – The type of job

  • out_fn (str) – Name of the file to write the output to. If left blank defaults to <job_type>_run.job

Returns:

The name of the output file written

Return type:

str

user_settings

User settings which can control the pipeliner’s behaviour, environment and default job option values.

user_settings.py

Settings for the CCP-EM pipeliner.

Settings can be provided by the user in a JSON-formatted settings file or in environment variables. When the pipeliner is first run, a new settings file is created in a platform-specific directory (typically ~/.config/ccpem/pipeliner/ on Linux or ~/Library/Application Support/ccpem/pipeliner/ on Mac). This contains default settings values. These settings can be edited in that file, or overridden by setting environment variables before running the pipeliner. Both RELION- and Pipeliner-type environment variables are supported. For example, the setting for the default qsub script template can be set by editing the value for "qsub_template" in the settings file or by setting either the PIPELINER_QSUB_TEMPLATE or RELION_QSUB_TEMPLATE environment variable. If more than one of these is set, PIPELINER_QSUB_TEMPLATE will be used by preference, followed by RELION_QSUB_TEMPLATE, followed by the value from the settings file.

For programmatic access to settings values, use the helper functions which are provided at the module level (e.g. user_settings.get_qsub_template(). These allow easy access to the correctly-typed value of each individual setting.

This module also contains a Settings class and some internal functions, but there should be no need to access any of these directly from outside this module. To add a new setting, create a new get_<setting name>() function at the top of this module and add a new setting definition in Settings.add_all_settings(), making sure to use the correct types for the function calls and default value.

class pipeliner.user_settings.BoolSettingDefinition(name: str, env_vars: List[str], default: bool)

Bases: SettingDefinition

Class for defining a setting with a bool value.

default: bool
static get_value_from_string(value_str: str) bool
class pipeliner.user_settings.IntSettingDefinition(name: str, env_vars: List[str], default: int)

Bases: SettingDefinition

Class for defining a setting with an int value.

default: int
static get_value_from_string(value_str: str) int
class pipeliner.user_settings.OptionalIntSettingDefinition(name: str, env_vars: List[str], default: int | None)

Bases: SettingDefinition

Class for defining a setting with an optional int value (i.e. int or None).

default: int | None
static get_value_from_string(value_str: str) int | None
class pipeliner.user_settings.OptionalStringSettingDefinition(name: str, env_vars: List[str], default: str | None)

Bases: SettingDefinition

Class for defining a setting with a string value.

default: str | None
static get_value_from_string(value_str: str | None) str
class pipeliner.user_settings.PathListSettingDefinition(name: str, env_vars: List[str], default: List[str])

Bases: SettingDefinition

Class for defining a setting with a value which is a list of file paths.

In JSON format, the value should appear as a list of strings. When stored in an environment variable, the value should take the form of a single string with the individual paths separated by os.pathsep (typically :).

default: List[str]
static get_value_from_string(value_str: str) List[str]
class pipeliner.user_settings.SettingDefinition(name: str, env_vars: List[str], default: Any)

Bases: object

Base class for setting definitions. Not intended to be used directly.

default: Any
env_vars: List[str]
static get_value_from_string(value_str: str) Any
name: str
class pipeliner.user_settings.Settings(settings_file_override=None)

Bases: object

Container class to hold settings definitions and the get_<type>() functions that fetch their values.

add_all_settings()
add_bool_setting(*, name: str, env_vars: List[str], default: bool)
add_int_setting(*, name: str, env_vars: List[str], default: int)
add_optional_int_setting(*, name: str, env_vars: List[str], default: int | None)
add_optional_string_setting(*, name: str, env_vars: List[str], default: str | None)
add_path_list_setting(*, name: str, env_vars: List[str], default: List[str])
add_string_setting(*, name: str, env_vars: List[str], default: str)
check_for_extra_keys()
get_bool(name: str) bool
get_int(name: str) int
get_list(name: str) List[str]
get_optional_int(name: str) int | None
get_optional_string(name: str) str
get_qsub_extras(number: int) Dict[str, str]

Add definitions for the requested qsub extra settings and get their values

get_setting_value(name: str)
get_string(name: str) str
class pipeliner.user_settings.StringSettingDefinition(name: str, env_vars: List[str], default: str)

Bases: SettingDefinition

Class for defining a setting with a string value.

default: str
static get_value_from_string(value_str: str) str
pipeliner.user_settings.get_additional_program_paths() List[str]

Paths to prepend to the PATH environment variable before searching for program executables. Use this setting to make third-party software available to the pipeliner. Note that the paths should be to the directory containing the executable, not to the executable itself.

In JSON format, this setting is a list of strings. It can also be set using the PIPELINER_ADDITIONAL_PROGRAM_PATHS environment variable, which should be a PATH-style string containing individual paths separated by path separators (typically ‘:’ on Linux and Mac).

pipeliner.user_settings.get_ccpem_share_dir() str

Path to the share dir in an installation of ccpem for rvapi to use

pipeliner.user_settings.get_ctffind_executable() str

The default CTFFIND-4.1+ executable.

pipeliner.user_settings.get_default_nrmpi() int

The default for ‘Number of MPI procs’.

pipeliner.user_settings.get_default_nrthreads() int

The default for ‘Number of threads’.

pipeliner.user_settings.get_gctf_executable() str

The default Gctf executable.

pipeliner.user_settings.get_minimum_dedicated() int

The default for ‘Minimum dedicated cores per node’.

pipeliner.user_settings.get_modelcraft_executable() str

The default Modelcraft executable.

pipeliner.user_settings.get_motioncor2_executable() str

The default MotionCor2 executable.

pipeliner.user_settings.get_mpi_max() int | None

The maximum number of MPI processes available from the GUI.

pipeliner.user_settings.get_mpirun_command() str

The default command prepended to MPI jobs, including ‘XXXmpinodesXXX’ which will be substituted with the number of MPI nodes to use.

pipeliner.user_settings.get_path_to_source_files() List[str]

Script files to set up the environment variables for the pipeliner. These files will be sourced when the pipeliner starts up and their environment read in and used to update the pipeliner’s environment variables. This is a clunky mechanism intended mainly for setting up CCP4 and the old version of CCP-EM. Avoid using it for new packages.

In JSON format, this setting is a list of strings. It can also be set using the PIPELINER_PATH_TO_SOURCE_FILES environment variable, which should be a PATH-style string containing individual paths separated by path separators (typically ‘:’ on Linux and Mac).

pipeliner.user_settings.get_qsub_command() str

The default for ‘Queue submit command’.

pipeliner.user_settings.get_qsub_extra_count() int

The number of extra qsub template substitution variables to use.

pipeliner.user_settings.get_qsub_extras(number: int) Dict[str, str]

Return a dictionary of values for a single numbered set of extra qsub settings.

For example:

get_qsub_extras(1) -> {
    "name": "Label for qsub_extra1",
    "default": "Default value for qsub_extra1",
    "help": "Help text for qsub_extra1",
}
pipeliner.user_settings.get_qsub_template() str

The default queue submission script template.

pipeliner.user_settings.get_queue_name() str

The default for ‘Queue Name”.

pipeliner.user_settings.get_queue_use() bool

The default for ‘Submit to queue?’.

pipeliner.user_settings.get_resmap_executable() str

The default ResMap executable.

pipeliner.user_settings.get_scratch_dir() str

The default scratch directory.

pipeliner.user_settings.get_thread_max() int | None

The maximum number of threads per MPI process available from the GUI.

pipeliner.user_settings.get_topaz_executable() str

The default Topaz executable.

pipeliner.user_settings.get_warning_local_mpi() int | None

Warn if users try to submit local jobs with more than this many MPI nodes.