CCPEM-Pipeliner API
The pipeliner api provides access to all of the main functions of the pipeliner
PipelinerProject
To interact with a pipeliner project it must be created as a
PipelinerProject
object
- class pipeliner.api.manage_project.PipelinerProject(pipeline_name: str = 'default', project_name: str | None = None, description: str | None = None, make_new_project: bool = False)
Bases:
object
This class forms the basis for a project.
- pipeline_name
The name of the pipeline. Defaults to default if not set. There is really no good reason to give the pipeline any other name.
- Type:
- abort_job(job_name: str) None
Abort a running job.
This function signals to the job that it should abort, but does not wait for the job to respond.
- Parameters:
job_name – The job name. This must be exact, use
parse_procname
to find the job if you need to find it from a partial name, number or alias.- Raises:
ValueError – If there is no job with the given name
RuntimeError – If the job is in any state except Running
- cleanup_all(harsh: bool = False)
Runs cleanup on all jobs in a project
- Parameters:
harsh (bool) – Should harsh cleaning be performed?
- compare_job_parameters(jobs_list: List[str]) dict
Compare the running parameters of multiple jobs
- Parameters:
jobs_list (list) – The jobs to compare
- Returns:
{parameter: [value, value, value]}
- Return type:
- Raises:
ValueError – If any of the jobs is not found
ValueError – If the jobs being compared are not of the same type
- continue_job(job_to_continue: str, comment: str | None = None, run_in_foreground=False) PipelinerJob
Continue a job that has already been run
To change the parameters in a continuation the user needs to edit the continue_job.star file in the job’s directory
- Parameters:
- Returns:
- The
PipelinerJob
object for the created job
- Return type:
- Raises:
ValueError – If the continue_job.star file is not found and there is no job.star file in the job’s directory to use as a backup
ValueError – If the job is of a type that needs a optimizer file to continue and this file is not found
ValueError – The job has iterations but the parameters specified would result in no additional iterations being run
- create_archive(job: str, full: bool = False, tar: bool = True) str
Creates an archive
Archives can be full or simple. Simple archives contain the directory structure of the project, the parameter files for each job and a script to rerun the project through the terminal job. The full archive contains the full job dirs for the terminal job and all of its children
- create_reference_report(terminal_job: str) Tuple[str, int]
Create a report on all the references used in the project
- delete_job(job: str) bool
Delete a job
Removes the job from the main project and moves it and its children it to the Trash
- edit_comment(job_name: str, comment: str | None = None, overwrite: bool = False, new_rank: int | None = None)
Edit the comment of a job
- Parameters:
- Raises:
ValueError – If the new rank is not None or an integer
- static empty_trash()
Deletes all the files and dirs in the Trash directory
- Returns:
True
if any files were deleted,False
If no files were deleted- Return type:
- find_job_by_comment(contains: List[str] | None = None, not_contains: List[str] | None = None, job_type: str | None = None, command: bool = False) List[str]
Find Jobs by their comments or command
- Parameters:
- Returns:
Names of all the jobs found
- Return type:
- Raises:
ValueError – If nothing is specified for contains and not_contains
- find_job_by_rank(equals: int | None = None, less_than: int | None = None, greater_than: int | None = None, job_type: str | None = None) List[str]
Find jobs by their rank
Ignores jobs that are unranked
- Parameters:
- Returns:
Names of the matching jobs
- Return type:
- Raises:
ValueError – If nothing is specified to search for
ValueError – If both equals and less_than/greater than are specified
- get_job(job_name: str) PipelinerJob
Get an existing job from the project.
- Parameters:
job_name (str) – The name of the job to get
- Raises:
ValueError – if the named job cannot be found
- parse_proclist(list_o_procs: list, search_trash: bool = False) list
Finds full process names for multiple processes
Returns full process names IE: Import/job001/ from job001 or 1
- parse_procname(in_proc: str, search_trash: bool = False) str
Find process name with the ability for parse ambiguous input.
Returns full process names IE: Import/job001/ from job001 or 1 Can look in both active processes and the Trash Can, accepts inputs containing only job number and process type and alias IE Import/my_alias
- Parameters:
- Returns:
the process name
- Return type:
- Raises:
ValueError – if the process was in the trash but search_trash is false
ValueError – if the process name is not in the pipeliner format, jobxxx, or a number. IE: An unrelated string
ValueError – if the process name is not found
- static prepare_deposition(terminal_job: str, depo_type: Literal['onedep', 'empiar'], depo_id: str | None = None, jobstar_file: str | None = None, empiar_do_mov: bool = True, empiar_do_mics: bool = True, empiar_do_parts: bool = True, empiar_do_rparts: bool = True) str
Prepare a deposition for EMPIAR, EMDB, or PDB databases
- Parameters:
terminal_job (str) – This job and all its parents will be included in the deposition
depo_type (Literal["onedep", "empiar"]) – ‘ondep’ is used for PDB and EMDB, ‘empir for EMPIAR’
depo_id (Optional[str]) – A name for the deposition
jobstar_file – (Optional[str]): For EMPIAR; A job.star file that contains additional required information that cannot be gathered from the jobs themselves see ‘:class:~pipeliner.jobs.other.empiar_deposition_job.EmpiarDepositionJob’ for the specifics
empiar_do_mov (bool) – For EMPIAR; should raw movies be included?
empiar_do_mics (bool) – For EMPIAR; Should corrected micrographs be included?
empiar_do_parts (bool) – For EMPIAR; Should particles be included?
empiar_do_parts – For EMPIAR; Should polished particles be included?
- Returns:
The name path of the created archive
- Return type:
- prepare_metadata_report(jobname: str) Tuple[str, int]
Returns a full metadata trace for a job and all upstream jobs
- Parameters:
jobname – The name of the job to run on
- Returns:
- (dict: str: file written, int:
number of jobs in the report)
- Return type:
- run_cleanup(jobs: list, harsh: bool = False) bool
Run the cleanup function for multiple jobs
Each job defines its own method for cleanup and harsh cleanup
- run_job(jobinput: str | dict | PipelinerJob, overwrite: str | None = None, alias: str | None = None, ignore_invalid_joboptions=False, run_in_foreground=False) PipelinerJob
Run a new job in the project
If a file is specified the job will be created from the parameters in that file If a dict is input the job will be created with defaults for all options except those specified in the dict.
If a dict is used for input it MUST contain at minimum {“_rlnJobTypeLabel”: <the jobtype>}
- Parameters:
jobinput (str, dict,
PipelinerJob
) – The path to a run.job or job.star file that defines the parameters for the job or a dict specifying job parameters or aPipelinerJob
objectoverwrite (str) – The name of a job to overwrite, if
None
a new job will be created. A job can only be overwritten by a job of the same typealias (str) – Alias to assign to the new job
ignore_invalid_joboptions (bool) – Run the job anyway even if the job options appear to be invalid
run_in_foreground (bool) – Run job in the main process blocking anything else from happening until it completes
- Returns:
The name of the job that was run
- Return type:
- Raises:
ValueError – If this method is used to continue a job
ValueError – If the job options are invalid and
ignore_invalid_joboptions
is not set
- run_schedule(fn_sched: str, job_ids: List[str], nr_repeat: int = 1, minutes_wait: int = 0, minutes_wait_before: int = 0, seconds_wait_after: int = 5) str
Runs a list of scheduled jobs
- Parameters:
fn_sched (str) – A name to assign to the schedule
job_ids (list) – A list of string job names to run
nr_repeat (int) – Number of times to repeat the entire schedule
minutes_wait (int) – Minimum number of minutes to wait between running each subsequent job
minutes_wait_before (int) – Initial number of minutes to wait before starting to run the schedules.
seconds_wait_after (int) – Time to wait after running each job
- Returns:
The name of the schedule that is run
- Return type:
- Raises:
ValueError – If the schedule name is already in use
- run_scheduled_job(job: PipelinerJob, run_in_foreground: bool = False)
Run a job that has been scheduled
- Parameters:
job (PipelinerJob) – The job to run
run_in_foreground (bool) – Should the job be run in the foreground instead of being spun off as a separate process, for testing
- schedule_continue_job(job_to_continue: str, params_dict: dict | None = None, comments: str | None = None) PipelinerJob
Schedule a job to run
Adds the job to the pipeline with scheduled status, does not run it
- Parameters:
- Returns:
- The
PipelinerJob
object for the newly scheduled job
- Return type:
- schedule_job(job_input: str | Dict[str, str | float | int | bool], comment: str | None = None, alias: str | None = None) PipelinerJob
Schedule a job to run
Adds the job to the pipeline with scheduled status, does not run it
- Parameters:
- Returns:
The
PipelinerJob
object for the scheduled job
- set_alias(job: str, new_alias: str | None)
Set the alias for a job
- Parameters:
- Raises:
ValueError – If the alias could not be set for any reason.
- stop_schedule(schedule_name: str) bool
Stops a currently running schedule
Kills the process running the schedule and marks the currently running job as aborted. Works to stop schedules that were started using the RELION GUI or pipeliner.
- undelete_job(job: str) bool
Restores a job from the Trash back into the project
Also restores the job’s alias if one existed
- update_job_status(job: str, new_status: str)
Mark a job as finished, failed or aborted
If is_failed and is_aborted are both
False
the job is marked as finished.- Parameters:
- Raises:
ValueError – If the new status is not one of the options
- pipeliner.api.manage_project.convert_pipeline(pipeline_name: str = 'default') bool
Converts a pipeline file from the RELION 2.0-3.1 format
This format has integer node, process, and status IDs. The pipeliner format uses string IDs
- pipeliner.api.manage_project.delete_summary_data_archive(filename: str)
Remove an archive from the summary data
Args: filename (str): The name of the archive zip file or dir
- pipeliner.api.manage_project.delete_summary_data_metadata_report(filename: str)
Remove a metadata report from the summary data
Args: filename (str): The name of the report file
- pipeliner.api.manage_project.delete_summary_data_reference_report(filename: str)
Remove a reference report from the summary data
Args: filename (str): The name of the report file
- pipeliner.api.manage_project.get_archives_list_from_summary_file() Tuple[List[str], List[List[str]]]
Get a list of the reference reports in the summary file
- Returns:
([column, headers], [[line, 1, data], [line, 2, data]]
- Return type:
- pipeliner.api.manage_project.get_commands_and_nodes(job_file: str) tuple
Tell what commands a job file would return and nodes that would be created
- Parameters:
job_file (str) – The path to a run.job or job.star file
- Returns:
A list of commands. Each item in the commands list is a list of commands arguments. IE:
[[com1-arg1, com1-arg2],[com2-arg1]]
A list of input nodes that would be created. Each item in the list is a tuple:
[(name, type), (name, type)]
A list of output nodes that would be created. Each item in the list is a tuple:
[(name, type), (name, type)]
A list of any PipelinerWarning raised by joboption validation
A list of the ExternalProgram objects used by the job
- Return type:
- pipeliner.api.manage_project.get_metadata_reports_from_summary_file() Tuple[List[str], List[List[str]]]
Get a list of the metadata reports in the summary file
- Returns:
([column, headers], [[line, 1, data], [line, 2, data]]
- Return type:
- pipeliner.api.manage_project.get_ref_reports_from_summary_file() Tuple[List[str], List[List[str]]]
Get a list of the reference reports in the summary file
- Returns:
([column, headers], [[line, 1, data], [line, 2, data]]
- Return type:
api_utilities
Utility functions do not require an existing project
- pipeliner.api.api_utils.edit_jobstar(fn_template: str, params_to_change: dict, out_fn: str) str
Modify one or more parameters in a job.star file
- pipeliner.api.api_utils.get_job_info(job_type: str) JobInfo | None
Get information about a job
- Parameters:
job_type (str) – The type of job to return info on
- Returns:
JobInfo object with info about the job and it’s references
- Return type:
- Raises:
ValueError – If the job type is not found
- pipeliner.api.api_utils.job_default_parameters_dict(jobtype: str) dict
Get dictionary of a job’s parameters
- pipeliner.api.api_utils.validate_starfile(fn_in: str)
Checks for inappropriate use of reserved words in starfiles
Writes a corrected version with proper quotation if possible. The original file is saved with a ‘.orig’ suffix added.
- Parameters:
fn_in (str) – The name of the file to check
- pipeliner.api.api_utils.write_default_jobstar(job_type: str, out_fn: str | None = None, relionstyle: bool = False)
Write a job.star file for the specified type of job
The default jobstar contains all the job options with their values set as the defaults
- Parameters:
job_type (str) – The type of job
out_fn (str) – Name of the file to write the output to. If left blank defaults to <job_type>_job.star
relionstyle (bool) – Should the job.star files be written in the relion format? Relion files are compatible with the pipeliner, but the pipeliner versions are not back compatible with Relion. If this option is selected a Relion job type should be used for job_type
- Returns:
The name of the output file written
- Return type:
user_settings
User settings which can control the pipeliner’s behaviour, environment and default job option values.
user_settings.py
Settings for the CCP-EM pipeliner.
Settings can be provided by the user in a JSON-formatted settings file or in environment
variables. When the pipeliner is first run, a new settings file is created in a
platform-specific directory (typically ~/.config/ccpem/pipeliner/
on Linux or
~/Library/Application Support/ccpem/pipeliner/
on Mac). This contains default
settings values. These settings can be edited in that file, or overridden by setting
environment variables before running the pipeliner. Both RELION- and Pipeliner-type
environment variables are supported. For example, the setting for the default qsub
script template can be set by editing the value for "qsub_template"
in the settings
file or by setting either the PIPELINER_QSUB_TEMPLATE
or RELION_QSUB_TEMPLATE
environment variable. If more than one of these is set, PIPELINER_QSUB_TEMPLATE
will
be used by preference, followed by RELION_QSUB_TEMPLATE
, followed by the value from
the settings file.
For programmatic access to settings values, use the helper functions which are provided
at the module level (e.g. user_settings.get_qsub_template()
. These allow easy access
to the correctly-typed value of each individual setting.
This module also contains a Settings class and some internal functions, but there should
be no need to access any of these directly from outside this module. To add a new
setting, create a new get_<setting name>()
function at the top of this module and
add a new setting definition in Settings.add_all_settings()
, making sure to use the
correct types for the function calls and default value.
- class pipeliner.user_settings.BoolSettingDefinition(name: str, env_vars: List[str], default: bool)
Bases:
SettingDefinition
Class for defining a setting with a bool value.
- class pipeliner.user_settings.IntSettingDefinition(name: str, env_vars: List[str], default: int)
Bases:
SettingDefinition
Class for defining a setting with an int value.
- class pipeliner.user_settings.OptionalIntSettingDefinition(name: str, env_vars: List[str], default: int | None)
Bases:
SettingDefinition
Class for defining a setting with an optional int value (i.e. int or None).
- class pipeliner.user_settings.OptionalStringSettingDefinition(name: str, env_vars: List[str], default: str | None)
Bases:
SettingDefinition
Class for defining a setting with a string value.
- class pipeliner.user_settings.PathListSettingDefinition(name: str, env_vars: List[str], default: List[str])
Bases:
SettingDefinition
Class for defining a setting with a value which is a list of file paths.
In JSON format, the value should appear as a list of strings. When stored in an environment variable, the value should take the form of a single string with the individual paths separated by
os.pathsep
(typically:
).
- class pipeliner.user_settings.SettingDefinition(name: str, env_vars: List[str], default: Any)
Bases:
object
Base class for setting definitions. Not intended to be used directly.
- class pipeliner.user_settings.Settings(settings_file_override=None)
Bases:
object
Container class to hold settings definitions and the
get_<type>()
functions that fetch their values.- add_all_settings()
- check_for_extra_keys()
- class pipeliner.user_settings.StringSettingDefinition(name: str, env_vars: List[str], default: str)
Bases:
SettingDefinition
Class for defining a setting with a string value.
- pipeliner.user_settings.get_additional_program_paths() List[str]
Paths to prepend to the PATH environment variable before searching for program executables. Use this setting to make third-party software available to the pipeliner. Note that the paths should be to the directory containing the executable, not to the executable itself.
In JSON format, this setting is a list of strings. It can also be set using the
PIPELINER_ADDITIONAL_PROGRAM_PATHS
environment variable, which should be a PATH-style string containing individual paths separated by path separators (typically ‘:’ on Linux and Mac).
Path to the share dir in an installation of ccpem for rvapi to use
- pipeliner.user_settings.get_minimum_dedicated() int
The default for ‘Minimum dedicated cores per node’.
- pipeliner.user_settings.get_mpi_max() int | None
The maximum number of MPI processes available from the GUI.
- pipeliner.user_settings.get_mpirun_command() str
The default command prepended to MPI jobs, including ‘XXXmpinodesXXX’ which will be substituted with the number of MPI nodes to use.
- pipeliner.user_settings.get_path_to_source_files() List[str]
Script files to set up the environment variables for the pipeliner. These files will be sourced when the pipeliner starts up and their environment read in and used to update the pipeliner’s environment variables. This is a clunky mechanism intended mainly for setting up CCP4 and the old version of CCP-EM. Avoid using it for new packages.
In JSON format, this setting is a list of strings. It can also be set using the
PIPELINER_PATH_TO_SOURCE_FILES
environment variable, which should be a PATH-style string containing individual paths separated by path separators (typically ‘:’ on Linux and Mac).
- pipeliner.user_settings.get_qsub_extra_count() int
The number of extra qsub template substitution variables to use.
- pipeliner.user_settings.get_qsub_extras(number: int) Dict[str, str]
Return a dictionary of values for a single numbered set of extra qsub settings.
For example:
get_qsub_extras(1) -> { "name": "Label for qsub_extra1", "default": "Default value for qsub_extra1", "help": "Help text for qsub_extra1", }