Skip to content

Commit f68801b

Browse files
author
rafael_chicoli
committed
driver: reuse logrus
1 parent 34a4579 commit f68801b

File tree

2 files changed

+51
-32
lines changed

2 files changed

+51
-32
lines changed

main.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,39 @@
11
package main
22

33
import (
4+
"fmt"
5+
"os"
6+
7+
"github.com/Sirupsen/logrus"
48
"github.com/docker/go-plugins-helpers/sdk"
59

610
"github.com/rchicoli/docker-log-elasticsearch/pkg/docker"
711
)
812

13+
var logLevels = map[string]logrus.Level{
14+
"debug": logrus.DebugLevel,
15+
"info": logrus.InfoLevel,
16+
"warn": logrus.WarnLevel,
17+
"error": logrus.ErrorLevel,
18+
}
19+
920
func main() {
1021

22+
levelVal := os.Getenv("LOG_LEVEL")
23+
if levelVal == "" {
24+
levelVal = "info"
25+
}
26+
if level, exists := logLevels[levelVal]; exists {
27+
logrus.SetLevel(level)
28+
logrus.SetFormatter(&logrus.TextFormatter{
29+
DisableTimestamp: true,
30+
QuoteEmptyFields: false,
31+
})
32+
} else {
33+
fmt.Fprintln(os.Stderr, "invalid log level: ", levelVal)
34+
os.Exit(1)
35+
}
36+
1137
h := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`)
1238
d := docker.NewDriver()
1339
docker.Handlers(&h, d)

pkg/docker/driver.go

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@ import (
77
"encoding/json"
88
"fmt"
99
"io"
10-
"log"
11-
"os"
1210
"path"
1311
"sync"
1412
"syscall"
1513
"time"
1614

1715
"golang.org/x/sync/errgroup"
1816

17+
"github.com/Sirupsen/logrus"
1918
"github.com/docker/docker/api/types/plugins/logdriver"
2019
"github.com/docker/docker/daemon/logger"
2120
"github.com/tonistiigi/fifo"
@@ -30,8 +29,6 @@ const (
3029
name = "elasticsearchlog"
3130
)
3231

33-
var l = log.New(os.Stderr, "", 0)
34-
3532
// Driver ...
3633
type Driver struct {
3734
mu *sync.Mutex
@@ -154,7 +151,7 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
154151
d.logs[filename] = c
155152
d.mu.Unlock()
156153

157-
l.Printf("info: starting logger for containerID=[%v] and socket=[%v]\n", c.info.ContainerID, filename)
154+
logrus.WithField("containerID", c.info.ContainerID).WithField("socket", filename).Info("starting logging")
158155

159156
config := defaultLogOpt()
160157
if err := config.validateLogOpt(c.info.Config); err != nil {
@@ -175,7 +172,7 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
175172

176173
dec := protoio.NewUint32DelimitedReader(c.stream, binary.BigEndian, 1e6)
177174
defer func() {
178-
fmt.Printf("info: [%v] closing docker reader\n", c.info.ContainerID)
175+
logrus.WithField("containerID", c.info.ContainerID).Info("closing docker stream")
179176
dec.Close()
180177
close(c.pipeline.inputCh)
181178
}()
@@ -186,12 +183,10 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
186183
for {
187184
if err = dec.ReadMsg(&buf); err != nil {
188185
if err == io.EOF {
189-
fmt.Printf("info: [%v] shutting down logger: %v\n", c.info.ContainerID, err)
186+
logrus.WithField("containerID", c.info.ContainerID).WithField("line", string(buf.Line)).Debugf("shutting down reader eof")
190187
return nil
191188
}
192189
if err != nil {
193-
// TODO: log only on debug mode
194-
// l.Printf("error: panicing [%v]: %v\n", c.info.ContainerID, err)
195190
// the connection has been closed
196191
// stop looping and closing the input channel
197192
// read /proc/self/fd/6: file already closed
@@ -203,22 +198,20 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
203198
dec = protoio.NewUint32DelimitedReader(c.stream, binary.BigEndian, 1e6)
204199
}
205200

206-
// l.Printf("INFO pipe1 client: %#v\n", c.esClient)
207-
l.Printf("info: pipe1 line: %v\n", string(buf.Line))
201+
// logrus.WithField("containerID", c.info.ContainerID).WithField("line", string(buf.Line)).Debugf("pipe1")
208202

209203
// I guess this problem has been fixed with the break function above
210204
// test it again
211205
// BUG: (17.09.0~ce-0~debian) docker run command throws lots empty line messages
212206
if len(bytes.TrimSpace(buf.Line)) == 0 {
213-
// TODO: add log debug level
214-
// l.Printf("error trimming")
207+
logrus.WithField("containerID", c.info.ContainerID).WithField("line", string(buf.Line)).Debugf("trim")
215208
continue
216209
}
217210

218211
select {
219212
case c.pipeline.inputCh <- buf:
220213
case <-c.pipeline.ctx.Done():
221-
l.Printf("info: context done for pipe 1: %#v\n", c.pipeline.ctx.Err())
214+
logrus.WithField("containerID", c.info.ContainerID).WithError(c.pipeline.ctx.Err()).Error("context closing pipe 1")
222215
return c.pipeline.ctx.Err()
223216
}
224217
buf.Reset()
@@ -248,19 +241,19 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
248241
msg.Partial = m.Partial
249242
msg.TimeNano = m.TimeNano
250243

251-
l.Printf("info: pipe2 line: %v\n", string(m.Line))
244+
// logrus.WithField("containerID", c.info.ContainerID).WithField("line", string(buf.Line)).Debugf("pipe2")
252245

253246
// TODO: create a PR to grok upstream for parsing bytes
254247
// so that we avoid having to convert the message to string
255248
msg.GrokLine, msg.Line, err = groker.ParseLine(config.grokMatch, logMessage, m.Line)
256249
if err != nil {
257-
l.Printf("error: [%v] parsing log message: %v\n", c.info.ID(), err)
250+
logrus.WithField("containerID", c.info.ContainerID).WithError(err).Error("parsing log message")
258251
}
259252

260253
select {
261254
case c.pipeline.outputCh <- msg:
262255
case <-c.pipeline.ctx.Done():
263-
l.Printf("error: context done for pipe 2: %#v\n", c.pipeline.ctx.Err())
256+
logrus.WithField("containerID", c.info.ContainerID).WithError(c.pipeline.ctx.Err()).Error("context closing pipe 2")
264257
return c.pipeline.ctx.Err()
265258
}
266259

@@ -273,33 +266,36 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
273266

274267
err := c.esClient.NewBulkProcessorService(c.pipeline.ctx, config.Bulk.workers, config.Bulk.actions, config.Bulk.size, config.Bulk.flushInterval, config.Bulk.stats)
275268
if err != nil {
276-
l.Printf("error creating bulk processor: %v\n", err)
269+
logrus.WithField("containerID", c.info.ContainerID).WithError(err).Error("creating bulk processor")
270+
// logrus.WithField("containerID", c.info.ContainerID).WithField("line", string(buf.Line)).Debugf("pipe1")
271+
277272
}
278273

279274
defer func() {
280275
if err := c.esClient.Flush(); err != nil {
281-
l.Printf("error: flushing queue: %v", err)
276+
logrus.WithField("containerID", c.info.ContainerID).WithError(err).Error("flushing queue")
282277
}
283278

284-
l.Printf("info: closing client: %v", c.esClient)
279+
// logrus.WithField("containerID", c.info.ContainerID).WithField("client", c.esClient).Debugf("closing client")
280+
285281
if err := c.esClient.Close(); err != nil {
286-
l.Printf("error: closing client connection: %v\n", err)
282+
logrus.WithField("containerID", c.info.ContainerID).WithError(err).Error("closing client connection")
287283
}
288284
c.esClient.Stop()
289285
}()
290286

291287
// this was helpful to test if the pipeline has been closed successfully
292288
// newTicker := time.NewTicker(1 * time.Second)
293289
for doc := range c.pipeline.outputCh {
294-
l.Printf("info: pipe3 line: %v\n", string(doc.Line))
295-
// l.Printf("info: pipe3: %#v\n", doc.GrokLine)
290+
// logrus.WithField("containerID", c.info.ContainerID).WithField("line", string(doc.Line)).WithField("grok", doc.GrokLine).Debugf("pipe3")
291+
296292
c.esClient.Add(config.index, config.tzpe, doc)
297293
select {
298294
case <-c.pipeline.ctx.Done():
299-
l.Printf("info context done for pipe 3: %#v\n", c.pipeline.ctx.Err())
295+
logrus.WithField("containerID", c.info.ContainerID).WithError(c.pipeline.ctx.Err()).Error("context closing pipe 3")
300296
return c.pipeline.ctx.Err()
301297
// case <-newTicker.C:
302-
// l.Printf("info: still ticking")
298+
// log.Printf("info: still ticking")
303299
default:
304300
}
305301
}
@@ -351,29 +347,26 @@ func (d *Driver) StopLogging(file string) error {
351347
delete(d.logs, file)
352348
d.mu.Unlock()
353349

354-
l.Printf("info: stopping logger for containerID=[%v] and socket=[%v]\n", c.info.ContainerID, filename)
350+
logrus.WithField("containerID", c.info.ContainerID).WithField("socket", filename).Info("stopping logging")
355351

356352
if c.stream != nil {
357-
l.Printf("info: [%v] closing container stream\n", c.info.ID())
353+
logrus.WithField("containerID", c.info.ContainerID).Info("closing container stream")
358354
c.stream.Close()
359355
}
360356

361357
if c.pipeline.group != nil {
362-
l.Printf("info: [%v] closing pipeline: %v\n", c.info.ContainerID, c.pipeline)
363-
// close(c.pipeline.inputCh)
358+
logrus.WithField("containerID", c.info.ContainerID).Info("closing pipeline")
364359

365360
// Check whether any goroutines failed.
366361
if err := c.pipeline.group.Wait(); err != nil {
367-
l.Printf("error with pipeline [%v]: %v\n", filename, err)
362+
logrus.WithField("containerID", c.info.ContainerID).WithError(err).Error("pipeline wait group")
368363
}
369364
}
370365

371366
// if c.esClient != nil {
372367
// close client connection on last pipeline
373368
// }
374369

375-
// l.Printf("info: done stopping logger for containerID=[%v] and socket=[%v]\n", c.info.ContainerID, filename)
376-
377370
return nil
378371
}
379372

0 commit comments

Comments
 (0)