Source code for pytest_wdl.utils

#! /usr/bin/env python
#
#    Copyright 2019 Eli Lilly and Company
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#
# TODO: some of the code here can be replaced by functions in xphyle.{paths,utils}
import contextlib
import fnmatch
import hashlib
import logging
import os
from pathlib import Path
import re
import shutil
import stat
import tempfile
from typing import Optional, Sequence, Union, cast

from py._path.local import LocalPath


LOG = logging.getLogger("pytest-wdl")
LOG.setLevel(os.environ.get("LOGLEVEL", "WARNING").upper())

ENV_PATH = "PATH"
ENV_CLASSPATH = "CLASSPATH"
DEFAULT_CLASSPATH = "."

UNSAFE_RE = re.compile(r"[^\w.-]")


[docs]def safe_string(s: str, replacement: str = "_") -> str: """ Makes a string safe by replacing non-word characters. Args: s: The string to make safe replacement: The replacement stringj Returns: The safe string """ return UNSAFE_RE.sub(replacement, s)
# def deprecated(f: Callable): # """ # Decorator for deprecated functions/methods. Deprecated functionality will be # removed before each major release. # """ # def decorator(*args, **kwargs): # LOG.warning(f"Function/method {f.__name__} is deprecated and will be removed") # f(*args, **kwargs) # return decorator
[docs]@contextlib.contextmanager def chdir(todir: Path): """ Context manager that temporarily changes directories. Args: todir: The directory to change to. """ curdir = Path.cwd() try: os.chdir(todir) yield todir finally: os.chdir(curdir)
[docs]@contextlib.contextmanager def tempdir( change_dir: bool = False, tmproot: Optional[Path] = None, cleanup: Optional[bool] = True ) -> Path: """ Context manager that creates a temporary directory, yields it, and then deletes it after return from the yield. Args: change_dir: Whether to temporarily change to the temp dir. tmproot: Root directory in which to create temporary directories. cleanup: Whether to delete the temporary directory before exiting the context. """ temp = ensure_path(tempfile.mkdtemp(dir=tmproot)) try: if change_dir: with chdir(temp): yield temp else: yield temp finally: if cleanup: shutil.rmtree(temp)
[docs]@contextlib.contextmanager def context_dir( path: Optional[Path] = None, change_dir: bool = False, cleanup: Optional[bool] = None ) -> Path: """ Context manager that looks for a specific environment variable to specify a directory. If the environment variable is not set, a temporary directory is created and cleaned up upon return from the yield. Args: path: The environment variable to look for. change_dir: Whether to change to the directory. cleanup: Whether to delete the directory when exiting the context. If None, the directory is only deleted if a temporary directory is created. Yields: A directory path. """ if cleanup is None: cleanup = path is None if not path: path = Path(tempfile.mkdtemp()) elif not path.exists(): path.mkdir(parents=True) try: if change_dir: with chdir(path): yield path else: yield path finally: if cleanup and path.exists(): shutil.rmtree(path, ignore_errors=True)
[docs]def ensure_path( path: Union[str, LocalPath, Path], search_paths: Optional[Sequence[Path]] = None, canonicalize: bool = True, exists: Optional[bool] = None, is_file: Optional[bool] = None, executable: Optional[bool] = None, create: bool = False ) -> Path: """ Converts a string path or :class:`py.path.local.LocalPath` to a :class:`pathlib.Path`. Args: path: The path to convert. search_paths: Directories to search for `path` if it is not already absolute. If `exists` is True, looks for the first search path that contains the file, otherwise just uses the first search path. canonicalize: Whether to return the canonicalized version of the path - expand home directory shortcut (~), make absolute, and resolve symlinks. exists: If True, raise an exception if the path does not exist; if False, raise an exception if the path does exist. is_file: If True, raise an exception if the path is not a file; if False, raise an exception if the path is not a directory. executable: If True and `is_file` is True and the file exists, raise an exception if it is not executable. create: Create the directory (or parent, if `is_file` = True) if it does not exist. Ignored if `exists` is True. Returns: A `pathlib.Path` object. """ if isinstance(path, Path): p = cast(Path, path) else: p = Path(str(path)) p = Path(os.path.expandvars(p)) if canonicalize: p = p.expanduser() if search_paths and not p.is_absolute(): if exists: for search_path in search_paths: p_tmp = search_path / p if p_tmp.exists(): p = p_tmp.absolute() break else: p = (search_paths[0] / p).absolute() p = p.resolve() if p.exists(): if exists is False: raise FileExistsError(f"Path {p} already exists") if is_file is True: if p.is_dir(): raise IsADirectoryError(f"Path {p} is not a file") elif executable and not is_executable(p): raise OSError(f"File {p} is not executable") elif is_file is False and not p.is_dir(): raise NotADirectoryError(f"Path {p} is not a directory") elif exists is True: raise FileNotFoundError(f"Path {p} does not exist") elif create: if is_file: p.parent.mkdir(parents=True, exist_ok=True) else: p.mkdir(parents=True, exist_ok=True) return p
[docs]def resolve_file( filename: Union[str, Path], project_root: Path, assert_exists: bool = True ) -> Optional[Path]: """ Finds `filename` under `project_root` or in the project path. Args: filename: The filename, relative path, or absolute path to resolve. project_root: The project root dir. assert_exists: Whether to raise an error if the file cannot be found. Returns: A `pathlib.Path` object, or None if the file cannot be found and `assert_exists` is False. Raises: FileNotFoundError if the file cannot be found and `assert_exists` is True. """ path = ensure_path(filename, canonicalize=False) is_abs = path.is_absolute() if is_abs and path.exists(): return path if not is_abs: check_path = ensure_path(project_root / path) if check_path.exists(): return check_path # Search in cwd check_path = find_project_path(path) if check_path and check_path.exists(): return check_path # Search upward from project root check_path = find_project_path(path, start=project_root) if check_path and check_path.exists(): return check_path if assert_exists: raise FileNotFoundError(f"Could not resolve file: {filename}") else: return None
[docs]def find_project_path( *filenames: Union[str, Path], start: Optional[Path] = None, return_parent: bool = False, assert_exists: bool = False ) -> Optional[Path]: """ Starting from `path` folder and moving upwards, search for any of `filenames` and return the first path containing any one of them. Args: *filenames: Filenames to search. Either a string filename, or a sequence of string path elements. start: Starting folder return_parent: Whether to return the containing folder or the discovered file. assert_exists: Whether to raise an exception if a file cannot be found. Returns: A `Path`, or `None` if no folder is found that contains any of `filenames`. If `return_parent` is `False` and more than one of the files is found one of the files is randomly selected for return. Raises: FileNotFoundError if the file cannot be found and `assert_exists` is True. """ path = start or Path.cwd() while path != path.parent: for filename in filenames: if isinstance(filename, str): found = list(path.glob(filename)) found = found[0] if found else None else: found = path / filename if not found.exists(): found = None if found: LOG.debug("Found %s in %s", filename, path) if return_parent: return path else: return found else: path = path.parent if assert_exists: raise FileNotFoundError( f"Could not find any of {','.join(str(f) for f in filenames)} " f"starting from {start}" ) return None
[docs]def find_executable_path( executable: str, search_path: Optional[Sequence[Path]] = None ) -> Optional[Path]: """Finds 'executable' in `search_path`. Args: executable: The name of the executable to find. search_path: The list of directories to search. If None, the system search path (defined by the $PATH environment variable) is used. Returns: Absolute path of the executable, or None if no matching executable was found. """ if search_path is None: if ENV_PATH in os.environ: search_path = [Path(p) for p in os.environ[ENV_PATH].split(os.pathsep)] else: return None for path in search_path: exe_path = path / executable if exe_path.exists() and is_executable(exe_path): return exe_path else: return None
[docs]def is_executable(path: Path) -> bool: """ Checks if a path is executable. Args: path: The path to check Returns: True if `path` exists and is executable by the user, otherwise False. """ return path.exists() and os.stat(path).st_mode & stat.S_IXUSR
[docs]def find_in_classpath(glob: str) -> Optional[Path]: """ Attempts to find a .jar file matching the specified glob pattern in the Java classpath. Args: glob: JAR filename pattern Returns: Path to the JAR file, or None if a matching file is not found. """ classpath = os.environ.get(ENV_CLASSPATH, DEFAULT_CLASSPATH) for path_str in classpath.split(os.pathsep): path = ensure_path(path_str) if path.exists(): if path.is_dir(): matches = list(path.glob(glob)) if matches: if len(matches) > 1: LOG.warning( "Found multiple jar files matching pattern %s: %s;" "returning the first one.", glob, matches ) return matches[0] elif path.exists() and fnmatch.fnmatch(path.name, glob): return path
[docs]def env_map(d: dict) -> dict: """ Given a mapping of keys to value descriptors, creates a mapping of the keys to the described values. """ envmap = {} for name, value_descriptor in d.items(): value = resolve_value_descriptor(value_descriptor) if value: envmap[name] = value return envmap
[docs]def resolve_value_descriptor(value_descriptor: Union[str, dict]) -> Optional: """ Resolves the value of a value descriptor, which may be an environment variable name, or a map with keys `env` (the environment variable name) and `value` (the value to use if `env` is not specified or if the environment variable is unset. Args: value_descriptor: Returns: """ if isinstance(value_descriptor, str): return os.environ.get(value_descriptor) elif "env" in value_descriptor: return os.environ.get( value_descriptor["env"], value_descriptor.get("value") ) else: return value_descriptor.get("value")
[docs]class DigestsNotEqualError(AssertionError): pass
[docs]def compare_files_with_hash(file1: Path, file2: Path, hash_name: str = "md5"): file1_digest = hash_file(file1, hash_name) file2_digest = hash_file(file2, hash_name) if file1_digest != file2_digest: raise DigestsNotEqualError( f"{hash_name} digests differ between expected identical files " f"{file1}, {file2}" )
[docs]def hash_file(path: Path, hash_name: str = "md5") -> str: assert hash_name in hashlib.algorithms_guaranteed with open(path, "rb") as inp: hashobj = hashlib.new(hash_name) hashobj.update(inp.read()) return hashobj.hexdigest()
[docs]def verify_digests(path: Path, digests: dict): for hash_name, expected_digest in digests.items(): try: actual_digest = hash_file(path, hash_name) except AssertionError: # TODO: test this LOG.warning( "Hash algorithm %s is not supported; cannot verify file %s", hash_name, path ) continue if actual_digest != expected_digest: raise DigestsNotEqualError( f"{hash_name} digest {actual_digest} of file " f"{path} does match expected value {expected_digest}" )