Source code for pytest_wdl.utils

#! /usr/bin/env python
#
#    Copyright 2019 Eli Lilly and Company
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#
# TODO: some of the code here can be replaced by functions in xphyle.{paths,utils}
from collections import defaultdict
import contextlib
import fnmatch
import logging
import os
from pathlib import Path
import re
import shutil
import stat
import tempfile
from typing import (
    Dict, Generic, Iterable, Optional, Sequence, Type, TypeVar, Union, cast
)

from pkg_resources import EntryPoint, iter_entry_points
from py._path.local import LocalPath


LOG = logging.getLogger("pytest-wdl")
LOG.setLevel(os.environ.get("LOGLEVEL", "WARNING").upper())

ENV_PATH = "PATH"
ENV_CLASSPATH = "CLASSPATH"
DEFAULT_CLASSPATH = "."

UNSAFE_RE = re.compile(r"[^\w.-]")

T = TypeVar("T")


[docs]class PluginFactory(Generic[T]): """ Lazily loads a plugin class associated with a data type. """ def __init__(self, entry_point: EntryPoint, return_type: Type[T]): self.entry_point = entry_point self.return_type = return_type self.factory = None def __call__(self, *args, **kwargs) -> T: if self.factory is None: module = __import__( self.entry_point.module_name, fromlist=['__name__'], level=0 ) self.factory = getattr(module, self.entry_point.attrs[0]) plugin = self.factory(*args, **kwargs) if not isinstance(plugin, self.return_type): raise RuntimeError( f"Expected plugin {plugin} to be an instance of {self.return_type}" ) return cast(self.return_type, plugin)
[docs]def plugin_factory_map( return_type: Type[T], group: Optional[str] = None, entry_points: Optional[Iterable[EntryPoint]] = None ) -> Dict[str, PluginFactory[T]]: """ Creates a mapping of entry point name to `PluginFactory` for all discovered entry points in the specified group. Args: group: Entry point group name return_type: Expected return type entry_points: Returns: Dict mapping entry point name to `PluginFactory` instances """ if entry_points is None: entry_points = iter_entry_points(group=group) entry_point_map = defaultdict(list) for entry_point in entry_points: entry_point_map[entry_point.name].append(entry_point) factory_map = {} for name, points in entry_point_map.items(): if len(points) > 1: # Filter out built-ins points = list(filter( lambda point: not point.module_name.startswith("pytest_wdl"), points )) if len(points) > 1: raise RuntimeError( f"Multiple third-party plugins found in group {group} with the " f"same name: {name}" ) factory_map[name] = PluginFactory(points[0], return_type) return factory_map
[docs]def safe_string(s: str, replacement: str = "_") -> str: """ Makes a string safe by replacing non-word characters. Args: s: The string to make safe replacement: The replacement stringj Returns: The safe string """ return UNSAFE_RE.sub(replacement, s)
# def deprecated(f: Callable): # """ # Decorator for deprecated functions/methods. Deprecated functionality will be # removed before each major release. # """ # def decorator(*args, **kwargs): # LOG.warning(f"Function/method {f.__name__} is deprecated and will be removed") # f(*args, **kwargs) # return decorator
[docs]@contextlib.contextmanager def chdir(todir: Path): """ Context manager that temporarily changes directories. Args: todir: The directory to change to. """ curdir = Path.cwd() try: os.chdir(todir) yield todir finally: os.chdir(curdir)
[docs]@contextlib.contextmanager def tempdir(change_dir: bool = False) -> Path: """ Context manager that creates a temporary directory, yields it, and then deletes it after return from the yield. Args: change_dir: Whether to temporarily change to the temp dir. """ temp = ensure_path(tempfile.mkdtemp()) try: if change_dir: with chdir(temp): yield temp else: yield temp finally: shutil.rmtree(temp)
[docs]@contextlib.contextmanager def context_dir( path: Optional[Path] = None, change_dir: bool = False, cleanup: Optional[bool] = None ) -> Path: """ Context manager that looks for a specific environment variable to specify a directory. If the environment variable is not set, a temporary directory is created and cleaned up upon return from the yield. Args: path: The environment variable to look for. change_dir: Whether to change to the directory. cleanup: Whether to delete the directory when exiting the context. If None, the directory is only deleted if a temporary directory is created. Yields: A directory path. """ if cleanup is None: cleanup = path is None if not path: path = Path(tempfile.mkdtemp()) elif not path.exists(): path.mkdir(parents=True) try: if change_dir: with chdir(path): yield path else: yield path finally: if cleanup and path.exists(): shutil.rmtree(path)
[docs]def ensure_path( path: Union[str, LocalPath, Path], search_paths: Optional[Sequence[Path]] = None, canonicalize: bool = True, exists: Optional[bool] = None, is_file: Optional[bool] = None, executable: Optional[bool] = None, create: bool = False ) -> Path: """ Converts a string path or :class:`py.path.local.LocalPath` to a :class:`pathlib.Path`. Args: path: The path to convert. search_paths: Directories to search for `path` if it is not already absolute. If `exists` is True, looks for the first search path that contains the file, otherwise just uses the first search path. canonicalize: Whether to return the canonicalized version of the path - expand home directory shortcut (~), make absolute, and resolve symlinks. exists: If True, raise an exception if the path does not exist; if False, raise an exception if the path does exist. is_file: If True, raise an exception if the path is not a file; if False, raise an exception if the path is not a directory. executable: If True and `is_file` is True and the file exists, raise an exception if it is not executable. create: Create the directory (or parent, if `path` is an existing file) if it does not exist. Ignored if `exists` is True. Returns: A `pathlib.Path` object. """ if isinstance(path, Path): p = cast(Path, path) else: p = Path(str(path)) p = Path(os.path.expandvars(p)) if canonicalize: p = p.expanduser() if search_paths and not p.is_absolute(): if exists: for search_path in search_paths: p_tmp = search_path / p if p_tmp.exists(): p = p_tmp.absolute() break else: p = (search_paths[0] / p).absolute() p = p.resolve() if p.exists(): if exists is False: raise FileExistsError(f"Path {p} already exists") if is_file is True: if p.is_dir(): raise IsADirectoryError(f"Path {p} is not a file") elif executable and not is_executable(p): raise OSError(f"File {p} is not executable") elif is_file is False and not p.is_dir(): raise NotADirectoryError(f"Path {p} is not a directory") elif exists is True: raise FileNotFoundError(f"Path {p} does not exist") elif create: if is_file: p.parent.mkdir(parents=True, exist_ok=True) else: p.mkdir(parents=True, exist_ok=True) return p
[docs]def resolve_file( filename: Union[str, Path], project_root: Path, assert_exists: bool = True ) -> Optional[Path]: """ Finds `filename` under `project_root` or in the project path. Args: filename: The filename, relative path, or absolute path to resolve. project_root: The project root dir. assert_exists: Whether to raise an error if the file cannot be found. Returns: A `pathlib.Path` object, or None if the file cannot be found and `assert_exists` is False. Raises: FileNotFoundError if the file cannot be found and `assert_exists` is True. """ path = ensure_path(filename, canonicalize=False) is_abs = path.is_absolute() if is_abs and path.exists(): return path if not is_abs: check_path = ensure_path(project_root / path) if check_path.exists(): return check_path # Search in cwd check_path = find_project_path(path) if check_path and check_path.exists(): return check_path # Search upward from project root check_path = find_project_path(path, start=project_root) if check_path and check_path.exists(): return check_path if assert_exists: raise FileNotFoundError(f"Could not resolve file: {filename}") else: return None
[docs]def find_project_path( *filenames: Union[str, Path], start: Optional[Path] = None, return_parent: bool = False, assert_exists: bool = False ) -> Optional[Path]: """ Starting from `path` folder and moving upwards, search for any of `filenames` and return the first path containing any one of them. Args: *filenames: Filenames to search. Either a string filename, or a sequence of string path elements. start: Starting folder return_parent: Whether to return the containing folder or the discovered file. assert_exists: Whether to raise an exception if a file cannot be found. Returns: A `Path`, or `None` if no folder is found that contains any of `filenames`. If `return_parent` is `False` and more than one of the files is found one of the files is randomly selected for return. Raises: FileNotFoundError if the file cannot be found and `assert_exists` is True. """ path = start or Path.cwd() while path != path.parent: for filename in filenames: if isinstance(filename, str): found = list(path.glob(filename)) found = found[0] if found else None else: found = path / filename if not found.exists(): found = None if found: LOG.debug("Found %s in %s", filename, path) if return_parent: return path else: return found else: path = path.parent if assert_exists: raise FileNotFoundError( f"Could not find any of {','.join(str(f) for f in filenames)} " f"starting from {start}" ) return None
[docs]def find_executable_path( executable: str, search_path: Optional[Sequence[Path]] = None ) -> Optional[Path]: """Finds 'executable' in `search_path`. Args: executable: The name of the executable to find. search_path: The list of directories to search. If None, the system search path (defined by the $PATH environment variable) is used. Returns: Absolute path of the executable, or None if no matching executable was found. """ if search_path is None: if ENV_PATH in os.environ: search_path = [Path(p) for p in os.environ[ENV_PATH].split(os.pathsep)] else: return None for path in search_path: exe_path = path / executable if exe_path.exists() and is_executable(exe_path): return exe_path else: return None
[docs]def is_executable(path: Path) -> bool: """ Checks if a path is executable. Args: path: The path to check Returns: True if `path` exists and is executable by the user, otherwise False. """ return path.exists() and os.stat(path).st_mode & stat.S_IXUSR
[docs]def find_in_classpath(glob: str) -> Optional[Path]: """ Attempts to find a .jar file matching the specified glob pattern in the Java classpath. Args: glob: JAR filename pattern Returns: Path to the JAR file, or None if a matching file is not found. """ classpath = os.environ.get(ENV_CLASSPATH, DEFAULT_CLASSPATH) for path_str in classpath.split(os.pathsep): path = ensure_path(path_str) if path.exists(): if path.is_dir(): matches = list(path.glob(glob)) if matches: if len(matches) > 1: LOG.warning( "Found multiple jar files matching pattern %s: %s;" "returning the first one.", glob, matches ) return matches[0] elif path.exists() and fnmatch.fnmatch(path.name, glob): return path
[docs]def env_map(d: dict) -> dict: """ Given a mapping of keys to value descriptors, creates a mapping of the keys to the described values. """ envmap = {} for name, value_descriptor in d.items(): value = resolve_value_descriptor(value_descriptor) if value: envmap[name] = value return envmap
[docs]def resolve_value_descriptor(value_descriptor: Union[str, dict]) -> Optional: """ Resolves the value of a value descriptor, which may be an environment variable name, or a map with keys `env` (the environment variable name) and `value` (the value to use if `env` is not specified or if the environment variable is unset. Args: value_descriptor: Returns: """ if isinstance(value_descriptor, str): return os.environ.get(value_descriptor) elif "env" in value_descriptor: return os.environ.get( value_descriptor["env"], value_descriptor.get("value") ) else: return value_descriptor.get("value")