11package docker
22
33import (
4+ "bytes"
45 "context"
5- "fmt "
6+ "encoding/binary "
67 "io"
7- "path"
8- "syscall"
8+ "os"
9+ "strings"
10+ "time"
911
1012 log "github.com/Sirupsen/logrus"
1113 "github.com/docker/docker/api/types/plugins/logdriver"
1214 "github.com/docker/docker/daemon/logger"
15+ protoio "github.com/gogo/protobuf/io"
1316 "github.com/rchicoli/docker-log-elasticsearch/pkg/elasticsearch"
17+ "github.com/rchicoli/docker-log-elasticsearch/pkg/extension/grok"
1418 "github.com/robfig/cron"
15- "github.com/tonistiigi/fifo"
1619 "golang.org/x/sync/errgroup"
1720)
1821
1922type container struct {
2023 cron * cron.Cron
2124 esClient elasticsearch.Client
2225 indexName string
23- info logger.Info
2426 logger * log.Entry
2527 pipeline pipeline
2628 stream io.ReadCloser
@@ -32,44 +34,179 @@ type pipeline struct {
3234 outputCh chan LogMessage
3335}
3436
35- // newContainer stores the container's configuration in memory
36- // and returns a pointer to the container
37- func (d * Driver ) newContainer (ctx context.Context , file string ) (* container , error ) {
37+ // Read reads messages from proto buffer
38+ func (c * container ) Read (ctx context.Context ) error {
3839
39- filename := path .Base (file )
40- log .WithField ("fifo" , file ).Debug ("created fifo file" )
40+ c .logger .Debug ("starting pipeline: Read" )
4141
42- d .mu .Lock ()
43- if _ , exists := d .logs [filename ]; exists {
44- return nil , fmt .Errorf ("error: a logger for this container already exists: %s" , filename )
45- }
46- d .mu .Unlock ()
42+ c .pipeline .group .Go (func () error {
4743
48- f , err := fifo .OpenFifo (ctx , file , syscall .O_RDONLY , 0700 )
49- if err != nil {
50- return nil , fmt .Errorf ("could not open fifo: %q" , err )
51- }
44+ dec := protoio .NewUint32DelimitedReader (c .stream , binary .BigEndian , 1e6 )
45+ defer func () {
46+ c .logger .Info ("closing docker stream" )
47+ dec .Close ()
48+ close (c .pipeline .inputCh )
49+ }()
5250
53- d .mu .Lock ()
54- c := & container {stream : f }
55- d .logs [filename ] = c
56- d .mu .Unlock ()
51+ var buf logdriver.LogEntry
52+ var err error
5753
58- return c , nil
54+ for {
55+ if err = dec .ReadMsg (& buf ); err != nil {
56+ if err == io .EOF {
57+ c .logger .Debug ("shutting down reader eof" )
58+ break
59+ }
60+ // the connection has been closed
61+ // stop looping and close the input channel
62+ // read /proc/self/fd/6: file already closed
63+ if strings .Contains (err .Error (), os .ErrClosed .Error ()) {
64+ c .logger .WithError (err ).Debug ("shutting down fifo: closed by the writer" )
65+ break
66+ }
67+ if err != nil {
68+ // the connection has been closed
69+ // stop looping and closing the input channel
70+ // read /proc/self/fd/6: file already closed
71+ c .logger .WithError (err ).Debug ("shutting down fifo" )
72+ break
73+ // do not return, otherwise group.Go closes the pipeline
74+ // return err
75+ }
76+
77+ dec = protoio .NewUint32DelimitedReader (c .stream , binary .BigEndian , 1e6 )
78+ }
79+
80+ // in case docker run command throws lots empty line messages
81+ if len (bytes .TrimSpace (buf .Line )) == 0 {
82+ c .logger .WithField ("line" , string (buf .Line )).Debug ("trim space" )
83+ continue
84+ }
85+
86+ select {
87+ case c .pipeline .inputCh <- buf :
88+ case <- ctx .Done ():
89+ c .logger .WithError (ctx .Err ()).Error ("closing read pipeline" )
90+ return ctx .Err ()
91+ }
92+ buf .Reset ()
93+ }
94+
95+ return nil
96+ })
97+
98+ return nil
5999}
60100
61- // getContainer retrieves the container's configuration from memory
62- func (d * Driver ) getContainer (file string ) (* container , error ) {
101+ // Parse filters line messages
102+ func (c * container ) Parse (ctx context.Context , info logger.Info , fields , grokMatch , grokPattern , grokPatternFrom , grokPatternSplitter string , grokNamedCapture bool ) error {
103+
104+ c .logger .Debug ("starting pipeline: Parse" )
105+
106+ c .pipeline .group .Go (func () error {
107+ defer close (c .pipeline .outputCh )
63108
64- filename := path .Base (file )
109+ groker , err := grok .NewGrok (grokMatch , grokPattern , grokPatternFrom , grokPatternSplitter , grokNamedCapture )
110+ if err != nil {
111+ return err
112+ }
65113
66- d .mu .Lock ()
67- defer d .mu .Unlock ()
114+ var logMessage string
115+ // custom log message fields
116+ msg := getLogMessageFields (fields , info )
68117
69- c , exists := d .logs [filename ]
70- if ! exists {
71- return nil , fmt .Errorf ("error: logger not found for socket ID: %v" , file )
72- }
118+ for m := range c .pipeline .inputCh {
73119
74- return c , nil
120+ logMessage = string (m .Line )
121+
122+ // create message
123+ msg .Source = m .Source
124+ msg .Partial = m .Partial
125+ msg .TimeNano = m .TimeNano
126+
127+ // TODO: create a PR to grok upstream for parsing bytes
128+ // so that we avoid having to convert the message to string
129+ msg .GrokLine , msg .Line , err = groker .ParseLine (grokMatch , logMessage , m .Line )
130+ if err != nil {
131+ c .logger .WithError (err ).Error ("could not parse line with grok" )
132+ }
133+
134+ select {
135+ case c .pipeline .outputCh <- msg :
136+ case <- ctx .Done ():
137+ c .logger .WithError (ctx .Err ()).Error ("closing parse pipeline" )
138+ return ctx .Err ()
139+ }
140+
141+ }
142+
143+ return nil
144+ })
145+
146+ return nil
75147}
148+
149+ // Log sends messages to Elasticsearch Bulk Service
150+ func (c * container ) Log (ctx context.Context , workers , actions , size int , flushInterval time.Duration , stats bool , indexName , tzpe string ) error {
151+
152+ c .logger .Debug ("starting pipeline: Log" )
153+
154+ c .pipeline .group .Go (func () error {
155+
156+ err := c .esClient .NewBulkProcessorService (ctx , workers , actions , size , flushInterval , stats )
157+ if err != nil {
158+ c .logger .WithError (err ).Error ("could not create bulk processor" )
159+ }
160+
161+ defer func () {
162+ if err := c .esClient .Flush (); err != nil {
163+ c .logger .WithError (err ).Error ("could not flush queue" )
164+ }
165+
166+ if err := c .esClient .Close (); err != nil {
167+ c .logger .WithError (err ).Error ("could not close client connection" )
168+ }
169+ c .esClient .Stop ()
170+ }()
171+
172+ for doc := range c .pipeline .outputCh {
173+
174+ c .esClient .Add (indexName , tzpe , doc )
175+
176+ select {
177+ case <- ctx .Done ():
178+ c .logger .WithError (ctx .Err ()).Error ("closing log pipeline" )
179+ return ctx .Err ()
180+ default :
181+ }
182+ }
183+ return nil
184+ })
185+
186+ return nil
187+ }
188+
189+ // Stats shows metrics related to the bulk service
190+ // func (d *Driver) Stats(filename string, config Configuration) error {
191+ // TODO: create metrics from stats
192+ // d.pipeline.group.Go(func() error {
193+ // stats := d.esClient.Stats()
194+
195+ // fields := log.Fields{
196+ // "flushed": stats.Flushed,
197+ // "committed": stats.Committed,
198+ // "indexed": stats.Indexed,
199+ // "created": stats.Created,
200+ // "updated": stats.Updated,
201+ // "succeeded": stats.Succeeded,
202+ // "failed": stats.Failed,
203+ // }
204+
205+ // for i, w := range stats.Workers {
206+ // fmt.Printf("Worker %d: Number of requests queued: %d\n", i, w.Queued)
207+ // fmt.Printf(" Last response time : %v\n", w.LastDuration)
208+ // fields[fmt.Sprintf("w%d.queued", i)] = w.Queued
209+ // fields[fmt.Sprintf("w%d.lastduration", i)] = w.LastDuration
210+ // }
211+ // })
212+ // }
0 commit comments