Pipeline Tools

Nodes and Processes

ProjectGraph

The ProjectGraph handles all the management of the pipeline keeping track of which jobs have been run, their statuses, and what files are input and outputs to the various jobs

class pipeliner.project_graph.ProjectGraph(name: str = 'default', pipeline_dir: str | os.PathLike[str] = '.', read_only: bool = True, create_new: bool = False)

Bases: object

The main ProjectGraph object is used for manipulating the pipeline

node_list

A Node object for every file that is an input or output for a job in the project

Type:

list

process_list

A Process object for each job in the project

Type:

list

job_counter

The number of the next job in the project IE: If there are 10 jobs in a project job_counter is 11

Type:

int

add_job(job: PipelinerJob, as_status: str, do_overwrite: bool, alias: str | None = None) Process

Add a job to the pipeline

Adds the Process for the job, a Node for each of its input and output files, and writes a mini-pipeline containing just that job

Parameters:
  • job (PipelinerJob) – The job to add.

  • as_status (str) – The status of the job in the pipeline

  • do_overwrite (bool) – If the job already exists, should it be overwritten?

  • alias (str) – Alias to assign to job

Returns:

The Process for the new job

Return type:

Process

add_new_input_edge(node: Node, process: Process) None

Add a Node to a Process as in input

Parameters:
  • node (Node) – The node to add

  • process (Process) – The Process to add the Node to

add_new_output_edge(process: Process, node: Node, mini: bool = False) None

Add a Node to a Process as in output

Parameters:
  • node (Node) – The node to add

  • process (Process) – The Process to add the Node to

  • mini (bool) – Is the pipeline being operated on a mini pipeline written inside a job directory

add_node(node: Node, touch_if_not_exists: bool = False) Node | None

Add a Node to the pipeline

A node is only added if it doesn’t already exist in the pipeline, so if a node is used as an input the keywords from the process that wrote the node will overrule any added in the node’s definition from the process that used it as input

Parameters:
  • node (Node) – The node to add

  • touch_if_not_exists (bool) – If the file for the node does not exist should it be created?

Returns:

The Node that was added. If this node already existed, returns the existing copy

Return type:

Node

Raises:

RuntimeError – If the node name is empty

add_process(process: Process, do_overwrite: bool) Process

Add a Process to the pipeline

Parameters:
  • process (Process) – The Process to add

  • do_overwrite (bool) – If the process already exists should it be overwritten?

Returns:

The Process that was added or the existing Process if it already existed

Return type:

(Process)

Raises:

RuntimeError – If the Process already exists and overwrite is False

alias_checks(process, new_alias) str

A set of checks for alias to make sure they fit the requirements

Parameters:
  • process (Process) – The Process object that is getting the new alias

  • new_alias (str) – The alias that will be applied to the process

check_process_completion() None

Check to see if any processes have finished running, update their status.

static check_process_status(proc: Process, finished: List[Process], failed: List[Process], aborted: List[Process]) Tuple[List[Process], List[Process], List[Process]]

Check if a job has a completion status file and assign to the right list

Parameters:
  • proc (Process) – The process to check

  • finished (List[Process]) – Process that are finished successfully

  • failed (List[Process]) – Processes that have failed

  • aborted (List[Process]) – Processes that were killed by the user

Returns:

The updated input lists

Return type:

Tuple[List[Process], List[Process], List[Process]]

clean_up_job(process: Process, do_harsh: bool) bool

Cleans up a job by deleting intermediate files

Gets a list of files to delete from the specific job’s cleanup function. First checks that none of the files that are slated for deletion are on the Node list or are of a few specific types that RELION needs. Then moves all the intermediate files to the trash

There are two tiers of clean up ‘harsh’ and normal. Each job defines what files are cleaned up by each cleanup type

Parameters:
  • process (Process) – The Process of the job to clean up

  • do_harsh (bool) – Should harsh cleaning be performed?

Returns:

True if cleaning was performed False if the specified job had no clean up method or it was protected from harsh cleaning

Return type:

bool

cleanup_all_jobs(do_harsh: bool) int

Clean up all jobs in the project

Parameters:

do_harsh (bool) – Should harsh cleaning be performed?

Returns:

The number of jobs that were successfully cleaned

Return type:

int

close() None

Close the pipeline by releasing the lock.

After closing, this object will be read-only.

Note that this does not write the pipeline to disk. If you have made changes that should be saved, call :meth:_write first or (preferably) open the pipeline in a context manager.

delete_job(process: Process) None

Remove a job from the pipeline

:param process (Process: ): The job to remove

delete_temp_node_file(node: Node) None

Remove files associated with a Node

Also removes the directory if it is empty

Parameters:

node (Node) – The node to remove the file for

delete_temp_node_files(process: Process) None

Delete all the files for the nodes in a specific Process

Parameters:

process (Process) – The Process to create the files for

find_immediate_child_processes(process: Process) List[Process]

Find just the immediate child processes of a process

Parameters:

process (Process) – The process to find children for

Returns:

The Process object for each job connected to the input Process

Return type:

list

find_node(name: str) Node | None

Retrieve the Node object for a file

Parameters:

name (str) – The name of the file to get the node for

Returns:

The file’s Node object. None if the file is not found.

Return type:

Node

find_process(name_or_alias: str) Process | None

Retrieve the Process object for a job in the pipeline

Parameters:

name_or_alias (str) – The job name or its alias

Returns:

The job’s Process, or None if the job was not found.

Return type:

pipeliner.process.Process

Raises:

RuntimeError – If multiple processes with the same name are found

get_downstream_network(process: Process) List[Tuple[Node | None, Process | None, Process]]

Gets data for drawing a network downstream from process

Parameters:

process (Process) – The process to trace

Returns:

Contains a tuple for each edge: (Node, parent Process, child Process). Each edge describes one file in the network.

Return type:

list

static get_node_name(node: Node) str

Get the relative path of a node file with its alias if it exists

This returns the relative path (which is the same as the file name) unless the job that created the node has an alias in which case it returns the file path with the alias instead of <jobtype>/jobxxx/

Parameters:

node (Node) – The node to get the name for

Returns:

The relative path of the file to node points to with the job’s alias if applicable

Return type:

str

get_pipeline_edges() Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]

Find the edges that track connections between processes

Returns:

([input edges], [output edges]) input edges is a list of tuples (input file, process name) output edges is a list of tuples (process name, output file) Note that the order in the tuples is different in the input and output edges! This is done to match the order of the pipeline STAR file columns.

Return type:

tuple

get_project_procs_list(terminal_job: str, reverse: bool = True) List[Process]

make a list of all the process objects for a job and its parents

Parameters:
  • terminal_job (str) – The name of the final job in the workflow

  • reverse (bool) – sort in reverse order (High to low)

Returns:

The Process objects for the terminal job and its parents, in reverse order by job number

Return type:

list

Raises:

ValueError – If the terminal job was not found

get_upstream_network(process: Process) List[Tuple[Node | None, Process | None, Process]]

Gets data for drawing a network upstream from process

Parameters:

process (Process) – The process to trace

Returns:

Contains a tuple for each edge: (Node, parent Process, child Process). Each edge describes one file in the network.

Return type:

list

get_whole_project_network() List[Tuple[Node | None, Process | None, Process]]

Get the edges and nodes for the entire project

Returns:

Edges [nod type, parent_job, child_job, extra info] set: The names of all nodes

Return type:

list

property name

The name of the pipeline, usually “default”.

_pipeline.star is added to the name to generate the pipeline file name.

property pipeline_dir

The directory containing the pipeline file.

property read_only

If this object should be able to make changes to the pipeline.

remake_node_directory() None

Erase and rewrite RELION’s .Nodes directory

set_job_alias(process: Process | None, new_alias: str | None) None

Set a job’s alias

Sets the alias in the pipeline and creates the alias symlink that points to the job directory.

Parameters:
  • process (Process) – The Process to make an alias for

  • new_alias (str) – The new alias for the job

Raises:
property star_file

The name of the pipeline STAR file, usually “default_pipeline.star”.

touch_temp_node_file(node: Node, touch_if_not_exists: bool) None

Create a placeholder file for a node that will be created later

Parameters:
  • node (Node) – The node to create the file for

  • touch_if_not_exists (bool) – Should the file be created if it does not already exist

touch_temp_node_files(process: Process) None

Create placeholder files for all nodes in a Process

Parameters:

process (Process) – The Process to create the files for

undelete_job(del_job: str) None

Get job out of the trash and restore it to the pipeline

Also restores any alias the job may have had as long as it does not conflict wit the current aliases

Parameters:

del_job (str) – The name of the deleted job

update_lock_message(lock_message: str) None

Updates the contents of the lockfile for the pipeline

This enables the user to see which process has locked the pipeline

update_status(the_proc: Process, new_status: str) None

Change the status of a job

The job can be marked as “Succeeded”, “Failed”, “Aborted”, “Running” or “Scheduled”

Parameters:
  • the_proc (Process) – The Process to update the status for

  • new_status (str) – The new status for the job

Returns:

Was the status changed?

Return type:

bool

Raises:
  • ValueError – If the new_status is not in the approved list

  • ValueError – If a job with any other status than ‘Running’ is marked ‘Aborted’

  • ValueError – If a job’s updated status is the same as it’s current status

validate_pipeline_file() Tuple[Document, str]

Check if the pipeline file is valid and what version it is

Returns:

(cif.Document containing the pipeline data, pipeline version string)

Return type:

Tuple

Metadata Tools

pipeliner.metadata_tools.add_md_to_summary_data(filename: str, terminal_job: str, njobs: int)

Add a metadata report to the summary file :param filename: The name of the report file :type filename: str :param terminal_job: The job the report was launched from :type terminal_job: str :param njobs: The number of jobs contained in the report :type njobs: int

pipeliner.metadata_tools.format_for_metadata(joboption: JobOption) str | float | int | bool | None

Format data from relion starfiles for JSON.

Changes ‘Yes’/’No” to True/False, None for blank vals and removes any quotation marks

Parameters:

joboption (JobOption) – The joboption to format

Returns:

A properly formatted str, float or

int or bool

Return type:

str

Raises:

ValueError – If a boolean job option doesn’t have a value compatible with a class:bool

pipeliner.metadata_tools.get_job_metadata(this_job: Process) dict

Run a job’s metadata gathering method

Parameters:

this_job (pipeliner.process.Process) – The Process object for the job to gather metadata from

Returns:

Metadata dict for the job Returns None if the job has no metadata gathering function

Return type:

dict

pipeliner.metadata_tools.get_metadata_chain(pipeline, this_job: Process, full: bool = False) Tuple[dict, int]

Get the metadata for a job and all its upstream jobs

Metadata is gathered by each individual job’s gather_metadata function

Parameters:
  • pipeline (ProjectGraph) –

  • this_job (Process) – The Process object for the terminal job

  • full (bool) – Should the metadata report also contain information about continuations and multiple runs of the jobs or just the current one

Returns:

(the metadata dict, number of jobs in it)

Return type:

tuple

pipeliner.metadata_tools.make_job_parameters_schema(job_type: str) dict

Write the json schema for the running parameters of a job

The metadata schema have two parts: the running parameters part is generated automatically and the results part is written by the user

Parameters:

job_type (str) – The job type to write the schema for

Returns:

The json schema ready to be dumped to json file

Return type:

dict

pipeliner.metadata_tools.make_job_results_schema(job_type: str) dict
pipeliner.metadata_tools.remove_md_from_summary_data(filename: str)

Remove a report from the summary file

Parameters:

filename (str) – Name of the report file to remove