# Copyright 2019 Eli Lilly and Company
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from pathlib import Path
import tempfile
from typing import Optional, Sequence, Union
import subby
from pytest_wdl.executors import (
ExecutionFailedError,
JavaExecutor,
get_target_name,
read_write_inputs,
)
from pytest_wdl.executors._cromwell import (
ENV_CROMWELL_ARGS, ENV_CROMWELL_JAR, ENV_CROMWELL_CONFIG, CromwellHelperMixin
)
from pytest_wdl.utils import LOG, ensure_path
[docs]class CromwellLocalExecutor(JavaExecutor, CromwellHelperMixin):
"""
Manages the running of WDL workflows using Cromwell.
Args:
import_dirs: Relative or absolute paths to directories containing WDL
scripts that should be available as imports.
java_bin: Path to the java executable.
java_args: Default Java arguments to use; can be overidden by passing
`java_args=...` to `run_workflow`.
cromwell_jar_file: Path to the Cromwell JAR file.
cromwell_args: Default Cromwell arguments to use; can be overridden by
passing `cromwell_args=...` to `run_workflow`.
"""
def __init__(
self,
import_dirs: Optional[Sequence[Path]] = None,
java_bin: Optional[Union[str, Path]] = None,
java_args: Optional[str] = None,
cromwell_jar_file: Optional[Union[str, Path]] = None,
cromwell_configuration: Optional[Union[str, Path, dict]] = None,
cromwell_args: Optional[str] = None,
# deprecated
cromwell_config_file: Optional[Union[str, Path]] = None,
):
super().__init__(java_bin, java_args)
self._import_dirs = import_dirs
self._cromwell_jar_file = self.resolve_jar_file(
"cromwell*.jar", cromwell_jar_file, ENV_CROMWELL_JAR
)
if cromwell_config_file:
LOG.warn(
"The 'cromwell_config_file' parameter is deprecated; please use "
"'cromwell_configuration' instead."
)
if not cromwell_configuration:
cromwell_configuration = cromwell_config_file
if not cromwell_configuration:
config_file = os.environ.get(ENV_CROMWELL_CONFIG)
if config_file:
cromwell_configuration = ensure_path(config_file)
if cromwell_configuration:
if self.java_args:
LOG.warn("'cromwell_configuration' is ignored when 'java_args' are set")
else:
if isinstance(cromwell_configuration, dict):
cromwell_config_file = Path(tempfile.mkstemp(suffix=".zip")[1])
with open(cromwell_config_file, "wt") as out:
json.dump(cromwell_configuration, out)
else:
cromwell_config_file = ensure_path(
cromwell_configuration, is_file=True, exists=True
)
self.java_args = f"-Dconfig.file={cromwell_config_file}"
self._cromwell_args = cromwell_args or os.environ.get(ENV_CROMWELL_ARGS)
[docs] def run_workflow(
self,
wdl_path: Path,
inputs: Optional[dict] = None,
expected: Optional[dict] = None,
**kwargs,
) -> dict:
"""
Run a WDL workflow on given inputs, and check that the output matches
given expected values.
Args:
wdl_path: The WDL script to execute.
inputs: Object that will be serialized to JSON and provided to Cromwell
as the workflow inputs.
expected: Dict mapping output parameter names to expected values.
kwargs: Additional keyword arguments, mostly for debugging:
* workflow_name: The name of the workflow in the WDL script. If None,
the name of the WDL script is used (without the .wdl extension).
* inputs_file: Path to the Cromwell inputs file to use. Inputs are
written to this file only if it doesn't exist.
* imports_file: Path to the WDL imports file to use. Imports are
written to this file only if it doesn't exist.
* java_args: Additional arguments to pass to Java runtime.
* cromwell_args: Additional arguments to pass to `cromwell run`.
Returns:
Dict of outputs.
Raises:
ExecutionFailedError: if there was an error executing Cromwell
AssertionError: if the actual outputs don't match the expected outputs
"""
target, is_task = get_target_name(
wdl_path=wdl_path, import_dirs=self._import_dirs, **kwargs
)
if is_task:
raise ValueError(
"Cromwell cannot execute tasks independently of a workflow"
)
inputs_dict, inputs_file = read_write_inputs(
inputs_file=kwargs.get("inputs_file"), inputs_dict=inputs, namespace=target
)
imports_file = self._get_workflow_imports(
self._import_dirs, kwargs.get("imports_file")
)
inputs_arg = f"-i {inputs_file}" if inputs_file else ""
imports_zip_arg = f"-p {imports_file}" if imports_file else ""
java_args = kwargs.get("java_args", self.java_args) or ""
cromwell_args = kwargs.get("cromwell_args", self._cromwell_args) or ""
metadata_file = Path.cwd() / "metadata.json"
cmd = (
f"{self.java_bin} {java_args} -jar {self._cromwell_jar_file} run "
f"-m {metadata_file} {cromwell_args} {inputs_arg} {imports_zip_arg} "
f"{wdl_path}"
)
LOG.info(
f"Executing cromwell command '{cmd}' with inputs "
f"{json.dumps(inputs_dict, default=str)}"
)
exe = subby.run(cmd, raise_on_error=False)
metadata = None
if metadata_file.exists():
with open(metadata_file, "rt") as inp:
metadata = json.load(inp)
if exe.ok:
if metadata:
assert metadata["status"] == "Succeeded"
outputs = metadata["outputs"]
else:
LOG.warning(
f"Cromwell command completed successfully but did not generate "
f"a metadata file at {metadata_file}"
)
outputs = self._get_cromwell_outputs(exe.output)
else:
error_kwargs = {
"executor": "cromwell",
"target": target,
"status": "Failed",
"inputs": inputs_dict,
"executor_stdout": exe.output,
"executor_stderr": exe.error,
}
if metadata:
error_kwargs = {
"executor": "cromwell",
"target": target,
"status": "Failed",
"inputs": inputs_dict,
}
self._parse_metadata_errors(
metadata, target=target, error_kwargs=error_kwargs
)
else:
error_kwargs["msg"] = (
f"Cromwell command failed but did not generate a metadata "
f"file at {metadata_file}"
)
raise ExecutionFailedError(**error_kwargs)
if expected:
self._validate_outputs(outputs, expected, target)
return outputs