Skip to content

Commit 0c10d62

Browse files
authored
Merge pull request #73 from rchicoli/development
rewrite testing framework and fix issue with cleaning up pipeline
2 parents 941e728 + 6b325d0 commit 0c10d62

35 files changed

+1348
-769
lines changed

.travis.yml

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@ before_install:
2323
script:
2424
# - echo
2525
- sudo make || exit 1
26-
- sudo make integration_tests || exit 1
27-
- sudo make acceptance_tests || exit 1
26+
# - sudo make integration_tests || exit 1
27+
# - sudo make acceptance_tests || exit 1
28+
# - sudo env BATS_TESTFILE=01-integration.bats make suite_tests || exit 1
29+
# - sudo env BATS_TESTFILE=02-acceptance.bats make suite_tests || exit 1
30+
- sudo env SLEEP_TIME=10 ./scripts/basht.sh --test-dir tests/integration-tests
31+
- sudo env SLEEP_TIME=10 ./scripts/basht.sh --test-dir tests/acceptance-tests
2832

2933
stages:
3034
# - name: cache
@@ -74,7 +78,7 @@ after_install:
7478

7579
# BUG: docker-ce version 17.09.0~ce-0~debian
7680
# it overloads docker-plugin by sending lots of empty lines
77-
addons:
78-
apt:
79-
packages:
80-
- docker-ce=17.05.0~ce-0~ubuntu-trusty
81+
# addons:
82+
# apt:
83+
# packages:
84+
# - docker-ce=17.05.0~ce-0~ubuntu-trusty

docker/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ services:
5454
driver: 'rchicoli/docker-log-elasticsearch:development'
5555
options:
5656
elasticsearch-url: 'http://172.31.0.2:9200'
57+
elasticsearch-bulk-flush-interval: '1s'
5758

5859
nginx:
5960
image: nginx:alpine

main.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,39 @@
11
package main
22

33
import (
4+
"fmt"
5+
"os"
6+
7+
"github.com/Sirupsen/logrus"
48
"github.com/docker/go-plugins-helpers/sdk"
59

610
"github.com/rchicoli/docker-log-elasticsearch/pkg/docker"
711
)
812

13+
var logLevels = map[string]logrus.Level{
14+
"debug": logrus.DebugLevel,
15+
"info": logrus.InfoLevel,
16+
"warn": logrus.WarnLevel,
17+
"error": logrus.ErrorLevel,
18+
}
19+
920
func main() {
1021

22+
levelVal := os.Getenv("LOG_LEVEL")
23+
if levelVal == "" {
24+
levelVal = "info"
25+
}
26+
if level, exists := logLevels[levelVal]; exists {
27+
logrus.SetLevel(level)
28+
logrus.SetFormatter(&logrus.TextFormatter{
29+
DisableTimestamp: true,
30+
QuoteEmptyFields: false,
31+
})
32+
} else {
33+
fmt.Fprintln(os.Stderr, "invalid log level: ", levelVal)
34+
os.Exit(1)
35+
}
36+
1137
h := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`)
1238
d := docker.NewDriver()
1339
docker.Handlers(&h, d)

pkg/docker/driver.go

Lines changed: 73 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ import (
77
"encoding/json"
88
"fmt"
99
"io"
10-
"log"
11-
"os"
10+
"path"
1211
"sync"
1312
"syscall"
1413
"time"
1514

1615
"golang.org/x/sync/errgroup"
1716

17+
"github.com/Sirupsen/logrus"
1818
"github.com/docker/docker/api/types/plugins/logdriver"
1919
"github.com/docker/docker/daemon/logger"
2020
"github.com/tonistiigi/fifo"
@@ -29,8 +29,6 @@ const (
2929
name = "elasticsearchlog"
3030
)
3131

32-
var l = log.New(os.Stderr, "", 0)
33-
3432
// Driver ...
3533
type Driver struct {
3634
mu *sync.Mutex
@@ -42,7 +40,7 @@ type pipeline struct {
4240
ctx context.Context
4341
outputCh chan LogMessage
4442
inputCh chan logdriver.LogEntry
45-
stopCh chan struct{}
43+
// stopCh chan struct{}
4644
}
4745

4846
type container struct {
@@ -129,27 +127,32 @@ func NewDriver() *Driver {
129127
func (d *Driver) StartLogging(file string, info logger.Info) error {
130128

131129
// log.Printf("info: starting log: %s\n", file)
130+
// full path: /run/docker/logging/4f8fdcf6793a3a72296e4aedf4f94f5bb5269b3f52eb17061bfe0fd75c66776a
131+
filename := path.Base(file)
132132

133133
// container's configuration is stored in memory
134134
d.mu.Lock()
135-
if _, exists := d.logs[file]; exists {
135+
if _, exists := d.logs[filename]; exists {
136136
d.mu.Unlock()
137-
return fmt.Errorf("error: [%v] a logger for this container already exists", info.ContainerID)
137+
return fmt.Errorf("error: [%v] a logger for this container already exists", filename)
138138
}
139139
d.mu.Unlock()
140140

141+
// check if need to be stored in memory
141142
ctx := context.Background()
142143

143144
f, err := fifo.OpenFifo(ctx, file, syscall.O_RDONLY, 0700)
144145
if err != nil {
145-
return fmt.Errorf("error: opening logger fifo: %q", file)
146+
return fmt.Errorf("error: opening logger fifo: %q", info.ContainerID)
146147
}
147148

148149
d.mu.Lock()
149150
c := &container{stream: f, info: info}
150-
d.logs[file] = c
151+
d.logs[filename] = c
151152
d.mu.Unlock()
152153

154+
logrus.WithField("containerID", c.info.ContainerID).WithField("socket", filename).Info("starting logging")
155+
153156
config := defaultLogOpt()
154157
if err := config.validateLogOpt(c.info.Config); err != nil {
155158
return fmt.Errorf("error: validating log options: %v", err)
@@ -163,18 +166,15 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
163166
c.pipeline.group, c.pipeline.ctx = errgroup.WithContext(ctx)
164167
c.pipeline.inputCh = make(chan logdriver.LogEntry)
165168
c.pipeline.outputCh = make(chan LogMessage)
166-
167-
l.Printf("INFO starting: %#v\n", c.info.ContainerID)
169+
// c.pipeline.stopCh = make(chan struct{})
168170

169171
c.pipeline.group.Go(func() error {
170172

171173
dec := protoio.NewUint32DelimitedReader(c.stream, binary.BigEndian, 1e6)
172174
defer func() {
173-
fmt.Printf("info: [%v] closing dec.\n", c.info.ContainerID)
175+
logrus.WithField("containerID", c.info.ContainerID).Info("closing docker stream")
174176
dec.Close()
175-
// close(c.pipeline.inputCh)
176-
// close(c.pipeline.outputCh)
177-
// c.pipeline.ctx.Done()
177+
close(c.pipeline.inputCh)
178178
}()
179179

180180
var buf logdriver.LogEntry
@@ -183,39 +183,45 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
183183
for {
184184
if err = dec.ReadMsg(&buf); err != nil {
185185
if err == io.EOF {
186-
fmt.Printf("info: [%v] shutting down log logger: %v\n", c.info.ContainerID, err)
187-
c.stream.Close()
186+
logrus.WithField("containerID", c.info.ContainerID).WithField("line", string(buf.Line)).Debugf("shutting down reader eof")
188187
return nil
189188
}
190189
if err != nil {
191-
l.Printf("error panicing: %v\n", err)
192-
return err
190+
// the connection has been closed
191+
// stop looping and closing the input channel
192+
// read /proc/self/fd/6: file already closed
193+
break
194+
// do not return, otherwise group.Go closes the pipeline
195+
// return err
193196
}
194197

195198
dec = protoio.NewUint32DelimitedReader(c.stream, binary.BigEndian, 1e6)
196199
}
197200

198-
// l.Printf("INFO pipe1 client: %#v\n", c.esClient)
199-
// l.Printf("INFO pipe1 line: %#v\n", string(buf.Line))
201+
// logrus.WithField("containerID", c.info.ContainerID).WithField("line", string(buf.Line)).Debugf("pipe1")
200202

203+
// I guess this problem has been fixed with the break function above
204+
// test it again
201205
// BUG: (17.09.0~ce-0~debian) docker run command throws lots empty line messages
202206
if len(bytes.TrimSpace(buf.Line)) == 0 {
203-
l.Printf("error trimming")
204-
// TODO: add log debug level
207+
logrus.WithField("containerID", c.info.ContainerID).WithField("line", string(buf.Line)).Debugf("trim")
205208
continue
206209
}
207210

208211
select {
209212
case c.pipeline.inputCh <- buf:
210213
case <-c.pipeline.ctx.Done():
211-
l.Printf("ERROR pipe1: %#v\n", c.pipeline.ctx.Err())
214+
logrus.WithField("containerID", c.info.ContainerID).WithError(c.pipeline.ctx.Err()).Error("context closing pipe 1")
212215
return c.pipeline.ctx.Err()
213216
}
214217
buf.Reset()
215218
}
219+
220+
return nil
216221
})
217222

218223
c.pipeline.group.Go(func() error {
224+
defer close(c.pipeline.outputCh)
219225

220226
groker, err := grok.NewGrok(config.grokMatch, config.grokPattern, config.grokPatternFrom, config.grokPatternSplitter, config.grokNamedCapture)
221227
if err != nil {
@@ -235,19 +241,19 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
235241
msg.Partial = m.Partial
236242
msg.TimeNano = m.TimeNano
237243

238-
// l.Printf("INFO pipe2: %#v\n", string(m.Line))
244+
// logrus.WithField("containerID", c.info.ContainerID).WithField("line", string(buf.Line)).Debugf("pipe2")
239245

240246
// TODO: create a PR to grok upstream for parsing bytes
241247
// so that we avoid having to convert the message to string
242248
msg.GrokLine, msg.Line, err = groker.ParseLine(config.grokMatch, logMessage, m.Line)
243249
if err != nil {
244-
l.Printf("error: [%v] parsing log message: %v\n", c.info.ID(), err)
250+
logrus.WithField("containerID", c.info.ContainerID).WithError(err).Error("parsing log message")
245251
}
246252

247253
select {
248254
case c.pipeline.outputCh <- msg:
249255
case <-c.pipeline.ctx.Done():
250-
l.Printf("ERROR pipe2: %#v\n", c.pipeline.ctx.Err())
256+
logrus.WithField("containerID", c.info.ContainerID).WithError(c.pipeline.ctx.Err()).Error("context closing pipe 2")
251257
return c.pipeline.ctx.Err()
252258
}
253259

@@ -260,35 +266,39 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
260266

261267
err := c.esClient.NewBulkProcessorService(c.pipeline.ctx, config.Bulk.workers, config.Bulk.actions, config.Bulk.size, config.Bulk.flushInterval, config.Bulk.stats)
262268
if err != nil {
263-
l.Printf("error creating bulk processor: %v\n", err)
269+
logrus.WithField("containerID", c.info.ContainerID).WithError(err).Error("creating bulk processor")
270+
// logrus.WithField("containerID", c.info.ContainerID).WithField("line", string(buf.Line)).Debugf("pipe1")
271+
264272
}
265273

266-
// for {
267-
// // l.Printf("INFO pipe3 starts")
268-
// select {
269-
// case doc := <-c.pipeline.outputCh:
270-
// // l.Printf("INFO pipe3: %#v\n", string(doc.Line))
271-
// // l.Printf("sending doc: %#v\n", doc.GrokLine)
272-
// c.esClient.Add(config.index, config.tzpe, doc)
274+
defer func() {
275+
if err := c.esClient.Flush(); err != nil {
276+
logrus.WithField("containerID", c.info.ContainerID).WithError(err).Error("flushing queue")
277+
}
278+
279+
// logrus.WithField("containerID", c.info.ContainerID).WithField("client", c.esClient).Debugf("closing client")
273280

274-
// case <-c.pipeline.ctx.Done():
275-
// // l.Printf("ERROR pipe3: %#v\n", c.pipeline.ctx.Err())
276-
// return c.pipeline.ctx.Err()
277-
// }
278-
// }
281+
if err := c.esClient.Close(); err != nil {
282+
logrus.WithField("containerID", c.info.ContainerID).WithError(err).Error("closing client connection")
283+
}
284+
c.esClient.Stop()
285+
}()
279286

287+
// this was helpful to test if the pipeline has been closed successfully
288+
// newTicker := time.NewTicker(1 * time.Second)
280289
for doc := range c.pipeline.outputCh {
281-
// l.Printf("INFO pipe3: %#v\n", string(doc.Line))
282-
// l.Printf("sending doc: %#v\n", doc.GrokLine)
290+
// logrus.WithField("containerID", c.info.ContainerID).WithField("line", string(doc.Line)).WithField("grok", doc.GrokLine).Debugf("pipe3")
291+
283292
c.esClient.Add(config.index, config.tzpe, doc)
284293
select {
285294
case <-c.pipeline.ctx.Done():
286-
l.Printf("ERROR pipe3: %#v\n", c.pipeline.ctx.Err())
295+
logrus.WithField("containerID", c.info.ContainerID).WithError(c.pipeline.ctx.Err()).Error("context closing pipe 3")
287296
return c.pipeline.ctx.Err()
297+
// case <-newTicker.C:
298+
// log.Printf("info: still ticking")
288299
default:
289300
}
290301
}
291-
292302
return nil
293303
})
294304

@@ -318,52 +328,44 @@ func (d *Driver) StartLogging(file string, info logger.Info) error {
318328
}
319329

320330
// StopLogging ...
321-
// TODO: change api interface
322331
func (d *Driver) StopLogging(file string) error {
323332

324-
// log.Infof("info: stopping log: %s\n", file)
333+
// this is required for some environment like travis
334+
// otherwise the start and stop function are executed
335+
// too fast, even before messages are sent to the pipeline
336+
time.Sleep(1 * time.Second)
325337

326338
d.mu.Lock()
327-
c, exists := d.logs[file]
339+
filename := path.Base(file)
340+
// full path: /var/lib/docker/plugins/1ce514430f4da85be15e02ce6956e506246190ea790753a58f7821892b4639ef/
341+
// rootfs/run/docker/logging/4f8fdcf6793a3a72296e4aedf4f94f5bb5269b3f52eb17061bfe0fd75c66776a
342+
c, exists := d.logs[filename]
328343
if !exists {
329344
d.mu.Unlock()
330-
return fmt.Errorf("error: logger not found for %v", file)
345+
return fmt.Errorf("error: logger not found for %v", filename)
331346
}
347+
delete(d.logs, file)
348+
d.mu.Unlock()
349+
350+
logrus.WithField("containerID", c.info.ContainerID).WithField("socket", filename).Info("stopping logging")
332351

333352
if c.stream != nil {
334-
l.Printf("info: [%v] closing container stream\n", c.info.ID())
353+
logrus.WithField("containerID", c.info.ContainerID).Info("closing container stream")
335354
c.stream.Close()
336355
}
337356

338-
// TODO: count how many docs are in the queue before shutting down
339-
// alternative: sleep flush interval time
340-
time.Sleep(10 * time.Second)
341-
342-
if c.esClient != nil {
343-
// l.Printf("INFO client: %v\n", c.esClient)
344-
if err := c.esClient.Close(); err != nil {
345-
l.Printf("error: closing client connection: %v", err)
346-
}
347-
c.esClient.Stop()
348-
}
349-
350-
delete(d.logs, file)
351-
352357
if c.pipeline.group != nil {
353-
// l.Printf("INFO [%v] closing pipeline: %v\n", c.info.ContainerID, c.pipeline)
354-
355-
close(c.pipeline.inputCh)
356-
close(c.pipeline.outputCh)
357-
// d.pipeline.group.Stop()
358+
logrus.WithField("containerID", c.info.ContainerID).Info("closing pipeline")
358359

359360
// Check whether any goroutines failed.
360361
if err := c.pipeline.group.Wait(); err != nil {
361-
l.Printf("error with pipeline: %v", err)
362+
logrus.WithField("containerID", c.info.ContainerID).WithError(err).Error("pipeline wait group")
362363
}
363364
}
364-
d.mu.Unlock()
365365

366-
// l.Printf("INFO done stop logging")
366+
// if c.esClient != nil {
367+
// close client connection on last pipeline
368+
// }
367369

368370
return nil
369371
}

pkg/elasticsearch/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ type Client interface {
2626
// Stop the bulk processor and do some cleanup
2727
Close() error
2828
// Stats()
29+
30+
Flush() error
2931
}
3032

3133
// NewClient ...

pkg/elasticsearch/v1/elasticsearch.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ func (e *Elasticsearch) Close() error {
8787
return e.BulkProcessor.Close()
8888
}
8989

90+
// Flush manually asks all workers to commit their outstanding requests.
91+
// It returns only when all workers acknowledge completion.
92+
func (e *Elasticsearch) Flush() error {
93+
return e.BulkProcessor.Flush()
94+
}
95+
9096
// Stop stops the background processes that the client is running,
9197
// i.e. sniffing the cluster periodically and running health checks
9298
// on the nodes.

0 commit comments

Comments
 (0)