Skip to content

Commit 5da84e7

Browse files
authored
Merge pull request #70 from rchicoli/free-up-resource
Free up resource
2 parents 325293b + 07eed1b commit 5da84e7

File tree

2 files changed

+80
-10
lines changed

2 files changed

+80
-10
lines changed

internal/pkg/errgroup/errgroup.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package errgroup
2+
3+
import (
4+
"sync"
5+
6+
"golang.org/x/net/context"
7+
)
8+
9+
// A Group is a collection of goroutines working on subtasks that are part of
10+
// the same overall task.
11+
//
12+
// A zero Group is valid and does not cancel on error.
13+
type Group struct {
14+
cancel func()
15+
16+
wg sync.WaitGroup
17+
18+
errOnce sync.Once
19+
err error
20+
}
21+
22+
// WithContext returns a new Group and an associated Context derived from ctx.
23+
//
24+
// The derived Context is canceled the first time a function passed to Go
25+
// returns a non-nil error or the first time Wait returns, whichever occurs
26+
// first.
27+
func WithContext(ctx context.Context) (*Group, context.Context) {
28+
ctx, cancel := context.WithCancel(ctx)
29+
return &Group{cancel: cancel}, ctx
30+
}
31+
32+
// Wait blocks until all function calls from the Go method have returned, then
33+
// returns the first non-nil error (if any) from them.
34+
func (g *Group) Wait() error {
35+
g.wg.Wait()
36+
if g.cancel != nil {
37+
g.cancel()
38+
}
39+
return g.err
40+
}
41+
42+
// Stop cancels all goroutines
43+
func (g *Group) Stop() error {
44+
if g.cancel != nil {
45+
g.cancel()
46+
}
47+
return g.err
48+
}
49+
50+
// Go calls the given function in a new goroutine.
51+
//
52+
// The first call to return a non-nil error cancels the group; its error will be
53+
// returned by Wait.
54+
func (g *Group) Go(f func() error) {
55+
g.wg.Add(1)
56+
57+
go func() {
58+
defer g.wg.Done()
59+
60+
if err := f(); err != nil {
61+
g.errOnce.Do(func() {
62+
g.err = err
63+
if g.cancel != nil {
64+
g.cancel()
65+
}
66+
})
67+
}
68+
}()
69+
}

pkg/docker/driver.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,11 @@ import (
1212
"syscall"
1313
"time"
1414

15-
"golang.org/x/sync/errgroup"
16-
1715
"github.com/docker/docker/api/types/plugins/logdriver"
1816
"github.com/docker/docker/daemon/logger"
1917
"github.com/tonistiigi/fifo"
2018

19+
"github.com/rchicoli/docker-log-elasticsearch/internal/pkg/errgroup"
2120
"github.com/rchicoli/docker-log-elasticsearch/pkg/elasticsearch"
2221
"github.com/rchicoli/docker-log-elasticsearch/pkg/extension/grok"
2322

@@ -307,19 +306,12 @@ func (d *Driver) StopLogging(file string) error {
307306
}
308307
}
309308

310-
if d.esClient != nil {
311-
// l.Printf("INFO client: %v", d.esClient)
312-
if err := d.esClient.Close(); err != nil {
313-
l.Printf("error: closing client connection: %v", err)
314-
}
315-
d.esClient.Stop()
316-
}
317-
318309
if d.pipeline.group != nil {
319310
// l.Printf("INFO with pipeline: %v", d.pipeline)
320311

321312
// close(d.pipeline.inputCh)
322313
// close(d.pipeline.outputCh)
314+
d.pipeline.group.Stop()
323315
// d.pipeline.stopCh <- struct{}{}
324316

325317
// TODO: close channels gracefully
@@ -330,6 +322,15 @@ func (d *Driver) StopLogging(file string) error {
330322
// l.Printf("error with pipeline: %v", err)
331323
// }
332324
}
325+
326+
if d.esClient != nil {
327+
// l.Printf("INFO client: %v", d.esClient)
328+
if err := d.esClient.Close(); err != nil {
329+
l.Printf("error: closing client connection: %v", err)
330+
}
331+
d.esClient.Stop()
332+
}
333+
333334
// l.Printf("INFO done stop logging")
334335

335336
return nil

0 commit comments

Comments
 (0)