Skip to content

Commit 444b5be

Browse files
authored
Fix concurrency warnings in AsyncProcess (#9430)
Squashes the warnings in AsyncProcess by adjusting required callbacks to be `@Sendable`, marking lock protected state as `nonisolated(unsafe)` and using a `ThreadSafeBox` to capture the process result. Fixes up warnings in AsyncProcessTests as well.
1 parent 9653927 commit 444b5be

File tree

5 files changed

+58
-59
lines changed

5 files changed

+58
-59
lines changed

Sources/Basics/Concurrency/AsyncProcess.swift

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -259,29 +259,26 @@ package final class AsyncProcess {
259259
#endif
260260

261261
/// Typealias for stdout/stderr output closure.
262-
package typealias OutputClosure = ([UInt8]) -> Void
262+
package typealias OutputClosure = @Sendable ([UInt8]) -> Void
263263

264-
/// Typealias for logging handling closure
265-
package typealias LoggingHandler = (String) -> Void
264+
/// Typealias for logging handling closure.
265+
package typealias LoggingHandler = @Sendable (String) -> Void
266266

267-
private static var _loggingHandler: LoggingHandler?
268-
private static let loggingHandlerLock = NSLock()
267+
/// Global logging handler storage.
268+
private static let _loggingHandler = ThreadSafeBox<LoggingHandler?>()
269269

270270
/// Global logging handler. Use with care! preferably use instance level instead of setting one globally.
271-
@available(
272-
*,
273-
deprecated,
274-
message: "use instance level `loggingHandler` passed via `init` instead of setting one globally."
275-
)
276271
package static var loggingHandler: LoggingHandler? {
277272
get {
278-
Self.loggingHandlerLock.withLock {
279-
self._loggingHandler
280-
}
281-
} set {
282-
Self.loggingHandlerLock.withLock {
283-
self._loggingHandler = newValue
284-
}
273+
return _loggingHandler.get() ?? nil
274+
}
275+
@available(
276+
*,
277+
deprecated,
278+
message: "use instance level `loggingHandler` passed via `init` instead of setting one globally."
279+
)
280+
set {
281+
_loggingHandler.put(newValue)
285282
}
286283
}
287284

@@ -332,8 +329,7 @@ package final class AsyncProcess {
332329
///
333330
/// Key: Executable name or path.
334331
/// Value: Path to the executable, if found.
335-
private static var validatedExecutablesMap = [String: AbsolutePath?]()
336-
private static let validatedExecutablesMapLock = NSLock()
332+
private static let validatedExecutablesMap = ThreadSafeKeyValueStore<String, AbsolutePath?>()
337333

338334
/// Create a new process instance.
339335
///
@@ -453,14 +449,7 @@ package final class AsyncProcess {
453449
}
454450
// This should cover the most common cases, i.e. when the cache is most helpful.
455451
if workingDirectory == localFileSystem.currentWorkingDirectory {
456-
return AsyncProcess.validatedExecutablesMapLock.withLock {
457-
if let value = AsyncProcess.validatedExecutablesMap[program] {
458-
return value
459-
}
460-
let value = lookup()
461-
AsyncProcess.validatedExecutablesMap[program] = value
462-
return value
463-
}
452+
return AsyncProcess.validatedExecutablesMap.memoize(program, body: lookup)
464453
} else {
465454
return lookup()
466455
}
@@ -816,17 +805,17 @@ package final class AsyncProcess {
816805
package func waitUntilExit() throws -> AsyncProcessResult {
817806
let group = DispatchGroup()
818807
group.enter()
819-
var processResult: Result<AsyncProcessResult, Swift.Error>?
808+
let resultBox = ThreadSafeBox<Result<AsyncProcessResult, Swift.Error>>()
820809
self.waitUntilExit { result in
821-
processResult = result
810+
resultBox.put(result)
822811
group.leave()
823812
}
824813
group.wait()
825-
return try processResult.unsafelyUnwrapped.get()
814+
return try resultBox.get().unsafelyUnwrapped.get()
826815
}
827816

828817
/// Executes the process I/O state machine, calling completion block when finished.
829-
private func waitUntilExit(_ completion: @escaping (Result<AsyncProcessResult, Swift.Error>) -> Void) {
818+
private func waitUntilExit(_ completion: @Sendable @escaping (Result<AsyncProcessResult, Swift.Error>) -> Void) {
830819
self.stateLock.lock()
831820
switch self.state {
832821
case .idle:
@@ -1103,7 +1092,7 @@ extension AsyncProcess {
11031092
environment: Environment = .current,
11041093
loggingHandler: LoggingHandler? = .none,
11051094
queue: DispatchQueue? = nil,
1106-
completion: @escaping (Result<AsyncProcessResult, Swift.Error>) -> Void
1095+
completion: @Sendable @escaping (Result<AsyncProcessResult, Swift.Error>) -> Void
11071096
) {
11081097
let completionQueue = queue ?? Self.sharedCompletionQueue
11091098

Sources/Commands/SwiftTestCommand.swift

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -980,7 +980,7 @@ final class TestRunner {
980980

981981
/// Executes and returns execution status. Prints test output on standard streams if requested
982982
/// - Returns: Result of spawning and running the test process, and the output stream result
983-
func test(outputHandler: @escaping (String) -> Void) -> Result {
983+
func test(outputHandler: @escaping @Sendable (String) -> Void) -> Result {
984984
var results = [Result]()
985985
for path in self.bundlePaths {
986986
let testSuccess = self.test(at: path, outputHandler: outputHandler)
@@ -1027,11 +1027,11 @@ final class TestRunner {
10271027
return args
10281028
}
10291029

1030-
private func test(at path: AbsolutePath, outputHandler: @escaping (String) -> Void) -> Result {
1030+
private func test(at path: AbsolutePath, outputHandler: @escaping @Sendable (String) -> Void) -> Result {
10311031
let testObservabilityScope = self.observabilityScope.makeChildScope(description: "running test at \(path)")
10321032

10331033
do {
1034-
let outputHandler = { (bytes: [UInt8]) in
1034+
let outputHandler: @Sendable ([UInt8]) -> Void = { (bytes: [UInt8]) in
10351035
if let output = String(bytes: bytes, encoding: .utf8) {
10361036
outputHandler(output)
10371037
}
@@ -1214,17 +1214,16 @@ final class ParallelTestRunner {
12141214
observabilityScope: self.observabilityScope,
12151215
library: .xctest // swift-testing does not use ParallelTestRunner
12161216
)
1217-
var output = ""
1218-
let outputLock = NSLock()
1217+
let output = ThreadSafeBox<String>("")
12191218
let start = DispatchTime.now()
1220-
let result = testRunner.test(outputHandler: { _output in outputLock.withLock{ output += _output }})
1219+
let result = testRunner.test(outputHandler: { _output in output.append(_output) })
12211220
let duration = start.distance(to: .now())
12221221
if result == .failure {
12231222
self.ranSuccessfully = false
12241223
}
12251224
self.finishedTests.enqueue(TestResult(
12261225
unitTest: test,
1227-
output: output,
1226+
output: output.get() ?? "",
12281227
success: result != .failure,
12291228
duration: duration
12301229
))

Tests/BasicsTests/AsyncProcessTests.swift

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,15 @@ final class AsyncProcessTests: XCTestCase {
7777
let args = ["whoami"]
7878
let answer = NSUserName()
7979
#endif
80-
var popenResult: Result<AsyncProcessResult, Error>?
80+
let popenResult = ThreadSafeBox<Result<AsyncProcessResult, Error>>()
8181
let group = DispatchGroup()
8282
group.enter()
8383
AsyncProcess.popen(arguments: args) { result in
84-
popenResult = result
84+
popenResult.put(result)
8585
group.leave()
8686
}
8787
group.wait()
88-
switch popenResult {
88+
switch popenResult.get() {
8989
case .success(let processResult):
9090
let output = try processResult.utf8Output()
9191
XCTAssertTrue(output.hasPrefix(answer))
@@ -243,9 +243,11 @@ final class AsyncProcessTests: XCTestCase {
243243
}
244244

245245
func testStdin() throws {
246-
var stdout = [UInt8]()
246+
let stdout = ThreadSafeBox<[UInt8]>([])
247247
let process = AsyncProcess(scriptName: "in-to-out\(ProcessInfo.batSuffix)", outputRedirection: .stream(stdout: { stdoutBytes in
248-
stdout += stdoutBytes
248+
stdout.mutate {
249+
$0?.append(contentsOf: stdoutBytes)
250+
}
249251
}, stderr: { _ in }))
250252
let stdinStream = try process.launch()
251253

@@ -256,7 +258,7 @@ final class AsyncProcessTests: XCTestCase {
256258

257259
try process.waitUntilExit()
258260

259-
XCTAssertEqual(String(decoding: stdout, as: UTF8.self), "hello\(ProcessInfo.EOL)")
261+
XCTAssertEqual(String(decoding: stdout.get(default: []), as: UTF8.self), "hello\(ProcessInfo.EOL)")
260262
}
261263

262264
func testStdoutStdErr() throws {
@@ -353,28 +355,37 @@ final class AsyncProcessTests: XCTestCase {
353355
}
354356

355357
func testStdoutStdErrStreaming() throws {
356-
var stdout = [UInt8]()
357-
var stderr = [UInt8]()
358+
let stdout = ThreadSafeBox<[UInt8]>([])
359+
let stderr = ThreadSafeBox<[UInt8]>([])
358360
let process = AsyncProcess(scriptName: "long-stdout-stderr\(ProcessInfo.batSuffix)", outputRedirection: .stream(stdout: { stdoutBytes in
359-
stdout += stdoutBytes
361+
stdout.mutate {
362+
$0?.append(contentsOf: stdoutBytes)
363+
}
360364
}, stderr: { stderrBytes in
361-
stderr += stderrBytes
365+
stderr.mutate {
366+
$0?.append(contentsOf: stderrBytes)
367+
}
362368
}))
363369
try process.launch()
364370
try process.waitUntilExit()
365371

366372
let count = 16 * 1024
367-
XCTAssertEqual(String(bytes: stdout, encoding: .utf8), String(repeating: "1", count: count))
368-
XCTAssertEqual(String(bytes: stderr, encoding: .utf8), String(repeating: "2", count: count))
373+
XCTAssertEqual(String(bytes: stdout.get(default: []), encoding: .utf8), String(repeating: "1", count: count))
374+
XCTAssertEqual(String(bytes: stderr.get(default: []), encoding: .utf8), String(repeating: "2", count: count))
369375
}
370376

371377
func testStdoutStdErrStreamingRedirected() throws {
372-
var stdout = [UInt8]()
373-
var stderr = [UInt8]()
378+
let stdout = ThreadSafeBox<[UInt8]>([])
379+
let stderr = ThreadSafeBox<[UInt8]>([])
380+
374381
let process = AsyncProcess(scriptName: "long-stdout-stderr\(ProcessInfo.batSuffix)", outputRedirection: .stream(stdout: { stdoutBytes in
375-
stdout += stdoutBytes
382+
stdout.mutate {
383+
$0?.append(contentsOf: stdoutBytes)
384+
}
376385
}, stderr: { stderrBytes in
377-
stderr += stderrBytes
386+
stderr.mutate {
387+
$0?.append(contentsOf: stderrBytes)
388+
}
378389
}, redirectStderr: true))
379390
try process.launch()
380391
try process.waitUntilExit()
@@ -387,8 +398,8 @@ final class AsyncProcessTests: XCTestCase {
387398
let expectedStdout = String(repeating: "12", count: count)
388399
let expectedStderr = ""
389400
#endif
390-
XCTAssertEqual(String(bytes: stdout, encoding: .utf8), expectedStdout)
391-
XCTAssertEqual(String(bytes: stderr, encoding: .utf8), expectedStderr)
401+
XCTAssertEqual(String(bytes: stdout.get(default: []), encoding: .utf8), expectedStdout)
402+
XCTAssertEqual(String(bytes: stderr.get(default: []), encoding: .utf8), expectedStderr)
392403
}
393404

394405
func testWorkingDirectory() throws {

Tests/BasicsTests/CancellatorTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ class ProcessStartedSemaphore {
451451
self.term = term
452452
}
453453

454-
func handleOutput(_ bytes: [UInt8]) {
454+
@Sendable func handleOutput(_ bytes: [UInt8]) {
455455
self.lock.withLock {
456456
guard !self.trapped else {
457457
return

Tests/CommandsTests/RunCommandTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ struct RunCommandTests {
370370
self.sync = sync
371371
}
372372

373-
func handle(bytes: [UInt8]) {
373+
@Sendable func handle(bytes: [UInt8]) {
374374
guard let output = String(bytes: bytes, encoding: .utf8) else {
375375
return
376376
}

0 commit comments

Comments
 (0)