Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions Lib/importlib/_bootstrap_external.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,8 @@ def _write_atomic(path, data, mode=0o666):
try:
# We first write data to a temporary file, and then use os.replace() to
# perform an atomic rename.
with _io.FileIO(fd, 'wb') as file:
bytes_written = file.write(data)
if bytes_written != len(data):
# Raise an OSError so the 'except' below cleans up the partially
# written file.
raise OSError("os.write() didn't write the full pyc file")
with _io.open(fd, 'wb') as file:
file.write(data)
_os.replace(path_tmp, path)
except OSError:
try:
Expand Down
71 changes: 55 additions & 16 deletions Lib/test/test_importlib/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,31 +788,70 @@ def test_complete_multi_phase_init_module(self):
self.run_with_own_gil(script)


class MiscTests(unittest.TestCase):
def test_atomic_write_should_notice_incomplete_writes(self):
class PatchAtomicWrites():
def __init__(self, truncate_at_length=100, never_complete=False):
self.truncate_at_length = truncate_at_length
self.never_complete = never_complete
self.seen_write = False
self._children = []

def __enter__(self):
import _pyio

oldwrite = os.write
seen_write = False

truncate_at_length = 100

# Emulate an os.write that only writes partial data.
def write(fd, data):
nonlocal seen_write
seen_write = True
return oldwrite(fd, data[:truncate_at_length])
if self.seen_write and self.never_complete:
return None
self.seen_write = True
return oldwrite(fd, data[:self.truncate_at_length])

# Need to patch _io to be _pyio, so that io.FileIO is affected by the
# os.write patch.
with (support.swap_attr(_bootstrap_external, '_io', _pyio),
support.swap_attr(os, 'write', write)):
with self.assertRaises(OSError):
# Make sure we write something longer than the point where we
# truncate.
content = b'x' * (truncate_at_length * 2)
_bootstrap_external._write_atomic(os_helper.TESTFN, content)
assert seen_write
self.children = [
support.swap_attr(_bootstrap_external, '_io', _pyio),
support.swap_attr(os, 'write', write)
]
for child in self.children:
child.__enter__()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
for child in self.children:
child.__exit__(exc_type, exc_val, exc_tb)


class MiscTests(unittest.TestCase):

def test_atomic_write_retries_incomplete_writes(self):
truncate_at_length = 100
length = truncate_at_length * 2

with PatchAtomicWrites(truncate_at_length=truncate_at_length) as cm:
# Make sure we write something longer than the point where we
# truncate.
content = b'x' * length
_bootstrap_external._write_atomic(os_helper.TESTFN, content)
assert cm.seen_write

assert os.stat(support.os_helper.TESTFN).st_size == length
os.unlink(support.os_helper.TESTFN)

def test_atomic_write_errors_if_unable_to_complete(self):
truncate_at_length = 100

with (
PatchAtomicWrites(
truncate_at_length=truncate_at_length, never_complete=True,
) as cm,
self.assertRaises(OSError)
):
# Make sure we write something longer than the point where we
# truncate.
content = b'x' * (truncate_at_length * 2)
_bootstrap_external._write_atomic(os_helper.TESTFN, content)
assert cm.seen_write

with self.assertRaises(OSError):
os.stat(support.os_helper.TESTFN) # Check that the file did not get written.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use Python's regular file object to ensure that writes to ``.pyc`` files are retried, or an appropriate error is raised.
Loading