Skip to content

Commit cdf721f

Browse files
authored
Avoid zombie pipe channels when receiving writeEOF (#3408)
Motivation In rare cases where the inbound side of a pipe channel is already closed (either because it was never open or because it was closed during use), we can get into trouble if the reader drops the read side of our write pipe. In that context, the Linux kernel will deliver an EPOLLHUP, but while we'll close the FD we'll let the channel hang out as a zombie. This is, obviously, suboptimal. Modifications In writeEOF, we check whether the read side is already closed. If it is, we trigger a close internally to shut the channel down. Added a test for this as needed, and modified an existing test that would now trip over this behaviour. Also added a new test for the reverse case to ensure we don't end up with zombies there (we don't). Results No zombie channels.
1 parent 4e8f4b1 commit cdf721f

File tree

2 files changed

+63
-4
lines changed

2 files changed

+63
-4
lines changed

Sources/NIOPosix/PipeChannel.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,17 @@ final class PipeChannel: BaseStreamSocketChannel<PipePair>, @unchecked Sendable
121121
}
122122
try! self.selectableEventLoop.deregister(channel: self, mode: .output)
123123
try! outputSPH.close()
124+
125+
// Only close the entire channel if the input is already closed.
126+
// If input is still open, we can continue half-duplex operation.
127+
var inputIsClosed = true
128+
if let inputSPH = self.pipePair.input {
129+
inputIsClosed = !inputSPH.isOpen
130+
}
131+
if inputIsClosed {
132+
let error = IOError(errnoCode: EPIPE, reason: "Broken pipe")
133+
self.close0(error: error, mode: .all, promise: nil)
134+
}
124135
}
125136

126137
override func shutdownSocket(mode: CloseMode) throws {

Tests/NIOPosixTests/PipeChannelTest.swift

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,17 @@ final class PipeChannelTest: XCTestCase {
9999

100100
func testWriteErrorsCloseChannel() {
101101
XCTAssertNoThrow(try self.channel.setOption(.allowRemoteHalfClosure, value: true).wait())
102-
self.fromChannel.closeFile()
103-
var buffer = self.channel.allocator.buffer(capacity: 1)
104-
buffer.writeString("X")
105-
XCTAssertThrowsError(try self.channel.writeAndFlush(buffer).wait()) { error in
102+
103+
// We need to wedge the EL open here to make sure that the close of `fromChannel` does
104+
// not potentially cause us to handle writeEOF before we attempt to make the write. We
105+
// want the _write_ to discover the issue first, not writeEOF.
106+
let writeFuture = self.channel.eventLoop.flatSubmit { [fromChannel, channel] in
107+
fromChannel!.closeFile()
108+
var buffer = channel!.allocator.buffer(capacity: 1)
109+
buffer.writeString("X")
110+
return channel!.writeAndFlush(buffer)
111+
}
112+
XCTAssertThrowsError(try writeFuture.wait()) { error in
106113
if let error = error as? IOError {
107114
XCTAssert([EPIPE, EBADF].contains(error.errnoCode), "unexpected errno: \(error)")
108115
} else {
@@ -199,6 +206,47 @@ final class PipeChannelTest: XCTestCase {
199206
)
200207
XCTAssertEqual(UInt8(ascii: "X"), spaceForX)
201208
}
209+
210+
func testWriteEndGoingAway() throws {
211+
try withPipe { readHandle, writeHandle in
212+
let chan = try NIOPipeBootstrap(group: self.group)
213+
.takingOwnershipOfDescriptor(output: try writeHandle.takeDescriptorOwnership())
214+
.wait()
215+
216+
let writeResult = chan.writeAndFlush(ByteBuffer(repeating: 0x41, count: 32 * 1024 * 1024))
217+
try chan.eventLoop.submit {}.wait() // wait for write to be enqueued
218+
219+
try readHandle.close()
220+
221+
XCTAssertThrowsError(try writeResult.wait()) { error in
222+
XCTAssertTrue(error is IOError, "Expected IOError but got \(type(of: error))")
223+
if let ioError = error as? IOError {
224+
XCTAssertEqual(ioError.errnoCode, EPIPE, "Expected EPIPE but got \(ioError.errnoCode)")
225+
}
226+
}
227+
228+
// Channel should be closed after the write error
229+
XCTAssertNoThrow(try chan.closeFuture.wait())
230+
231+
return []
232+
}
233+
}
234+
235+
func testReadEndGoingAway() throws {
236+
try withPipe { readHandle, writeHandle in
237+
let chan = try NIOPipeBootstrap(group: self.group)
238+
.channelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
239+
.takingOwnershipOfDescriptor(input: try readHandle.takeDescriptorOwnership())
240+
.wait()
241+
242+
try writeHandle.close()
243+
244+
// Channel should be closed because the read side is dead.
245+
XCTAssertNoThrow(try chan.closeFuture.wait())
246+
247+
return []
248+
}
249+
}
202250
}
203251

204252
extension FileHandle {

0 commit comments

Comments
 (0)