#! /usr/bin/env python
#
# Copyright 2019 Eli Lilly and Company
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# TODO: some of the code here can be replaced by functions in xphyle.{paths,utils}
import contextlib
import fnmatch
import hashlib
import logging
import os
from pathlib import Path
import re
import shutil
import stat
import tempfile
import time
from typing import Callable, Optional, Sequence, Union, cast
from py._path.local import LocalPath
LOG = logging.getLogger("pytest-wdl")
LOG.setLevel(os.environ.get("LOGLEVEL", "WARNING").upper())
ENV_PATH = "PATH"
ENV_CLASSPATH = "CLASSPATH"
DEFAULT_CLASSPATH = "."
UNSAFE_RE = re.compile(r"[^\w.-]")
[docs]def safe_string(s: str, replacement: str = "_") -> str:
"""
Makes a string safe by replacing non-word characters.
Args:
s: The string to make safe
replacement: The replacement stringj
Returns:
The safe string
"""
return UNSAFE_RE.sub(replacement, s)
# def deprecated(f: Callable):
# """
# Decorator for deprecated functions/methods. Deprecated functionality will be
# removed before each major release.
# """
# def decorator(*args, **kwargs):
# LOG.warning(f"Function/method {f.__name__} is deprecated and will be removed")
# f(*args, **kwargs)
# return decorator
[docs]@contextlib.contextmanager
def chdir(todir: Path):
"""
Context manager that temporarily changes directories.
Args:
todir: The directory to change to.
"""
curdir = Path.cwd()
try:
os.chdir(todir)
yield todir
finally:
os.chdir(curdir)
[docs]@contextlib.contextmanager
def tempdir(
change_dir: bool = False,
tmproot: Optional[Path] = None,
cleanup: Optional[bool] = True,
) -> Path:
"""
Context manager that creates a temporary directory, yields it, and then
deletes it after return from the yield.
Args:
change_dir: Whether to temporarily change to the temp dir.
tmproot: Root directory in which to create temporary directories.
cleanup: Whether to delete the temporary directory before exiting the context.
"""
temp = ensure_path(tempfile.mkdtemp(dir=tmproot))
try:
if change_dir:
with chdir(temp):
yield temp
else:
yield temp
finally:
if cleanup:
shutil.rmtree(temp)
[docs]@contextlib.contextmanager
def context_dir(
path: Optional[Path] = None,
change_dir: bool = False,
cleanup: Optional[bool] = None,
) -> Path:
"""
Context manager that looks for a specific environment variable to specify a
directory. If the environment variable is not set, a temporary directory is
created and cleaned up upon return from the yield.
Args:
path: The environment variable to look for.
change_dir: Whether to change to the directory.
cleanup: Whether to delete the directory when exiting the context. If None,
the directory is only deleted if a temporary directory is created.
Yields:
A directory path.
"""
if cleanup is None:
cleanup = path is None
if not path:
path = Path(tempfile.mkdtemp())
elif not path.exists():
path.mkdir(parents=True)
try:
if change_dir:
with chdir(path):
yield path
else:
yield path
finally:
if cleanup and path.exists():
shutil.rmtree(path, ignore_errors=True)
[docs]def ensure_path(
path: Union[str, LocalPath, Path],
search_paths: Optional[Sequence[Path]] = None,
canonicalize: bool = True,
exists: Optional[bool] = None,
is_file: Optional[bool] = None,
executable: Optional[bool] = None,
create: bool = False,
) -> Path:
"""
Converts a string path or :class:`py.path.local.LocalPath` to a
:class:`pathlib.Path`.
Args:
path: The path to convert.
search_paths: Directories to search for `path` if it is not already absolute.
If `exists` is True, looks for the first search path that contains the file,
otherwise just uses the first search path.
canonicalize: Whether to return the canonicalized version of the path -
expand home directory shortcut (~), make absolute, and resolve symlinks.
exists: If True, raise an exception if the path does not exist; if False,
raise an exception if the path does exist.
is_file: If True, raise an exception if the path is not a file; if False,
raise an exception if the path is not a directory.
executable: If True and `is_file` is True and the file exists, raise an
exception if it is not executable.
create: Create the directory (or parent, if `is_file` = True) if
it does not exist. Ignored if `exists` is True.
Returns:
A `pathlib.Path` object.
"""
if isinstance(path, Path):
p = cast(Path, path)
else:
p = Path(str(path))
p = Path(os.path.expandvars(p))
if canonicalize:
p = p.expanduser()
if search_paths and not p.is_absolute():
if exists:
for search_path in search_paths:
p_tmp = search_path / p
if p_tmp.exists():
p = p_tmp.absolute()
break
else:
p = (search_paths[0] / p).absolute()
p = p.resolve()
if p.exists():
if exists is False:
raise FileExistsError(f"Path {p} already exists")
if is_file is True:
if p.is_dir():
raise IsADirectoryError(f"Path {p} is not a file")
elif executable and not is_executable(p):
raise OSError(f"File {p} is not executable")
elif is_file is False and not p.is_dir():
raise NotADirectoryError(f"Path {p} is not a directory")
elif exists is True:
raise FileNotFoundError(f"Path {p} does not exist")
elif create:
if is_file:
p.parent.mkdir(parents=True, exist_ok=True)
else:
p.mkdir(parents=True, exist_ok=True)
return p
[docs]def resolve_file(
filename: Union[str, Path], project_root: Path, assert_exists: bool = True
) -> Optional[Path]:
"""
Finds `filename` under `project_root` or in the project path.
Args:
filename: The filename, relative path, or absolute path to resolve.
project_root: The project root dir.
assert_exists: Whether to raise an error if the file cannot be found.
Returns:
A `pathlib.Path` object, or None if the file cannot be found and
`assert_exists` is False.
Raises:
FileNotFoundError if the file cannot be found and `assert_exists` is True.
"""
path = ensure_path(filename, canonicalize=False)
is_abs = path.is_absolute()
if is_abs and path.exists():
return path
if not is_abs:
check_path = ensure_path(project_root / path)
if check_path.exists():
return check_path
# Search in cwd
check_path = find_project_path(path)
if check_path and check_path.exists():
return check_path
# Search upward from project root
check_path = find_project_path(path, start=project_root)
if check_path and check_path.exists():
return check_path
if assert_exists:
raise FileNotFoundError(f"Could not resolve file: {filename}")
else:
return None
[docs]def find_project_path(
*filenames: Union[str, Path],
start: Optional[Path] = None,
return_parent: bool = False,
assert_exists: bool = False,
) -> Optional[Path]:
"""
Starting from `path` folder and moving upwards, search for any of `filenames` and
return the first path containing any one of them.
Args:
*filenames: Filenames to search. Either a string filename, or a sequence of
string path elements.
start: Starting folder
return_parent: Whether to return the containing folder or the discovered file.
assert_exists: Whether to raise an exception if a file cannot be found.
Returns:
A `Path`, or `None` if no folder is found that contains any of `filenames`.
If `return_parent` is `False` and more than one of the files is found one
of the files is randomly selected for return.
Raises:
FileNotFoundError if the file cannot be found and `assert_exists` is True.
"""
path = start or Path.cwd()
while path != path.parent:
for filename in filenames:
if isinstance(filename, str):
found = list(path.glob(filename))
found = found[0] if found else None
else:
found = path / filename
if not found.exists():
found = None
if found:
LOG.debug("Found %s in %s", filename, path)
if return_parent:
return path
else:
return found
else:
path = path.parent
if assert_exists:
raise FileNotFoundError(
f"Could not find any of {','.join(str(f) for f in filenames)} "
f"starting from {start}"
)
return None
[docs]def find_executable_path(
executable: str, search_path: Optional[Sequence[Path]] = None
) -> Optional[Path]:
"""Finds 'executable' in `search_path`.
Args:
executable: The name of the executable to find.
search_path: The list of directories to search. If None, the system search
path (defined by the $PATH environment variable) is used.
Returns:
Absolute path of the executable, or None if no matching executable was found.
"""
if search_path is None:
if ENV_PATH in os.environ:
search_path = [Path(p) for p in os.environ[ENV_PATH].split(os.pathsep)]
else:
return None
for path in search_path:
exe_path = path / executable
if exe_path.exists() and is_executable(exe_path):
return exe_path
else:
return None
[docs]def is_executable(path: Path) -> bool:
"""
Checks if a path is executable.
Args:
path: The path to check
Returns:
True if `path` exists and is executable by the user, otherwise False.
"""
return path.exists() and os.stat(path).st_mode & stat.S_IXUSR
[docs]def find_in_classpath(glob: str) -> Optional[Path]:
"""
Attempts to find a .jar file matching the specified glob pattern in the
Java classpath.
Args:
glob: JAR filename pattern
Returns:
Path to the JAR file, or None if a matching file is not found.
"""
classpath = os.environ.get(ENV_CLASSPATH, DEFAULT_CLASSPATH)
for path_str in classpath.split(os.pathsep):
path = ensure_path(path_str)
if path.exists():
if path.is_dir():
matches = list(path.glob(glob))
if matches:
if len(matches) > 1:
LOG.warning(
"Found multiple jar files matching pattern %s: %s;"
"returning the first one.",
glob,
matches,
)
return matches[0]
elif path.exists() and fnmatch.fnmatch(path.name, glob):
return path
[docs]def env_map(d: dict) -> dict:
"""
Given a mapping of keys to value descriptors, creates a mapping of the keys to
the described values.
"""
envmap = {}
for name, value_descriptor in d.items():
value = resolve_value_descriptor(value_descriptor)
if value:
envmap[name] = value
return envmap
[docs]def resolve_value_descriptor(value_descriptor: Union[str, dict]) -> Optional:
"""
Resolves the value of a value descriptor, which may be an environment variable
name, or a map with keys `env` (the environment variable name) and `value` (the
value to use if `env` is not specified or if the environment variable is unset.
Args:
value_descriptor:
Returns:
"""
if isinstance(value_descriptor, str):
return os.environ.get(value_descriptor)
elif "env" in value_descriptor:
return os.environ.get(value_descriptor["env"], value_descriptor.get("value"))
else:
return value_descriptor.get("value")
[docs]class DigestsNotEqualError(AssertionError):
pass
[docs]def compare_files_with_hash(file1: Path, file2: Path, hash_name: str = "md5"):
file1_digest = hash_file(file1, hash_name)
file2_digest = hash_file(file2, hash_name)
if file1_digest != file2_digest:
raise DigestsNotEqualError(
f"{hash_name} digests differ between expected identical files "
f"{file1}, {file2}"
)
[docs]def hash_file(path: Path, hash_name: str = "md5") -> str:
assert hash_name in hashlib.algorithms_guaranteed
with open(path, "rb") as inp:
hashobj = hashlib.new(hash_name)
hashobj.update(inp.read())
return hashobj.hexdigest()
[docs]def verify_digests(path: Path, digests: dict):
for hash_name, expected_digest in digests.items():
try:
actual_digest = hash_file(path, hash_name)
except AssertionError: # TODO: test this
LOG.warning(
"Hash algorithm %s is not supported; cannot verify file %s",
hash_name,
path,
)
continue
if actual_digest != expected_digest:
raise DigestsNotEqualError(
f"{hash_name} digest {actual_digest} of file "
f"{path} does match expected value {expected_digest}"
)
[docs]class PollingException(Exception):
"""Base exception that stores the last result seen."""
def __init__(self, last=None):
self.last = last
[docs]class TimeoutException(PollingException):
"""Exception raised if polling function times out"""
[docs]class MaxCallException(PollingException):
"""Exception raised if maximum number of iterations is exceeded"""
[docs]def poll(
target: Callable,
step: int = 1,
args: Optional[Sequence] = None,
kwargs: Optional[dict] = None,
timeout: Optional[int] = None,
max_tries: Optional[int] = None,
check_success: Callable = bool,
step_function: Optional[Callable[[int, int], int]] = None,
ignore_exceptions: Sequence = (),
):
"""
Poll by calling a target function until a certain condition is met. You must specify
at least a target function to be called and the step -- base wait time between
each function call.
Vendored from the [polling](https://github.com/justiniso/polling) package.
Args:
target: The target callable
step: Step defines the amount of time to wait (in seconds)
args: Arguments to be passed to the target function
kwargs: Keyword arguments to be passed to the target function
timeout: The target function will be called until the time elapsed is greater
than the maximum timeout (in seconds). NOTE that the actual execution
time of the function *can* exceed the time specified in the timeout. For
instance, if the target function takes 10 seconds to execute and the timeout
is 21 seconds, the polling function will take a total of 30 seconds (two
iterations of the target --20s which is less than the timeout--21s,
and a final iteration)
max_tries: Maximum number of times the target function will be called before
failing
check_success: A callback function that accepts the return value of the target
function. It must return true if you want the polling function to stop
and return this value. It must return false if you want to continue
polling. You may also use this function to collect non-success values. The
default is a callback that tests for truthiness (anything not False, 0,
or empty collection).
step_function: A callback function that accepts two arguments: current_step,
num_tries; and returns the next step value. By default, this is constant,
but you can also pass a function that will increase or decrease the step.
As an example, you can increase the wait time between calling the target
function by 10 seconds every iteration until the step is 100 seconds--at
which point it should remain constant at 100 seconds
>>> def my_step_function(current_step: int, num_tries: int) -> int:
>>> return max(current_step + 10, 100)
ignore_exceptions: You can specify a tuple of exceptions that should be caught
and ignored on every iteration. If the target function raises one of
these exceptions, it will be caught and the exception instance will be
pushed to the queue of values collected during polling. Any other exceptions
raised will be raised as normal.
Returns:
The first value from the target function that meets the condions of the
check_success callback. By default, this will be the first value that is not
None, 0, False, '', or an empty collection.
"""
max_time = time.time() + timeout if timeout else None
tries = 0
last_item = None
if args is None:
args = ()
if kwargs is None:
kwargs = {}
while True:
if max_tries and tries >= max_tries:
raise MaxCallException(last_item)
try:
val = target(*args, **kwargs)
last_item = val
except ignore_exceptions as e:
last_item = e
else:
# Condition passes, this is the only "successful" exit from the
# polling function
if check_success(val):
return val
tries += 1
# Check the time after to make sure the poll function is called at least once
if max_time and time.time() >= max_time:
raise TimeoutException(last_item)
time.sleep(step)
if step_function:
step = step_function(step, tries)