Pipeline Tools

Nodes and Processes

class pipeliner.data_structure.Node(name: str, n_type: str)

Bases: object

Nodes store info about input and output files of jobs that have been run

name

The name of the file the node represents

Type

str

type

The node type

Type

str

kwds

Keywords associated with the node

Type

list

ext

The node file’s extension

Type

str

output_from_process

The Process object for process that created the file

Type

Process

input_for_processes_list

A list of Process objects for processes that use the node as an input

Type

list

clear() None

Clear the input and output processes from a Node

class pipeliner.data_structure.Process(name: str, p_type: str, status: str, alias: Optional[str] = None)

Bases: object

A Process represents a job that has been run by the Pipeliner

name

The name of the process. It should be in the format “<jobtype>/jobxxx/”. The trailing slash is required

Type

str

alias

An alternate name for the process to make it easier to identify

Type

str

outdir

The directory the process was written into

Type

str

p_type

The process’ type

Type

str

status

The processes’ status ‘running, scheduled, successful, failed, or aborted’

Type

str

input_nodes

Node objects for files the process use as inputs

Type

list

output_nodes

Node objects for files the process produces

Type

list

clear()

ProjectGraph

class pipeliner.project_graph.ProjectGraph(name: str = 'default', do_read_only: bool = False)

Bases: object

The main ProjectGraph object is used for manipulating the pipeline

name

The name of the pipeline. _pipeline.star is added to the name to generate the pipeline file name

Type

str

node_list

A Node object for every file that is an input or output for a job in the project

Type

list

process_list

A Process object for each job in the project

Type

list

job_counter

The number of the next job in the project IE: If there are 10 jobs in a project job_counter is 11

Type

int

do_read_only

If this current instatiation of the object should be able to make changes to the pipeline

Type

bool

add_job(job: pipeliner.pipeliner_job.PipelinerJob, as_status: str, do_overwrite: bool) pipeliner.data_structure.Process

Add a job to the pipeline

Adds the Process for the job, a Node for each of its input and output files, and writes a mini-pipeline containing just that job

Parameters
  • job (PipelinerJob) – The job to add.

  • as_status (str) – The status of the job in the pipeline

  • do_overwrite (bool) – If the job already exists, should it be overwritten?

Returns

The Process for the new job

Return type

Process

add_new_input_edge(node: pipeliner.data_structure.Node, process: pipeliner.data_structure.Process)

Add a Node to a Process as in input

Parameters
  • node (Node) – The node to add

  • process (Process) – The Process to add the Node to

add_new_output_edge(process: pipeliner.data_structure.Process, node: pipeliner.data_structure.Node, mini: bool = False)

Add a Node to a Process as in output

Parameters
  • node (Node) – The node to add

  • process (Process) – The Process to add the Node to

add_new_process(process: pipeliner.data_structure.Process, do_overwrite: bool) Optional[pipeliner.data_structure.Process]

Add a Process to the pipeline

Parameters
  • process (Process) – The Process to add

  • do_overwrite (bool) – If the process already exists should it be overwritten?

Returns

The Process that was added or the existing Process if it already existed

Return type

(Process)

Raises

RuntimeError – If the Process already exists and overwrite is False

add_node(node: pipeliner.data_structure.Node, touch_if_not_exists: bool = False) Optional[pipeliner.data_structure.Node]

Add a Node to the pipeline

A node is only added if it doesn’t already exist in the pipeline, so if a node is used as an input the keywords from the process that wrote the node will overrule any added in the node’s definition from the process that used it as input

Parameters
  • node (Node) – The node to add

  • touch_if_not_exists (bool) – If the file for the node does not exist should it be created?

Returns

The Node that was added. If this node already existed, returns the existing copy

Return type

pipeline.data_structure.Node

Raises

RuntimeError – If the node name is empty

check_lock(lock_expected: bool, wait_time: int = 2)

Checks to see if a lock file exists for the pipeline

Parameters
  • lock_expected (bool) – Is a lock file expected to exist?

  • wait_time (int) – The number of minutes to continue trying whilst waiting for a lock file to appear/disappear based of if one is expected

Raises
  • RuntimeError – If no lock is expected, but a lock file is still found after waiting <wait_time> minutes for it to disappear

  • RuntimeError – If a lock file is expected but not found after waiting <wait_time> minutes for it to appear

check_process_completion() bool

Check to see if a any processes have finished running, update their status

Returns

True if any statuses have been updated, else False

Return type

bool

clean_up_job(process: pipeliner.data_structure.Process, do_harsh: bool) bool

Cleans up a job by deleting intermediate files

Gets a list of files to delete from the specific job’s cleanup function. First checks that none of the files that are slated for deletion are on the Node list or are of a few specific types that RELION needs. Then moves all the intermediate files to the trash

There are two tiers of clean up ‘harsh’ and normal. Each job defines what files are cleaned up by each cleanup type

Parameters
  • process (Process) – The Process of the job to clean up

  • do_harsh (bool) – Should harsh cleaning be performed?

Returns

True if cleaning was performed False if the specified job had no clean up method or it was protected from harsh cleaning

Return type

bool

cleanup_all_jobs(do_harsh: bool) int

Clean up all jobs in the project

Parameters

do_harsh (bool) – Should harsh cleaning be performed?

Returns

The number of jobs that were successfully cleaned

Return type

int

clear()

Clear the node and process lists and set the job counter to 1

create_lock(wait_time: int = 2)

Lock the pipeline

Creates a directory called .relion_lock and the file .relion_lock/lock_<pipeline_name>_pipeline.star. The pipeline cannot be edited when this file is present.

Parameters

wait_time (int) – The number of minutes to continue trying to make the lock directory/file if a problem is encountered.

Raises
  • RuntimeError – If a permission error is encountered trying to create or read the .relion_lock directory

  • RuntimeError – If the lock file has not appeared after <wait_time> minutes

create_process_display_objs(proc)

Create the ResultsDisplay objects for a process and save them

Parameters

Proc (Process) – The process to operate on

Returns

The DisplayObjects for that process

Return type

list

delete_job(this_job: pipeliner.data_structure.Process)

Remove a job from the pipeline

:param this_job (Process: ): The job to remove

delete_node(node: pipeliner.data_structure.Node)

Remove a node from the pipeline

Also removes any edges that contain this node

Parameters

node (Node) – The node to remove

delete_temp_node_file(node: pipeliner.data_structure.Node) bool

Remove files associated with a Node

Also removes the directory if it is empty

Parameters

node (Node) – The node to remove the file for

Returns

True if files were deleted, Otherwise False

Return type

bool

delete_temp_node_files(process: pipeliner.data_structure.Process) bool

Delete all the files for the nodes in a specific Process

Parameters

process (Process) – The Process to create the files for

Returns

True if files were removed, otherwise False

Return type

bool

export_all_scheduled_jobs(mydir: str) bool

Exports all scheduled jobs in the pipeline

This function is part of the RELION import/export system which is not used by the pipeliner and will probably be removed

Parameters

mydir (str) – The name of the export directory to be created. It will be written as ExportJobs/<mydir>

Returns

Were any jobs exported?

Return type

bool

find_immediate_child_processes(process: pipeliner.data_structure.Process) list

Find just the immediate child processes of a process

Parameters

process (Process) – The process to find children for

Returns

The Process object for each job connected to the input Process

Return type

list

find_node(name: str) Optional[pipeliner.data_structure.Node]

Retrieve the Node object for a file

Parameters

name (str) – The name of the file to get the node for

Returns

The files’s Node object. None if the file is not found.

Return type

Node

find_process(name_or_alias: str) Optional[pipeliner.data_structure.Process]

Retrieve the Process object for a job in the pipeline

Parameters

name_or_alias (str) – The job name or its alias

Returns

The job’s Process

Return type

pipeliner.data_structre.Process

Raises

RuntimeError – If multiple processes with the same name are found

get_downstream_network(process: pipeliner.data_structure.Process) List[Tuple[Optional[pipeliner.data_structure.Node], Optional[pipeliner.data_structure.Process], pipeliner.data_structure.Process]]

Gets data for drawing a network downstream from process

Parameters

process (Process) – The process to trace

Returns

Contains tuple for each edge (node, parent process, chile process) [Node, Process, Process]. Each edge describes one file in the network

Return type

list

get_node_name(node: pipeliner.data_structure.Node) str

Get the relative path of a node file with its alias if it exists

This returns the relative path (which is the same as the file name) unless the job that created the node has an alias in which case it returns the file path with the alias instead of <jobtype>/jobxxx/

Parameters

node (Node) – The node to get the name for

Returns

The relative path of the file to node points to with the job’s alias if applicable

Return type

str

get_output_nodes_from_starfile(process: pipeliner.data_structure.Process)

Get nodes from an exported job

Reads a RELION_OUTPUT_NODES.star file created when a job is exported and adds these nodes to a Process

This function is RELION-specific and will probably be deprecated

Parameters

process (Process) – The process to add the nodes to. It will look in the process’s directory for the RELION_OUTPUT_NODES.star file

get_pipeline_edges(delete_nodes: Optional[List[pipeliner.data_structure.Node]] = None) Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]

Find the connections between jobs

Get the connections between jobs and nodes for the pipeline if any nodes have been deleted their corresponding edges need to be removed

Parameters

delete_nodes (list) – List of Node objects to be deleted from the pipeline

Returns

([input edges], [output edges]) input edges is a list of tuples (process name, input file) output edges is a list of tuples (process name, output file)

Return type

tuple

get_pipeline_filename() str

Get the name of the pipeline file

Returns

The name of the pipeline file usually ‘default_pipeline.star’

Return type

str

get_process_results_display(proc, forceupdate=False)

Get the ResultsDisplay objects for a process

Atttempts to be as effecient as possible, uses already existing files if they are found

Parameters

forceupdate (bool) – Force an update even if it thinks one is not necessary

get_upstream_network(process: pipeliner.data_structure.Process) List[Tuple[Optional[pipeliner.data_structure.Node], Optional[pipeliner.data_structure.Process], pipeliner.data_structure.Process]]

Gets data for drawing a network upstream from process

Parameters

process (Process) – The process to trace

Returns

Contains tuple for each edge (node, parent process, child process) [Node, Process, Process]. Each edge describes one file in the network

Return type

list

get_whole_project_network() List[Tuple[Optional[pipeliner.data_structure.Node], Optional[pipeliner.data_structure.Process], pipeliner.data_structure.Process]]

Get the edges and nodes for the entire project

Returns

Edges [nod type, parent_job, child_job, extra info] set: The names of all nodes

Return type

list

import_jobs(fn_export: str)

Import a previously exported job

This function is part of the RELION import/export system which is not used by the pipeliner and will probably be removed

Parameters

fn_export (str) – The name of the file created by an export job

is_empty() bool

Tests of the pipeline is empty

Returns

True if there are no jobs in the pipeline

Return type

bool

prepare_archive(process: pipeliner.data_structure.Process, do_full: bool = False, tar: bool = True) str

Create a archive for a job

There are two levels of archive:

  • A full archive copies the full job directories for the terminal job and all of its parents

  • A simple archive just recreates the directory structure, saves the parameter files for each job, and writes a script to re-run the project

Parameters
  • process (Process) – The Process object for the terminal job in the project

  • do_full (bool) – Should a full archive be created?

  • tar (bool) – Should the archive be compressed after creation?

Returns

A completion message

It says the archive was created successfully and gives the name of the file created or describes any errors encountered

Return type

str

read(do_lock: bool = False, lock_wait: int = 2) pipeliner.jobstar_reader.StarfileCheck

Read the pipeline

Parameters
  • do_lock (bool) – Should the pipeline be locked upon reading

  • lock_wait (int) – If the pipeline is unable to be locked continue trying for this many minutes

Returns

For the pipeline file

Return type

pipeliner.jobstar_reader.StarfileCheck

Raises

AttributeError – If the pipeline file is not found

remake_node_directory()

Erase and rewrite RELION’s .Nodes directory

remove_lock()

Remove the lock file for the pipeline

replace_files_for_import_export_of_sched_jobs(fn_in_dir: str, fn_out_dir: str, find_pattern: str, replace_pattern: str)

Updates the content of files in jobs that are to be exported

This function is part of the RELION import/export system which is not used by the pipeliner and will probably be removed

Parameters
  • fn_in_dir (str) – The input directory

  • fn_out_dir (str) – The output directory

  • find_pattern (str) – The text to replace

  • replace_pattern (str) – The text to replace it with

set_job_alias(process: pipeliner.data_structure.Process, new_alias: Optional[str]) bool

Set a job’s alias

Sets the alias in the pipeline and creates the ailas directory, which is a symlink to the original directory

Parameters
  • process (Process) – The Process to make an alias for

  • new_alias (str) – The new alias for the job

Returns

True if the new alias was successfully set

Return type

bool

Raises
  • ValueError – If the Process is not found

  • ValueError – If the new alias is ‘None’, which is not allowed

  • ValueError – If the new alias is shorter than 2 characters

  • ValueError – If the new alias begins with job, which would cause problems

  • ValueError – If the new alias is not unique

set_name(name: str, new_lock: bool = True, overwrite: bool = False)

Change the name of the pipeline file

Unlocks the old pipeline when the new one is created. This should really not be used as there is no reason to be changing the name of the pipeline

Parameters
  • name (str) – The new name

  • new_lock (bool) – Should the new pipeline be locked?

  • overwrite (bool) – Should the old pipeline be removed?

Raises

ValueError – If an attempt is made to change the name to on that already exists

touch_temp_node_file(node: pipeliner.data_structure.Node, touch_if_not_exists: bool) bool

Create a placeholder file for a node that will be created later

Parameters
  • node (Node) – The node to create the file for

  • touch_if_not_exists (bool) – Should the file be created if it does not already exist

Returns

True if a file was created or it already existed False if no file was created.

Return type

bool

touch_temp_node_files(process: pipeliner.data_structure.Process) bool

Create placeholde files for all nodes in a Process

Parameters

process (Process) – The Process to create the files for

Returns

True if files were created, otherwise False

Return type

bool

undelete_job(del_job: str)

Get job out of the trash and restore it to the pipeline

Also restores any alias the job may have had as long as it does not confilct wit the current aliases

Parameters

del_job (str) – The name of the deleted job

update_lock_message(lock_message: str)

Updates the contents of the lockfile for the pipeline

This enables the user to see which process has locked the pipeline

update_status(the_proc: pipeliner.data_structure.Process, new_status: str)

Mark a job finished

The job can be marked as “Succeeded”, “Failed”, or “Aborted”, “Running” or “Scheduled”

Parameters
  • the_proc (Process) – The Process to update the status for

  • new_status (str) – The new status for the job

Returns

Was the status changed?

Return type

bool

Raises
  • ValueError – If the new_status is not in the approved list

  • ValueError – If a job with any other status than ‘Running’ is marked ‘Aborted’

  • ValueError – If a job’s updated status is the same as it’s current status

write(edges: Optional[Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]] = None, lockfile_expected: bool = True, lock_wait: int = 2)

Write the pipeline file from the ProjectGraph object

If the ‘ProjectGraph’ is locked it will be unlocked after writing is finished

Parameters
  • edges (tuple) –

    The pipeline edges as

    [0] A list of tuples (process name, input file)

    [1] A list of tuples (process name, output file)

    If this is left as None the edges of the current pipeline are used

  • lockfile_expected (bool) – Does the pipeline expect to be locked?

  • lock_wait (int) – If the locking status of the pipeline is not as expected wait this many minutes for it to resolve

pipeliner.project_graph.update_jobinfo_file(current_proc: pipeliner.data_structure.Process, action: str, comment: Optional[str] = None, command_list: Optional[list] = None)

Update the file in the jobdir that stores info about the job

Parameters
  • current_proc (Process) – Process object for the job being operated on

  • action (str) – what action was performed on the job IE: Ran, Scheduled, Cleaned up

  • comment (str) – Comment to append to the job’s comments list

  • command_list (list) – Commands that were run. Generally None if action was any other than Run or Schedule

Metadata Tools

pipeliner.metadata_tools.format_for_metadata(joboption: pipeliner.job_options.JobOption) Optional[Union[str, float, int, bool]]

Format data from relion starfiles for JSON.

Changes ‘Yes’/’No” to True/False, None for blank vals and removes any quotation marks

Parameters

joboption (pipeliner.job_options.JobOption) – The joboption to format

Returns

A properly formatted string, float or

int or bool

Return type

str

Raises

ValueError – If a boolean job option doesn’t have a value compatible with a class:bool”

pipeliner.metadata_tools.get_job_metadata(this_job: pipeliner.data_structure.Process) dict

Run a job’s metadata gathering method

Parameters

this_job (pipeliner.data_structure.Process) – The Process object for the job to gather metadata from

Returns

Metadata dict for the job Returns None if the job has no metadata gathering function

Return type

dict

pipeliner.metadata_tools.get_metadata_chain(pipeline, this_job: pipeliner.data_structure.Process, output_file: Optional[str] = None, full: bool = False) dict

Get the metadata for a job and all its upstream jobs

Metadata is gathered by each individual job’s gather_metadata function

Parameters
  • pipeline (ProjectGraph) – The current pipeline

  • this_job (Process) – The Process object for the terminal job

  • output_file (str) – The name of a file to write the results to; if None no file will be written

  • full (bool) – Should the metadata report also contain information about continuations and multiple runs of the the jobs or just the current one

Returns

The metadata dictionary

Return type

dict

pipeliner.metadata_tools.get_reference_list(pipeline, terminal_job: pipeliner.data_structure.Process) dict

Prepares a list of every piece of software used to generate a termianl job

Parameters
  • pipeline (ProjectGraph) – The current pipeline

  • terminal_job (Process) – The Process object for the terminal job

Returns

The reference list{job_name: ([programs, used], [references])}

Return type

dict

pipeliner.metadata_tools.make_default_results_schema(job_type: str) dict

Write the json schema for the results of a job if one was not provided

The metadata schema have two parts: the running parameters part is generated automatically and the results part is written by the user. If no schema was provided for the results this function is used to write a blank plackholder one

Parameters

job_type (str) – The job type to write the schema for

Returns

The json schema ready to be dumped to json file

Return type

dict

pipeliner.metadata_tools.make_job_parameters_schema(job_type: str) dict

Write the json schema for the running parameters of a job

The metadata schema have two parts: the running parameters part is generated automatically and the results part is written by the user

Parameters

job_type (str) – The job type to write the schema for

Returns

The json schema ready to be dumped to json file

Return type

dict

Pipeline Visualisation

Tools for visualising projects. An installation of pygraphviz, which is dependent on GraphVis is necessary. See the README for info about installation of GraphViz

class pipeliner.flowchart_illustration.ProcessFlowchart(pipeline: Optional[pipeliner.project_graph.ProjectGraph] = None, process: Optional[pipeliner.data_structure.Process] = None, do_upstream: bool = False, do_downstream: bool = False, do_full: bool = False, save: bool = False, show: bool = False)

Bases: object

A class for drawing pretty flowcharts for pipeliner projects

pipeline

The pipeline to use in drawing the flowchart

Type

ProjectGraph

process

The process to start drawing from for upstream and/or downstream flowcharts

Type

Process

do_upstream

Should an upstream flowchart be drawn?

Type

bool

do_downstream

Should a downstream flowchart be drawn?

Type

bool

do_full

Should a full project flowchart be drawn?

Type

bool

save

Should the flowchart be saved as a .png file?

Type

bool

show

Should an interactive flowchart be displayed on screen?

Type

bool

drew_up

The name of the upstream flowchart saved, if any

Type

str

drew_down

The name of the downstream flowchart saved, if any

Type

str

drew_full

The name of the full flowchart saved, if any

Type

str

downstream_process_graph() None

Make a flowchart for a job and all downstream processes

format_edges_list(il: List[Tuple[Optional[pipeliner.data_structure.Node], Optional[pipeliner.data_structure.Process], pipeliner.data_structure.Process]]) List[Tuple[str, str, str, Optional[int]]]

Convert the output of network finding functions to format for graph drawing

(node, parent, child) to (Node name, parent name, child name, count) formats: (Node, Process, Process) to (str, str, str, int)

Parameters
  • il (list) – list of tuples in the format

  • [node

  • parent

  • child]

  • [Node

  • Process

  • Process]

Returns

list of tuples [Node name, parent name, child name, count],

[str, str, str, int]

Return type

list

full_process_graph() None

Make a flowchart for an entire project

make_process_flowchart(edges_list: list, procname: Optional[str], ntype: str) Optional[str]

Draw a flowchart and save and/or display it

Parameters
  • edges_list (list) – A list of pipeliner.project_graph.EDGE objects

  • procname (str) – The name of the process to draw a flowchart for

  • ntype (str) – The type of flowchart being drawn; “upstream” or “downstream”

Returns

The name of the file created if one was saved otherwise None

Return type

str

upstream_process_graph() None

Make a flowchart for a job and all upstream processes