From dc7f7c5be873441c8cbcf97e6ef42cffba737d5d Mon Sep 17 00:00:00 2001 From: Teque5 Date: Tue, 23 Dec 2025 13:53:55 -0800 Subject: [PATCH] fix read_samples from SigMF archive * When reading from a SigMF (.sigmf) archive, slicing and reading entire file worked, but not reading specific sample count * add test for archive read_samples and refactor related tests --- sigmf/__init__.py | 2 +- sigmf/sigmffile.py | 35 +++--- tests/test_archive.py | 246 +++++++++++++++++++++++------------------- 3 files changed, 160 insertions(+), 123 deletions(-) diff --git a/sigmf/__init__.py b/sigmf/__init__.py index 63e6b39..b5bdcf3 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -5,7 +5,7 @@ # SPDX-License-Identifier: LGPL-3.0-or-later # version of this python module -__version__ = "1.2.13" +__version__ = "1.2.14" # matching version of the SigMF specification __specification__ = "1.2.5" diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 81f6683..66aa0b9 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -177,6 +177,7 @@ def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksu """ super().__init__() self.data_file = None + self.data_buffer = None self.sample_count = 0 self._memmap = None self.is_complex_data = False # numpy.iscomplexobj(self._memmap) is not adequate for fixed-point complex case @@ -490,23 +491,28 @@ def _count_samples(self): use 0. For complex data, a 'sample' includes both the real and imaginary part. """ - if self.data_file is None: + if self.data_file is None and self.data_buffer is None: sample_count = self._get_sample_count_from_annotations() else: header_bytes = sum([c.get(self.HEADER_BYTES_KEY, 0) for c in self.get_captures()]) - file_size = self.data_file.stat().st_size if self.data_size_bytes is None else self.data_size_bytes - file_data_size = file_size - self.get_global_field(self.TRAILING_BYTES_KEY, 0) - header_bytes # bytes + if self.data_file is not None: + file_bytes = self.data_file.stat().st_size if self.data_size_bytes is None else self.data_size_bytes + elif self.data_buffer is not None: + file_bytes = len(self.data_buffer.getbuffer()) if self.data_size_bytes is None else self.data_size_bytes + else: + file_bytes = 0 + sample_bytes = file_bytes - self.get_global_field(self.TRAILING_BYTES_KEY, 0) - header_bytes sample_size = self.get_sample_size() # size of a sample in bytes num_channels = self.get_num_channels() - sample_count = file_data_size // sample_size // num_channels - if file_data_size % (sample_size * num_channels) != 0: + sample_count = sample_bytes // sample_size // num_channels + if sample_bytes % (sample_size * num_channels) != 0: warnings.warn( - f"File `{self.data_file}` does not contain an integer number of samples across channels. " + f"Data source does not contain an integer number of samples across channels. " "It may be invalid data." ) if self._get_sample_count_from_annotations() > sample_count: warnings.warn( - f"File `{self.data_file}` ends before the final annotation in the corresponding SigMF metadata." + f"Data source ends before the final annotation in the corresponding SigMF metadata." ) self.sample_count = sample_count return sample_count @@ -735,7 +741,9 @@ def _read_datafile(self, first_byte, nitems, autoscale, raw_components): fp.seek(first_byte, 0) data = np.fromfile(fp, dtype=data_type_in, count=nitems) elif self.data_buffer is not None: - data = np.frombuffer(self.data_buffer.getbuffer(), dtype=data_type_in, count=nitems) + # handle offset for data_buffer like we do for data_file + buffer_data = self.data_buffer.getbuffer()[first_byte:] + data = np.frombuffer(buffer_data, dtype=data_type_in, count=nitems) else: data = self._memmap @@ -1065,10 +1073,13 @@ def fromarchive(archive_path, dir=None, skip_checksum=False): def fromfile(filename, skip_checksum=False): """ - Creates and returns a SigMFFile or SigMFCollection instance with metadata - loaded from the specified file. The filename may be that of either a - sigmf-meta file, a sigmf-data file, a sigmf-collection file, or a sigmf - archive. + Creates and returns a SigMFFile or SigMFCollection instance with metadata loaded from the specified file. + + The file can be one of: + * A SigMF Metadata file (.sigmf-meta) + * A SigMF Dataset file (.sigmf-data) + * A SigMF Collection file (.sigmf-collection) + * A SigMF Archive file (.sigmf-archive) Parameters ---------- diff --git a/tests/test_archive.py b/tests/test_archive.py index 1db92e0..c9d6e70 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -7,127 +7,132 @@ """Tests for SigMFArchive""" import codecs +import copy import json +import shutil import tarfile import tempfile +import unittest from pathlib import Path import jsonschema import numpy as np -import pytest -from sigmf import error +from sigmf import SigMFFile, __specification__, error, fromfile from sigmf.archive import SIGMF_DATASET_EXT, SIGMF_METADATA_EXT from .testdata import TEST_FLOAT32_DATA, TEST_METADATA -def create_test_archive(test_sigmffile, tmpfile): - sigmf_archive = test_sigmffile.archive(fileobj=tmpfile) - sigmf_tarfile = tarfile.open(sigmf_archive, mode="r", format=tarfile.PAX_FORMAT) - return sigmf_tarfile - - -def test_without_data_file_throws_fileerror(test_sigmffile): - test_sigmffile.data_file = None - with tempfile.NamedTemporaryFile() as temp: - with pytest.raises(error.SigMFFileError): - test_sigmffile.archive(name=temp.name) - - -def test_invalid_md_throws_validationerror(test_sigmffile): - del test_sigmffile._metadata["global"]["core:datatype"] # required field - with tempfile.NamedTemporaryFile() as temp: - with pytest.raises(jsonschema.exceptions.ValidationError): - test_sigmffile.archive(name=temp.name) - - -def test_name_wrong_extension_throws_fileerror(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - with pytest.raises(error.SigMFFileError): - test_sigmffile.archive(name=temp.name + ".zip") - - -def test_fileobj_extension_ignored(test_sigmffile): - with tempfile.NamedTemporaryFile(suffix=".tar") as temp: - test_sigmffile.archive(fileobj=temp) - - -def test_name_used_in_fileobj(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - sigmf_archive = test_sigmffile.archive(name="testarchive", fileobj=temp) - sigmf_tarfile = tarfile.open(sigmf_archive, mode="r") - basedir, file1, file2 = sigmf_tarfile.getmembers() - assert basedir.name == "testarchive" - - def filename(tarinfo): - return Path(tarinfo.name).stem - - assert filename(file1) == "testarchive" - assert filename(file2) == "testarchive" - - -def test_fileobj_not_closed(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - test_sigmffile.archive(fileobj=temp) - assert not temp.file.closed - - -def test_unwritable_fileobj_throws_fileerror(test_sigmffile): - with tempfile.NamedTemporaryFile(mode="rb") as temp: - with pytest.raises(error.SigMFFileError): - test_sigmffile.archive(fileobj=temp) - - -def test_unwritable_name_throws_fileerror(test_sigmffile): - # Cannot assume /root/ is unwritable (e.g. Docker environment) - # so use invalid filename - unwritable_file = "/bad_name/" - with pytest.raises(error.SigMFFileError): - test_sigmffile.archive(name=unwritable_file) - - -def test_tarfile_layout(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - sigmf_tarfile = create_test_archive(test_sigmffile, temp) - basedir, file1, file2 = sigmf_tarfile.getmembers() - assert tarfile.TarInfo.isdir(basedir) - assert tarfile.TarInfo.isfile(file1) - assert tarfile.TarInfo.isfile(file2) - - -def test_tarfile_names_and_extensions(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - sigmf_tarfile = create_test_archive(test_sigmffile, temp) - basedir, file1, file2 = sigmf_tarfile.getmembers() +class TestSigMFArchive(unittest.TestCase): + """Tests for SigMF Archive functionality""" + + def setUp(self): + """Create temporary directory and test SigMFFile""" + self.temp_dir = Path(tempfile.mkdtemp()) + self.temp_path_data = self.temp_dir / "trash.sigmf-data" + self.temp_path_meta = self.temp_dir / "trash.sigmf-meta" + self.temp_path_archive = self.temp_dir / "test.sigmf" + TEST_FLOAT32_DATA.tofile(self.temp_path_data) + self.sigmf_object = SigMFFile(copy.deepcopy(TEST_METADATA), data_file=self.temp_path_data) + self.sigmf_object.tofile(self.temp_path_meta) + self.sigmf_object.tofile(self.temp_path_archive, toarchive=True) + self.sigmf_tarfile = tarfile.open(self.temp_path_archive, mode="r", format=tarfile.PAX_FORMAT) + + def tearDown(self): + """Clean up temporary directory""" + shutil.rmtree(self.temp_dir) + + def test_archive_creation_requires_data_file(self): + """Test that archiving without data file raises error""" + self.sigmf_object.data_file = None + with self.assertRaises(error.SigMFFileError): + self.sigmf_object.archive(name=self.temp_path_archive) + + def test_archive_creation_validates_metadata(self): + """Test that invalid metadata raises error""" + del self.sigmf_object._metadata["global"]["core:datatype"] # required field + with self.assertRaises(jsonschema.exceptions.ValidationError): + self.sigmf_object.archive(name=self.temp_path_archive) + + def test_archive_creation_validates_extension(self): + """Test that wrong extension raises error""" + wrong_name = self.temp_dir / "temp_archive.zip" + with self.assertRaises(error.SigMFFileError): + self.sigmf_object.archive(name=wrong_name) + + def test_fileobj_ignores_extension(self): + """Test that file object extension is ignored""" + temp_archive_tar = self.temp_dir / "test.sigmf.tar" + with open(temp_archive_tar, "wb") as temp: + self.sigmf_object.archive(fileobj=temp) + + def test_custom_name_overrides_fileobj_name(self): + """Test that name is used in file object""" + with open(self.temp_path_archive, "wb") as temp: + sigmf_archive = self.sigmf_object.archive(name="testarchive", fileobj=temp) + sigmf_tarfile = tarfile.open(sigmf_archive, mode="r") + basedir, file1, file2 = sigmf_tarfile.getmembers() + self.assertEqual(basedir.name, "testarchive") + self.assertEqual(Path(file1.name).stem, "testarchive") + self.assertEqual(Path(file2.name).stem, "testarchive") + + def test_fileobj_remains_open_after_archive(self): + """Test that file object is not closed after archiving""" + with open(self.temp_path_archive, "wb") as temp: + self.sigmf_object.archive(fileobj=temp) + self.assertFalse(temp.closed) + + def test_readonly_fileobj_raises_error(self): + """Test that unwritable file object raises error""" + temp_path = self.temp_dir / "temp_archive.sigmf" + temp_path.touch() + with open(temp_path, "rb") as temp: + with self.assertRaises(error.SigMFFileError): + self.sigmf_object.archive(fileobj=temp) + + def test_invalid_path_raises_error(self): + """Test that unwritable name raises error""" + # Cannot assume /root/ is unwritable (e.g. Docker environment) + # so use invalid filename + unwritable_file = "/bad_name/" + with self.assertRaises(error.SigMFFileError): + self.sigmf_object.archive(name=unwritable_file) + + def test_archive_contains_directory_and_files(self): + """Test archive layout structure""" + basedir, file1, file2 = self.sigmf_tarfile.getmembers() + self.assertTrue(tarfile.TarInfo.isdir(basedir)) + self.assertTrue(tarfile.TarInfo.isfile(file1)) + self.assertTrue(tarfile.TarInfo.isfile(file2)) + + def test_archive_files_have_correct_names_and_extensions(self): + """Test tarfile names and extensions""" + basedir, file1, file2 = self.sigmf_tarfile.getmembers() archive_name = basedir.name - assert archive_name == Path(temp.name).name + self.assertEqual(archive_name, Path(self.temp_path_archive).stem) file_extensions = {SIGMF_DATASET_EXT, SIGMF_METADATA_EXT} file1_name, file1_ext = Path(file1.name).stem, Path(file1.name).suffix - assert file1_name == archive_name - assert file1_ext in file_extensions + self.assertEqual(file1_name, archive_name) + self.assertIn(file1_ext, file_extensions) file_extensions.remove(file1_ext) file2_name, file2_ext = Path(file2.name).stem, Path(file2.name).suffix - assert file2_name == archive_name - assert file2_ext in file_extensions - - -def test_tarfile_persmissions(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - sigmf_tarfile = create_test_archive(test_sigmffile, temp) - basedir, file1, file2 = sigmf_tarfile.getmembers() - assert basedir.mode == 0o755 - assert file1.mode == 0o644 - assert file2.mode == 0o644 - - -def test_contents(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - sigmf_tarfile = create_test_archive(test_sigmffile, temp) - basedir, file1, file2 = sigmf_tarfile.getmembers() + self.assertEqual(file2_name, archive_name) + self.assertIn(file2_ext, file_extensions) + + def test_archive_files_have_correct_permissions(self): + """Test tarfile permissions""" + basedir, file1, file2 = self.sigmf_tarfile.getmembers() + self.assertEqual(basedir.mode, 0o755) + self.assertEqual(file1.mode, 0o644) + self.assertEqual(file2.mode, 0o644) + + def test_archive_contents_match_original_data(self): + """Test archive contents""" + _, file1, file2 = self.sigmf_tarfile.getmembers() if file1.name.endswith(SIGMF_METADATA_EXT): mdfile = file1 datfile = file2 @@ -136,18 +141,39 @@ def test_contents(test_sigmffile): datfile = file1 bytestream_reader = codecs.getreader("utf-8") # bytes -> str - mdfile_reader = bytestream_reader(sigmf_tarfile.extractfile(mdfile)) - assert json.load(mdfile_reader) == TEST_METADATA + mdfile_reader = bytestream_reader(self.sigmf_tarfile.extractfile(mdfile)) + self.assertEqual(json.load(mdfile_reader), TEST_METADATA) - datfile_reader = sigmf_tarfile.extractfile(datfile) + datfile_reader = self.sigmf_tarfile.extractfile(datfile) # calling `fileno` on `tarfile.ExFileObject` throws error (?), but # np.fromfile requires it, so we need this extra step data = np.frombuffer(datfile_reader.read(), dtype=np.float32) - assert np.array_equal(data, TEST_FLOAT32_DATA) - - -def test_tarfile_type(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - sigmf_tarfile = create_test_archive(test_sigmffile, temp) - assert sigmf_tarfile.format == tarfile.PAX_FORMAT + np.testing.assert_array_equal(data, TEST_FLOAT32_DATA) + + def test_tarfile_format(self): + """Tar file format is PAX""" + self.assertEqual(self.sigmf_tarfile.format, tarfile.PAX_FORMAT) + + def test_archive_read_samples(self): + """test that read_samples works correctly with archived data""" + # load from archive + archive_mdfile = fromfile(self.temp_path_archive) + + # verify sample count matches + expected_sample_count = len(self.sigmf_object) + self.assertEqual(archive_mdfile.sample_count, expected_sample_count) + + # verify read_samples returns same as slice + samples_orig = TEST_FLOAT32_DATA[3:13] + samples_read = archive_mdfile.read_samples(start_index=3, count=10) + samples_sliced = archive_mdfile[3:13] + np.testing.assert_array_equal(samples_orig, samples_sliced) + np.testing.assert_array_equal(samples_orig, samples_read) + + def test_archive_read_samples_beyond_end(self): + """test that read_samples beyond end of data raises error""" + meta = fromfile(self.temp_path_archive) + # FIXME: Should this raise a SigMFFileError instead? + with self.assertRaises(OSError): + meta.read_samples(start_index=meta.sample_count + 10, count=5)