# Copyright 2019 Eli Lilly and Company
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta, abstractmethod
import json
import os
from pathlib import Path
import tempfile
import textwrap
from typing import Any, Optional, Sequence, Tuple, Union, cast
from pytest_wdl.data_types import DataFile
from pytest_wdl.utils import (
ensure_path, safe_string, find_executable_path, find_in_classpath
)
import WDL
from WDL import Document, Error, Tree
ENV_JAVA_HOME = "JAVA_HOME"
ENV_JAVA_ARGS = "JAVA_ARGS"
INDENT = " " * 16
[docs]class ExecutorError(Exception):
def __init__(self, executor: str, msg: Optional[str] = None):
super().__init__(msg)
self.executor = executor
[docs]class ExecutionFailedError(ExecutorError):
def __init__(
self,
executor: str,
target: str,
status: str,
inputs: Optional[dict] = None,
executor_stdout: Optional[str] = None,
executor_stderr: Optional[str] = None,
failed_task: Optional[str] = None,
failed_task_exit_status: Optional[int] = None,
failed_task_stdout: Optional[str] = None,
failed_task_stderr: Optional[str] = None,
msg: Optional[str] = None
):
if msg is None:
if failed_task and failed_task != target:
msg = f"{executor} failed with status {status} while running task " \
f"{failed_task} of {target}"
else:
msg = f"{executor} failed with status {status} while running {target}"
super().__init__(executor, msg)
self.target = target
self.status = status
self.inputs = inputs
self.executor_stdout = executor_stdout
self.executor_stderr = executor_stderr
self.failed_task = failed_task
self.failed_task_exit_status = failed_task_exit_status
self.failed_task_stdout = failed_task_stdout
self.failed_task_stderr = failed_task_stderr
@property
def exit_status_str(self) -> str:
if self.failed_task_exit_status:
return str(self.failed_task_exit_status)
else:
return "Unknown"
def __str__(self):
def wrap_std(std: str):
if std:
return "\n" + textwrap.indent(std, INDENT)
else:
return " None"
return textwrap.dedent(f"""
{self.args[0]}:
inputs:
{self.inputs}
executor_stdout:{wrap_std(self.executor_stdout)}
executor_stderr:{wrap_std(self.executor_stderr)}
failed_task_exit_status: {self.exit_status_str}
failed_task_stdout:{wrap_std(self.failed_task_stdout)}
failed_task_stderr:{wrap_std(self.failed_task_stderr)}
""")
[docs]class Executor(metaclass=ABCMeta):
"""
Base class for WDL workflow executors.
"""
[docs] @abstractmethod
def run_workflow(
self,
wdl_path: Path,
inputs: Optional[dict] = None,
expected: Optional[dict] = None,
**kwargs
) -> dict:
"""
Run a WDL workflow on given inputs, and check that the output matches
given expected values.
Args:
wdl_path: The WDL script to execute.
inputs: Object that will be serialized to JSON and provided to Cromwell
as the workflow inputs.
expected: Dict mapping output parameter names to expected values.
kwargs: Additional executor-specific keyword arguments (mostly for
debugging)
Returns:
Dict of outputs.
Raises:
ExecutionFailedError: if there was an error executing the workflow
AssertionError: if the actual outputs don't match the expected outputs
"""
@classmethod
def _validate_outputs(
cls: "Executor", outputs: dict, expected: dict, target: str
) -> None:
"""
Validate expected and actual outputs are equal.
Args:
outputs: Actual outputs
expected: Expected outputs
target: Execution target (i.e. workflow name)
Raises:
AssertionError
"""
for name, expected_value in expected.items():
key = f"{target}.{name}"
if key not in outputs:
raise AssertionError(f"Workflow did not generate output {key}")
cls._compare_output_values(expected_value, outputs[key], key)
@classmethod
def _compare_output_values(
cls: "Executor", expected_value, actual_value, name: str
) -> None:
"""
Compare two values and raise an error if they are not equal.
Args:
expected_value:
actual_value:
name: Name of the output being compared
Raises:
AssertionError
"""
if actual_value is None:
if expected_value is None:
return
else:
raise AssertionError(
f"Expected and actual values differ for {name}: "
f"{expected_value} != {actual_value}"
)
elif isinstance(expected_value, list):
if len(expected_value) != len(actual_value):
raise AssertionError(
f"Expected and actual values differ in length for {name}: "
f"{len(expected_value)} != {len(actual_value)}"
)
for i, (exp, act) in enumerate(zip(expected_value, actual_value)):
cls._compare_output_values(exp, act, f"{name}[{i}]")
elif isinstance(expected_value, dict):
if len(expected_value) != len(actual_value):
raise AssertionError(
f"Expected and actual values differ in length for {name}: "
f"{len(expected_value)} != {len(actual_value)}"
)
for key, exp in expected_value.items():
if key not in actual_value:
raise AssertionError(
f"Key '{key}' is in the expected value but not in "
f"the actual value: {expected_value} != {actual_value}"
)
cls._compare_output_values(exp, actual_value[key], f"{name}.{key}")
elif isinstance(expected_value, DataFile):
# TODO: pass name
expected_value.assert_contents_equal(actual_value)
elif expected_value != actual_value:
raise AssertionError(
f"Expected and actual values differ for {name}: "
f"{expected_value} != {actual_value}"
)
[docs]class JavaExecutor(Executor, metaclass=ABCMeta):
"""
Manages the running of WDL workflows using a Java-based executor.
Args:
java_bin: Path to the java executable.
java_args: Default Java arguments to use; can be overidden by passing
`java_args=...` to `run_workflow`.
"""
def __init__(
self,
java_bin: Optional[Union[str, Path]] = None,
java_args: Optional[str] = None
):
if not java_bin:
java_home = os.environ.get(ENV_JAVA_HOME)
if java_home:
java_bin = Path(java_home) / "bin" / "java"
else:
java_bin = find_executable_path("java")
if not java_bin:
raise FileNotFoundError("Could not find java executable")
self.java_bin = ensure_path(
java_bin, exists=True, is_file=True, executable=True
)
self.java_args = java_args or os.environ.get(ENV_JAVA_ARGS)
[docs] @staticmethod
def resolve_jar_file(
file_name_pattern: str, jar_path: Optional[Path] = None,
env_var: Optional[str] = None,
):
if not jar_path:
path_str = None
if env_var:
path_str = os.environ.get(env_var)
if path_str:
jar_path = ensure_path(path_str)
else:
jar_path = find_in_classpath(file_name_pattern)
if not jar_path:
raise FileNotFoundError(f"Could not find JAR file {file_name_pattern}")
return ensure_path(jar_path, is_file=True, exists=True)
[docs]def parse_wdl(
wdl_path: Path,
import_dirs: Optional[Sequence[Path]] = (),
check_quant: bool = False,
**_
) -> Document:
return WDL.load(
str(wdl_path),
path=[str(path) for path in import_dirs],
check_quant=check_quant
)
[docs]def get_target_name(
wdl_path: Optional[Path] = None,
wdl_doc: Optional[Document] = None,
task_name: Optional[str] = None,
workflow_name: Optional[str] = None,
**kwargs
) -> Tuple[str, bool]:
"""
Get the execution target. The order of priority is:
- task_name
- workflow_name
- wdl_doc.workflow.name
- wdl_doc.task[0].name
- wdl_file.stem
Args:
wdl_path: Path to a WDL file
wdl_doc: A miniwdl-parsed WDL document
task_name: The task name
workflow_name: The workflow name
**kwargs: Additional keyword arguments to pass to `parse_wdl`
Returns:
A tuple (target, is_task), where `is_task` is a boolean indicating whether
the target is a task (True) or a workflow (False).
Raises:
ValueError if 1) neither `task_name` nor `workflow_name` is specified and the
WDL document contains no workflow and multiple tasks; or 2) all of the
parameters are None.
"""
if task_name:
return task_name, True
if workflow_name:
return workflow_name, False
if not wdl_doc and Tree:
try:
wdl_doc = parse_wdl(wdl_path, **kwargs)
except Error.SyntaxError as err:
raise RuntimeError(
"There was an error parsing the WDL document to extract the target "
"workflow/task name. Please specify the 'workflow_name' or 'task_name' "
"parameter to workflow_runner()."
) from err
if wdl_doc:
if wdl_doc.workflow:
return wdl_doc.workflow.name, False
elif wdl_doc.tasks and len(wdl_doc.tasks) == 1:
return wdl_doc.tasks[0].name, True
else:
raise ValueError(
"WDL document has no workflow and multiple tasks, and 'task_name' "
"is not specified"
)
if wdl_path:
return safe_string(wdl_path.stem), False
raise ValueError("At least one parameter must not be None")