The documentation below was generated by Sphinx, with some light editing to improve readability.
- plato_wp36.connect_db module
- plato_wp36.constants module
- plato_wp36.lc_reader_lcsg module
- plato_wp36.lightcurve module
- plato_wp36.lightcurve_resample module
- plato_wp36.logging_database module
- plato_wp36.quality_control module
- plato_wp36.settings module
- plato_wp36.task_database module
- plato_wp36.task_execution module
- plato_wp36.task_expression_evaluation module
- plato_wp36.task_heartbeat module
- plato_wp36.task_objects module
- plato_wp36.task_queues module
- plato_wp36.task_timer module
- plato_wp36.task_types module
- plato_wp36.temporary_directory module
Module for connecting to MySQL database for storage of results.
class plato_wp36.connect_db.DatabaseConnector(db_engine: Optional[str] = None, db_user: Optional[str] = None, db_passwd: Optional[str] = None, db_host: Optional[str] = None, db_port: Optional[int] = None, db_database: Optional[str] = None)
Factory class for creating connections to SQL databases.
Return a new connection to the database as a DatabaseInterface object.
-
Parameters
connect – Flag indicating whether we immediately open connection to database, or whether the user will subsequently open a database connection explicitly.
-
Returns
Instance of DatabaseInterface
Class defining a unified interface for interacting with SQL databases.
Close connection to the database.
Commit changes to the database.
Open a connection to the SQL database.
Create an empty SQL database.
-
Parameters
initialise_schema – Boolean flag indicating whether we initialise the schema of the database.
Create a gzipped database dump to a file.
-
Parameters
output_filename – The filename for the database dump
Fetch all results.
Fetch a single result.
Fetch the ID of the last inserted row.
Create SQL configuration file with default username and password, which means we can log into database without supplying these when we try to connect to the database in the future.
-
Returns
None
Execute a database query with a single set of input parameters.
Execute a database query with multiple sets of input parameters.
Restore from a database dump
-
Parameters
input_filename – The filename from which to read the database dump
Fetch the path to SQL configuration file with username and password that we use for future connections.
-
Returns
List[str]
Test whether the task database has already been set up.
-
Returns
Boolean indicating whether the database has been set up
class plato_wp36.connect_db.DatabaseInterfaceMySql(db_user: Optional[str] = None, db_passwd: Optional[str] = None, db_host: Optional[str] = None, db_port: Optional[int] = None, db_database: Optional[str] = None, connect: bool = True)
Bases: DatabaseInterface
Class defining an interface for interacting with MySQL databases.
Close connection to the database.
Commit changes to the database.
Open a connection to the SQL database.
Create an empty SQL database.
-
Parameters
initialise_schema – Boolean flag indicating whether we initialise the schema of the database.
Create a gzipped database dump to a file.
-
Parameters
output_filename – The filename for the database dump
Create SQL configuration file with username and password, which means we can log into database without supplying these on the command line.
-
Returns
None
Execute a database query with a single set of input parameters.
Execute a database query with multiple sets of input parameters.
Restore from a database dump
-
Parameters
input_filename – The filename from which to read the database dump
Test whether the task database has already been set up.
-
Returns
Boolean indicating whether the database has been set up
class plato_wp36.connect_db.DatabaseInterfaceSqlite(db_database: Optional[str] = None, connect: bool = True)
Bases: DatabaseInterface
Class defining an interface for interacting with sqlite3 databases.
Close connection to the database.
Commit changes to the database.
Open a connection to the SQL database.
Create an empty SQL database.
-
Parameters
initialise_schema – Boolean flag indicating whether we initialise the schema of the database.
Create a gzipped database dump to a file.
-
Parameters
output_filename – The filename for the database dump
Create SQL configuration file with username and password, which means we can log into database without supplying these on the command line.
-
Returns
None
Execute a database query with a single set of input parameters.
Execute a database query with multiple sets of input parameters.
Restore from a database dump
-
Parameters
input_filename – The filename from which to read the database dump
Test whether the task database has already been set up.
-
Returns
Boolean indicating whether the database has been set up
Some constants which can be used in task descriptions
Read the ASCII lightcurves generated by the lightcurve stitching group, and turn them into Lightcurve objects.
plato_wp36.lc_reader_lcsg.read_lcsg_lightcurve(file_path: str, gzipped: Optional[bool] = None, cut_off_time: Optional[float] = None)
Read a lightcurve from an ASCII data file.
ASCII file should have three columns: time [days] ; flux ; flag
-
Parameters
-
file_path (str) – The full path to the input data file.
-
gzipped (bool) – Boolean flag indicating whether the input datafiles have been gzipped. If this is not set, a guess is made from the filename suffix.
-
cut_off_time (float) – Only read lightcurve up to some cut off time
-
-
Returns
A object.
Classes for representing light curves, either on arbitrary time rasters, or on rasters with fixed step.
A class representing a lightcurve.
Check that this light curve is sampled at a fixed time interval. Return the number of errors.
-
Parameters
-
verbose – Should we output a logging message about every missing time point?
-
max_errors – The maximum number of errors we should show
-
-
Returns
int
Return the duration of the lightcurve [days]
Estimate the time step on which this light curve is sampled, with robustness against missing points [days]
-
Returns
Time step
Return an array of the flags of the lightcurve samples
Return an array of the fluxes of the lightcurve samples
Return the time associated with a particular data point in the lightcurve [days]
Return an array of the times of the lightcurve samples [days]
Return an array of the uncertainties of the lightcurve samples
Write a lightcurve out to a text data file. The time axis is multiplied by a factor 86400 to convert from days into seconds.
-
Parameters
-
target_path – The target file path where we should create a file representing this lightcurve.
-
binary – Boolean specifying whether we store lightcurve on disk in binary format or plain text.
-
gzipped – Boolean specifying whether we gzip plain-text lightcurves.
-
-
Returns
Dict of metadata associated with the written file.
Truncate a lightcurve to only contain data points within a certain time range [days]
-
Parameters
-
minimum_time – The lowest time value for which flux points should be included [days]
-
maximum_time – The highest time value for which flux points should be included [days]
-
-
Returns
A new Lightcurve object.
class plato_wp36.lightcurve.LightcurveArbitraryRaster(times: ndarray, fluxes: ndarray, uncertainties: Optional[ndarray] = None, flags: Optional[ndarray] = None, metadata: Optional[Dict] = None)
Bases: Lightcurve
A class representing a lightcurve which is sampled on an arbitrary raster of times.
Check that this light curve is sampled at a fixed time interval. Return the number of errors.
-
Parameters
-
verbose – Should we output a logging message about every missing time point?
-
max_errors – The maximum number of errors we should show
-
-
Returns
int
Estimate the time step on which this light curve is sampled, with robustness against missing points [days]
-
Returns
Time step
Read a lightcurve from a data file in our lightcurve archive.
-
Parameters
-
file_path – The path to the input data file.
-
file_metadata – A dictionary of metadata associated with the input file.
-
cut_off_time – Only read lightcurve up to some cut-off time.
-
-
Returns
A object.
Return an array of the flags of the lightcurve samples
Return an array of the fluxes of the lightcurve samples
Return the time associated with a particular data point in the lightcurve [days]
Return an array of the times of the lightcurve samples [days]
Return an array of the uncertainties of the lightcurve samples
Convert this lightcurve to a fixed time stride.
-
Parameters
-
verbose – Should we output a logging message about every missing time point?
-
max_errors – The maximum number of errors we should show
-
-
Returns
[LightcurveFixedStep]
Truncate a lightcurve to only contain data points within a certain time range [days]
-
Parameters
-
minimum_time – The lowest time value for which flux points should be included [days]
-
maximum_time – The highest time value for which flux points should be included [days]
-
-
Returns
A new Lightcurve object.
class plato_wp36.lightcurve.LightcurveFixedStep(time_start: float, time_step: float, fluxes: ndarray, uncertainties: Optional[ndarray] = None, flags: Optional[ndarray] = None, metadata: Optional[Dict] = None)
Bases: Lightcurve
A class representing a lightcurve which is sampled on a fixed time step.
Check that this light curve is sampled at a fixed time interval. Return the number of errors.
-
Parameters
-
verbose – Should we output a logging message about every missing time point?
-
max_errors – The maximum number of errors we should show
-
-
Returns
int
Estimate the time step on which this light curve is sampled, with robustness against missing points [days]
-
Returns
Time step
Return an array of the flags of the lightcurve samples
Return an array of the fluxes of the lightcurve samples
Return the time value associated with a particular index in this lightcurve [days]
-
Parameters
index – The index of the time point within the lightcurve
-
Returns
The time value, in days
Return an array of the times of the lightcurve samples [days]
Return an array of the uncertainties of the lightcurve samples
Truncate a lightcurve to only contain data points within a certain time range [days]
-
Parameters
-
minimum_time – The lowest time value for which flux points should be included [days]
-
maximum_time – The highest time value for which flux points should be included [days]
-
-
Returns
A new Lightcurve object.
A class containing utility functions for resampling LCs onto different time rasters
Resample this lightcurve onto the same time raster of another LightcurveArbitraryRaster object.
-
Parameters
-
other – The other LightcurveArbitraryRaster object whose raster we should resample this LightcurveArbitraryRaster onto.
-
resample_flags – Should we bother re-sampling the spectrum’s flags as the data itself? If not, the mask will be cleared, but the function will return 30% quicker.
-
-
Returns
New LightcurveArbitraryRaster object.
Resample this lightcurve onto a user-specified time raster.
-
Parameters
-
output_raster – The raster we should resample this lightcurve onto.
-
resample_flags – Should we bother resampling the lightcurve’s flags as the data itself? If not, the flags will be cleared, but the function will return 30% quicker.
-
-
Returns
New LightcurveArbitraryRaster object.
A custom log handler class, which sends all logs messages to the EAS status database.
Bases: StreamHandler
A custom log handler, which sends all logs messages to the EAS status database.
Record a logging message.
-
Parameters
record – The logging message to record to the database
-
Returns
None
Set the ID of the execution attempt we are currently running, so that this is recorded in any log messages. :param attempt_id:
The ID of the execution attempt.
-
Returns
None
Functions for computing quality control metrics. Many of these are rather arbitrarily defined currently.
plato_wp36.quality_control.transit_detection_quality_control(lc: LightcurveArbitraryRaster, metadata: dict)
Determine whether the metadata returned by a transit-detection algorithm is a successful detection, or a failure. Currently the PLATO success criteria for transit detection are not tightly defined, so for the moment we use an arbitrary statistic - detecting the correct period to within 3%.
-
Parameters
-
lc (LightcurveArbitraryRaster) – The lightcurve object containing the input lightcurve.
-
metadata (Dict) – The metadata dictionary returned by the transit-detection code.
-
-
Returns
Updated metadata dictionary, with QC data added.
Compile the settings to be used for this installation of the EAS pipeline code. We merge default values for each setting below with local overrides which can be placed in the YAML file <configuration/installation_settings.conf>.
Class containing EAS pipeline settings.
Module for reading and writing task objects to the database.
Class for reading and writing task objects to the database.
Close database connection.
Commit changes to the database.
Update the database with the system resources currently assigned to a particular type of worker container.
-
Parameters
container_name – The name of the worker container.
-
Returns
dict
Update the database with the system resources currently assigned to a particular type of worker container.
-
Parameters
-
container_name – The name of the worker container.
-
cpu – The number of CPU cores assigned to each instance of the worker container.
-
gpu – The number of GPU units assigned to each instance of the worker container.
-
memory_gb – The number of GB of RAM assigned to each instance of the worker container.
-
-
Returns
None
Delete a scheduling attempt.
-
Parameters
attempt_id (int) – The execution attempt ID
-
Returns
None
Check for the presence of the given task execution attempt.
-
Parameters
attempt_id (int) – The attempt ID
-
Returns
True if we have a record with this ID, False otherwise
Retrieve a dictionary of all the files generated by a task execution attempt, indexed by their semantic types.
-
Parameters
attempt_id – The ID of the task execution attempt.
-
Returns
Dictionary of
FileProductVersion
, indexed by semantic type string.
Retrieve a TaskExecutionAttempt object representing a task execution attempt in the database
-
Parameters
-
attempt_id – The execution attempt ID
-
embed_task_object – Boolean flag indicating whether the returned TaskExecutionAttempt object should include a descriptor for its parent task.
-
-
Returns
A
TaskExecutionAttempt
instance, or None if not found
execution_attempt_register(task_id: Optional[int] = None, queued_time: Optional[float] = None, metadata: Optional[Dict[str, Any]] = None)
Register an attempt to execute a task in the database
-
Parameters
-
task_id – The integer ID of the task which is being run in the <eas_tasks> table
-
queued_time – The unix timestamp when this execution attempt was put into the job queue
-
-
Returns
Integer ID for this execution attempt
execution_attempt_register_output(execution_attempt: TaskExecutionAttempt, output_name: str, file_path: str, file_metadata: dict, preserve: bool = False)
Register an output file product from a task execution attempt into the task database.
-
Parameters
-
execution_attempt – The describing the task execution which generated this file.
-
output_name – The unique semantic type which identifies this output file from this task.
-
file_path – The path to where a copy of the output file can be found.
-
file_metadata – A dictionary of metadata to associate with this file product.
-
preserve – A boolean indicating whether we should preserve the original file at location <file_path> after importing it into the database, or whether it should be deleted once it is in the database.
-
-
Returns
None
execution_attempt_update(attempt_id: Optional[int] = None, queued_time: Optional[float] = None, start_time: Optional[float] = None, latest_heartbeat_time: Optional[float] = None, end_time: Optional[float] = None, all_products_passed_qc: Optional[bool] = None, error_fail: Optional[bool] = None, error_text: Optional[str] = None, run_time_wall_clock: Optional[float] = None, run_time_cpu: Optional[float] = None, run_time_cpu_inc_children: Optional[float] = None, is_queued: Optional[bool] = None, is_running: Optional[bool] = None, is_finished: Optional[bool] = None, metadata: Optional[Dict[str, Any]] = None)
Update information about a task execution attempt in the database
-
Parameters
-
attempt_id – The integer ID of this task execution attempt.
-
queued_time – The unix timestamp when this execution attempt was put into the job queue
-
start_time – The unix timestamp when this execution attempt began to be executed
-
latest_heartbeat_time – The unix timestamp when this execution attempt last sent a heartbeat message
-
end_time – The unix timestamp when this execution attempt signaled that it had completed
-
all_products_passed_qc – A boolean flag indicating whether all the file products from this execution attempt passed their QC inspection.
-
error_fail – A boolean flag indicating whether this execution attempt signalled that an error occurred
-
error_text – The error message text associated with any failure of this job
-
run_time_wall_clock – The number of seconds the job took to execute, in wall clock time
-
run_time_cpu – The number of seconds the job took to execute, in CPU time
-
run_time_cpu_inc_children – The number of seconds the job took to execute, in CPU time, including child threads
-
is_queued – A boolean flag indicating whether this execution attempt is queued for execution
-
is_running – A boolean flag indicating whether this execution attempt is currently running
-
is_finished – A boolean flag indicating whether this execution attempt is finished
-
metadata – A dictionary of metadata associated with this execution attempt.
-
-
Returns
None
Search for a file product by directory and filename. Return all matches.
-
Parameters
-
directory – The directory that the file product is stored in.
-
filename – The filename of the file product.
-
-
Returns
A list of integer IDs
Delete an intermediate file product.
-
Parameters
product_id (int) – The file ID
-
Returns
None
Check for the presence of the given intermediate file product.
-
Parameters
product_id (int) – The file ID
-
Returns
True if we have a record with this ID, False otherwise
Check for the presence of the given file_id.
-
Parameters
product_id (int) – The file ID
-
Returns
True if we have a file in the file system with this ID, False otherwise
Check for the presence of the given file_id.
-
Parameters
product_id (int) – The file ID
-
Returns
True if we have a file in the file system with this ID, False otherwise
Retrieve a FileProduct object representing a file product in the database
-
Parameters
product_id (int) – The file ID
-
Returns
A
FileProduct
instance, or None if not found
file_product_register(generator_task: int, directory: str, filename: str, semantic_type: str, planned_time: Optional[float] = None, mime_type: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None)
Register a file product in the database, and move it into our file archive
-
Parameters
-
generator_task – The ID integer of the pipeline task which generates this file product.
-
directory – The directory in the file repository to place this file product into
-
filename – The filename of this file product
-
semantic_type – The name used to identify the type of data in this file, e.g. “lightcurve” or “periodogram”. Each file output from any given pipeline step must have a unique semantic type.
-
planned_time – The time when the pipeline scheduler created a database entry for this file product
-
mime_type – The mime type of this file, used so a web interface can serve it if needed
-
metadata – Dictionary of metadata associated with this file product
-
-
Returns
Integer ID for this file product
file_product_update(product_id: int, planned_time: Optional[float] = None, mime_type: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None)
Update information about a file product in the database
-
Parameters
-
product_id – The ID integer of the file product to update.
-
planned_time – The time when the pipeline scheduler created a database entry for this file product
-
mime_type – The mime type of this file, used so a web interface can serve it if needed
-
metadata – Dictionary of metadata associated with this file product
-
-
Returns
None
file_version_by_product(product_id: int, attempt_id: Optional[int] = None, must_have_passed_qc: bool = False)
Look up the ID (or IDs) of the file product versions associated with a particular integer file product ID.
-
Parameters
-
product_id – The file product ID in the <eas_product> table.
-
attempt_id – The integer ID of the task execution attempt whose output files we should return. If none, then we return the all versions of this file product.
-
must_have_passed_qc – If true, then we only return files which have passed QC checks. If false, we also return file products which have not passed QC (useful, for example, if the user is trying to do a QC check!)
-
-
Returns
List[int]
Delete an intermediate file product version.
-
Parameters
product_version_id (int) – ID of a file
-
Returns
None
Check for the presence of the given file_id.
-
Parameters
product_version_id (int) – ID of a file
-
Returns
True if we have a record with this ID, False otherwise
Check for the presence of the given file_id.
-
Parameters
product_version_id (int) – ID of a file
-
Returns
True if we have a file in the file system with this ID, False otherwise
Get a random hexadecimal hash for a file product, used when storing it on disk to avoid clashes with other files with the same filename.
-
Parameters
-
timestamp – Unix time associated with the file.
-
filename – Filename of the file (used to determine the file type suffix)
-
file_info_fields – An arbitrary list of string fields which are used to generate a unique hash for this file
-
-
Returns
Hash string
Calculate the MD5 checksum for a file on disk.
-
Parameters
file_path (string) – Path to the file
-
Returns
MD5 checksum
Retrieve a FileProductVersion object representing a file product in the database
-
Parameters
product_version_id (int) – ID of a file
-
Returns
A
FileProduct
instance, or None if not found
Get the file system path for a given file product version ID.
-
Parameters
-
product_version_id (int) – ID of a file (which may or may not exist, this method doesn’t check)
-
full_path – If true, we return the full path of file products. Otherwise, the path relative to the file repository root.
-
must_exist – If true, we only return a file path if the file actually exists
-
-
Returns
System file path for the file, or None if there is no match
file_version_register(product_id: int, generated_by_task_execution: int, file_path_input: str, preserve: bool = False, created_time: Optional[float] = None, modified_time: Optional[float] = None, passed_qc: Optional[bool] = None, metadata: Optional[Dict[str, Any]] = None)
Register a file product in the database, and move it into our file archive
-
Parameters
-
product_id – The integer ID of the intermediate file product of which this file is a version.
-
generated_by_task_execution – The integer ID of the task execution attempt which generated this file.
-
file_path_input – The path to where a copy of this file can be found (usually in a temporary file system)
-
preserve – Boolean flag indicating whether the input file should be left in situ (copied into the archive), or removed (moved into the archive).
-
created_time – The time when this file product was created by the pipeline
-
modified_time – The time when this file product was modified by the pipeline
-
passed_qc – Boolean indicating whether QC checks have taken place on this file, and whether they passed
-
metadata – Dictionary of metadata associated with this file product
-
-
Returns
Integer ID for this file product
file_version_update(product_version_id: int, file_path_input: Optional[str] = None, preserve: bool = False, modified_time: Optional[float] = None, passed_qc: Optional[bool] = None, metadata: Optional[Dict[str, Any]] = None)
Update information about a file product in the database
-
Parameters
-
product_version_id – The ID integer of the file product version to update.
-
file_path_input – The path to where a copy of this file can be found (usually in a temporary file system)
-
preserve – Boolean flag indicating whether the input file should be left in situ (copied into the archive), or removed (moved into the archive).
-
modified_time – The time when this file product was modified by the pipeline
-
passed_qc – Boolean indicating whether QC checks have taken place on this file, and whether they passed
-
metadata – Dictionary of metadata associated with this file product version
-
-
Returns
None
Fetch the numerical ID associated with a particular worker node’s hostname.
-
Parameters
name – String hostname of worker node
-
Returns
Integer ID
metadata_fetch_all(task_id: Optional[int] = None, scheduling_attempt_id: Optional[int] = None, product_id: Optional[int] = None, product_version_id: Optional[int] = None)
Fetch dictionary of metadata objects associated with an entity in the database. Specify either a task, or an execution attempt, or an intermediate file product, or a file version.
-
Parameters
-
task_id – Fetch metadata associated with a particular task.
-
scheduling_attempt_id – Fetch metadata associated with a particular task execution attempt.
-
product_id – Fetch metadata associated with a particular intermediate file product.
-
product_version_id – Fetch metadata associated with a particular version of an intermediate file product.
-
-
Returns
Dictionary of MetadataItem objects
metadata_fetch_item(keyword: str, task_id: Optional[int] = None, scheduling_attempt_id: Optional[int] = None, product_id: Optional[int] = None, product_version_id: Optional[int] = None)
Fetch dictionary of metadata objects associated with an entity in the database. Specify either a task, or an execution attempt, or an intermediate file product, or a file version.
-
Parameters
-
keyword – String metadata keyword
-
task_id – Fetch metadata associated with a particular task.
-
scheduling_attempt_id – Fetch metadata associated with a particular task execution attempt.
-
product_id – Fetch metadata associated with a particular intermediate file product.
-
product_version_id – Fetch metadata associated with a particular version of an intermediate file product.
-
-
Returns
Dictionary of MetadataItem objects
Fetch the numerical ID associated with a metadata keyword.
-
Parameters
keyword – String metadata keyword
-
Returns
Integer ID
metadata_register(metadata: Dict[str, Any], task_id: Optional[int] = None, scheduling_attempt_id: Optional[int] = None, product_id: Optional[int] = None, product_version_id: Optional[int] = None)
Write a dictionary of metadata objects associated with an entity in the database. Specify either a task, or an execution attempt, or an intermediate file product, or a file version.
-
Parameters
-
task_id – Register metadata associated with a particular task.
-
scheduling_attempt_id – Register metadata associated with a particular task execution attempt.
-
product_id – Register metadata associated with a particular intermediate file product.
-
product_version_id – Fetch metadata associated with a particular version of an intermediate file product.
-
metadata – Dictionary of s
-
-
Returns
None
Fetch the numerical ID associated with a semantic type name.
-
Parameters
name – String semantic type
-
Returns
Integer ID
Delete a task.
-
Parameters
task_id (int) – The task ID
-
Returns
None
Check for the presence of the given task.
-
Parameters
task_id (int) – The task ID
-
Returns
True if we have a record with this ID, False otherwise
Retrieve a dictionary of all the attempts to execute a task, indexed by their integer UIDs
-
Parameters
-
task_id – The ID of the task.
-
successful – Boolean indicating whether we’re searching for successful executions, incomplete executions, or both
-
-
Returns
Dictionary of
TaskExecutionAttempt
, indexed by unique ID.
Retrieve a dictionary of all the file inputs to a task, indexed by their semantic types.
-
Parameters
task_id – The ID of the task.
-
Returns
Dictionary of
FileProduct
, indexed by semantic type string.
Retrieve a dictionary of all the file products generated by a task, indexed by their semantic types.
-
Parameters
task_id – The ID of the task.
-
Returns
Dictionary of
FileProduct
, indexed by semantic type string.
Retrieve a dictionary of metadata inputs to a task from previous tasks, indexed by their names.
-
Parameters
task_id – The ID of the task.
-
Returns
Dictionary of dictionaries, indexed by the names of the tasks the metadata is coming from.
Retrieve a Task object representing a task in the database
-
Parameters
task_id (int) – The task ID
-
Returns
A
Task
instance, or None if not found
task_open_file_input(task: Task, input_name: str, tmp_dir: TemporaryDirectory, execution_id: Optional[int] = None, must_have_passed_qc: bool = True)
Make a copy of an input file to a task in a temporary directory.
-
Parameters
-
task – The task which is requesting to open one of its input files.
-
input_name – The semantic type which identifies the particular input file to this task.
-
tmp_dir – The temporary directory in which we should serve up a copy of the requested file.
-
execution_id – Optional - if set, only open file products generated by a particular execution attempt.
-
must_have_passed_qc – If true, only open file products which have passed QC.
-
-
Returns
List [ file path to copy of file, file metadata dictionary ]
task_register(parent_id: Optional[int] = None, created_time: Optional[float] = None, fully_configured: bool = True, task_type: Optional[str] = None, job_name: Optional[str] = None, task_name: Optional[str] = None, working_directory: Optional[str] = None, input_files: Optional[Dict[str, FileProduct]] = None, metadata: Optional[Dict[str, Any]] = None)
Register a new task in the database
-
Parameters
-
parent_id – The integer ID of the parent task which may have spawned this sub-task (e.g. a task execution chain)
-
created_time – The unix timestamp when this task was created
-
fully_configured – Boolean flag indicating whether this task should be marked in the database as fully configured and ready to be scheduled.
-
task_type – The string name of this type of task, as defined in <task_type_registry.xml>
-
job_name – The human-readable name of the top-level job (requested by a user) that this task of part of
-
task_name – The human-readable name of a task; used by its siblings to identify metadata coming from a task
-
working_directory – The directory in the file store where this job should write its output file products
-
input_files – The list of file products that this task uses as inputs. This task cannot be executed until all of these file products exist and have passed QC.
-
metadata – A dictionary of metadata associated with this task
-
-
Returns
Integer ID for this task
Fetch the integer ID associated with a type of pipeline task.
-
Parameters
task_type_name – The name of the pipeline task type.
-
Returns
Integer ID
Return a list of all known task type names, from the database.
-
Returns
List of all known task type names.
Write a list of all known task type names to the database.
-
Returns
None
Update information about a task in the database
-
Parameters
-
task_id – The integer ID of this task.
-
metadata – A dictionary of metadata associated with this task.
-
-
Returns
None
Wrapper for the functions that perform EAS pipeline tasks.
This is performs housekeeping tasks that are common to all tasks, such as recording log messages in the EasControl task database, recording the execution times of tasks, and sending heartbeat messages to indicate that a task is still running.
plato_wp36.task_execution.call_subprocess_and_catch_stdout(arguments: Iterable, shell: Optional[bool] = None)
Execute a shell command, and capture any error messages sent to stderr, storing them in the logging database.
-
Parameters
-
arguments – A list of the command-line arguments to run in the shell.
-
shell – Boolean indicating whether subprocess runs in a shell.
-
-
Returns
Boolean indicating whether the process exited with no error reported
plato_wp36.task_execution.call_subprocess_and_log_output(arguments: Iterable, shell: Optional[bool] = None)
Execute a shell command, and capture any error messages sent to stderr, storing them in the logging database.
-
Parameters
-
arguments – A list of the command-line arguments to run in the shell.
-
shell – Boolean indicating whether subprocess runs in a shell.
-
-
Returns
Boolean indicating whether the process exited with no error reported
plato_wp36.task_execution.do_pipeline_task(job_id: int, is_qc_task: bool, task_handler: Callable[[TaskExecutionAttempt], None], eas_logger: EasLoggingHandler)
Perform the EAS pipeline task.
-
Parameters
-
job_id – The integer ID of the job in <eas_scheduling_attempt> table. This allows us to fetch all the metadata associated with the job we are to perform.
-
is_qc_task – Boolean flag indicating whether this is a QC task.
-
task_handler – The function we should call to perform the pipeline task. It is passed three arguments: task_handler(execution_attempt: task_database.TaskExecutionAttempt,
task_info: task_database.Task, task_description: Dict)
- eas_logger – The which is connected to this task’s logging stream, which will direct log messages to the EasControl database.
-
-
Returns
None
Perform an EAS pipeline task, reading the task configuration from the process’s getargs.
-
Parameters
task_handler – The function we should call to perform the pipeline task. It is passed three arguments: task_handler(execution_attempt: task_database.TaskExecutionAttempt,
task_info: task_database.Task, task_description: Dict)
-
Returns
Callable[None, None]
A class which takes the data structure containing the job description for a task, and evaluates any expressions within that data structure. Where parameter values are specified as strings starting with the characters ‘ ” or ( these are taken as mathematical expressions to be evaluated in an environment where constants and metadata values are available.
class plato_wp36.task_expression_evaluation.TaskExpressionEvaluation(metadata: Dict[str, MetadataItem], requested_metadata: Dict[str, Dict[str, MetadataItem]])
A class which takes the data structure containing the job description for a task, and evaluates any expressions within that data structure. Where parameter values are specified as strings starting with the characters ‘ ” or ( these are taken as mathematical expressions to be evaluated in an environment where constants and metadata values are available.
Evaluate an expression in the current context.
-
Parameters
expression – The expression to evaluate
-
Returns
The expression value
Cycle through all the items in a hierarchy of lists and dictionaries, evaluating all the items.
-
Parameters
structure – The structure of lists and dictionaries to cycle through.
-
Returns
A copy of the structure in which all expressions have been evaluated.
A class which can be used to wrap segments of code, and sends regular heartbeat updates to the task database to indicate that a child process is still alive and working on the task. To use, wrap your code as follows:
with TaskHeartbeat( ):
<code_segment>
class plato_wp36.task_heartbeat.TaskHeartbeat(task_attempt_id: Optional[int] = None, heartbeat_cadence: float = 60)
A class which can be used to wrap segments of code, and sends regular heartbeat updates to the task database to indicate that a child process is still alive and working on the task.
Pythonic objects to represent pipeline task entries in the database
Python class to represent an intermediate file product, each associated with the task which generated / will generate them. Note that multiple versions of the same file product may exist on disk, if a task runs multiple times.
Turn a class instance into a Python dictionary, allowing it to be transmitted in JSON format. :return:
Dict
configure(product_id: Optional[int] = None, generator_task: Optional[int] = None, planned_time: Optional[float] = None, directory: Optional[str] = None, filename: Optional[str] = None, semantic_type: Optional[str] = None, mime_type: Optional[str] = None, metadata: Optional[Dict[str, MetadataItem]] = None)
-
Parameters
-
product_id – The ID integer for this file product in the database.
-
generator_task – The ID integer of the pipeline task which generates this file product.
-
directory – The directory in the file repository to place this file product into
-
filename – The filename of this file product
-
semantic_type – The name used to identify the type of data in this file, e.g. “lightcurve” or “periodogram”. Each file output from any given pipeline step must have a unique semantic type.
-
planned_time – The time when the pipeline scheduler created a database entry for this file product
-
mime_type – The mime type of this file, used so a web interface can serve it if needed
-
metadata – Dictionary of metadata associated with this file product
-
-
Returns
None
Recreate a class instance from a Python dictionary, allowing it to be transmitted in JSON format. :param d:
Dictionary representation of class instance
-
Returns
Class instance
Set an item of metadata associated with this object.
-
Parameters
-
keyword – Metadata keyword
-
value – Metadata value
-
-
Returns
None
Python class to represent a version of an intermediate file product. Multiple versions of the same file product may exist on disk, if a task runs multiple times.
Turn a class instance into a Python dictionary, allowing it to be transmitted in JSON format. :return:
Dict
configure(product_version_id: Optional[int] = None, product_id: Optional[int] = None, generated_by_task_execution: Optional[int] = None, repository_id: Optional[str] = None, created_time: Optional[float] = None, modified_time: Optional[float] = None, file_md5: Optional[str] = None, file_size: Optional[int] = None, passed_qc: Optional[bool] = None, metadata: Optional[Dict[str, MetadataItem]] = None)
-
Parameters
-
product_version_id – The integer ID of this version of a file product
-
product_id – The integer ID of the intermediate file product of which this file is a version.
-
generated_by_task_execution – The integer ID of the task execution attempt which generated this file.
-
created_time – The time when this file product was created by the pipeline
-
modified_time – The time when this file product was modified by the pipeline
-
passed_qc – Boolean indicating whether QC checks have taken place on this file, and whether they passed
-
metadata – Dictionary of metadata associated with this file product
-
repository_id – The string filename used to store this file in the file store.
-
file_md5 – The MD5 hash of the file
-
file_size – The number of bytes in the file
-
-
Returns
None
Recreate a class instance from a Python dictionary, allowing it to be transmitted in JSON format. :param d:
Dictionary representation of class instance
-
Returns
Class instance
Set an item of metadata associated with this object.
-
Parameters
-
keyword – Metadata keyword
-
value – Metadata value
-
-
Returns
None
A class representing a metadata value to associate with an item.
Turn a class instance into a Python dictionary, allowing it to be transmitted in JSON format. :return:
Dict
Recreate a class instance from a Python dictionary, allowing it to be transmitted in JSON format. :param d:
Dictionary representation of class instance
-
Returns
Class instance
Python class to represent a task in the <eas_task> database table.
Turn a class instance into a Python dictionary, allowing it to be transmitted in JSON format. :return:
Dict
configure(task_id: Optional[int] = None, parent_id: Optional[int] = None, created_time: Optional[float] = None, task_type: Optional[str] = None, job_name: Optional[str] = None, task_name: Optional[str] = None, working_directory: Optional[str] = None, input_files: Optional[Dict[str, FileProduct]] = None, input_metadata: Optional[Dict[str, Dict[str, MetadataItem]]] = None, execution_attempts_passed: Optional[List[TaskExecutionAttempt]] = None, execution_attempts_incomplete: Optional[List[TaskExecutionAttempt]] = None, metadata: Optional[Dict[str, MetadataItem]] = None, output_files: Optional[Dict[str, FileProduct]] = None)
-
Parameters
-
task_id – The integer ID of this task in the <eas_task> table
-
parent_id – The integer ID of the parent task which may have spawned this sub-task (e.g. a task execution chain)
-
created_time – The unix timestamp when this task was created
-
task_type – The string name of this type of task, as defined in <task_type_registry.xml>
-
job_name – The human-readable name of the top-level job (requested by a user) that this task of part of
-
task_name – The human-readable name of a task; used by its siblings to identify metadata coming from a task
-
working_directory – The directory in the file store where this job should write its output file products
-
input_files – The list of file products that this task uses as inputs. This task cannot be executed until all of these file products exist and have passed QC.
-
input_files – A dictionary of metadata produced by previous tasks which this task has requested access to, indexed by the name of the tasks which produced the metadata.
-
execution_attempts_passed – A list of all the successfully completed attempts to execute this task
-
execution_attempts_incomplete – A list of all the failed, or incomplete, attempts to execute this task
-
metadata – A dictionary of metadata associated with this task
-
output_files – A list of all the file products that this task will create, or has created
-
-
Returns
None
Recreate a class instance from a Python dictionary, allowing it to be transmitted in JSON format. :param d:
Dictionary representation of class instance
-
Returns
Class instance
Set an item of metadata associated with this object.
-
Parameters
-
keyword – Metadata keyword
-
value – Metadata value
-
-
Returns
None
Python class to represent an attempt to execute a pipeline task.
Turn a class instance into a Python dictionary, allowing it to be transmitted in JSON format. :return:
Dict
configure(attempt_id: Optional[int] = None, task_id: Optional[int] = None, queued_time: Optional[float] = None, start_time: Optional[float] = None, latest_heartbeat_time: Optional[float] = None, end_time: Optional[float] = None, all_products_passed_qc: Optional[bool] = None, error_fail: Optional[bool] = None, error_text: Optional[str] = None, run_time_wall_clock: Optional[float] = None, run_time_cpu: Optional[float] = None, run_time_cpu_inc_children: Optional[float] = None, metadata: Optional[Dict[str, MetadataItem]] = None, output_files: Optional[Dict[str, FileProductVersion]] = None, task_object=None)
-
Parameters
-
attempt_id – The integer ID of this task execution attempt.
-
task_id – The integer ID of the task which is being run in the <eas_tasks> table
-
queued_time – The unix timestamp when this execution attempt was put into the job queue
-
start_time – The unix timestamp when this execution attempt began to be executed
-
latest_heartbeat_time – The unix timestamp when this execution attempt last sent a heartbeat message
-
end_time – The unix timestamp when this execution attempt signaled that it had completed
-
all_products_passed_qc – A boolean flag indicating whether all the file products from this execution attempt passed their QC inspection.
-
error_fail – A boolean flag indicating whether this execution attempt signalled that an error occurred
-
error_text – The error message text associated with any failure of this job
-
run_time_wall_clock – The number of seconds the job took to execute, in wall clock time
-
run_time_cpu – The number of seconds the job took to execute, in CPU time
-
run_time_cpu_inc_children – The number of seconds the job took to execute, in CPU time, including child threads
-
metadata – A dictionary of metadata associated with this execution attempt.
-
output_files – A dictionary of the output file products from this execution attempt, represented by FileProductVersion objects.
-
task_object – The object describing the task that this execution attempt is to run.
-
-
Returns
None
Recreate a class instance from a Python dictionary, allowing it to be transmitted in JSON format. :param d:
Dictionary representation of class instance
-
Returns
Class instance
Set an item of metadata associated with this object.
-
Parameters
-
keyword – Metadata keyword
-
value – Metadata value
-
-
Returns
None
Python class for interacting with a message queue in RabbitMQ or SQL.
Python class for describing an abstract job queue.
Close our connection to the task queue.
-
Returns
None
Declare a task queue.
-
Parameters
queue_name – The name of the queue to declare
-
Returns
None
Fetch a message from a queue, without blocking.
-
Parameters
-
queue_name – The name of the queue to query
-
acknowledge – Flag indicating whether we acknowledge receipt of this item from the queue, preventing it from being delivered again.
-
set_running – Flag indicating whether we mark this task as being running.
-
-
Returns
Scheduling attempt ID, or None if the queue is empty
Fetch a list of all the items in a queue, without blocking.
-
Parameters
queue_name – The name of the queue to query
-
Returns
List of contents
Return the number of messages waiting in a task queue.
-
Parameters
queue_name – The name of the queue to query
-
Returns
The number of messages waiting
Publish a message to a task queue.
-
Parameters
-
queue_name – The name of the queue to send the message to.
-
item_id – The integer ID of the scheduling attempt to put into the task queue
-
-
Returns
None
Bases: TaskQueue
Python class for interacting with a message queue in RabbitMQ.
Close our connection to the message bus.
-
Returns
None
Declare a message queue on the AMQP server.
-
Parameters
queue_name – The name of the queue to declare
-
Returns
None
Fetch a message from a queue, without blocking.
-
Parameters
-
queue_name – The name of the queue to query
-
acknowledge – Flag indicating whether we acknowledge receipt of this item from the queue, preventing it from being delivered again.
-
set_running – Flag indicating whether we mark this task as being running.
-
-
Returns
Scheduling attempt ID, or None if the queue is empty
Fetch a list of all the items in a queue, without blocking.
-
Parameters
queue_name – The name of the queue to query
-
Returns
List of contents
Return the number of messages waiting in a message queue on the AMQP server.
-
Parameters
queue_name – The name of the queue to query
-
Returns
The number of messages waiting
Publish a message to a task queue.
-
Parameters
-
queue_name – The name of the queue to send the message to.
-
item_id – The integer ID of the scheduling attempt to put into the task queue
-
-
Returns
None
class plato_wp36.task_queues.TaskQueueConnector(queue_engine: Optional[str] = None, debugging: bool = False)
Factory class for creating connections to task queues.
Return a new connection to the task queue as a TaskQueue object.
-
Returns
Instance of TaskQueue
static make_task_queue_config(queue_implementation: str, mq_user: str, mq_passwd: str, mq_host: str, mq_port: int)
Create configuration file with the message bus username and password, which means we connect in the future without supplying these on the command line.
-
Parameters
-
queue_implementation – The name of the task queue implementation we are using. Either or .
-
mq_user – The username to use when connecting to an AMQP-based task queue.
-
mq_passwd – The password to use when connecting to an AMQP-based task queue.
-
mq_host – The host to use when connecting to an AMQP-based task queue.
-
mq_port – The port number to use when connecting to an AMQP-based task queue.
-
-
Returns
None
Path to task queue configuration file with task queue connection details.
-
Returns
str
Bases: TaskQueue
Python class for interacting with a message queue that is embedded into the task database.
Close our connection to the message bus.
-
Returns
None
Declare a message queue on the AMQP server.
-
Parameters
queue_name – The name of the queue to declare
-
Returns
None
Fetch a message from a queue, without blocking.
-
Parameters
-
queue_name – The name of the queue to query
-
acknowledge – Flag indicating whether we acknowledge receipt of this item from the queue, preventing it from being delivered again.
-
set_running – Flag indicating whether we mark this task as being running.
-
-
Returns
Scheduling attempt ID, or None if the queue is empty
Fetch a list of all the items in a queue, without blocking.
-
Parameters
queue_name – The name of the queue to query
-
Returns
List of contents
Return the number of messages waiting in a message queue on the AMQP server.
-
Parameters
queue_name – The name of the queue to query
-
Returns
The number of messages waiting
Publish a message to a task queue.
-
Parameters
-
queue_name – The name of the queue to send the message to.
-
item_id – The integer ID of the scheduling attempt to put into the task queue
-
-
Returns
None
Python class for add tasks that are waiting to run into the job queue
(Re)schedule all unfinished tasks to be (re)-run.
-
Returns
None
Schedule a single task to run, by adding it to the job queue.
-
Parameters
task_id – The integer ID of the scheduling attempt to add to the job queue.
Schedule all tasks in the database which have not yet been queued.
-
Returns
None
Schedule all tasks in the database which don’t have any unfulfilled dependencies, and which don’t have any previous run attempts which meet the database search criterion.
-
Parameters
task_selection_criteria – SQL criteria used to decide whether a previous run of a task means it shouldn’t be run again.
-
Returns
None
A class which can be used to wrap segments of code, and time how long they take to run, in both wall-clock time and also CPU time. To use, wrap your code as follows:
with TaskTimer( ):
<code_segment>
A class which can be used to wrap segments of code, and time how long they take to run, in both wall-clock time and also CPU time.
Implementation of the function(s) we use to measure how long this process has been running.
-
Returns
A dictionary of time measurements
Module for reading the list of all known pipeline tasks, and the list of which Docker containers are capable of running each type of task.
Class for reading and representing the list of all known pipeline tasks, and the list of which Docker containers are capable of running each type of task.
Return a list of the names of the Docker containers which are capable of running a particular task.
-
Parameters
task_type_name (str) – The name of the task to be run
-
Returns
List of string names of Docker containers
Read the contents of an XML file specifying the list of all known pipeline tasks.
-
Parameters
input_xml_filename (str) – The filename of the XML file specifying the list of all known pipeline tasks.
-
Returns
TaskTypeList instance
Return a list of all known task type names.
-
Returns
List of all known task type names.
Return a list of the names of the task types that a named type of Docker container can run.
-
Parameters
container_name (str) – The name of the Docker container
-
Returns
List of string names of tasks the Docker container can run
Return a list of all worker container names.
-
Returns
List of all known worker container names.
Class to create a temporary working directory, and clean up its contents afterwards
Class to create a temporary working directory, and clean up its contents afterwards
Clean up temporary directory
This code is developed and maintained by Dominic Ford, at the Institute of Astronomy, Cambridge.