#! /usr/bin/env python
#
# Copyright 2019 Eli Lilly and Company
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# TODO: some of the code here can be replaced by functions in xphyle.{paths,utils}
import contextlib
import fnmatch
import hashlib
import logging
import os
from pathlib import Path
import re
import shutil
import stat
import tempfile
from typing import Optional, Sequence, Union, cast
from py._path.local import LocalPath
LOG = logging.getLogger("pytest-wdl")
LOG.setLevel(os.environ.get("LOGLEVEL", "WARNING").upper())
ENV_PATH = "PATH"
ENV_CLASSPATH = "CLASSPATH"
DEFAULT_CLASSPATH = "."
UNSAFE_RE = re.compile(r"[^\w.-]")
[docs]def safe_string(s: str, replacement: str = "_") -> str:
"""
Makes a string safe by replacing non-word characters.
Args:
s: The string to make safe
replacement: The replacement stringj
Returns:
The safe string
"""
return UNSAFE_RE.sub(replacement, s)
# def deprecated(f: Callable):
# """
# Decorator for deprecated functions/methods. Deprecated functionality will be
# removed before each major release.
# """
# def decorator(*args, **kwargs):
# LOG.warning(f"Function/method {f.__name__} is deprecated and will be removed")
# f(*args, **kwargs)
# return decorator
[docs]@contextlib.contextmanager
def chdir(todir: Path):
"""
Context manager that temporarily changes directories.
Args:
todir: The directory to change to.
"""
curdir = Path.cwd()
try:
os.chdir(todir)
yield todir
finally:
os.chdir(curdir)
[docs]@contextlib.contextmanager
def tempdir(
change_dir: bool = False, tmproot: Optional[Path] = None,
cleanup: Optional[bool] = True
) -> Path:
"""
Context manager that creates a temporary directory, yields it, and then
deletes it after return from the yield.
Args:
change_dir: Whether to temporarily change to the temp dir.
tmproot: Root directory in which to create temporary directories.
cleanup: Whether to delete the temporary directory before exiting the context.
"""
temp = ensure_path(tempfile.mkdtemp(dir=tmproot))
try:
if change_dir:
with chdir(temp):
yield temp
else:
yield temp
finally:
if cleanup:
shutil.rmtree(temp)
[docs]@contextlib.contextmanager
def context_dir(
path: Optional[Path] = None, change_dir: bool = False,
cleanup: Optional[bool] = None
) -> Path:
"""
Context manager that looks for a specific environment variable to specify a
directory. If the environment variable is not set, a temporary directory is
created and cleaned up upon return from the yield.
Args:
path: The environment variable to look for.
change_dir: Whether to change to the directory.
cleanup: Whether to delete the directory when exiting the context. If None,
the directory is only deleted if a temporary directory is created.
Yields:
A directory path.
"""
if cleanup is None:
cleanup = path is None
if not path:
path = Path(tempfile.mkdtemp())
elif not path.exists():
path.mkdir(parents=True)
try:
if change_dir:
with chdir(path):
yield path
else:
yield path
finally:
if cleanup and path.exists():
shutil.rmtree(path, ignore_errors=True)
[docs]def ensure_path(
path: Union[str, LocalPath, Path],
search_paths: Optional[Sequence[Path]] = None,
canonicalize: bool = True,
exists: Optional[bool] = None,
is_file: Optional[bool] = None,
executable: Optional[bool] = None,
create: bool = False
) -> Path:
"""
Converts a string path or :class:`py.path.local.LocalPath` to a
:class:`pathlib.Path`.
Args:
path: The path to convert.
search_paths: Directories to search for `path` if it is not already absolute.
If `exists` is True, looks for the first search path that contains the file,
otherwise just uses the first search path.
canonicalize: Whether to return the canonicalized version of the path -
expand home directory shortcut (~), make absolute, and resolve symlinks.
exists: If True, raise an exception if the path does not exist; if False,
raise an exception if the path does exist.
is_file: If True, raise an exception if the path is not a file; if False,
raise an exception if the path is not a directory.
executable: If True and `is_file` is True and the file exists, raise an
exception if it is not executable.
create: Create the directory (or parent, if `is_file` = True) if
it does not exist. Ignored if `exists` is True.
Returns:
A `pathlib.Path` object.
"""
if isinstance(path, Path):
p = cast(Path, path)
else:
p = Path(str(path))
p = Path(os.path.expandvars(p))
if canonicalize:
p = p.expanduser()
if search_paths and not p.is_absolute():
if exists:
for search_path in search_paths:
p_tmp = search_path / p
if p_tmp.exists():
p = p_tmp.absolute()
break
else:
p = (search_paths[0] / p).absolute()
p = p.resolve()
if p.exists():
if exists is False:
raise FileExistsError(f"Path {p} already exists")
if is_file is True:
if p.is_dir():
raise IsADirectoryError(f"Path {p} is not a file")
elif executable and not is_executable(p):
raise OSError(f"File {p} is not executable")
elif is_file is False and not p.is_dir():
raise NotADirectoryError(f"Path {p} is not a directory")
elif exists is True:
raise FileNotFoundError(f"Path {p} does not exist")
elif create:
if is_file:
p.parent.mkdir(parents=True, exist_ok=True)
else:
p.mkdir(parents=True, exist_ok=True)
return p
[docs]def resolve_file(
filename: Union[str, Path], project_root: Path, assert_exists: bool = True
) -> Optional[Path]:
"""
Finds `filename` under `project_root` or in the project path.
Args:
filename: The filename, relative path, or absolute path to resolve.
project_root: The project root dir.
assert_exists: Whether to raise an error if the file cannot be found.
Returns:
A `pathlib.Path` object, or None if the file cannot be found and
`assert_exists` is False.
Raises:
FileNotFoundError if the file cannot be found and `assert_exists` is True.
"""
path = ensure_path(filename, canonicalize=False)
is_abs = path.is_absolute()
if is_abs and path.exists():
return path
if not is_abs:
check_path = ensure_path(project_root / path)
if check_path.exists():
return check_path
# Search in cwd
check_path = find_project_path(path)
if check_path and check_path.exists():
return check_path
# Search upward from project root
check_path = find_project_path(path, start=project_root)
if check_path and check_path.exists():
return check_path
if assert_exists:
raise FileNotFoundError(f"Could not resolve file: {filename}")
else:
return None
[docs]def find_project_path(
*filenames: Union[str, Path],
start: Optional[Path] = None,
return_parent: bool = False,
assert_exists: bool = False
) -> Optional[Path]:
"""
Starting from `path` folder and moving upwards, search for any of `filenames` and
return the first path containing any one of them.
Args:
*filenames: Filenames to search. Either a string filename, or a sequence of
string path elements.
start: Starting folder
return_parent: Whether to return the containing folder or the discovered file.
assert_exists: Whether to raise an exception if a file cannot be found.
Returns:
A `Path`, or `None` if no folder is found that contains any of `filenames`.
If `return_parent` is `False` and more than one of the files is found one
of the files is randomly selected for return.
Raises:
FileNotFoundError if the file cannot be found and `assert_exists` is True.
"""
path = start or Path.cwd()
while path != path.parent:
for filename in filenames:
if isinstance(filename, str):
found = list(path.glob(filename))
found = found[0] if found else None
else:
found = path / filename
if not found.exists():
found = None
if found:
LOG.debug("Found %s in %s", filename, path)
if return_parent:
return path
else:
return found
else:
path = path.parent
if assert_exists:
raise FileNotFoundError(
f"Could not find any of {','.join(str(f) for f in filenames)} "
f"starting from {start}"
)
return None
[docs]def find_executable_path(
executable: str, search_path: Optional[Sequence[Path]] = None
) -> Optional[Path]:
"""Finds 'executable' in `search_path`.
Args:
executable: The name of the executable to find.
search_path: The list of directories to search. If None, the system search
path (defined by the $PATH environment variable) is used.
Returns:
Absolute path of the executable, or None if no matching executable was found.
"""
if search_path is None:
if ENV_PATH in os.environ:
search_path = [Path(p) for p in os.environ[ENV_PATH].split(os.pathsep)]
else:
return None
for path in search_path:
exe_path = path / executable
if exe_path.exists() and is_executable(exe_path):
return exe_path
else:
return None
[docs]def is_executable(path: Path) -> bool:
"""
Checks if a path is executable.
Args:
path: The path to check
Returns:
True if `path` exists and is executable by the user, otherwise False.
"""
return path.exists() and os.stat(path).st_mode & stat.S_IXUSR
[docs]def find_in_classpath(glob: str) -> Optional[Path]:
"""
Attempts to find a .jar file matching the specified glob pattern in the
Java classpath.
Args:
glob: JAR filename pattern
Returns:
Path to the JAR file, or None if a matching file is not found.
"""
classpath = os.environ.get(ENV_CLASSPATH, DEFAULT_CLASSPATH)
for path_str in classpath.split(os.pathsep):
path = ensure_path(path_str)
if path.exists():
if path.is_dir():
matches = list(path.glob(glob))
if matches:
if len(matches) > 1:
LOG.warning(
"Found multiple jar files matching pattern %s: %s;"
"returning the first one.", glob, matches
)
return matches[0]
elif path.exists() and fnmatch.fnmatch(path.name, glob):
return path
[docs]def env_map(d: dict) -> dict:
"""
Given a mapping of keys to value descriptors, creates a mapping of the keys to
the described values.
"""
envmap = {}
for name, value_descriptor in d.items():
value = resolve_value_descriptor(value_descriptor)
if value:
envmap[name] = value
return envmap
[docs]def resolve_value_descriptor(value_descriptor: Union[str, dict]) -> Optional:
"""
Resolves the value of a value descriptor, which may be an environment variable
name, or a map with keys `env` (the environment variable name) and `value` (the
value to use if `env` is not specified or if the environment variable is unset.
Args:
value_descriptor:
Returns:
"""
if isinstance(value_descriptor, str):
return os.environ.get(value_descriptor)
elif "env" in value_descriptor:
return os.environ.get(
value_descriptor["env"], value_descriptor.get("value")
)
else:
return value_descriptor.get("value")
[docs]class DigestsNotEqualError(AssertionError):
pass
[docs]def compare_files_with_hash(file1: Path, file2: Path, hash_name: str = "md5"):
file1_digest = hash_file(file1, hash_name)
file2_digest = hash_file(file2, hash_name)
if file1_digest != file2_digest:
raise DigestsNotEqualError(
f"{hash_name} digests differ between expected identical files "
f"{file1}, {file2}"
)
[docs]def hash_file(path: Path, hash_name: str = "md5") -> str:
assert hash_name in hashlib.algorithms_guaranteed
with open(path, "rb") as inp:
hashobj = hashlib.new(hash_name)
hashobj.update(inp.read())
return hashobj.hexdigest()
[docs]def verify_digests(path: Path, digests: dict):
for hash_name, expected_digest in digests.items():
try:
actual_digest = hash_file(path, hash_name)
except AssertionError: # TODO: test this
LOG.warning(
"Hash algorithm %s is not supported; cannot verify file %s",
hash_name, path
)
continue
if actual_digest != expected_digest:
raise DigestsNotEqualError(
f"{hash_name} digest {actual_digest} of file "
f"{path} does match expected value {expected_digest}"
)