Source code for pytest_wdl.executors.miniwdl

#    Copyright 2019 Eli Lilly and Company
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
import logging
from pathlib import Path
import pkg_resources
from typing import Optional

from pytest_wdl.executors import Executor, get_workflow_inputs, validate_outputs

from WDL import CLI, Error, Tree, runtime

[docs]class MiniwdlExecutor(Executor): """ Manages the running of WDL workflows using Cromwell. """
[docs] def run_workflow( self, wdl_path: Path, inputs: Optional[dict] = None, expected: Optional[dict] = None, **kwargs ) -> dict: """ Run a WDL workflow on given inputs, and check that the output matches given expected values. Args: wdl_path: The WDL script to execute. inputs: Object that will be serialized to JSON and provided to Cromwell as the workflow inputs. expected: Dict mapping output parameter names to expected values. kwargs: Additional keyword arguments, mostly for debugging: * workflow_name: Name of the workflow to run. * task_name: Name of the task to run if a workflow isn't defined. * inputs_file: Path to the Cromwell inputs file to use. Inputs are written to this file only if it doesn't exist. Returns: Dict of outputs. Raises: Exception: if there was an error executing Cromwell AssertionError: if the actual outputs don't match the expected outputs """ doc = CLI.load( str(wdl_path), path=[str(path) for path in self.import_dirs], check_quant=kwargs.get("check_quant", True), read_source=CLI.read_source ) task = kwargs.get("task_name") namespace = None if not task: if "workflow_name" in kwargs: namespace = kwargs["workflow_name"] else: namespace = inputs_dict, inputs_file = get_workflow_inputs( inputs, kwargs.get("inputs_file"), namespace=namespace ) target, input_env, input_json = CLI.runner_input( doc=doc, inputs=[], input_file=inputs_file, empty=[], task=task ) logger = logging.getLogger("miniwdl-run") logger.setLevel(CLI.NOTICE_LEVEL) CLI.install_coloredlogs(logger) try: logger.debug(pkg_resources.get_distribution("miniwdl")) except pkg_resources.DistributionNotFound as exc: logger.debug( "miniwdl version unknown ({}: {})".format(type(exc).__name__, exc) ) for pkg in ["docker", "lark-parser", "argcomplete", "pygtail"]: logger.debug(pkg_resources.get_distribution(pkg)) try: if isinstance(target, Tree.Task): entrypoint = runtime.run_local_task else: entrypoint = runtime.run_local_workflow rundir, output_env = entrypoint(target, input_env) except Error.EvalError as exn: log_source(logger, exn) raise except runtime.task.TaskFailure as exn: exn = exn.__cause__ or exn if isinstance(exn, runtime.task.CommandFailure): logger.error( "command's standard error in %s", getattr(exn, "stderr_file") ) if isinstance(getattr(exn, "pos", None), Error.SourcePosition): log_source(logger, exn) else: logger.error(f"{exn.__class__.__name__}, {str(exn)}") raise outputs = CLI.values_to_json(output_env, if expected: validate_outputs(outputs, expected, return outputs
[docs]def log_source(logger: logging.Logger, exn): logger.error( "({} Ln {} Col {}) {}{}".format( exn.pos.uri, exn.pos.line, exn.pos.column, exn.__class__.__name__, (", " + str(exn) if str(exn) else ""), ) )