CCPEM-Pipeliner API
The pipeliner api provides access to all of the main functions of the pipeliner
PipelinerProject
To interact with a pipeliner project it must be created as a
PipelinerProject object
- class pipeliner.api.manage_project.PipelinerProject(pipeline_name: str = 'default', project_name: str | None = None, description: str | None = None, make_new_project: bool = False)
Bases:
objectThis class forms the basis for a project.
- pipeline_name
The name of the pipeline. Defaults to default if not set. There is really no good reason to give the pipeline any other name.
- Type:
- abort_job(job_name: str) None
Abort a running job.
This function signals to the job that it should abort, but does not wait for the job to respond.
- Parameters:
job_name – The job name. This must be exact, use
parse_procnameto find the job if you need to find it from a partial name, number or alias.- Raises:
ValueError – If there is no job with the given name
RuntimeError – If the job is in any state except Running
- add_missing_nodes_from_all_jobs() List[str]
Add any missing nodes from all jobs in the pipeline.
One reason why this might be necessary is that the Pipeliner makes more nodes than Relion, so if a pipeline is converted the missing nodes must be added.
- cleanup_all(harsh: bool = False)
Runs cleanup on all jobs in a project
- Parameters:
harsh (bool) – Should harsh cleaning be performed?
- compare_job_parameters(jobs_list: List[str]) dict
Compare the running parameters of multiple jobs
- Parameters:
jobs_list (list) – The jobs to compare
- Returns:
{parameter: [value, value, value]}
- Return type:
- Raises:
ValueError – If any of the jobs is not found
ValueError – If the jobs being compared are not of the same type
- continue_job(job_to_continue: str, comment: str | None = None, run_in_foreground=False) PipelinerJob
Continue a job that has already been run
To change the parameters in a continuation the user needs to edit the continue_job.star file in the job’s directory
- Parameters:
- Returns:
- The
PipelinerJobobject for the created job
- Return type:
- Raises:
ValueError – If the continue_job.star file is not found and there is no job.star file in the job’s directory to use as a backup
ValueError – If the job is of a type that needs a optimizer file to continue and this file is not found
ValueError – The job has iterations but the parameters specified would result in no additional iterations being run
- create_archive(job: str, full: bool = False, tar: bool = True) str
Creates an archive
Archives can be full or simple. Simple archives contain the directory structure of the project, the parameter files for each job and a script to rerun the project through the terminal job. The full archive contains the full job dirs for the terminal job and all of its children
- create_reference_report(terminal_job: str) Tuple[str, int]
Create a report on all the references used in the project
- delete_job(job: str) bool
Delete a job
Removes the job from the main project and moves it and its children it to the Trash
- edit_comment(job_name: str, comment: str | None = None, overwrite: bool = False)
Edit the comment of a job
- Parameters:
- Raises:
ValueError – If the new rank is not None or an integer
- static empty_trash()
Deletes all the files and dirs in the Trash directory
- Returns:
Trueif any files were deleted,FalseIf no files were deleted- Return type:
- find_job_by_comment(contains: List[str] | None = None, not_contains: List[str] | None = None, job_type: str | None = None, command: bool = False) List[str]
Find Jobs by their comments or command
- Parameters:
- Returns:
Names of all the jobs found
- Return type:
- Raises:
ValueError – If nothing is specified for contains and not_contains
- get_continuation_job(job_name: str) PipelinerJob
Returns a PipelinerJob ready for continuation
All JobOption values are the ones from the parent job unless they have been updated by the job’s prepare_for_continuation() method
- Returns:
The new continuation job ready to use for generating a new job in Doppio
- Return type:
- get_job(job_name: str) PipelinerJob
Get an existing job from the project.
- Parameters:
job_name (str) – The name of the job to get
- Raises:
ValueError – if the named job cannot be found
- parse_proclist(list_o_procs: list, search_trash: bool = False) list
Finds full process names for multiple processes
Returns full process names IE: Import/job001/ from job001 or 1
- parse_procname(in_proc: str, search_trash: bool = False) str
Find process name with the ability for parse ambiguous input.
Returns full process names IE: Import/job001/ from job001 or 1 Can look in both active processes and the Trash Can, accepts inputs containing only job number and process type and alias IE Import/my_alias
- Parameters:
- Returns:
the process name
- Return type:
- Raises:
ValueError – if the process was in the trash but search_trash is false
ValueError – if the process name is not in the pipeliner format, jobxxx, or a number. IE: An unrelated string
ValueError – if the process name is not found
- static prepare_deposition(terminal_job: str, depo_type: Literal['emdb', 'empiar'], depo_id: str | None = None, jobstar_file: str | None = None, empiar_do_mov: bool = True, empiar_do_mics: bool = True, empiar_do_parts: bool = True, empiar_do_rparts: bool = True) str
Prepare a deposition for EMPIAR, EMDB, or PDB databases
- Parameters:
terminal_job (str) – This job and all its parents will be included in the deposition
depo_type (Literal["emdb", "empiar"]) – The type of deposition
depo_id (Optional[str]) – A name for the deposition
jobstar_file – (Optional[str]): For EMPIAR; A job.star file that contains additional required information that cannot be gathered from the jobs themselves see ‘:class:~pipeliner.jobs.other.empiar_deposition_job.EmpiarDepositionJob’ for the specifics
empiar_do_mov (bool) – For EMPIAR; should raw movies be included?
empiar_do_mics (bool) – For EMPIAR; Should corrected micrographs be included?
empiar_do_parts (bool) – For EMPIAR; Should particles be included?
empiar_do_parts – For EMPIAR; Should polished particles be included?
- Returns:
The name path of the created archive
- Return type:
- prepare_metadata_report(jobname: str) Tuple[str, int]
Returns a full metadata trace for a job and all upstream jobs
- Parameters:
jobname – The name of the job to run on
- Returns:
- (dict: str: file written, int:
number of jobs in the report)
- Return type:
- restore_from_cleanup(job: str) None
Restore files from a job that were removed by the cleanup function
- Parameters:
job (str) – The name of the job to restore files from
- run_cleanup(jobs: list, harsh: bool = False) bool
Run the cleanup function for multiple jobs
Each job defines its own method for cleanup and harsh cleanup
- run_job(jobinput: str | dict | PipelinerJob, overwrite: str | None = None, alias: str | None = None, ignore_invalid_joboptions=False, run_in_foreground=False) PipelinerJob
Run a new job in the project
If a file is specified the job will be created from the parameters in that file If a dict is input the job will be created with defaults for all options except those specified in the dict.
If a dict is used for input it MUST contain at minimum {“_rlnJobTypeLabel”: <the jobtype>}
- Parameters:
jobinput (str, dict,
PipelinerJob) – The path to a run.job or job.star file that defines the parameters for the job or a dict specifying job parameters or aPipelinerJobobjectoverwrite (str) – The name of a job to overwrite, if
Nonea new job will be created. A job can only be overwritten by a job of the same typealias (str) – Alias to assign to the new job
ignore_invalid_joboptions (bool) – Run the job anyway even if the job options appear to be invalid
run_in_foreground (bool) – Run job in the main process blocking anything else from happening until it completes
- Returns:
- The job that was run. Note that unless
run_in_foreground was set, the job has been started and is probably still running when this function returns.
- The job that was run. Note that unless
- Return type:
- Raises:
ValueError – If this method is used to continue a job
ValueError – If the job options are invalid and
ignore_invalid_joboptionsis not set
- run_schedule(fn_sched: str, job_ids: List[str], nr_repeat: int = 1, minutes_wait: int = 0, minutes_wait_before: int = 0, seconds_wait_after: int = 5) str
Runs a list of scheduled jobs
- Parameters:
fn_sched (str) – A name to assign to the schedule
job_ids (list) – A list of string job names to run
nr_repeat (int) – Number of times to repeat the entire schedule
minutes_wait (int) – Minimum number of minutes to wait between running each subsequent job
minutes_wait_before (int) – Initial number of minutes to wait before starting to run the schedules.
seconds_wait_after (int) – Time to wait after running each job
- Returns:
The name of the schedule that is run
- Return type:
- Raises:
ValueError – If the schedule name is already in use
- run_scheduled_job(job: PipelinerJob, run_in_foreground: bool = False)
Run a job that has been scheduled
- Parameters:
job (PipelinerJob) – The job to run
run_in_foreground (bool) – Should the job be run in the foreground instead of being spun off as a separate process, for testing
- schedule_continue_job(job_to_continue: str, params_dict: dict | None = None, comments: str | None = None) PipelinerJob
Schedule a job to run
Adds the job to the pipeline with scheduled status, does not run it
- Parameters:
- Returns:
- The
PipelinerJobobject for the newly scheduled job
- Return type:
- schedule_job(job_input: str | Dict[str, str | float | int | bool], comment: str | None = None, alias: str | None = None) PipelinerJob
Schedule a job to run
Adds the job to the pipeline with scheduled status, does not run it
- Parameters:
job_input (str) – The path to a run.job or job.star file that defines the parameters for the job or a dictionary containing job parameters
comment (str) – Comments to put in the job’s jobinfo file
alias (str) – Alias to give to the job. Note that the alias is checked and might be modified when the job is added to the pipeline. The final alias used can be retrieved from the returned job’s
aliasattribute.
- Returns:
The
PipelinerJobobject for the scheduled job
- set_alias(job: str, new_alias: str | None)
Set the alias for a job
- Parameters:
- Raises:
ValueError – If the alias could not be set for any reason.
- stop_schedule(schedule_name: str) bool
Stops a currently running schedule
Kills the process running the schedule and marks the currently running job as aborted. Works to stop schedules that were started using the RELION GUI or pipeliner.
- undelete_job(job: str) bool
Restores a job from the Trash back into the project
Also restores the job’s alias if one existed
- update_job_status(job: str, new_status: str)
Mark a job as finished, failed or aborted
If is_failed and is_aborted are both
Falsethe job is marked as finished.- Parameters:
- Raises:
ValueError – If the new status is not one of the options
- wait_for_job_to_finish(job: str | PipelinerJob, ping: float = 1.0, timeout: float = 86400.0, error_on_fail: bool = False, error_on_abort: bool = False) str
Wait for the job to finish, with any status
- Parameters:
job (PipelinerJob) – The job to wait for
ping (float) – How long to wait before checking for the file again (in seconds)
timeout (float) – Raise an error after this much time has elapsed, even if the job hasn’t finished (in seconds). The default is 24 hours.
error_on_fail (bool) – Raise an error if the job fails
error_on_abort (bool) – Raise an error if the job is aborted
- Returns:
The final status of the job, Failed, Succeeded, or Aborted
- Return type:
- Raises:
RuntimeError – upon the watched job failing, being aborted, or either if error_on_fail or error_on_abort are True
RuntimeError – If the job still hasn’t finished by the timeout time
- pipeliner.api.manage_project.convert_pipeline(pipeline_name: str = 'default') bool
Converts a pipeline file from the RELION 2.0-3.1 format
This format has integer node, process, and status IDs. The pipeliner format uses string IDs
- pipeliner.api.manage_project.delete_all_cleaned_job_files() None
Delete all files the jobs that have been moved to the trash
- pipeliner.api.manage_project.delete_all_trashed_job_files() None
Delete all files the jobs that have been moved to the trash
- pipeliner.api.manage_project.delete_cleaned_job_files(job) None
Delete files in the trash from a cleaned up job
If the job is not found, a message is printed and nothing else happens.
- Parameters:
job (str) – The job name; WITHOUT the cleanup dir prepended
- pipeliner.api.manage_project.delete_summary_data_archive(filename: str)
Remove an archive from the summary data
Args: filename (str): The name of the archive zip file or dir
- pipeliner.api.manage_project.delete_summary_data_metadata_report(filename: str)
Remove a metadata report from the summary data
Args: filename (str): The name of the report file
- pipeliner.api.manage_project.delete_summary_data_reference_report(filename: str)
Remove a reference report from the summary data
Args: filename (str): The name of the report file
- pipeliner.api.manage_project.delete_trashed_job_files(job) None
Delete files in the trash for a specific job
If the job is not found, a message is printed and nothing else happens.
- Parameters:
job (str) – The job name; WITHOUT the trash dir prepended
- pipeliner.api.manage_project.get_archives_list_from_summary_file() Tuple[List[str], List[List[str]]]
Get a list of the reference reports in the summary file
- Returns:
([column, headers], [[line, 1, data], [line, 2, data]]
- Return type:
- pipeliner.api.manage_project.get_cleaned_up_jobs_and_sizes(show_deleted: bool = False) Dict[str, str]
Get names and sizes of all files that have been removed by cleanup
- pipeliner.api.manage_project.get_commands_and_nodes(job_file: str) tuple
Tell what commands a job file would return and nodes that would be created
- Parameters:
job_file (str) – The path to a run.job or job.star file
- Returns:
A list of commands. Each item in the commands list is a list of commands arguments. IE:
[[com1-arg1, com1-arg2],[com2-arg1]]A list of input nodes that would be created. Each item in the list is a tuple:
[(name, type), (name, type)]A list of output nodes that would be created. Each item in the list is a tuple:
[(name, type), (name, type)]A list of any PipelinerWarning raised by joboption validation
A list of the ExternalProgram objects used by the job
- Return type:
- pipeliner.api.manage_project.get_deleted_jobs_and_sizes() Dict[str, str]
Get all the jobs that have been moved to the trash and their sizes
- pipeliner.api.manage_project.get_image_metadata_from_node(node_name: str, image_name: str = '') Mapping[str, float | int | str | dict | list]
Return the metadata for an image using the method from its node.
- Parameters:
- Returns:
A dictionary of metadata about the image.
- pipeliner.api.manage_project.get_metadata_reports_from_summary_file() Tuple[List[str], List[List[str]]]
Get a list of the metadata reports in the summary file
- Returns:
([column, headers], [[line, 1, data], [line, 2, data]]
- Return type:
- pipeliner.api.manage_project.get_ref_reports_from_summary_file() Tuple[List[str], List[List[str]]]
Get a list of the reference reports in the summary file
- Returns:
([column, headers], [[line, 1, data], [line, 2, data]]
- Return type:
api_utilities
Utility functions do not require an existing project
- pipeliner.api.api_utils.edit_jobstar(fn_template: str, params_to_change: dict, out_fn: str) str
Modify one or more parameters in a job.star file
- pipeliner.api.api_utils.get_job_info(job_type: str) JobInfo | None
Get information about a job
- Parameters:
job_type (str) – The type of job to return info on
- Returns:
JobInfo object with info about the job and its references
- Return type:
- Raises:
ValueError – If the job type is not found
- pipeliner.api.api_utils.job_default_parameters_dict(jobtype: str) dict
Get dictionary of a job’s parameters
- pipeliner.api.api_utils.validate_starfile(fn_in: str)
Checks for inappropriate use of reserved words in STAR files
Writes a corrected version with proper quotation if possible. The original file is saved with a ‘.orig’ suffix added.
- Parameters:
fn_in (str) – The name of the file to check
- pipeliner.api.api_utils.write_default_jobstar(job_type: str, out_fn: str | None = None, relionstyle: bool = False)
Write a job.star file for the specified type of job
The default job.star contains all the job options with their values set as the defaults
- Parameters:
job_type (str) – The type of job
out_fn (str) – Name of the file to write the output to. If left blank defaults to <job_type>_job.star
relionstyle (bool) – Should the job.star files be written in the relion format? Relion files are compatible with the pipeliner, but the pipeliner versions are not back compatible with Relion. If this option is selected a Relion job type should be used for job_type
- Returns:
The name of the output file written
- Return type:
user_settings
User settings which can control the pipeliner’s behaviour, environment and default job option values.
user_settings.py
Settings for the CCP-EM pipeliner.
Settings can be provided by the user in a JSON-formatted settings file or in environment
variables. When the pipeliner is first run, a new settings file is created in a
platform-specific directory (typically ~/.config/ccpem/pipeliner/ on Linux or
~/Library/Application Support/ccpem/pipeliner/ on Mac). This contains default
settings values. These settings can be edited in that file, or overridden by setting
environment variables before running the pipeliner. Both RELION- and Pipeliner-type
environment variables are supported. For example, the setting for the default qsub
script template can be set by editing the value for "qsub_template" in the settings
file or by setting either the PIPELINER_QSUB_TEMPLATE or RELION_QSUB_TEMPLATE
environment variable. If more than one of these is set, PIPELINER_QSUB_TEMPLATE will
be used by preference, followed by RELION_QSUB_TEMPLATE, followed by the value from
the settings file.
For programmatic access to settings values, use the helper functions which are provided
at the module level (e.g. user_settings.get_qsub_template(). These allow easy access
to the correctly-typed value of each individual setting.
This module also contains a Settings class and some internal functions, but there should
be no need to access any of these directly from outside this module. To add a new
setting, create a new get_<setting name>() function at the top of this module and
add a new setting definition in Settings.add_all_settings(), making sure to use the
correct types for the function calls and default value.
- class pipeliner.user_settings.BoolSettingDefinition(name: str, env_vars: List[str], default: bool)
Bases:
SettingDefinitionClass for defining a setting with a bool value.
- class pipeliner.user_settings.IntSettingDefinition(name: str, env_vars: List[str], default: int)
Bases:
SettingDefinitionClass for defining a setting with an int value.
- class pipeliner.user_settings.OptionalIntSettingDefinition(name: str, env_vars: List[str], default: int | None)
Bases:
SettingDefinitionClass for defining a setting with an optional int value (i.e. int or None).
- class pipeliner.user_settings.OptionalStringSettingDefinition(name: str, env_vars: List[str], default: str | None)
Bases:
SettingDefinitionClass for defining a setting with a string value.
- class pipeliner.user_settings.PathListSettingDefinition(name: str, env_vars: List[str], default: List[str])
Bases:
SettingDefinitionClass for defining a setting with a value which is a list of file paths.
In JSON format, the value should appear as a list of strings. When stored in an environment variable, the value should take the form of a single string with the individual paths separated by
os.pathsep(typically:).
- class pipeliner.user_settings.SettingDefinition(name: str, env_vars: List[str], default: Any)
Bases:
objectBase class for setting definitions. Not intended to be used directly.
- class pipeliner.user_settings.Settings(settings_file_override=None)
Bases:
objectContainer class to hold settings definitions and the
get_<type>()functions that fetch their values.- add_all_settings()
- add_qsub_extra_settings()
- check_for_extra_keys()
- check_qsub_extras()
- class pipeliner.user_settings.StringSettingDefinition(name: str, env_vars: List[str], default: str)
Bases:
SettingDefinitionClass for defining a setting with a string value.
- pipeliner.user_settings.get_additional_program_paths() List[str]
Paths to prepend to the PATH environment variable before searching for program executables. Use this setting to make third-party software available to the pipeliner. Note that the paths should be to the directory containing the executable, not to the executable itself.
In JSON format, this setting is a list of strings. It can also be set using the
PIPELINER_ADDITIONAL_PROGRAM_PATHSenvironment variable, which should be a PATH-style string containing individual paths separated by path separators (typically ‘:’ on Linux and Mac).
Path to the share dir in an installation of ccpem for rvapi to use
- pipeliner.user_settings.get_cryocare_executables_dir() str
The default cryocare executables directory.
- pipeliner.user_settings.get_minimum_dedicated() int | None
The default for ‘Minimum dedicated cores per node’.
- pipeliner.user_settings.get_mpi_max() int | None
The maximum number of MPI processes available from the GUI.
- pipeliner.user_settings.get_mpirun_command() str
The default command prepended to MPI jobs, including ‘XXXmpinodesXXX’ which will be substituted with the number of MPI nodes to use.
- pipeliner.user_settings.get_path_to_source_files() List[str]
Script files to set up the environment variables for the pipeliner. These files will be sourced when the pipeliner starts up and their environment read in and used to update the pipeliner’s environment variables. This is a clunky mechanism intended mainly for setting up CCP4 and the old version of CCP-EM. Avoid using it for new packages.
In JSON format, this setting is a list of strings. It can also be set using the
PIPELINER_PATH_TO_SOURCE_FILESenvironment variable, which should be a PATH-style string containing individual paths separated by path separators (typically ‘:’ on Linux and Mac).
- pipeliner.user_settings.get_qsub_extra_count() int
The number of extra qsub template substitution variables to use.
- pipeliner.user_settings.get_qsub_extras(number: int) Dict[str, str]
Return a dictionary of values for a single numbered set of extra qsub settings.
For example:
get_qsub_extras(1) -> { "name": "Label for qsub_extra1", "default": "Default value for qsub_extra1", "help": "Help text for qsub_extra1", }