|
1 | | -import os |
2 | | -import typing as ty |
3 | | -import itertools |
4 | | -from pathlib import Path |
5 | | -from fileformats.core.fileset import FileSet |
6 | | -from fileformats.core.exceptions import ( |
7 | | - FormatMismatchError, |
8 | | - UnconstrainedExtensionException, |
9 | | -) |
10 | | -from fileformats.core import hook, SampleFileGenerator |
11 | | -from fileformats.core.utils import classproperty |
12 | | -from fileformats.core.mixin import WithClassifiers |
| 1 | +from .fsobject import FsObject # noqa: F401 |
| 2 | +from .file import File # noqa: F401 |
| 3 | +from .directory import Directory, DirectoryContaining # noqa: F401 |
| 4 | +from .set import TypedSet, SetOf # noqa: F401 |
| 5 | +from . import generate_sample_data # noqa: F401 |
13 | 6 |
|
14 | 7 |
|
15 | | -class FsObject(FileSet, os.PathLike): |
16 | | - "Generic file-system object, can be either a file or a directory" |
17 | | - |
18 | | - @hook.required |
19 | | - @property |
20 | | - def fspath(self): |
21 | | - if len(self.fspaths) > 1: |
22 | | - raise FormatMismatchError( |
23 | | - f"More than one fspath ({self.fspaths}) provided to {self}, " |
24 | | - f"primary path is ambiguous" |
25 | | - ) |
26 | | - return next(iter(self.fspaths)) |
27 | | - |
28 | | - def __str__(self): |
29 | | - return str(self.fspath) |
30 | | - |
31 | | - def __fspath__(self): |
32 | | - """Render to string, so can be treated as any other file-system path, i.e. passed |
33 | | - to functions like file 'open'""" |
34 | | - return str(self) |
35 | | - |
36 | | - @property |
37 | | - def stem(self): |
38 | | - return self.fspath.with_suffix("").name |
39 | | - |
40 | | - @classproperty |
41 | | - def unconstrained(cls) -> bool: |
42 | | - """Whether the file-format is unconstrained by extension, magic number or another |
43 | | - constraint""" |
44 | | - # We have to subtract `fspath` from required properties as we defined unconstrained |
45 | | - # file-sets as ones that have more constraints than simply existing |
46 | | - return not (len(list(cls.required_properties())) - 1) |
47 | | - |
48 | | - |
49 | | -class File(FsObject): |
50 | | - """Generic file type""" |
51 | | - |
52 | | - binary = True |
53 | | - is_dir = False |
54 | | - |
55 | | - @hook.required |
56 | | - @property |
57 | | - def fspath(self): |
58 | | - fspath = self.select_by_ext() |
59 | | - if fspath.is_dir(): |
60 | | - # fspath is guaranteed to exist |
61 | | - raise FormatMismatchError( |
62 | | - f'Path that matches extension of {type(self)}, "{fspath}", ' |
63 | | - f"is a directory not a file" |
64 | | - ) |
65 | | - return fspath |
66 | | - |
67 | | - @classproperty |
68 | | - def unconstrained(cls) -> bool: |
69 | | - """Whether the file-format is unconstrained by extension, magic number or another |
70 | | - constraint""" |
71 | | - return super().unconstrained and (cls.ext is None or None in cls.alternate_exts) |
72 | | - |
73 | | - @classmethod |
74 | | - def copy_ext( |
75 | | - cls, |
76 | | - old_path: Path, |
77 | | - new_path: Path, |
78 | | - decomposition_mode=FileSet.ExtensionDecomposition.none, |
79 | | - ): |
80 | | - """Copy extension from the old path to the new path, ensuring that all |
81 | | - of the extension is used (e.g. 'my.gz' instead of 'gz') |
82 | | -
|
83 | | - Parameters |
84 | | - ---------- |
85 | | - old_path: Path or str |
86 | | - The path from which to copy the extension from |
87 | | - new_path: Path or str |
88 | | - The path to append the extension to |
89 | | - decomposition_mode : FileSet.ExtensionDecomposition, optional |
90 | | - if the file doesn't have an explicit extension, how to interpret "." within |
91 | | - the filename |
92 | | -
|
93 | | - Returns |
94 | | - ------- |
95 | | - Path |
96 | | - The new path with the copied extension |
97 | | - """ |
98 | | - if not cls.matching_exts([old_path], [cls.ext]): |
99 | | - raise FormatMismatchError( |
100 | | - f"Extension of old path ('{str(old_path)}') does not match that " |
101 | | - f"of file, '{cls.ext}'" |
102 | | - ) |
103 | | - suffix = ( |
104 | | - cls.ext |
105 | | - if cls.ext |
106 | | - else cls.decompose_fspath(old_path, mode=decomposition_mode)[-1] |
107 | | - ) |
108 | | - return Path(new_path).with_suffix(suffix) |
109 | | - |
110 | | - @property |
111 | | - def contents(self): |
112 | | - return self.read_contents() |
113 | | - |
114 | | - def read_contents(self, size=None, offset=0): |
115 | | - with open(self.fspath, "rb" if self.binary else "r") as f: |
116 | | - if offset: |
117 | | - f.read(offset) |
118 | | - contents = f.read(size) |
119 | | - return contents |
120 | | - |
121 | | - @property |
122 | | - def actual_ext(self): |
123 | | - "The actual file extension (out of the primary and alternate extensions possible)" |
124 | | - constrained_exts = [ |
125 | | - e for e in self.possible_exts if e is not None |
126 | | - ] # strip out unconstrained |
127 | | - matching = [e for e in constrained_exts if self.fspath.name.endswith(e)] |
128 | | - if not matching: |
129 | | - raise UnconstrainedExtensionException( |
130 | | - f"Cannot determine actual extension of {self.fspath}, as it doesn't " |
131 | | - f"match any of the defined extensions {constrained_exts} " |
132 | | - "(i.e. matches the None extension)" |
133 | | - ) |
134 | | - # Return the longest matching extension, useful for optional extensions |
135 | | - return sorted(matching, key=len)[-1] |
136 | | - |
137 | | - @property |
138 | | - def stem(self): |
139 | | - if self.actual_ext: |
140 | | - stem = self.fspath.name[: -len(self.actual_ext)] |
141 | | - else: |
142 | | - stem = self.fspath |
143 | | - return stem |
144 | | - |
145 | | - |
146 | | -class Directory(FsObject): |
147 | | - """Base directory to be overridden by subtypes that represent directories but don't |
148 | | - want to inherit content type "qualifers" (i.e. most of them)""" |
149 | | - |
150 | | - is_dir = True |
151 | | - |
152 | | - content_types = () |
153 | | - |
154 | | - @hook.required |
155 | | - @property |
156 | | - def fspath(self): |
157 | | - # fspaths are checked for existence with the exception of mock classes |
158 | | - dirs = [p for p in self.fspaths if not p.is_file()] |
159 | | - if not dirs: |
160 | | - raise FormatMismatchError(f"No directory paths provided {repr(self)}") |
161 | | - if len(dirs) > 1: |
162 | | - raise FormatMismatchError( |
163 | | - f"More than one directory path provided {dirs} to {repr(self)}" |
164 | | - ) |
165 | | - fspath = dirs[0] |
166 | | - missing = [] |
167 | | - for content_type in self.content_types: |
168 | | - match = False |
169 | | - for p in fspath.iterdir(): |
170 | | - try: |
171 | | - content_type([p]) |
172 | | - except FormatMismatchError: |
173 | | - continue |
174 | | - else: |
175 | | - match = True |
176 | | - break |
177 | | - if not match: |
178 | | - missing.append(content_type) |
179 | | - if missing: |
180 | | - raise FormatMismatchError( |
181 | | - f"Did not find matches for {missing} content types in {repr(self)}" |
182 | | - ) |
183 | | - return fspath |
184 | | - |
185 | | - @property |
186 | | - def contents(self): |
187 | | - for content_type in self.content_types: |
188 | | - for p in self.fspath.iterdir(): |
189 | | - try: |
190 | | - yield content_type([p]) |
191 | | - except FormatMismatchError: |
192 | | - continue |
193 | | - |
194 | | - @classproperty |
195 | | - def unconstrained(cls) -> bool: |
196 | | - """Whether the file-format is unconstrained by extension, magic number or another |
197 | | - constraint""" |
198 | | - return super().unconstrained and not cls.content_types |
199 | | - |
200 | | - @hook.check |
201 | | - def validate_contents(self): |
202 | | - if not self.content_types: |
203 | | - return |
204 | | - not_found = set(self.content_types) |
205 | | - for fspath in self.fspath.iterdir(): |
206 | | - for content_type in list(not_found): |
207 | | - if content_type.matches(fspath): |
208 | | - not_found.remove(content_type) |
209 | | - if not not_found: |
210 | | - return |
211 | | - assert not_found |
212 | | - raise FormatMismatchError( |
213 | | - f"Did not find the required content types, {not_found}, within the " |
214 | | - f"directory {self.fspath} of {self}" |
215 | | - ) |
216 | | - |
217 | | - def hash_files(self, relative_to=None, **kwargs): |
218 | | - if relative_to is None: |
219 | | - relative_to = self.fspath |
220 | | - return super().hash_files(relative_to=relative_to, **kwargs) |
221 | | - |
222 | | - |
223 | | -class TypedSet(FileSet): |
224 | | - """List of specific file types (similar to the contents of a directory but not |
225 | | - enclosed in one)""" |
226 | | - |
227 | | - content_types = () |
228 | | - |
229 | | - @property |
230 | | - def contents(self): |
231 | | - for content_type in self.content_types: |
232 | | - for p in self.fspaths: |
233 | | - try: |
234 | | - yield content_type([p]) |
235 | | - except FormatMismatchError: |
236 | | - continue |
237 | | - |
238 | | - @hook.check |
239 | | - def validate_contents(self): |
240 | | - if not self.content_types: |
241 | | - return |
242 | | - not_found = set(self.content_types) |
243 | | - for fspath in self.fspaths: |
244 | | - for content_type in list(not_found): |
245 | | - if content_type.matches(fspath): |
246 | | - not_found.remove(content_type) |
247 | | - if not not_found: |
248 | | - return |
249 | | - assert not_found |
250 | | - raise FormatMismatchError( |
251 | | - f"Did not find the required content types, {not_found}, within the " |
252 | | - f"given list {self.fspaths}" |
253 | | - ) |
254 | | - |
255 | | - |
256 | | -class DirectoryContaining(WithClassifiers, Directory): |
257 | | - """Generic directory classified by the formats of its contents""" |
258 | | - |
259 | | - # WithClassifiers-required class attrs |
260 | | - classifiers_attr_name = "content_types" |
261 | | - allowed_classifiers = (FileSet,) |
262 | | - generically_classifies = True |
263 | | - |
264 | | - |
265 | | -class SetOf(WithClassifiers, TypedSet): |
266 | | - # WithClassifiers-required class attrs |
267 | | - classifiers_attr_name = "content_types" |
268 | | - allowed_classifiers = (FileSet,) |
269 | | - generically_classifies = True |
270 | | - |
271 | | - |
272 | | -# Methods to generate sample files, typically used in testing |
273 | | -FILE_FILL_LENGTH = 256 |
274 | | - |
275 | | - |
276 | | -@FileSet.generate_sample_data.register |
277 | | -def fsobject_generate_sample_data( |
278 | | - fsobject: FsObject, |
279 | | - generator: SampleFileGenerator, |
280 | | -) -> ty.Iterable[Path]: |
281 | | - return [generator.generate(File, fill=FILE_FILL_LENGTH)] |
282 | | - |
283 | | - |
284 | | -@FileSet.generate_sample_data.register |
285 | | -def file_generate_sample_data( |
286 | | - file: File, |
287 | | - generator: SampleFileGenerator, |
288 | | -) -> ty.Iterable[Path]: |
289 | | - contents = None |
290 | | - if file.binary: |
291 | | - if hasattr(file, "magic_number"): |
292 | | - offset = getattr(file, "magic_number_offset", 0) |
293 | | - contents = os.urandom(offset) |
294 | | - magic_number = getattr(file, "magic_number", b"") |
295 | | - if isinstance(magic_number, str): |
296 | | - magic_number = bytes.fromhex(magic_number) |
297 | | - contents += magic_number |
298 | | - elif hasattr(file, "magic_pattern"): |
299 | | - raise NotImplementedError( |
300 | | - "Sampling of magic version file types is not implemented yet" |
301 | | - ) |
302 | | - fspaths = [generator.generate(file, contents=contents, fill=FILE_FILL_LENGTH)] |
303 | | - if hasattr(file, "header_type"): |
304 | | - fspaths.extend(file.header_type.sample_data(generator)) |
305 | | - if hasattr(file, "side_car_types"): |
306 | | - for side_car_type in file.side_car_types: |
307 | | - fspaths.extend(side_car_type.sample_data(generator)) |
308 | | - return fspaths |
309 | | - |
310 | | - |
311 | | -@FileSet.generate_sample_data.register |
312 | | -def directory_generate_sample_data( |
313 | | - directory: Directory, |
314 | | - generator: SampleFileGenerator, |
315 | | -) -> ty.Iterable[Path]: |
316 | | - a_dir = generator.generate_fspath(Directory) |
317 | | - a_dir.mkdir() |
318 | | - File.sample_data( |
319 | | - generator.child(dest_dir=a_dir) |
320 | | - ) # Add a sample file for good measure |
321 | | - return [a_dir] |
322 | | - |
323 | | - |
324 | | -@FileSet.generate_sample_data.register |
325 | | -def directory_containing_generate_sample_data( |
326 | | - directory: DirectoryContaining, |
327 | | - generator: SampleFileGenerator, |
328 | | -) -> ty.Iterable[Path]: |
329 | | - a_dir = generator.generate_fspath(Directory) |
330 | | - a_dir.mkdir() |
331 | | - for tp in directory.content_types: |
332 | | - tp.sample_data(generator.child(dest_dir=a_dir)) |
333 | | - return [a_dir] |
334 | | - |
335 | | - |
336 | | -@FileSet.generate_sample_data.register |
337 | | -def set_of_sample_data( |
338 | | - set_of: SetOf, |
339 | | - generator: SampleFileGenerator, |
340 | | -) -> ty.Iterable[Path]: |
341 | | - return list( |
342 | | - itertools.chain( |
343 | | - *(tp.sample_data(generator.child()) for tp in set_of.content_types) |
344 | | - ) |
345 | | - ) |
| 8 | +__all__ = ["FsObject", "File", "Directory", "DirectoryContaining", "TypedSet", "SetOf"] |
0 commit comments