|
| 1 | +import os |
| 2 | +import typing as ty |
| 3 | +from pathlib import Path |
| 4 | +import re |
| 5 | +from contextlib import contextmanager |
| 6 | +import subprocess as sp |
| 7 | +from .utils import logger |
| 8 | + |
| 9 | + |
| 10 | +class FileSystemMountIndentifier: |
| 11 | + """Used to check the mount type that given file paths reside on in order to determine |
| 12 | + features that can be used (e.g. symlinks)""" |
| 13 | + |
| 14 | + @classmethod |
| 15 | + def on_cifs(cls, path: os.PathLike) -> bool: |
| 16 | + """ |
| 17 | + Check whether a file path is on a CIFS filesystem mounted in a POSIX host. |
| 18 | +
|
| 19 | + POSIX hosts are assumed to have the ``mount`` command. |
| 20 | +
|
| 21 | + On Windows, Docker mounts host directories into containers through CIFS |
| 22 | + shares, which has support for Minshall+French symlinks, or text files that |
| 23 | + the CIFS driver exposes to the OS as symlinks. |
| 24 | + We have found that under concurrent access to the filesystem, this feature |
| 25 | + can result in failures to create or read recently-created symlinks, |
| 26 | + leading to inconsistent behavior and ``FileNotFoundError`` errors. |
| 27 | +
|
| 28 | + This check is written to support disabling symlinks on CIFS shares. |
| 29 | +
|
| 30 | + NB: This function and sub-functions are copied from the nipype.utils.filemanip module |
| 31 | +
|
| 32 | +
|
| 33 | + NB: Adapted from https://github.com/nipy/nipype |
| 34 | + """ |
| 35 | + return cls.get_mount(path)[1] == "cifs" |
| 36 | + |
| 37 | + @classmethod |
| 38 | + def on_same_mount(cls, path1: os.PathLike, path2: os.PathLike) -> bool: |
| 39 | + """Checks whether two or paths are on the same logical file system""" |
| 40 | + return cls.get_mount(path1)[0] == cls.get_mount(path2)[0] |
| 41 | + |
| 42 | + @classmethod |
| 43 | + def get_mount(cls, path: os.PathLike) -> ty.Tuple[Path, str]: |
| 44 | + """Get the mount point for a given file-system path |
| 45 | +
|
| 46 | + Parameters |
| 47 | + ---------- |
| 48 | + path: os.PathLike |
| 49 | + the file-system path to identify the mount of |
| 50 | +
|
| 51 | + Returns |
| 52 | + ------- |
| 53 | + mount_point: os.PathLike |
| 54 | + the root of the mount the path sits on |
| 55 | + fstype : str |
| 56 | + the type of the file-system (e.g. ext4 or cifs)""" |
| 57 | + try: |
| 58 | + # Only the first match (most recent parent) counts, mount table sorted longest |
| 59 | + # to shortest |
| 60 | + return next( |
| 61 | + (Path(p), t) |
| 62 | + for p, t in cls.get_mount_table() |
| 63 | + if str(path).startswith(p) |
| 64 | + ) |
| 65 | + except StopIteration: |
| 66 | + return (Path("/"), "ext4") |
| 67 | + |
| 68 | + @classmethod |
| 69 | + def generate_cifs_table(cls) -> ty.List[ty.Tuple[str, str]]: |
| 70 | + """ |
| 71 | + Construct a reverse-length-ordered list of mount points that fall under a CIFS mount. |
| 72 | +
|
| 73 | + This precomputation allows efficient checking for whether a given path |
| 74 | + would be on a CIFS filesystem. |
| 75 | + On systems without a ``mount`` command, or with no CIFS mounts, returns an |
| 76 | + empty list. |
| 77 | +
|
| 78 | + """ |
| 79 | + exit_code, output = sp.getstatusoutput("mount") |
| 80 | + return cls.parse_mount_table(exit_code, output) |
| 81 | + |
| 82 | + @classmethod |
| 83 | + def parse_mount_table( |
| 84 | + cls, exit_code: int, output: str |
| 85 | + ) -> ty.List[ty.Tuple[str, str]]: |
| 86 | + """ |
| 87 | + Parse the output of ``mount`` to produce (path, fs_type) pairs. |
| 88 | +
|
| 89 | + Separated from _generate_cifs_table to enable testing logic with real |
| 90 | + outputs |
| 91 | +
|
| 92 | + """ |
| 93 | + # Not POSIX |
| 94 | + if exit_code != 0: |
| 95 | + return [] |
| 96 | + |
| 97 | + # Linux mount example: sysfs on /sys type sysfs (rw,nosuid,nodev,noexec) |
| 98 | + # <PATH>^^^^ ^^^^^<FSTYPE> |
| 99 | + # OSX mount example: /dev/disk2 on / (hfs, local, journaled) |
| 100 | + # <PATH>^ ^^^<FSTYPE> |
| 101 | + pattern = re.compile(r".*? on (/.*?) (?:type |\()([^\s,\)]+)") |
| 102 | + |
| 103 | + # Keep line and match for error reporting (match == None on failure) |
| 104 | + # Ignore empty lines |
| 105 | + matches = [(ll, pattern.match(ll)) for ll in output.strip().splitlines() if ll] |
| 106 | + |
| 107 | + # (path, fstype) tuples, sorted by path length (longest first) |
| 108 | + mount_info = sorted( |
| 109 | + (match.groups() for _, match in matches if match is not None), |
| 110 | + key=lambda x: len(x[0]), |
| 111 | + reverse=True, |
| 112 | + ) |
| 113 | + cifs_paths = [path for path, fstype in mount_info if fstype.lower() == "cifs"] |
| 114 | + |
| 115 | + # Report failures as warnings |
| 116 | + for line, match in matches: |
| 117 | + if match is None: |
| 118 | + logger.debug("Cannot parse mount line: '%s'", line) |
| 119 | + |
| 120 | + return [ |
| 121 | + mount |
| 122 | + for mount in mount_info |
| 123 | + if any(mount[0].startswith(path) for path in cifs_paths) |
| 124 | + ] |
| 125 | + |
| 126 | + @classmethod |
| 127 | + def get_mount_table(cls) -> ty.List[ty.Tuple[str, str]]: |
| 128 | + if cls._mount_table is None: |
| 129 | + cls._mount_table = cls.generate_cifs_table() |
| 130 | + return cls._mount_table |
| 131 | + |
| 132 | + @classmethod |
| 133 | + @contextmanager |
| 134 | + def patch_table(cls, mount_table: ty.List[ty.Tuple[str, str]]): |
| 135 | + """Patch the mount table with new values. Used in test routines""" |
| 136 | + orig_table = cls._mount_table |
| 137 | + cls._mount_table = list(mount_table) |
| 138 | + try: |
| 139 | + yield |
| 140 | + finally: |
| 141 | + cls._mount_table = orig_table |
| 142 | + |
| 143 | + _mount_table: ty.Optional[ty.List[ty.Tuple[str, str]]] = None |
0 commit comments