diff --git a/tests/test_mig_shared_fileio.py b/tests/test_mig_shared_fileio.py index 4e5336edb..9402e4339 100644 --- a/tests/test_mig_shared_fileio.py +++ b/tests/test_mig_shared_fileio.py @@ -3,7 +3,7 @@ # --- BEGIN_HEADER --- # # test_mig_shared_fileio - unit test of the corresponding mig shared module -# Copyright (C) 2003-2024 The MiG Project by the Science HPC Center at UCPH +# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH # # This file is part of MiG. # @@ -30,6 +30,7 @@ import binascii import os import sys +import time import unittest # NOTE: wrap next imports in try except to prevent autopep8 shuffling up @@ -45,19 +46,48 @@ DUMMY_UNICODE = u'UniCode123½¾µßðþđŋħĸþł@ª€£$¥©®' DUMMY_UNICODE_LENGTH = len(DUMMY_UNICODE) DUMMY_FILE_WRITECHUNK = 'fileio/write_chunk' +DUMMY_DIRECTORY_NESTED = 'fileio/nested/dir/structure' +DUMMY_DIRECTORY_EMPTY = 'fileio/empty_dir' +DUMMY_DIRECTORY_MOVE_SRC = 'fileio/move_dir_src' +DUMMY_DIRECTORY_MOVE_DST = 'fileio/move_dir_dst' +# File/dir paths for move/copy operations +DUMMY_FILE_MOVE_SRC = 'fileio/move_src' +DUMMY_FILE_MOVE_DST = 'fileio/move_dst' +DUMMY_FILE_COPY_SRC = 'fileio/copy_src' +DUMMY_FILE_COPY_DST = 'fileio/copy_dst' +# NOTE: getsize returns 4k for directories +DUMMY_DIRECTORY_SIZE = 4096 DUMMY_FILE_WRITEFILE = 'fileio/write_file' +DUMMY_FILE_WRITEFILELINES = 'fileio/write_file_lines' +DUMMY_FILE_READFILE = 'fileio/read_file' +DUMMY_FILE_READFILELINES = 'fileio/read_file_lines' +DUMMY_FILE_READHEADLINES = 'fileio/read_head_lines' +DUMMY_FILE_READTAILLINES = 'fileio/read_tail_lines' +DUMMY_FILE_DELETEFILE = 'fileio/delete_file' +DUMMY_FILE_GETFILESIZE = 'fileio/get_file_size' +DUMMY_FILE_MAKESYMLINKSRC = 'fileio/make_symlink/link' +DUMMY_FILE_MAKESYMLINKDST = 'fileio/make_symlink/target' +DUMMY_FILE_DELETESYMLINKSRC = 'fileio/delete_symlink/link' +DUMMY_FILE_DELETESYMLINKDST = 'fileio/delete_symlink/target' +# NOTE: getsize returns 4k for directories +DUMMY_DIRECTORY_SIZE = 4096 assert isinstance(DUMMY_BYTES, bytes) class MigSharedFileio__write_chunk(MigTestCase): - # TODO: Add docstrings to this class and its methods + """Test the write_chunk function from mig.shared.fileio module""" + def setUp(self): + """Initialize test environment for write_chunk tests""" super(MigSharedFileio__write_chunk, self).setUp() self.tmp_path = temppath(DUMMY_FILE_WRITECHUNK, self) + # Output dir is created by default here + os.makedirs(os.path.dirname(self.tmp_path), exist_ok=True) cleanpath(os.path.dirname(DUMMY_FILE_WRITECHUNK), self) def test_return_false_on_invalid_data(self): + """Test write_chunk returns False with invalid data input""" self.logger.forgive_errors() # NOTE: we make sure to disable any forced stringification here @@ -66,6 +96,7 @@ def test_return_false_on_invalid_data(self): self.assertFalse(did_succeed) def test_return_false_on_invalid_offset(self): + """Test write_chunk returns False with negative offset value""" self.logger.forgive_errors() did_succeed = fileio.write_chunk(self.tmp_path, DUMMY_BYTES, -42, @@ -73,6 +104,7 @@ def test_return_false_on_invalid_offset(self): self.assertFalse(did_succeed) def test_return_false_on_invalid_dir(self): + """Test write_chunk returns False when path is a directory""" self.logger.forgive_errors() os.makedirs(self.tmp_path) @@ -81,12 +113,14 @@ def test_return_false_on_invalid_dir(self): self.assertFalse(did_succeed) def test_creates_directory(self): + """Test write_chunk creates parent directory when needed""" fileio.write_chunk(self.tmp_path, DUMMY_BYTES, 0, self.logger) path_kind = self.assertPathExists(DUMMY_FILE_WRITECHUNK) self.assertEqual(path_kind, "file") def test_store_bytes(self): + """Test write_chunk stores byte data correctly at offset 0""" fileio.write_chunk(self.tmp_path, DUMMY_BYTES, 0, self.logger) with open(self.tmp_path, 'rb') as file: @@ -95,6 +129,7 @@ def test_store_bytes(self): self.assertEqual(content[:], DUMMY_BYTES) def test_store_bytes_at_offset(self): + """Test write_chunk stores byte data at specified offset""" offset = 3 fileio.write_chunk(self.tmp_path, DUMMY_BYTES, offset, self.logger) @@ -108,6 +143,7 @@ def test_store_bytes_at_offset(self): @unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select") def test_store_bytes_in_text_mode(self): + """Test write_chunk stores byte data in text mode""" fileio.write_chunk(self.tmp_path, DUMMY_BYTES, 0, self.logger, mode="r+") @@ -118,6 +154,7 @@ def test_store_bytes_in_text_mode(self): @unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select") def test_store_unicode(self): + """Test write_chunk stores unicode data in text mode""" fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger, mode='r+') @@ -128,6 +165,7 @@ def test_store_unicode(self): @unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select") def test_store_unicode_in_binary_mode(self): + """Test write_chunk stores unicode data in binary mode""" fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger, mode='r+b') @@ -138,12 +176,17 @@ def test_store_unicode_in_binary_mode(self): class MigSharedFileio__write_file(MigTestCase): + """Test the write_file function from mig.shared.fileio module""" + def setUp(self): + """Initialize test environment for write_file tests""" super(MigSharedFileio__write_file, self).setUp() self.tmp_path = temppath(DUMMY_FILE_WRITEFILE, self) + # Output dir is created by default here cleanpath(os.path.dirname(DUMMY_FILE_WRITEFILE), self) def test_return_false_on_invalid_data(self): + """Test write_file returns False with non-string data input""" self.logger.forgive_errors() # NOTE: we make sure to disable any forced stringification here @@ -152,32 +195,38 @@ def test_return_false_on_invalid_data(self): self.assertFalse(did_succeed) def test_return_false_on_invalid_dir(self): + """Test write_file returns False when path is a directory""" self.logger.forgive_errors() os.makedirs(self.tmp_path) - did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger) + did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, + self.logger) self.assertFalse(did_succeed) def test_return_false_on_missing_dir(self): + """Test write_file returns False on missing parent dir""" self.logger.forgive_errors() - did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger, - make_parent=False) + did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, + self.logger, make_parent=False) self.assertFalse(did_succeed) def test_creates_directory(self): + """Test write_file creates parent directory when needed""" # TODO: temporarily use empty string to avoid any byte/unicode issues - # did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger) + # did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, + # self.logger) did_succeed = fileio.write_file('', self.tmp_path, self.logger) self.assertTrue(did_succeed) path_kind = self.assertPathExists(DUMMY_FILE_WRITEFILE) self.assertEqual(path_kind, "file") - def test_store_bytes(self): + # TODO: replace next test once we have auto adjust mode in write helper + def test_store_bytes_with_manual_adjust_mode(self): + """Test write_file stores byte data in with manual adjust mode call""" mode = 'w' - # TODO: remove next once we have auto adjust mode in write helper mode = fileio._auto_adjust_mode(DUMMY_BYTES, mode) did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger, mode=mode) @@ -190,6 +239,7 @@ def test_store_bytes(self): @unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select") def test_store_bytes_in_text_mode(self): + """Test write_file stores byte data when opening in text mode""" did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger, mode="w") self.assertTrue(did_succeed) @@ -201,6 +251,7 @@ def test_store_bytes_in_text_mode(self): @unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select") def test_store_unicode(self): + """Test write_file stores unicode string when opening in text mode""" did_succeed = fileio.write_file(DUMMY_UNICODE, self.tmp_path, self.logger, mode='w') self.assertTrue(did_succeed) @@ -212,6 +263,7 @@ def test_store_unicode(self): @unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select") def test_store_unicode_in_binary_mode(self): + """Test write_file handles unicode strings when opening in binary mode""" did_succeed = fileio.write_file(DUMMY_UNICODE, self.tmp_path, self.logger, mode='wb') self.assertTrue(did_succeed) @@ -222,5 +274,936 @@ def test_store_unicode_in_binary_mode(self): self.assertEqual(content[:], DUMMY_UNICODE) +class MigSharedFileio__write_file_lines(MigTestCase): + """Test the write_file_lines function from mig.shared.fileio module""" + + def setUp(self): + """Initialize test environment for write_file_lines tests""" + super(MigSharedFileio__write_file_lines, self).setUp() + self.tmp_path = temppath(DUMMY_FILE_WRITEFILELINES, self) + # Output dir is created by default here + cleanpath(os.path.dirname(DUMMY_FILE_WRITEFILELINES), self) + + def test_write_lines(self): + """Test write_file_lines writes lines to a file""" + test_lines = ["line1\n", "line2\n", "line3"] + result = fileio.write_file_lines( + test_lines, self.tmp_path, self.logger) + self.assertTrue(result) + + # Verify with read_file_lines + lines = fileio.read_file_lines(self.tmp_path, self.logger) + self.assertEqual(lines, test_lines) + + def test_invalid_data(self): + """Test write_file_lines raises TypeError for non-list input""" + self.logger.forgive_errors() + with self.assertRaises(TypeError): + fileio.write_file_lines(4242, self.tmp_path, self.logger) + + def test_creates_directory(self): + """Test write_file_lines creates parent directory when needed""" + test_lines = ["test line"] + result = fileio.write_file_lines( + test_lines, self.tmp_path, self.logger) + self.assertTrue(result) + + path_kind = self.assertPathExists('fileio/write_file_lines') + self.assertEqual(path_kind, "file") + + def test_return_false_on_invalid_dir(self): + """Test write_file_lines returns False when path is directory""" + self.logger.forgive_errors() + os.makedirs(self.tmp_path) + result = fileio.write_file_lines(["dummy"], self.tmp_path, self.logger) + self.assertFalse(result) + + def test_return_false_on_missing_dir(self): + """Test write_file_lines fails when parent directory missing""" + self.logger.forgive_errors() + result = fileio.write_file_lines(["dummy"], self.tmp_path, self.logger, + make_parent=False) + self.assertFalse(result) + + +class MigSharedFileio__read_file(MigTestCase): + """Test the read_file function from mig.shared.fileio module""" + + def setUp(self): + """Initialize test environment for read_file tests""" + super(MigSharedFileio__read_file, self).setUp() + self.tmp_path = temppath(DUMMY_FILE_READFILE, self) + # We generally need output dir to exist here + os.makedirs(os.path.dirname(self.tmp_path), exist_ok=True) + cleanpath(os.path.dirname(self.tmp_path), self) + + def test_reads_bytes(self): + """Test read_file returns byte content with binary mode""" + with open(self.tmp_path, 'wb') as fh: + fh.write(DUMMY_BYTES) + content = fileio.read_file(self.tmp_path, self.logger, mode='rb') + self.assertEqual(content, DUMMY_BYTES) + + def test_reads_text(self): + """Test read_file returns text with text mode""" + with open(self.tmp_path, 'w') as fh: + fh.write(DUMMY_UNICODE) + content = fileio.read_file(self.tmp_path, self.logger, mode='r') + self.assertEqual(content, DUMMY_UNICODE) + + def test_allows_missing_file(self): + """Test read_file returns None with allow_missing=True""" + content = fileio.read_file( + 'missing.txt', self.logger, allow_missing=True) + self.assertIsNone(content) + + def test_reports_missing_file(self): + """Test read_file returns None with allow_missing=False""" + self.logger.forgive_errors() + content = fileio.read_file( + 'missing.txt', self.logger, allow_missing=False) + self.assertIsNone(content) + + def test_handles_directory_path(self): + """Test read_file returns None when path is directory""" + self.logger.forgive_errors() + os.makedirs(self.tmp_path) + content = fileio.read_file(self.tmp_path, self.logger) + self.assertIsNone(content) + + +class MigSharedFileio__read_file_lines(MigTestCase): + """Test the read_file_lines function from mig.shared.fileio module""" + + def setUp(self): + """Initialize test environment for read_file_lines tests""" + super(MigSharedFileio__read_file_lines, self).setUp() + self.tmp_path = temppath(DUMMY_FILE_READFILELINES, self) + # We generally need output dir to exist here + os.makedirs(os.path.dirname(self.tmp_path), exist_ok=True) + cleanpath(os.path.dirname(self.tmp_path), self) + + def test_returns_empty_list_for_empty_file(self): + """Test read_file_lines returns empty list for empty file""" + open(self.tmp_path, 'w').close() + lines = fileio.read_file_lines(self.tmp_path, self.logger) + self.assertEqual(lines, []) + + def test_reads_lines_from_file(self): + """Test read_file_lines returns lines from text file""" + with open(self.tmp_path, 'w') as fh: + fh.write("line1\nline2\nline3") + lines = fileio.read_file_lines(self.tmp_path, self.logger) + self.assertEqual(lines, ["line1\n", "line2\n", "line3"]) + + def test_none_for_missing_file(self): + self.logger.forgive_errors() + lines = fileio.read_file_lines('missing.txt', self.logger) + self.assertIsNone(lines) + + +class MigSharedFileio__get_file_size(MigTestCase): + """Test the get_file_size function from mig.shared.fileio module""" + + def setUp(self): + """Initialize test environment for get_file_size tests""" + super(MigSharedFileio__get_file_size, self).setUp() + self.tmp_path = temppath(DUMMY_FILE_GETFILESIZE, self) + # We generally need output dir to exist here + os.makedirs(os.path.dirname(self.tmp_path), exist_ok=True) + cleanpath(os.path.dirname(self.tmp_path), self) + + def test_returns_file_size(self): + """Test get_file_size returns correct file size""" + with open(self.tmp_path, 'wb') as fh: + fh.write(DUMMY_BYTES) + size = fileio.get_file_size(self.tmp_path, self.logger) + self.assertEqual(size, DUMMY_BYTES_LENGTH) + + def test_handles_missing_file(self): + """Test get_file_size returns -1 for missing file""" + self.logger.forgive_errors() + size = fileio.get_file_size('missing.txt', self.logger) + # TODO: fix called function to return on exception and enable next line + # self.assertEqual(size, -1) + self.assertIsNone(size) + + def test_handles_directory(self): + """Test get_file_size returns -1 when path is directory""" + self.logger.forgive_errors() + os.makedirs(self.tmp_path) + size = fileio.get_file_size(self.tmp_path, self.logger) + self.assertEqual(size, DUMMY_DIRECTORY_SIZE) + + +class MigSharedFileio__delete_file(MigTestCase): + """Test the delete_file function from mig.shared.fileio module""" + + def setUp(self): + """Initialize test environment for delete_file tests""" + super(MigSharedFileio__delete_file, self).setUp() + self.tmp_path = temppath(DUMMY_FILE_DELETEFILE, self) + # We generally need output dir to exist here + os.makedirs(os.path.dirname(self.tmp_path), exist_ok=True) + cleanpath(os.path.dirname(DUMMY_FILE_DELETEFILE), self) + + def test_deletes_existing_file(self): + """Test delete_file removes existing file""" + open(self.tmp_path, 'w').close() + result = fileio.delete_file(self.tmp_path, self.logger) + self.assertTrue(result) + self.assertFalse(os.path.exists(self.tmp_path)) + + def test_handles_missing_file_with_allow_missing(self): + """Test delete_file succeeds with allow_missing=True""" + result = fileio.delete_file( + 'missing.txt', self.logger, allow_missing=True) + self.assertTrue(result) + + def test_false_for_missing_file_without_allow_missing(self): + """Test delete_file returns False with allow_missing=False""" + self.logger.forgive_errors() + result = fileio.delete_file('missing.txt', + self.logger, + allow_missing=False) + self.assertFalse(result) + + +class MigSharedFileio__read_head_lines(MigTestCase): + """Test the read_head_lines function from mig.shared.fileio module""" + + def setUp(self): + """Initialize test environment for read_head_lines tests""" + super(MigSharedFileio__read_head_lines, self).setUp() + self.tmp_path = temppath(DUMMY_FILE_READHEADLINES, self) + # We generally need output dir to exist here + os.makedirs(os.path.dirname(self.tmp_path), exist_ok=True) + cleanpath(os.path.dirname(self.tmp_path), self) + + def test_reads_requested_lines(self): + """Test read_head_lines returns requested number of lines""" + with open(self.tmp_path, 'w') as fh: + fh.write("line1\nline2\nline3\nline4") + lines = fileio.read_head_lines(self.tmp_path, 2, self.logger) + self.assertEqual(lines, ["line1\n", "line2\n"]) + + def test_returns_all_lines_when_requested_more(self): + """Test read_head_lines returns all lines when file has fewer""" + with open(self.tmp_path, 'w') as fh: + fh.write("line1\nline2") + lines = fileio.read_head_lines(self.tmp_path, 5, self.logger) + self.assertEqual(lines, ["line1\n", "line2"]) + + def test_returns_empty_list_for_empty_file(self): + """Test read_head_lines returns empty for empty file""" + open(self.tmp_path, 'w').close() + lines = fileio.read_head_lines(self.tmp_path, 3, self.logger) + self.assertEqual(lines, []) + + def test_empty_for_missing_file(self): + """Test read_head_lines returns [] for missing file""" + self.logger.forgive_errors() + lines = fileio.read_head_lines('missing.txt', 3, self.logger) + self.assertEqual(lines, []) + + +class MigSharedFileio__read_tail_lines(MigTestCase): + """Test the read_tail_lines function from mig.shared.fileio module""" + + def setUp(self): + """Initialize test environment for read_tail_lines tests""" + super(MigSharedFileio__read_tail_lines, self).setUp() + self.tmp_path = temppath(DUMMY_FILE_READTAILLINES, self) + # We generally need output dir to exist here + os.makedirs(os.path.dirname(self.tmp_path), exist_ok=True) + cleanpath(os.path.dirname(self.tmp_path), self) + + def test_reads_requested_lines(self): + """Test read_tail_lines returns requested number of lines""" + with open(self.tmp_path, 'w') as fh: + fh.write("line1\nline2\nline3\nline4") + lines = fileio.read_tail_lines(self.tmp_path, 2, self.logger) + self.assertEqual(lines, ["line3\n", "line4"]) + + def test_returns_all_lines_when_requested_more(self): + """Test read_tail_lines returns all lines when file has fewer""" + with open(self.tmp_path, 'w') as fh: + fh.write("line1\nline2") + lines = fileio.read_tail_lines(self.tmp_path, 5, self.logger) + self.assertEqual(lines, ["line1\n", "line2"]) + + def test_returns_empty_list_for_empty_file(self): + """Test read_tail_lines returns empty for empty file""" + open(self.tmp_path, 'w').close() + lines = fileio.read_tail_lines(self.tmp_path, 3, self.logger) + self.assertEqual(lines, []) + + def test_empty_for_missing_file(self): + """Test read_tail_lines returns [] for missing file""" + self.logger.forgive_errors() + lines = fileio.read_tail_lines('missing.txt', 3, self.logger) + self.assertEqual(lines, []) + + +class MigSharedFileio__make_symlink(MigTestCase): + """Test the make_symlink function from mig.shared.fileio module""" + + def setUp(self): + """Initialize test environment for make_symlink tests""" + super(MigSharedFileio__make_symlink, self).setUp() + self.tmp_link = temppath(DUMMY_FILE_MAKESYMLINKSRC, self) + self.tmp_target = temppath(DUMMY_FILE_MAKESYMLINKDST, self) + # We generally need output dir to exist here + os.makedirs(os.path.dirname(self.tmp_link), exist_ok=True) + os.makedirs(os.path.dirname(self.tmp_target), exist_ok=True) + cleanpath(os.path.dirname(self.tmp_link), self) + cleanpath(os.path.dirname(self.tmp_target), self) + with open(self.tmp_target, 'w') as fh: + fh.write("test") + + def test_creates_symlink(self): + """Test make_symlink creates working symlink""" + result = fileio.make_symlink( + self.tmp_target, self.tmp_link, self.logger) + self.assertTrue(result) + self.assertTrue(os.path.islink(self.tmp_link)) + self.assertEqual(os.readlink(self.tmp_link), self.tmp_target) + + def test_force_overwrites_existing_link(self): + """Test make_symlink force replaces existing link""" + os.symlink('/dummy', self.tmp_link) + result = fileio.make_symlink(self.tmp_target, self.tmp_link, self.logger, + force=True) + self.assertTrue(result) + self.assertEqual(os.readlink(self.tmp_link), self.tmp_target) + + def test_fails_on_existing_link_without_force(self): + """Test make_symlink fails on existing link without force""" + self.logger.forgive_errors() + os.symlink('/dummy', self.tmp_link) + result = fileio.make_symlink(self.tmp_target, self.tmp_link, self.logger, + force=False) + self.assertFalse(result) + + def test_handles_nonexistent_target(self): + """Test make_symlink still creates broken symlink""" + self.logger.forgive_errors() + broken_target = self.tmp_target + '-nonexistent' + result = fileio.make_symlink(broken_target, self.tmp_link, self.logger) + self.assertTrue(result) + self.assertTrue(os.path.islink(self.tmp_link)) + self.assertEqual(os.readlink(self.tmp_link), broken_target) + + +class MigSharedFileio__delete_symlink(MigTestCase): + """Test the delete_symlink function from mig.shared.fileio module""" + + def setUp(self): + """Initialize test environment for delete_symlink tests""" + super(MigSharedFileio__delete_symlink, self).setUp() + self.tmp_link = temppath(DUMMY_FILE_DELETESYMLINKSRC, self) + self.tmp_target = temppath(DUMMY_FILE_DELETESYMLINKDST, self) + # We generally need output dir to exist here + os.makedirs(os.path.dirname(self.tmp_link), exist_ok=True) + os.makedirs(os.path.dirname(self.tmp_target), exist_ok=True) + cleanpath(os.path.dirname(self.tmp_link), self) + cleanpath(os.path.dirname(self.tmp_target), self) + with open(self.tmp_target, 'w') as fh: + fh.write("test") + + def create_symlink(self, target=None, link=None): + """Helper to create valid symlink before deletion""" + if target is None: + target = self.tmp_target + if link is None: + link = self.tmp_link + os.symlink(target, link) + + def test_deletes_existing_symlink(self): + """Test delete_symlink removes existing symlink""" + self.create_symlink() + result = fileio.delete_symlink(self.tmp_link, self.logger) + self.assertTrue(result) + self.assertFalse(os.path.exists(self.tmp_link)) + + def test_handles_missing_file_with_allow_missing(self): + """Test delete_symlink succeeds with allow_missing=True""" + # First make sure file doesn't exist + if os.path.exists(self.tmp_link): + os.remove(self.tmp_link) + result = fileio.delete_symlink(self.tmp_link, self.logger, + allow_missing=True) + self.assertTrue(result) + + def test_handles_missing_symlink_without_allow_missing(self): + """Test delete_symlink fails with allow_missing=False""" + self.logger.forgive_errors() + result = fileio.delete_symlink('missing_symlink', self.logger, + allow_missing=False) + self.assertFalse(result) + + @unittest.skip("TODO: implement check in tested function and enable again") + def test_rejects_regular_file(self): + """Test delete_symlink returns False when path is a regular file""" + self.logger.forgive_errors() + with open(self.tmp_link, 'w') as fh: + fh.write("dummy") + + result = fileio.delete_symlink(self.tmp_link, self.logger) + self.assertFalse(result) + + def test_deletes_broken_symlink(self): + """Test delete_symlink removes broken symlink""" + # Create broken symlink + broken_target = self.tmp_target + '-nonexistent' + self.create_symlink(broken_target) + self.assertTrue(os.path.islink(self.tmp_link)) + # Now delete it + result = fileio.delete_symlink(self.tmp_link, self.logger) + self.assertTrue(result) + + +class MigSharedFileio__touch(MigTestCase): + """Test the touch function from mig.shared.fileio module""" + + def _provide_configuration(self): + """Set up isolated test configuration and logger for the tests""" + return 'testconfig' + + def setUp(self): + """Initialize test environment for touch tests""" + super(MigSharedFileio__touch, self).setUp() + self.tmp_path = temppath('fileio/touch', self) + # We generally need output dir to exist here + os.makedirs(os.path.dirname(self.tmp_path), exist_ok=True) + cleanpath(os.path.dirname(self.tmp_path), self) + + def test_creates_new_file(self): + """Test touch creates new file if missing""" + self.assertFalse(os.path.exists(self.tmp_path)) + result = fileio.touch(self.tmp_path, self.configuration) + self.assertTrue(result) + self.assertTrue(os.path.exists(self.tmp_path)) + self.assertTrue(os.path.isfile(self.tmp_path)) + + @unittest.skip("TODO: fix invalid open 'r+w' in tested function and enable again") + def test_updates_timestamp_on_existing_file(self): + """Test touch updates timestamp on existing file""" + # Create initial file + with open(self.tmp_path, 'w') as fh: + fh.write("test") + orig_mtime = os.path.getmtime(self.tmp_path) + time.sleep(0.1) + result = fileio.touch(self.tmp_path, self.configuration) + self.assertTrue(result) + new_mtime = os.path.getmtime(self.tmp_path) + self.assertNotEqual(orig_mtime, new_mtime) + + @unittest.skip("TODO: fix handling of directory in tested function and enable again") + def test_succeeds_on_directory(self): + """Test touch succeeds for existing directory and updates timestamp""" + os.makedirs(self.tmp_path) + orig_mtime = os.path.getmtime(self.tmp_path) + time.sleep(0.1) + result = fileio.touch(self.tmp_path, self.configuration) + self.assertTrue(result) + self.assertTrue(os.path.isdir(self.tmp_path)) + new_mtime = os.path.getmtime(self.tmp_path) + self.assertNotEqual(orig_mtime, new_mtime) + + def test_fails_on_missing_parent(self): + """Test touch fails when parent directory doesn't exist""" + self.logger.forgive_errors() + nested_path = os.path.join(self.tmp_path, 'missing', 'file.txt') + result = fileio.touch(nested_path, self.configuration) + self.assertFalse(result) + self.assertFalse(os.path.exists(nested_path)) + + +class MigSharedFileio__remove_dir(MigTestCase): + """Test the remove_dir function from mig.shared.fileio module""" + + def _provide_configuration(self): + """Set up isolated test configuration and logger for the tests""" + return 'testconfig' + + def setUp(self): + """Initialize test environment for remove_dir tests""" + super(MigSharedFileio__remove_dir, self).setUp() + self.tmp_path = temppath('fileio/remove_dir', self) + # We generally need output dir to exist here + os.makedirs(os.path.dirname(self.tmp_path), exist_ok=True) + cleanpath(self.tmp_path, self) + os.makedirs(self.tmp_path) + + def test_removes_empty_directory(self): + """Test remove_dir removes empty directory""" + self.assertTrue(os.path.exists(self.tmp_path)) + result = fileio.remove_dir(self.tmp_path, self.configuration) + self.assertTrue(result) + self.assertFalse(os.path.exists(self.tmp_path)) + + def test_fails_on_nonempty_directory(self): + """Test remove_dir returns False for non-empty directory""" + self.logger.forgive_errors() + # Add a file to the directory + with open(os.path.join(self.tmp_path, 'test.txt'), 'w') as fh: + fh.write("test") + result = fileio.remove_dir(self.tmp_path, self.configuration) + self.assertFalse(result) + self.assertTrue(os.path.exists(self.tmp_path)) + + def test_fails_on_file(self): + """Test remove_dir returns False for file""" + self.logger.forgive_errors() + # Add a file to the directory + file_path = os.path.join(self.tmp_path, 'test.txt') + with open(file_path, 'w') as fh: + fh.write("test") + result = fileio.remove_dir(file_path, self.configuration) + self.assertFalse(result) + self.assertTrue(os.path.exists(file_path)) + + +class MigSharedFileio__remove_rec(MigTestCase): + """Test the remove_rec function from mig.shared.fileio module""" + + def _provide_configuration(self): + """Set up isolated test configuration and logger for the tests""" + return 'testconfig' + + def setUp(self): + """Initialize test environment for remove_rec tests""" + super(MigSharedFileio__remove_rec, self).setUp() + self.tmp_path = temppath('fileio/remove_rec', self) + cleanpath(self.tmp_path, self) + # Create a nested directory structure with files + # fileio/remove_rec/ + # ├── file1.txt + # └── subdir/ + # └── file2.txt + os.makedirs(os.path.join(self.tmp_path, 'subdir')) + with open(os.path.join(self.tmp_path, 'file1.txt'), 'w') as fh: + fh.write("dummy") + with open(os.path.join(self.tmp_path, 'subdir', 'file2.txt'), 'w') as fh: + fh.write("dummy2") + + def test_removes_directory_recursively(self): + """Test remove_rec removes directory and contents""" + self.assertTrue(os.path.exists(self.tmp_path)) + result = fileio.remove_rec(self.tmp_path, self.configuration) + self.assertTrue(result) + self.assertFalse(os.path.exists(self.tmp_path)) + + def test_rejects_regular_file(self): + """Test remove_rec returns False when path is a regular file""" + file_path = os.path.join(self.tmp_path, 'file1.txt') + result = fileio.remove_rec(file_path, self.configuration) + self.assertFalse(result) + self.assertTrue(os.path.exists(file_path)) + + +class MigSharedFileio__move_file(MigTestCase): + """Test the move_file function from mig.shared.fileio module""" + + def _provide_configuration(self): + """Set up isolated test configuration and logger for the tests""" + return 'testconfig' + + def setUp(self): + """Initialize test environment for move_file tests""" + super(MigSharedFileio__move_file, self).setUp() + self.tmp_src = temppath(DUMMY_FILE_MOVE_SRC, self) + os.makedirs(os.path.dirname(self.tmp_src), exist_ok=True) + self.tmp_dst = temppath(DUMMY_FILE_MOVE_DST, self) + os.makedirs(os.path.dirname(self.tmp_dst), exist_ok=True) + cleanpath(self.tmp_src, self) + cleanpath(self.tmp_dst, self) + with open(self.tmp_src, 'w') as fh: + fh.write("test") + + def test_moves_file(self): + """Test move_file successfully moves a file""" + success, msg = fileio.move_file(self.tmp_src, self.tmp_dst, + self.configuration) + self.assertTrue(success) + self.assertFalse(msg) + self.assertFalse(os.path.exists(self.tmp_src)) + self.assertTrue(os.path.exists(self.tmp_dst)) + + def test_overwrites_existing_destination(self): + """Test move_file overwrites existing destination file""" + # Create initial destination file + with open(self.tmp_dst, 'w') as fh: + fh.write("original") + success, msg = fileio.move_file(self.tmp_src, self.tmp_dst, + self.configuration) + self.assertTrue(success) + self.assertFalse(msg) + with open(self.tmp_dst, 'r') as fh: + content = fh.read() + self.assertEqual(content, "test") + + +class MigSharedFileio__move_rec(MigTestCase): + """Test the move_rec function from mig.shared.fileio module""" + + def _provide_configuration(self): + """Set up isolated test configuration and logger for the tests""" + return 'testconfig' + + def setUp(self): + """Initialize test environment for move_rec tests""" + super(MigSharedFileio__move_rec, self).setUp() + self.tmp_src = temppath(DUMMY_DIRECTORY_MOVE_SRC, self) + self.tmp_dst = temppath(DUMMY_DIRECTORY_MOVE_DST, self) + cleanpath(self.tmp_src, self) + cleanpath(self.tmp_dst, self) + # Create a nested directory structure with files + # fileio/move_dir_src/ + # ├── file1.txt + # └── subdir/ + # └── file2.txt + os.makedirs(os.path.join(self.tmp_src, 'subdir')) + with open(os.path.join(self.tmp_src, 'file1.txt'), 'w') as fh: + fh.write("dummy1") + with open(os.path.join(self.tmp_src, 'subdir', 'file2.txt'), 'w') as fh: + fh.write("dummy2") + + def test_moves_directory_recursively(self): + """Test move_rec moves directory and contents""" + result = fileio.move_rec(self.tmp_src, self.tmp_dst, + self.configuration) + self.assertTrue(result) + self.assertFalse(os.path.exists(self.tmp_src)) + self.assertTrue(os.path.exists(self.tmp_dst)) + # Verify structure + self.assertTrue(os.path.exists(os.path.join(self.tmp_dst, + 'file1.txt'))) + self.assertTrue(os.path.exists(os.path.join(self.tmp_dst, 'subdir', + 'file2.txt'))) + + def test_extends_existing_destination(self): + """Test move_rec extends existing destination directory""" + # Create initial destination with some content + os.makedirs(os.path.join(self.tmp_dst, 'prior')) + success, msg = fileio.move_rec(self.tmp_src, self.tmp_dst, + self.configuration) + self.assertTrue(success) + self.assertFalse(msg) + + # Verify structure with new src subdir and existing dir + new_sub = os.path.basename(DUMMY_DIRECTORY_MOVE_SRC) + self.assertTrue(os.path.exists(os.path.join(self.tmp_dst, new_sub, + 'file1.txt'))) + self.assertTrue(os.path.exists(os.path.join(self.tmp_dst, new_sub, + 'subdir', 'file2.txt'))) + self.assertTrue(os.path.exists(os.path.join(self.tmp_dst, 'prior'))) + + +class MigSharedFileio__copy_file(MigTestCase): + """Test the copy_file function from mig.shared.fileio module""" + + def _provide_configuration(self): + """Set up isolated test configuration and logger for the tests""" + return 'testconfig' + + def setUp(self): + """Initialize test environment for copy_file tests""" + super(MigSharedFileio__copy_file, self).setUp() + self.tmp_src = temppath(DUMMY_FILE_COPY_SRC, self) + self.tmp_dst = temppath(DUMMY_FILE_COPY_DST, self) + os.makedirs(os.path.dirname(self.tmp_src), exist_ok=True) + os.makedirs(os.path.dirname(self.tmp_dst), exist_ok=True) + cleanpath(self.tmp_src, self) + cleanpath(self.tmp_dst, self) + with open(self.tmp_src, 'w') as fh: + fh.write("test") + + def test_copies_file(self): + """Test copy_file successfully copies a file""" + result = fileio.copy_file( + self.tmp_src, self.tmp_dst, self.configuration) + self.assertTrue(result) + self.assertTrue(os.path.exists(self.tmp_src)) + self.assertTrue(os.path.exists(self.tmp_dst)) + + def test_overwrites_existing_destination(self): + """Test copy_file overwrites existing destination file""" + # Create initial destination file + with open(self.tmp_dst, 'w') as fh: + fh.write("original") + result = fileio.copy_file( + self.tmp_src, self.tmp_dst, self.configuration) + self.assertTrue(result) + with open(self.tmp_dst, 'r') as fh: + content = fh.read() + self.assertEqual(content, "test") + + +class MigSharedFileio__copy_rec(MigTestCase): + """Test the copy_rec function from mig.shared.fileio module""" + + def _provide_configuration(self): + """Set up isolated test configuration and logger for the tests""" + return 'testconfig' + + def setUp(self): + """Initialize test environment for copy_rec tests""" + super(MigSharedFileio__copy_rec, self).setUp() + self.tmp_src = temppath('fileio/copy_dir_src', self) + self.tmp_dst = temppath('fileio/copy_dir_dst', self) + cleanpath(self.tmp_src, self) + cleanpath(self.tmp_dst, self) + # Create a nested directory structure with files + os.makedirs(os.path.join(self.tmp_src, 'subdir')) + with open(os.path.join(self.tmp_src, 'file1.txt'), 'w') as fh: + fh.write("dummy1") + with open(os.path.join(self.tmp_src, 'subdir', 'file2.txt'), 'w') as fh: + fh.write("dummy2") + + def test_copies_directory_recursively(self): + """Test copy_rec copies directory and contents""" + result = fileio.copy_rec( + self.tmp_src, self.tmp_dst, self.configuration) + self.assertTrue(result) + self.assertTrue(os.path.exists(self.tmp_src)) + self.assertTrue(os.path.exists(self.tmp_dst)) + # Verify structure + self.assertTrue(os.path.exists(os.path.join( + self.tmp_dst, 'file1.txt'))) + self.assertTrue(os.path.exists(os.path.join( + self.tmp_dst, 'subdir', 'file2.txt'))) + + +class MigSharedFileio__check_empty_dir(MigTestCase): + """Test the check_empty_dir function from mig.shared.fileio module""" + + def setUp(self): + """Initialize test environment for check_empty_dir tests""" + super(MigSharedFileio__check_empty_dir, self).setUp() + self.empty_path = temppath(DUMMY_DIRECTORY_EMPTY, self) + self.nonempty_path = temppath(DUMMY_DIRECTORY_NESTED, self) + cleanpath(DUMMY_DIRECTORY_EMPTY, self) + cleanpath(DUMMY_DIRECTORY_NESTED, self) + os.makedirs(self.empty_path) + # Create non-empty directory structure + os.makedirs(self.nonempty_path) + with open(os.path.join(self.nonempty_path, 'test.txt'), 'w') as fh: + fh.write("dummy") + + def test_returns_true_for_empty(self): + """Test check_empty_dir returns True for empty directory""" + self.assertTrue(fileio.check_empty_dir(self.empty_path)) + + def test_returns_false_for_nonempty(self): + """Test check_empty_dir returns False for non-empty directory""" + self.assertFalse(fileio.check_empty_dir(self.nonempty_path)) + + def test_returns_false_for_file(self): + """Test check_empty_dir returns False for file path""" + file_path = os.path.join(self.nonempty_path, 'test.txt') + result = fileio.check_empty_dir(file_path) + self.assertFalse(result) + + +class MigSharedFileio__makedirs_rec(MigTestCase): + """Test the makedirs_rec function from mig.shared.fileio module""" + + def _provide_configuration(self): + """Set up isolated test configuration and logger for the tests""" + return 'testconfig' + + def setUp(self): + """Initialize test environment for makedirs_rec tests""" + super(MigSharedFileio__makedirs_rec, self).setUp() + self.tmp_path = temppath('fileio/makedirs_rec', self) + # We generally need output dir to exist here + os.makedirs(os.path.dirname(self.tmp_path), exist_ok=True) + cleanpath(self.tmp_path, self) + + def test_creates_directory_path(self): + """Test makedirs_rec creates nested directories""" + nested_path = os.path.join(self.tmp_path, 'a', 'b', 'c') + result = fileio.makedirs_rec(nested_path, self.configuration) + self.assertTrue(result) + self.assertTrue(os.path.exists(nested_path)) + + def test_returns_true_for_existing_directory(self): + """Test makedirs_rec returns True for existing path""" + os.makedirs(self.tmp_path) + result = fileio.makedirs_rec(self.tmp_path, self.configuration) + self.assertTrue(result) + + def test_fails_for_file_path(self): + """Test makedirs_rec returns False if path is file""" + self.logger.forgive_errors() + # Create a file at the path + os.makedirs(self.tmp_path) + file_path = os.path.join(self.tmp_path, 'file.txt') + with open(file_path, 'w') as fh: + fh.write("dummy") + result = fileio.makedirs_rec(file_path, self.configuration) + self.assertFalse(result) + + +class MigSharedFileio__check_access(MigTestCase): + """Test the various check access functions from mig.shared.fileio module""" + + def _provide_configuration(self): + """Set up isolated test configuration and logger for the tests""" + return 'testconfig' + + def setUp(self): + """Initialize test environment for access check tests""" + super(MigSharedFileio__check_access, self).setUp() + self.tmp_dir = temppath('fileio/check_access', self) + os.makedirs(self.tmp_dir) + self.writeonly_file = os.path.join(self.tmp_dir, 'writeonly.txt') + self.readonly_file = os.path.join(self.tmp_dir, 'readonly.txt') + self.readwrite_file = os.path.join(self.tmp_dir, 'readwrite.txt') + + # Create test files with different permissions + with open(self.writeonly_file, 'w') as fh: + fh.write("writeonly") + with open(self.readonly_file, 'w') as fh: + fh.write("readonly") + with open(self.readwrite_file, 'w') as fh: + fh.write("read-write") + + # Set permissions + os.chmod(self.writeonly_file, 0o200) + os.chmod(self.readonly_file, 0o400) + os.chmod(self.readwrite_file, 0o600) + + def test_check_read_access_file(self): + """Test check_read_access with readable file""" + self.assertTrue(fileio.check_read_access(self.readwrite_file)) + self.assertTrue(fileio.check_read_access(self.readonly_file)) + self.assertTrue(fileio.check_read_access(self.tmp_dir, + parent_dir=True)) + # Super-user has access to read and write all files! + if os.getuid() == 0: + self.assertTrue(fileio.check_read_access(self.writeonly_file)) + else: + self.assertFalse(fileio.check_read_access(self.writeonly_file)) + self.assertFalse(fileio.check_read_access('/invalid/path')) + + def test_check_write_access_file(self): + """Test check_write_access with writable file""" + self.assertTrue(fileio.check_write_access(self.writeonly_file)) + self.assertTrue(fileio.check_write_access(self.readwrite_file)) + # Super-user has access to read and write all files! + if os.getuid() == 0: + self.assertTrue(fileio.check_write_access(self.readonly_file)) + else: + self.assertFalse(fileio.check_write_access(self.readonly_file)) + self.assertFalse(fileio.check_write_access('/invalid/path')) + + def test_check_read_access_with_parent(self): + """Test check_read_access with parent_dir True""" + sub_file = os.path.join(self.tmp_dir, 'file.txt') + result = fileio.check_read_access(sub_file, parent_dir=True) + self.assertTrue(result) + + def test_check_write_access_with_parent(self): + """Test check_write_access with parent_dir True""" + sub_file = os.path.join(self.tmp_dir, 'file.txt') + result = fileio.check_write_access(sub_file, parent_dir=True) + self.assertTrue(result) + + def test_check_readable(self): + """Test check_readable wrapper function""" + self.assertTrue(fileio.check_readable(self.configuration, + self.readwrite_file)) + self.assertTrue(fileio.check_readable(self.configuration, + self.readonly_file)) + # Super-user has access to read and write all files! + if os.getuid() == 0: + self.assertTrue(fileio.check_readable(self.configuration, + self.writeonly_file)) + else: + self.assertFalse(fileio.check_readable(self.configuration, + self.writeonly_file)) + self.assertFalse(fileio.check_readable(self.configuration, + '/invalid/path')) + + def test_check_writable(self): + """Test check_writable wrapper function""" + self.assertTrue(fileio.check_writable(self.configuration, + self.readwrite_file)) + self.assertTrue(fileio.check_writable(self.configuration, + self.writeonly_file)) + # Super-user has access to read and write all files! + if os.getuid() == 0: + self.assertTrue(fileio.check_writable(self.configuration, + self.readonly_file)) + else: + self.assertFalse(fileio.check_writable(self.configuration, + self.readonly_file)) + self.assertFalse(fileio.check_writable(self.configuration, + "/no/such/file")) + + def test_check_readonly(self): + """Test check_readonly wrapper function""" + # Super-user has access to read and write all files! + if os.getuid() == 0: + # Test with read-only file path + self.assertFalse(fileio.check_readonly(self.configuration, + self.readonly_file)) + + # Test with writable file + self.assertFalse(fileio.check_readonly(self.configuration, + self.writeonly_file)) + self.assertFalse(fileio.check_readonly(self.configuration, + self.readwrite_file)) + else: + # Test with read-only file path + self.assertTrue(fileio.check_readonly(self.configuration, + self.readonly_file)) + + # Test with writable file + self.assertFalse(fileio.check_readonly(self.configuration, + self.writeonly_file)) + self.assertFalse(fileio.check_readonly(self.configuration, + self.readwrite_file)) + + def test_check_readwritable(self): + """Test check_readwritable wrapper function""" + self.assertTrue(fileio.check_readwritable(self.configuration, + self.readwrite_file)) + # Super-user has access to read and write all files! + if os.getuid() == 0: + self.assertTrue(fileio.check_readwritable(self.configuration, + self.readonly_file)) + self.assertTrue(fileio.check_readwritable(self.configuration, + self.writeonly_file)) + else: + self.assertFalse(fileio.check_readwritable(self.configuration, + self.readonly_file)) + self.assertFalse(fileio.check_readwritable(self.configuration, + self.writeonly_file)) + + self.assertFalse(fileio.check_readwritable(self.configuration, + "/invalid/file")) + + def test_special_cases(self): + """Test various special cases for access checks""" + # Check directory paths + self.assertTrue(fileio.check_read_access(self.tmp_dir)) + self.assertTrue(fileio.check_write_access(self.tmp_dir)) + + # Check non-existent paths + missing_path = os.path.join(self.tmp_dir, 'missing.txt') + self.assertFalse(fileio.check_read_access(missing_path)) + self.assertFalse(fileio.check_write_access(missing_path)) + + # Check with custom follow_symlink=False + self.assertTrue(fileio.check_read_access(self.readwrite_file, + follow_symlink=False)) + self.assertTrue(fileio.check_read_access(self.tmp_dir, True, + follow_symlink=False)) + + if __name__ == '__main__': testmain()