Skip to content

Commit af984fc

Browse files
authored
🐛 Fix hanging processes and subprocess termination (#444)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description - Trying to solve how subprocesses are spawned so that their termination is done gracefully and no orphans are left hanging ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update).
1 parent ba5830e commit af984fc

File tree

9 files changed

+118
-36
lines changed

9 files changed

+118
-36
lines changed

changes/20240403170926.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:bug: Fix sub processes termination and command process setup as described in [(go issue)](https://github.com/golang/go/issues/24050).

utils/subprocess/command_wrapper.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
* Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved.
33
* SPDX-License-Identifier: Apache-2.0
44
*/
5+
56
package subprocess
67

78
import (
89
"context"
910
"fmt"
10-
"os"
1111
"os/exec"
1212
"time"
1313

@@ -16,6 +16,7 @@ import (
1616
"github.com/ARM-software/golang-utils/utils/commonerrors"
1717
"github.com/ARM-software/golang-utils/utils/logs"
1818
"github.com/ARM-software/golang-utils/utils/parallelisation"
19+
"github.com/ARM-software/golang-utils/utils/proc"
1920
commandUtils "github.com/ARM-software/golang-utils/utils/subprocess/command"
2021
)
2122

@@ -46,7 +47,7 @@ func (c *cmdWrapper) Start() error {
4647
if c.cmd == nil {
4748
return fmt.Errorf("%w:undefined command", commonerrors.ErrUndefined)
4849
}
49-
return commonerrors.ConvertContextError(c.cmd.Start())
50+
return ConvertCommandError(c.cmd.Start())
5051
}
5152

5253
func (c *cmdWrapper) Run() error {
@@ -55,7 +56,7 @@ func (c *cmdWrapper) Run() error {
5556
if c.cmd == nil {
5657
return fmt.Errorf("%w:undefined command", commonerrors.ErrUndefined)
5758
}
58-
return commonerrors.ConvertContextError(c.cmd.Run())
59+
return ConvertCommandError(c.cmd.Run())
5960
}
6061

6162
func (c *cmdWrapper) Stop() error {
@@ -70,11 +71,11 @@ func (c *cmdWrapper) Stop() error {
7071
if subprocess != nil {
7172
pid := subprocess.Pid
7273
parallelisation.ScheduleAfter(ctx, 10*time.Millisecond, func(time.Time) {
73-
process, err := os.FindProcess(pid)
74+
process, err := proc.FindProcess(ctx, pid)
7475
if process == nil || err != nil {
7576
return
7677
}
77-
_ = process.Kill()
78+
_ = process.KillWithChildren(ctx)
7879
})
7980
}
8081
_ = c.cmd.Wait()
@@ -110,10 +111,11 @@ type command struct {
110111
func (c *command) createCommand(cmdCtx context.Context) *exec.Cmd {
111112
newCmd, newArgs := c.as.Redefine(c.cmd, c.args...)
112113
cmd := exec.CommandContext(cmdCtx, newCmd, newArgs...) //nolint:gosec
113-
cmd.Stdout = newOutStreamer(c.loggers)
114-
cmd.Stderr = newErrLogStreamer(c.loggers)
114+
cmd.Stdout = newOutStreamer(cmdCtx, c.loggers)
115+
cmd.Stderr = newErrLogStreamer(cmdCtx, c.loggers)
115116
cmd.Env = cmd.Environ()
116117
cmd.Env = append(cmd.Env, c.env...)
118+
setGroupAttrToCmd(cmd)
117119
return cmd
118120
}
119121

@@ -157,3 +159,31 @@ func newCommand(loggers logs.Loggers, as *commandUtils.CommandAsDifferentUser, e
157159
}
158160
return
159161
}
162+
163+
func ConvertCommandError(err error) error {
164+
return proc.ConvertProcessError(err)
165+
}
166+
167+
func CleanKillOfCommand(ctx context.Context, cmd *exec.Cmd) (err error) {
168+
if cmd == nil {
169+
return
170+
}
171+
defer func() {
172+
if cmd.Process != nil {
173+
_ = cmd.Process.Kill()
174+
}
175+
}()
176+
177+
thisP := cmd.Process
178+
if thisP == nil {
179+
return
180+
} else {
181+
p, subErr := proc.FindProcess(ctx, thisP.Pid)
182+
if subErr != nil {
183+
err = subErr
184+
return
185+
}
186+
err = p.KillWithChildren(ctx)
187+
}
188+
return
189+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
//go:build darwin
2+
// +build darwin
3+
4+
package subprocess
5+
6+
import (
7+
"os/exec"
8+
"syscall"
9+
)
10+
11+
// See https://github.com/tgulacsi/go/blob/master/proc/
12+
func setGroupAttrToCmd(c *exec.Cmd) {
13+
c.SysProcAttr = &syscall.SysProcAttr{
14+
Setpgid: true, // to be able to kill all children, too
15+
}
16+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
//go:build linux
2+
// +build linux
3+
4+
package subprocess
5+
6+
import (
7+
"os/exec"
8+
"syscall"
9+
)
10+
11+
// See https://github.com/tgulacsi/go/blob/master/proc/
12+
func setGroupAttrToCmd(c *exec.Cmd) {
13+
c.SysProcAttr = &syscall.SysProcAttr{
14+
Setpgid: true, // to be able to kill all children, too
15+
Pdeathsig: syscall.SIGKILL,
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
//go:build windows
2+
// +build windows
3+
4+
package subprocess
5+
6+
import (
7+
"os/exec"
8+
"syscall"
9+
)
10+
11+
func setGroupAttrToCmd(c *exec.Cmd) {
12+
c.SysProcAttr = &syscall.SysProcAttr{
13+
HideWindow: true,
14+
// Windows Process Creation Flags: https://learn.microsoft.com/en-us/windows/win32/procthread/process-creation-flags
15+
CreationFlags: syscall.CREATE_UNICODE_ENVIRONMENT | syscall.CREATE_NEW_PROCESS_GROUP,
16+
}
17+
}

utils/subprocess/executor.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ import (
1818
commandUtils "github.com/ARM-software/golang-utils/utils/subprocess/command"
1919
)
2020

21-
// Subprocess describes what a subproccess is as well as any monitoring it may need.
21+
// Subprocess describes what a subprocess is as well as any monitoring it may need.
2222
type Subprocess struct {
2323
mu deadlock.RWMutex
2424
isRunning atomic.Bool
2525
command *command
2626
processMonitoring *subprocessMonitoring
27-
messsaging *subprocessMessaging
27+
messaging *subprocessMessaging
2828
}
2929

3030
// New creates a subprocess description.
@@ -156,7 +156,7 @@ func (s *Subprocess) setup(ctx context.Context, loggers logs.Loggers, env []stri
156156
s.isRunning.Store(false)
157157
s.processMonitoring = newSubprocessMonitoring(ctx)
158158
s.command = newCommand(loggers, as, env, cmd, args...)
159-
s.messsaging = newSubprocessMessaging(loggers, withAdditionalMessages, messageOnSuccess, messageOnFailure, messageOnStart, s.command.GetPath())
159+
s.messaging = newSubprocessMessaging(loggers, withAdditionalMessages, messageOnSuccess, messageOnFailure, messageOnStart, s.command.GetPath())
160160
s.reset()
161161
return s.check()
162162
}
@@ -172,11 +172,11 @@ func (s *Subprocess) check() (err error) {
172172
if err != nil {
173173
return
174174
}
175-
if s.messsaging == nil {
175+
if s.messaging == nil {
176176
err = commonerrors.ErrNoLogger
177177
return
178178
}
179-
err = s.messsaging.Check()
179+
err = s.messaging.Check()
180180
return
181181
}
182182

@@ -212,26 +212,26 @@ func (s *Subprocess) Start() (err error) {
212212
cmd := s.getCmd()
213213
err = cmd.Start()
214214
if err != nil {
215-
s.messsaging.LogFailedStart(err)
215+
s.messaging.LogFailedStart(err)
216216
s.isRunning.Store(false)
217217
s.Cancel()
218218
return
219219
}
220220
pid, err := cmd.Pid()
221221
if err != nil {
222-
s.messsaging.LogFailedStart(err)
222+
s.messaging.LogFailedStart(err)
223223
s.isRunning.Store(false)
224224
s.Cancel()
225225
return
226226
}
227227

228228
s.isRunning.Store(true)
229-
s.messsaging.SetPid(pid)
230-
s.messsaging.LogStarted()
229+
s.messaging.SetPid(pid)
230+
s.messaging.LogStarted()
231231
return
232232
}
233233

234-
// Cancel interrupts an on-going process. This method is idempotent.
234+
// Cancel interrupts an ongoing process. This method is idempotent.
235235
func (s *Subprocess) Cancel() {
236236
s.processMonitoring.CancelSubprocess()
237237
}
@@ -251,13 +251,13 @@ func (s *Subprocess) Execute() (err error) {
251251
}
252252
s.processMonitoring.Reset()
253253
s.command.Reset()
254-
s.messsaging.LogStart()
254+
s.messaging.LogStart()
255255
s.runProcessMonitoring()
256256
cmd := s.getCmd()
257257
s.isRunning.Store(true)
258258
err = cmd.Run()
259259
s.isRunning.Store(false)
260-
s.messsaging.LogEnd(err)
260+
s.messaging.LogEnd(err)
261261
return
262262
}
263263

@@ -307,10 +307,10 @@ func (s *Subprocess) stop(cancel bool) (err error) {
307307
if !s.IsOn() {
308308
return
309309
}
310-
s.messsaging.LogStopping()
310+
s.messaging.LogStopping()
311311
err = s.getCmd().Stop()
312312
s.command.Reset()
313313
s.isRunning.Store(false)
314-
s.messsaging.LogEnd(nil)
314+
s.messaging.LogEnd(nil)
315315
return
316316
}

utils/subprocess/logging.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,20 @@
55
package subprocess
66

77
import (
8+
"context"
89
"io"
910
"strings"
1011

1112
"github.com/ARM-software/golang-utils/utils/logs"
1213
"github.com/ARM-software/golang-utils/utils/platform"
14+
"github.com/ARM-software/golang-utils/utils/safeio"
1315
)
1416

1517
var lineSep = platform.UnixLineSeparator()
1618

1719
// INTERNAL
1820
// Way of redirecting process output to a logger.
1921
type logStreamer struct {
20-
io.Writer
2122
IsStdErr bool
2223
Loggers logs.Loggers
2324
}
@@ -37,17 +38,17 @@ func (l *logStreamer) Write(p []byte) (n int, err error) {
3738
return len(p), nil
3839
}
3940

40-
func newLogStreamer(isStdErr bool, loggers logs.Loggers) *logStreamer {
41-
return &logStreamer{
41+
func newLogStreamer(ctx context.Context, isStdErr bool, loggers logs.Loggers) io.Writer {
42+
return safeio.ContextualWriter(ctx, &logStreamer{
4243
IsStdErr: isStdErr,
4344
Loggers: loggers,
44-
}
45+
})
4546
}
4647

47-
func newOutStreamer(loggers logs.Loggers) *logStreamer {
48-
return newLogStreamer(false, loggers)
48+
func newOutStreamer(ctx context.Context, loggers logs.Loggers) io.Writer {
49+
return newLogStreamer(ctx, false, loggers)
4950
}
5051

51-
func newErrLogStreamer(loggers logs.Loggers) *logStreamer {
52-
return newLogStreamer(true, loggers)
52+
func newErrLogStreamer(ctx context.Context, loggers logs.Loggers) io.Writer {
53+
return newLogStreamer(ctx, true, loggers)
5354
}

utils/subprocess/messaging.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,35 +27,35 @@ type subprocessMessaging struct {
2727
pid atomic.Int64
2828
}
2929

30-
// Logs subprocess start.
30+
// LogStart logs subprocess start.
3131
func (s *subprocessMessaging) LogStart() {
3232
if s.withAdditionalMessages {
3333
s.loggers.Log(s.messageOnProcessStart)
3434
}
3535
}
3636

37-
// Logs when subprocess failed to start.
37+
// LogFailedStart logs when subprocess failed to start.
3838
func (s *subprocessMessaging) LogFailedStart(err error) {
3939
if s.withAdditionalMessages {
4040
s.loggers.LogError(fmt.Sprintf("Failed starting process `%v`: %v", s.commandPath, err))
4141
}
4242
}
4343

44-
// Logs when subprocess has started.
44+
// LogStarted logs when subprocess has started.
4545
func (s *subprocessMessaging) LogStarted() {
4646
if s.withAdditionalMessages {
4747
s.loggers.Log(fmt.Sprintf("Started process [%v]", s.pid.Load()))
4848
}
4949
}
5050

51-
// Logs when subprocess is asked to stop.
51+
// LogStopping logs when subprocess is asked to stop.
5252
func (s *subprocessMessaging) LogStopping() {
5353
if s.withAdditionalMessages {
5454
s.loggers.Log(fmt.Sprintf("Stopping process [%v]", s.pid.Load()))
5555
}
5656
}
5757

58-
// Logs subprocess end with err if an error occurred.
58+
// LogEnd logs subprocess end with err if an error occurred.
5959
func (s *subprocessMessaging) LogEnd(err error) {
6060
if !s.withAdditionalMessages {
6161
return
@@ -67,7 +67,7 @@ func (s *subprocessMessaging) LogEnd(err error) {
6767
}
6868
}
6969

70-
// Sets the process PID.
70+
// SetPid sets the process PID.
7171
func (s *subprocessMessaging) SetPid(pid int) {
7272
s.pid.Store(int64(pid))
7373
}

utils/subprocess/monitoring.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func newSubprocessMonitoring(parentCtx context.Context) *subprocessMonitoring {
3232
return m
3333
}
3434

35-
// Interrupts an on-going process.
35+
// CancelSubprocess interrupts an on-going process.
3636
func (s *subprocessMonitoring) CancelSubprocess() {
3737
s.monitoringStopping.Store(true)
3838
s.cancelStore.Cancel()

0 commit comments

Comments
 (0)