Pipeline Tools
Nodes and Processes
- class pipeliner.data_structure.Node(name: str, n_type: str)
Bases:
object
Nodes store info about input and output files of jobs that have been run
- class pipeliner.data_structure.Process(name: str, p_type: str, status: str, alias: Optional[str] = None)
Bases:
object
A Process represents a job that has been run by the Pipeliner
- name
The name of the process. It should be in the format “<jobtype>/jobxxx/”. The trailing slash is required
- Type
- clear()
ProjectGraph
- class pipeliner.project_graph.ProjectGraph(name: str = 'default', do_read_only: bool = False)
Bases:
object
The main ProjectGraph object is used for manipulating the pipeline
- name
The name of the pipeline. _pipeline.star is added to the name to generate the pipeline file name
- Type
- job_counter
The number of the next job in the project IE: If there are 10 jobs in a project job_counter is 11
- Type
- do_read_only
If this current instatiation of the object should be able to make changes to the pipeline
- Type
- add_job(job: pipeliner.pipeliner_job.PipelinerJob, as_status: str, do_overwrite: bool) pipeliner.data_structure.Process
Add a job to the pipeline
Adds the
Process
for the job, aNode
for each of its input and output files, and writes a mini-pipeline containing just that job- Parameters
job (
PipelinerJob
) – The job to add.as_status (str) – The status of the job in the pipeline
do_overwrite (bool) – If the job already exists, should it be overwritten?
- Returns
The Process for the new job
- Return type
- add_new_input_edge(node: pipeliner.data_structure.Node, process: pipeliner.data_structure.Process)
- add_new_output_edge(process: pipeliner.data_structure.Process, node: pipeliner.data_structure.Node, mini: bool = False)
- add_new_process(process: pipeliner.data_structure.Process, do_overwrite: bool) Optional[pipeliner.data_structure.Process]
Add a
Process
to the pipeline- Parameters
- Returns
The Process that was added or the existing Process if it already existed
- Return type
(
Process
)- Raises
RuntimeError – If the Process already exists and overwrite is
False
- add_node(node: pipeliner.data_structure.Node, touch_if_not_exists: bool = False) Optional[pipeliner.data_structure.Node]
Add a
Node
to the pipelineA node is only added if it doesn’t already exist in the pipeline, so if a node is used as an input the keywords from the process that wrote the node will overrule any added in the node’s definition from the process that used it as input
- Parameters
- Returns
The Node that was added. If this node already existed, returns the existing copy
- Return type
pipeline.data_structure.Node
- Raises
RuntimeError – If the node name is empty
- check_lock(lock_expected: bool, wait_time: int = 2)
Checks to see if a lock file exists for the pipeline
- Parameters
- Raises
RuntimeError – If no lock is expected, but a lock file is still found after waiting <wait_time> minutes for it to disappear
RuntimeError – If a lock file is expected but not found after waiting <wait_time> minutes for it to appear
- check_process_completion() bool
Check to see if a any processes have finished running, update their status
- Returns
True
if any statuses have been updated, elseFalse
- Return type
- clean_up_job(process: pipeliner.data_structure.Process, do_harsh: bool) bool
Cleans up a job by deleting intermediate files
Gets a list of files to delete from the specific job’s cleanup function. First checks that none of the files that are slated for deletion are on the Node list or are of a few specific types that RELION needs. Then moves all the intermediate files to the trash
There are two tiers of clean up ‘harsh’ and normal. Each job defines what files are cleaned up by each cleanup type
- clear()
Clear the node and process lists and set the job counter to 1
- create_lock(wait_time: int = 2)
Lock the pipeline
Creates a directory called .relion_lock and the file .relion_lock/lock_<pipeline_name>_pipeline.star. The pipeline cannot be edited when this file is present.
- Parameters
wait_time (int) – The number of minutes to continue trying to make the lock directory/file if a problem is encountered.
- Raises
RuntimeError – If a permission error is encountered trying to create or read the .relion_lock directory
RuntimeError – If the lock file has not appeared after <wait_time> minutes
- create_process_display_objs(proc)
Create the ResultsDisplay objects for a process and save them
- delete_job(this_job: pipeliner.data_structure.Process)
Remove a job from the pipeline
:param this_job (
Process
: ): The job to remove
- delete_node(node: pipeliner.data_structure.Node)
Remove a node from the pipeline
Also removes any edges that contain this node
- Parameters
node (
Node
) – The node to remove
- delete_temp_node_file(node: pipeliner.data_structure.Node) bool
Remove files associated with a
Node
Also removes the directory if it is empty
- delete_temp_node_files(process: pipeliner.data_structure.Process) bool
Delete all the files for the nodes in a specific
Process
- export_all_scheduled_jobs(mydir: str) bool
Exports all scheduled jobs in the pipeline
This function is part of the RELION import/export system which is not used by the pipeliner and will probably be removed
- find_immediate_child_processes(process: pipeliner.data_structure.Process) list
Find just the immediate child processes of a process
- find_node(name: str) Optional[pipeliner.data_structure.Node]
Retrieve the
Node
object for a file
- find_process(name_or_alias: str) Optional[pipeliner.data_structure.Process]
Retrieve the
Process
object for a job in the pipeline- Parameters
name_or_alias (str) – The job name or its alias
- Returns
The job’s Process
- Return type
pipeliner.data_structre.Process
- Raises
RuntimeError – If multiple processes with the same name are found
- get_downstream_network(process: pipeliner.data_structure.Process) List[Tuple[Optional[pipeliner.data_structure.Node], Optional[pipeliner.data_structure.Process], pipeliner.data_structure.Process]]
Gets data for drawing a network downstream from process
- get_node_name(node: pipeliner.data_structure.Node) str
Get the relative path of a node file with its alias if it exists
This returns the relative path (which is the same as the file name) unless the job that created the node has an alias in which case it returns the file path with the alias instead of <jobtype>/jobxxx/
- get_output_nodes_from_starfile(process: pipeliner.data_structure.Process)
Get nodes from an exported job
Reads a RELION_OUTPUT_NODES.star file created when a job is exported and adds these nodes to a Process
This function is RELION-specific and will probably be deprecated
- Parameters
process (
Process
) – The process to add the nodes to. It will look in the process’s directory for the RELION_OUTPUT_NODES.star file
- get_pipeline_edges(delete_nodes: Optional[List[pipeliner.data_structure.Node]] = None) Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]
Find the connections between jobs
Get the connections between jobs and nodes for the pipeline if any nodes have been deleted their corresponding edges need to be removed
- get_pipeline_filename() str
Get the name of the pipeline file
- Returns
The name of the pipeline file usually ‘default_pipeline.star’
- Return type
- get_process_results_display(proc, forceupdate=False)
Get the ResultsDisplay objects for a process
Atttempts to be as effecient as possible, uses already existing files if they are found
- Parameters
forceupdate (bool) – Force an update even if it thinks one is not necessary
- get_upstream_network(process: pipeliner.data_structure.Process) List[Tuple[Optional[pipeliner.data_structure.Node], Optional[pipeliner.data_structure.Process], pipeliner.data_structure.Process]]
Gets data for drawing a network upstream from process
- get_whole_project_network() List[Tuple[Optional[pipeliner.data_structure.Node], Optional[pipeliner.data_structure.Process], pipeliner.data_structure.Process]]
Get the edges and nodes for the entire project
- Returns
Edges [nod type, parent_job, child_job, extra info] set: The names of all nodes
- Return type
- import_jobs(fn_export: str)
Import a previously exported job
This function is part of the RELION import/export system which is not used by the pipeliner and will probably be removed
- Parameters
fn_export (str) – The name of the file created by an export job
- is_empty() bool
Tests of the pipeline is empty
- Returns
True
if there are no jobs in the pipeline- Return type
- prepare_archive(process: pipeliner.data_structure.Process, do_full: bool = False, tar: bool = True) str
Create a archive for a job
There are two levels of archive:
A full archive copies the full job directories for the terminal job and all of its parents
A simple archive just recreates the directory structure, saves the parameter files for each job, and writes a script to re-run the project
- Parameters
- Returns
A completion message
It says the archive was created successfully and gives the name of the file created or describes any errors encountered
- Return type
- read(do_lock: bool = False, lock_wait: int = 2) pipeliner.jobstar_reader.StarfileCheck
Read the pipeline
- Parameters
- Returns
For the pipeline file
- Return type
- Raises
AttributeError – If the pipeline file is not found
- remake_node_directory()
Erase and rewrite RELION’s .Nodes directory
- remove_lock()
Remove the lock file for the pipeline
- replace_files_for_import_export_of_sched_jobs(fn_in_dir: str, fn_out_dir: str, find_pattern: str, replace_pattern: str)
Updates the content of files in jobs that are to be exported
This function is part of the RELION import/export system which is not used by the pipeliner and will probably be removed
- set_job_alias(process: pipeliner.data_structure.Process, new_alias: Optional[str]) bool
Set a job’s alias
Sets the alias in the pipeline and creates the ailas directory, which is a symlink to the original directory
- Parameters
- Returns
True
if the new alias was successfully set- Return type
- Raises
ValueError – If the
Process
is not foundValueError – If the new alias is ‘None’, which is not allowed
ValueError – If the new alias is shorter than 2 characters
ValueError – If the new alias begins with job, which would cause problems
ValueError – If the new alias is not unique
- set_name(name: str, new_lock: bool = True, overwrite: bool = False)
Change the name of the pipeline file
Unlocks the old pipeline when the new one is created. This should really not be used as there is no reason to be changing the name of the pipeline
- Parameters
- Raises
ValueError – If an attempt is made to change the name to on that already exists
- touch_temp_node_file(node: pipeliner.data_structure.Node, touch_if_not_exists: bool) bool
Create a placeholder file for a node that will be created later
- touch_temp_node_files(process: pipeliner.data_structure.Process) bool
Create placeholde files for all nodes in a
Process
- undelete_job(del_job: str)
Get job out of the trash and restore it to the pipeline
Also restores any alias the job may have had as long as it does not confilct wit the current aliases
- Parameters
del_job (str) – The name of the deleted job
- update_lock_message(lock_message: str)
Updates the contents of the lockfile for the pipeline
This enables the user to see which process has locked the pipeline
- update_status(the_proc: pipeliner.data_structure.Process, new_status: str)
Mark a job finished
The job can be marked as “Succeeded”, “Failed”, or “Aborted”, “Running” or “Scheduled”
- Parameters
- Returns
Was the status changed?
- Return type
- Raises
ValueError – If the new_status is not in the approved list
ValueError – If a job with any other status than ‘Running’ is marked ‘Aborted’
ValueError – If a job’s updated status is the same as it’s current status
- write(edges: Optional[Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]] = None, lockfile_expected: bool = True, lock_wait: int = 2)
Write the pipeline file from the ProjectGraph object
If the ‘ProjectGraph’ is locked it will be unlocked after writing is finished
- Parameters
edges (tuple) –
The pipeline edges as
[0] A list of tuples (process name, input file)
[1] A list of tuples (process name, output file)
If this is left as
None
the edges of the current pipeline are usedlockfile_expected (bool) – Does the pipeline expect to be locked?
lock_wait (int) – If the locking status of the pipeline is not as expected wait this many minutes for it to resolve
- pipeliner.project_graph.update_jobinfo_file(current_proc: pipeliner.data_structure.Process, action: str, comment: Optional[str] = None, command_list: Optional[list] = None)
Update the file in the jobdir that stores info about the job
- Parameters
current_proc (
Process
) – Process object for the job being operated onaction (str) – what action was performed on the job IE: Ran, Scheduled, Cleaned up
comment (str) – Comment to append to the job’s comments list
command_list (list) – Commands that were run. Generally None if action was any other than Run or Schedule
Metadata Tools
- pipeliner.metadata_tools.format_for_metadata(joboption: pipeliner.job_options.JobOption) Optional[Union[str, float, int, bool]]
Format data from relion starfiles for JSON.
Changes ‘Yes’/’No” to True/False, None for blank vals and removes any quotation marks
- Parameters
joboption (
pipeliner.job_options.JobOption
) – The joboption to format- Returns
- Return type
- Raises
ValueError – If a boolean job option doesn’t have a value compatible with a class:bool”
- pipeliner.metadata_tools.get_job_metadata(this_job: pipeliner.data_structure.Process) dict
Run a job’s metadata gathering method
- Parameters
this_job (
pipeliner.data_structure.Process
) – The Process object for the job to gather metadata from- Returns
Metadata dict for the job Returns
None
if the job has no metadata gathering function- Return type
- pipeliner.metadata_tools.get_metadata_chain(pipeline, this_job: pipeliner.data_structure.Process, output_file: Optional[str] = None, full: bool = False) dict
Get the metadata for a job and all its upstream jobs
Metadata is gathered by each individual job’s gather_metadata function
- Parameters
pipeline (
ProjectGraph
) – The current pipelinethis_job (
Process
) – The Process object for the terminal joboutput_file (str) – The name of a file to write the results to; if
None
no file will be writtenfull (bool) – Should the metadata report also contain information about continuations and multiple runs of the the jobs or just the current one
- Returns
The metadata dictionary
- Return type
- pipeliner.metadata_tools.get_reference_list(pipeline, terminal_job: pipeliner.data_structure.Process) dict
Prepares a list of every piece of software used to generate a termianl job
- Parameters
pipeline (
ProjectGraph
) – The current pipelineterminal_job (
Process
) – The Process object for the terminal job
- Returns
The reference list{job_name: ([programs, used], [references])}
- Return type
- pipeliner.metadata_tools.make_default_results_schema(job_type: str) dict
Write the json schema for the results of a job if one was not provided
The metadata schema have two parts: the running parameters part is generated automatically and the results part is written by the user. If no schema was provided for the results this function is used to write a blank plackholder one
Pipeline Visualisation
Tools for visualising projects. An installation of pygraphviz, which is dependent on GraphVis is necessary. See the README for info about installation of GraphViz
- class pipeliner.flowchart_illustration.ProcessFlowchart(pipeline: Optional[pipeliner.project_graph.ProjectGraph] = None, process: Optional[pipeliner.data_structure.Process] = None, do_upstream: bool = False, do_downstream: bool = False, do_full: bool = False, save: bool = False, show: bool = False)
Bases:
object
A class for drawing pretty flowcharts for pipeliner projects
- pipeline
The pipeline to use in drawing the flowchart
- Type
- format_edges_list(il: List[Tuple[Optional[pipeliner.data_structure.Node], Optional[pipeliner.data_structure.Process], pipeliner.data_structure.Process]]) List[Tuple[str, str, str, Optional[int]]]
Convert the output of network finding functions to format for graph drawing
(node, parent, child) to (Node name, parent name, child name, count) formats: (Node, Process, Process) to (str, str, str, int)