Source code for pytest_wdl.executors.miniwdl

#    Copyright 2019 Eli Lilly and Company
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
import json
import os
from pathlib import Path
from typing import Optional, Sequence

import subby
import WDL

from pytest_wdl.executors import (
    Executor, ExecutionFailedError, get_target_name, read_write_inputs
)


[docs]class MiniwdlExecutor(Executor): """ Manages the running of WDL workflows using miniwdl. """ def __init__(self, import_dirs: Optional[Sequence[Path]] = None): self._import_dirs = import_dirs or []
[docs] def run_workflow( self, wdl_path: Path, inputs: Optional[dict] = None, expected: Optional[dict] = None, **kwargs ) -> dict: """ Run a WDL workflow on given inputs, and check that the output matches given expected values. Args: wdl_path: The WDL script to execute. inputs: Object that will be serialized to JSON and provided to miniwdl as the workflow inputs. expected: Dict mapping output parameter names to expected values. kwargs: Additional keyword arguments, mostly for debugging: * workflow_name: Name of the workflow to run. * task_name: Name of the task to run if a workflow isn't defined. * inputs_file: Path to the miniwdl inputs file to use. Inputs are written to this file only if it doesn't exist. Returns: Dict of outputs. Raises: Exception: if there was an error executing miniwdl AssertionError: if the actual outputs don't match the expected outputs """ check_quant = kwargs.get("check_quant", True) wdl_doc = WDL.load( str(wdl_path), path=[str(path) for path in self._import_dirs], check_quant=check_quant ) namespace, is_task = get_target_name(wdl_doc=wdl_doc, **kwargs) inputs_dict, inputs_file = read_write_inputs( inputs_dict=inputs, namespace=namespace if not is_task else None, ) input_arg = f"-i {inputs_file}" if inputs_file else "" task_arg = f"--task {namespace}" if is_task else "" quant_arg = "--no-quant-check" if not check_quant else "" path_arg = " ".join(f"-p {p}" for p in self._import_dirs) # TODO: we shouldn't need --copy-input-files, but without it sometimes the staged # input files are not available in the container. # Another fix is https://github.com/chanzuckerberg/miniwdl/issues/145#issuecomment-733435644 # but we will leave --copy-input-files, so the user doesn't have to muck with Docker setings, # until the fissue is addressed: https://github.com/chanzuckerberg/miniwdl/issues/461 cmd = ( f"miniwdl run --error-json --copy-input-files {input_arg} {task_arg} " f"{quant_arg} {path_arg} {wdl_path}" ) exe = subby.run(cmd, raise_on_error=False) # miniwdl writes out either outputs or error in json format to stdout results = json.loads(exe.output) if exe.ok: outputs = results["outputs"] if expected: self._validate_outputs(outputs, expected, namespace) return outputs else: error = json.loads(exe.output) print(error) pos = error.get("pos") if pos: source = f"at {pos['line']}:{pos['column']} in {pos['source']}" else: source = f"in {wdl_path}" failure_attrs = [error.get(x) for x in ("task", "workflow", "exit_status")] if any(failure_attrs): # RunFailed or CommandFailed target = failure_attrs[0] or failure_attrs[1] failure_dir = error.get("dir") failed_task = failure_attrs[0] failed_task_exit_status = None failed_task_stderr = None cause = error if failure_attrs[2] else error.get("cause") if cause: if "dir" in cause: failure_dir = cause["dir"] failed_task_exit_status = cause["exit_status"] failed_task_stderr_path = cause["stderr_file"] if failed_task_stderr_path: p = Path(failed_task_stderr_path) if p.exists: with open(p, "rt") as inp: failed_task_stderr = inp.read() if failure_dir is not None: inputs_json = Path(os.path.join(failure_dir, "inputs.json")) if inputs_json.exists(): with open(inputs_json, "r") as inp: failed_inputs = json.load(inp) if failed_task is None: cause_error_file = Path(os.path.join(failure_dir, "error.json")) if cause_error_file.exists(): with open(cause_error_file, "r") as inp: cause_error_json = json.load(inp) if "task" in cause_error_json: failed_task = cause_error_json["task"] else: failed_inputs = None raise ExecutionFailedError( executor="miniwdl", target=target, status="Failed", inputs=failed_inputs, executor_stderr=exe.error, failed_task=failed_task, failed_task_exit_status=failed_task_exit_status, # failed_task_stdout=TODO, failed_task_stderr=failed_task_stderr, msg=error.get("message") ) else: message = error.get("message", "unknown") raise RuntimeError(f"Error {source}: {message}")