Pipeline Tools
Nodes and Processes
ProjectGraph
The ProjectGraph handles all the management of the pipeline keeping track of which jobs have been run, their statuses, and what files are input and outputs to the various jobs
- class pipeliner.project_graph.ProjectGraph(name: str = 'default', pipeline_dir: str | os.PathLike[str] = '.', read_only: bool = True, create_new: bool = False)
Bases:
object
The main ProjectGraph object is used for manipulating the pipeline
- job_counter
The number of the next job in the project IE: If there are 10 jobs in a project job_counter is 11
- Type:
- add_job(job: PipelinerJob, as_status: str, do_overwrite: bool, alias: str | None = None) Process
Add a job to the pipeline
Adds the
Process
for the job, aNode
for each of its input and output files, and writes a mini-pipeline containing just that job- Parameters:
job (
PipelinerJob
) – The job to add.as_status (str) – The status of the job in the pipeline
do_overwrite (bool) – If the job already exists, should it be overwritten?
alias (str) – Alias to assign to job
- Returns:
The Process for the new job
- Return type:
- add_node(node: Node, touch_if_not_exists: bool = False) Node | None
Add a
Node
to the pipelineA node is only added if it doesn’t already exist in the pipeline, so if a node is used as an input the keywords from the process that wrote the node will overrule any added in the node’s definition from the process that used it as input
- Parameters:
- Returns:
The Node that was added. If this node already existed, returns the existing copy
- Return type:
- Raises:
RuntimeError – If the node name is empty
- add_process(process: Process, do_overwrite: bool) Process
Add a
Process
to the pipeline- Parameters:
- Returns:
The Process that was added or the existing Process if it already existed
- Return type:
(
Process
)- Raises:
RuntimeError – If the Process already exists and overwrite is
False
- alias_checks(process, new_alias) str
A set of checks for alias to make sure they fit the requirements
- check_process_completion() None
Check to see if any processes have finished running, update their status.
- static check_process_status(proc: Process, finished: List[Process], failed: List[Process], aborted: List[Process]) Tuple[List[Process], List[Process], List[Process]]
Check if a job has a completion status file and assign to the right list
- Parameters:
- Returns:
The updated input lists
- Return type:
- clean_up_job(process: Process, do_harsh: bool) bool
Cleans up a job by deleting intermediate files
Gets a list of files to delete from the specific job’s cleanup function. First checks that none of the files that are slated for deletion are on the Node list or are of a few specific types that RELION needs. Then moves all the intermediate files to the trash
There are two tiers of clean up ‘harsh’ and normal. Each job defines what files are cleaned up by each cleanup type
- close() None
Close the pipeline by releasing the lock.
After closing, this object will be read-only.
Note that this does not write the pipeline to disk. If you have made changes that should be saved, call :meth:
_write
first or (preferably) open the pipeline in a context manager.
- delete_job(process: Process) None
Remove a job from the pipeline
:param process (
Process
: ): The job to remove
- delete_temp_node_file(node: Node) None
Remove files associated with a
Node
Also removes the directory if it is empty
- Parameters:
node (
Node
) – The node to remove the file for
- delete_temp_node_files(process: Process) None
Delete all the files for the nodes in a specific
Process
- Parameters:
process (
Process
) – The Process to create the files for
- find_immediate_child_processes(process: Process) List[Process]
Find just the immediate child processes of a process
- find_process(name_or_alias: str) Process | None
Retrieve the
Process
object for a job in the pipeline- Parameters:
name_or_alias (str) – The job name or its alias
- Returns:
The job’s Process, or None if the job was not found.
- Return type:
- Raises:
RuntimeError – If multiple processes with the same name are found
- get_downstream_network(process: Process) List[Tuple[Node | None, Process | None, Process]]
Gets data for drawing a network downstream from process
- static get_node_name(node: Node) str
Get the relative path of a node file with its alias if it exists
This returns the relative path (which is the same as the file name) unless the job that created the node has an alias in which case it returns the file path with the alias instead of <jobtype>/jobxxx/
- get_pipeline_edges() Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]
Find the edges that track connections between processes
- Returns:
([input edges], [output edges]) input edges is a list of tuples (input file, process name) output edges is a list of tuples (process name, output file) Note that the order in the tuples is different in the input and output edges! This is done to match the order of the pipeline STAR file columns.
- Return type:
- get_project_procs_list(terminal_job: str, reverse: bool = True) List[Process]
make a list of all the process objects for a job and its parents
- Parameters:
- Returns:
The
Process
objects for the terminal job and its parents, in reverse order by job number- Return type:
- Raises:
ValueError – If the terminal job was not found
- get_upstream_network(process: Process) List[Tuple[Node | None, Process | None, Process]]
Gets data for drawing a network upstream from process
- get_whole_project_network() List[Tuple[Node | None, Process | None, Process]]
Get the edges and nodes for the entire project
- Returns:
Edges [nod type, parent_job, child_job, extra info] set: The names of all nodes
- Return type:
- property name
The name of the pipeline, usually “default”.
_pipeline.star is added to the name to generate the pipeline file name.
- property pipeline_dir
The directory containing the pipeline file.
- property read_only
If this object should be able to make changes to the pipeline.
- set_job_alias(process: Process | None, new_alias: str | None) None
Set a job’s alias
Sets the alias in the pipeline and creates the alias symlink that points to the job directory.
- Parameters:
- Raises:
ValueError – If process is None
ValueError – If the
Process
is not foundValueError – If the new alias is None, which is not allowed
ValueError – If the new alias is shorter than 2 characters
ValueError – If the new alias begins with “job”, which would cause problems
ValueError – If the new alias is not unique
- property star_file
The name of the pipeline STAR file, usually “default_pipeline.star”.
- touch_temp_node_file(node: Node, touch_if_not_exists: bool) None
Create a placeholder file for a node that will be created later
- touch_temp_node_files(process: Process) None
Create placeholder files for all nodes in a
Process
- Parameters:
process (
Process
) – The Process to create the files for
- undelete_job(del_job: str) None
Get job out of the trash and restore it to the pipeline
Also restores any alias the job may have had as long as it does not conflict wit the current aliases
- Parameters:
del_job (str) – The name of the deleted job
- update_lock_message(lock_message: str) None
Updates the contents of the lockfile for the pipeline
This enables the user to see which process has locked the pipeline
- update_status(the_proc: Process, new_status: str) None
Change the status of a job
The job can be marked as “Succeeded”, “Failed”, “Aborted”, “Running” or “Scheduled”
- Parameters:
- Returns:
Was the status changed?
- Return type:
- Raises:
ValueError – If the new_status is not in the approved list
ValueError – If a job with any other status than ‘Running’ is marked ‘Aborted’
ValueError – If a job’s updated status is the same as it’s current status
Metadata Tools
- pipeliner.metadata_tools.add_md_to_summary_data(filename: str, terminal_job: str, njobs: int)
Add a metadata report to the summary file :param filename: The name of the report file :type filename: str :param terminal_job: The job the report was launched from :type terminal_job: str :param njobs: The number of jobs contained in the report :type njobs: int
- pipeliner.metadata_tools.format_for_metadata(joboption: JobOption) str | float | int | bool | None
Format data from relion starfiles for JSON.
Changes ‘Yes’/’No” to True/False, None for blank vals and removes any quotation marks
- pipeliner.metadata_tools.get_job_metadata(this_job: Process) dict
Run a job’s metadata gathering method
- Parameters:
this_job (
pipeliner.process.Process
) – The Process object for the job to gather metadata from- Returns:
Metadata dict for the job Returns
None
if the job has no metadata gathering function- Return type:
- pipeliner.metadata_tools.get_metadata_chain(pipeline, this_job: Process, full: bool = False) Tuple[dict, int]
Get the metadata for a job and all its upstream jobs
Metadata is gathered by each individual job’s gather_metadata function
- Parameters:
pipeline (
ProjectGraph
) –this_job (
Process
) – The Process object for the terminal jobfull (bool) – Should the metadata report also contain information about continuations and multiple runs of the jobs or just the current one
- Returns:
(the metadata dict, number of jobs in it)
- Return type: