Skip to content

Commit 5accb4f

Browse files
author
rafael_chicoli
committed
driver: sleep on shutdown in order to avoid to empty the bulk queue
1 parent 3322d90 commit 5accb4f

File tree

1 file changed

+36
-20
lines changed

1 file changed

+36
-20
lines changed

pkg/docker/driver.go

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,10 @@ var l = log.New(os.Stderr, "", 0)
3232

3333
// Driver ...
3434
type Driver struct {
35-
mu *sync.Mutex
36-
logs map[string]*container
37-
logger logger.Logger
38-
39-
esClient elasticsearch.Client
40-
41-
groker *grok.Grok
42-
35+
mu *sync.Mutex
4336
container *container
44-
45-
pipeline pipeline
37+
pipeline pipeline
38+
esClient elasticsearch.Client
4639
}
4740

4841
type pipeline struct {
@@ -126,8 +119,7 @@ func (l LogMessage) timeOmityEmpty() *time.Time {
126119
// NewDriver ...
127120
func NewDriver() Driver {
128121
return Driver{
129-
logs: make(map[string]*container),
130-
mu: new(sync.Mutex),
122+
mu: new(sync.Mutex),
131123
}
132124
}
133125

@@ -160,6 +152,7 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
160152

161153
d.pipeline.group, d.pipeline.ctx = errgroup.WithContext(ctx)
162154
d.pipeline.inputCh = make(chan logdriver.LogEntry)
155+
d.pipeline.outputCh = make(chan LogMessage)
163156
// d.pipeline.stopCh = make(chan struct{})
164157

165158
d.pipeline.group.Go(func() error {
@@ -198,13 +191,11 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
198191
// custom log message fields
199192
msg := getLostashFields(cfg.fields, c.info)
200193

201-
d.groker, err = grok.NewGrok(cfg.grokMatch, cfg.grokPattern, cfg.grokPatternFrom, cfg.grokPatternSplitter, cfg.grokNamedCapture)
194+
groker, err := grok.NewGrok(cfg.grokMatch, cfg.grokPattern, cfg.grokPatternFrom, cfg.grokPatternSplitter, cfg.grokNamedCapture)
202195
if err != nil {
203196
return err
204197
}
205198

206-
d.pipeline.outputCh = make(chan LogMessage)
207-
208199
for m := range d.pipeline.inputCh {
209200

210201
logMessage = string(m.Line)
@@ -221,16 +212,20 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
221212

222213
// TODO: create a PR to grok upstream for parsing bytes
223214
// so that we avoid having to convert the message to string
224-
msg.GrokLine, msg.Line, err = d.groker.ParseLine(cfg.grokMatch, logMessage, m.Line)
215+
msg.GrokLine, msg.Line, err = groker.ParseLine(cfg.grokMatch, logMessage, m.Line)
225216
if err != nil {
226217
l.Printf("error: [%v] parsing log message: %v\n", c.info.ID(), err)
227218
}
228219

220+
// l.Printf("INFO: grokline: %v\n", msg.GrokLine)
221+
// l.Printf("INFO: line: %v\n", string(msg.Line))
222+
229223
select {
230224
case d.pipeline.outputCh <- msg:
231225
case <-d.pipeline.ctx.Done():
232226
return d.pipeline.ctx.Err()
233227
}
228+
234229
}
235230

236231
return nil
@@ -240,17 +235,34 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
240235

241236
err := d.esClient.NewBulkProcessorService(d.pipeline.ctx, cfg.Bulk.workers, cfg.Bulk.actions, cfg.Bulk.size, cfg.Bulk.flushInterval, cfg.Bulk.stats)
242237
if err != nil {
243-
l.Printf("error creating bulk processor: %v", err)
238+
l.Printf("error creating bulk processor: %v\n", err)
244239
}
245240

246-
for {
241+
// l.Printf("receving from output\n")
242+
243+
for doc := range d.pipeline.outputCh {
244+
245+
// l.Printf("sending doc: %#v\n", doc.GrokLine)
246+
d.esClient.Add(cfg.index, cfg.tzpe, doc)
247+
247248
select {
248-
case doc := <-d.pipeline.outputCh:
249-
d.esClient.Add(cfg.index, cfg.tzpe, doc)
250249
case <-d.pipeline.ctx.Done():
251250
return d.pipeline.ctx.Err()
251+
default:
252252
}
253253
}
254+
// for {
255+
// select {
256+
// case doc := <-d.pipeline.outputCh:
257+
// l.Printf("sending doc: %#v\n", doc.GrokLine)
258+
// d.esClient.Add(cfg.index, cfg.tzpe, doc)
259+
260+
// case <-d.pipeline.ctx.Done():
261+
// return d.pipeline.ctx.Err()
262+
// }
263+
// }
264+
265+
return nil
254266
})
255267

256268
// TODO: create metrics from stats
@@ -284,6 +296,10 @@ func (d *Driver) StopLogging(file string) error {
284296

285297
// log.Infof("info: stopping log: %s\n", file)
286298

299+
// TODO: count how many docs are in the queue before shutting down
300+
// alternative: sleep flush interval time
301+
time.Sleep(10 * time.Second)
302+
287303
if d.container != nil {
288304
// l.Printf("INFO container: %v", d.container)
289305
if err := d.container.stream.Close(); err != nil {

0 commit comments

Comments
 (0)