Source code for pytest_wdl.executors.miniwdl

#    Copyright 2019 Eli Lilly and Company
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
import logging
from pathlib import Path
from typing import Optional, Sequence, cast

from pytest_wdl.executors import (
    Executor, ExecutionFailedError, get_target_name, read_write_inputs
)

from WDL import CLI, Error, Tree, runtime


[docs]class MiniwdlExecutor(Executor): """ Manages the running of WDL workflows using Cromwell. """ def __init__(self, import_dirs: Optional[Sequence[Path]] = None): self._import_dirs = import_dirs
[docs] def run_workflow( self, wdl_path: Path, inputs: Optional[dict] = None, expected: Optional[dict] = None, **kwargs ) -> dict: """ Run a WDL workflow on given inputs, and check that the output matches given expected values. Args: wdl_path: The WDL script to execute. inputs: Object that will be serialized to JSON and provided to Cromwell as the workflow inputs. expected: Dict mapping output parameter names to expected values. kwargs: Additional keyword arguments, mostly for debugging: * workflow_name: Name of the workflow to run. * task_name: Name of the task to run if a workflow isn't defined. * inputs_file: Path to the Cromwell inputs file to use. Inputs are written to this file only if it doesn't exist. Returns: Dict of outputs. Raises: Exception: if there was an error executing Cromwell AssertionError: if the actual outputs don't match the expected outputs """ logger = logging.getLogger("miniwdl-run") logger.setLevel(CLI.NOTICE_LEVEL) CLI.install_coloredlogs(logger) wdl_doc = CLI.load( str(wdl_path), path=[str(path) for path in self._import_dirs], check_quant=kwargs.get("check_quant", True), read_source=CLI.read_source ) namespace, is_task = get_target_name(wdl_doc=wdl_doc, **kwargs) inputs_dict, inputs_file = read_write_inputs( inputs_dict=inputs, namespace=namespace if not is_task else None, ) target, input_env, input_json = CLI.runner_input( doc=wdl_doc, inputs=[], input_file=str(inputs_file) if inputs_file else None, empty=[], task=namespace if is_task else None ) # Create config cfg = runtime.config.Loader(logger) cfg.override({ "copy_input_files": kwargs.get("copy_input_files", False) }) cfg.log_all() # initialize Docker runtime.task.LocalSwarmContainer.global_init(cfg, logger) try: rundir, output_env = runtime.run(cfg, target, input_env, run_dir=None) except Error.EvalError as err: # TODO: test errors MiniwdlExecutor.log_source(logger, err) raise except Error.RuntimeError as err: MiniwdlExecutor.log_source(logger, err) if isinstance(err, runtime.error.RunFailed): # This will be a workflow- or a task-level failure, depending on # whether a workflow or task was executed. If it is workflow-level, # we need to get the task-level error that caused the workflow to fail. if isinstance(err.exe, Tree.Workflow): err = err.__cause__ task_err = cast(runtime.error.RunFailed, err) cause = task_err.__cause__ failed_task_exit_status = None failed_task_stderr = None if isinstance(cause, runtime.error.CommandFailed): # If the task failed due to an error in the command, populate the # command exit status and stderr. cmd_err = cast(runtime.error.CommandFailed, cause) failed_task_exit_status = cmd_err.exit_status failed_task_stderr = MiniwdlExecutor.read_miniwdl_command_std( cmd_err.stderr_file ) raise ExecutionFailedError( "miniwdl", namespace, status="Failed", inputs=task_err.exe.inputs, failed_task=task_err.exe.name, failed_task_exit_status=failed_task_exit_status, failed_task_stderr=failed_task_stderr ) from err else: raise outputs = CLI.values_to_json(output_env, namespace=target.name) if expected: self._validate_outputs(outputs, expected, target.name) return outputs
[docs] @staticmethod def read_miniwdl_command_std(path: Optional[str] = None) -> Optional[str]: if path: p = Path(path) if p.exists(): with open(path, "rt") as inp: return inp.read()
[docs] @staticmethod def log_source(logger: logging.Logger, exn: Exception): if isinstance(exn, runtime.error.RunFailed): pos = cast(runtime.error.RunFailed, exn).exe.pos elif hasattr(exn, "pos"): pos = cast(Error.SourcePosition, getattr(exn, "pos")) else: return if pos: logger.error( "({} Ln {} Col {}) {}{}".format( pos.uri, pos.line, pos.column, exn.__class__.__name__, (", " + str(exn) if str(exn) else ""), ) )