Skip to content

Commit 325293b

Browse files
authored
Merge pull request #68 from rchicoli/improve-pipeline
improve pipeline
2 parents 7ba98ff + 7fbca12 commit 325293b

File tree

9 files changed

+177
-121
lines changed

9 files changed

+177
-121
lines changed

.gitattributes

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
*.go linguist-language=Go
2+
tests/* linguist-vendored
3+
docker/* linguist-vendored
4+
config/* linguist-vendored
5+
scripts/* linguist-vendored

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ matrix:
5959
- stage: release
6060
# env: MAJOR_RELEASE=true
6161
# it would be nice to map github labels instead
62-
env: RELEASE_TAG=0.3.0
62+
# env: RELEASE_TAG=0.3.0
6363
if: type IN (push)
6464
script:
6565
- ./scripts/git-release.sh

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ Before creating a docker container, a healthy instance of Elasticsearch service
127127
- *examples*: 2
128128

129129
###### elasticsearch-bulk-size ######
130-
- *bulk-size* specifies when to flush based on the size (in bytes) of the actions currently added. Defaults to 5 MB and can be set to -1 to be disabled.
130+
- *bulk-size* specifies when to flush based on the size (in bytes) of the actions currently added. Set to -1 to disable it.
131131
- *examples*: 1024, -1
132132

133133
###### elasticsearch-bulk-flush-interval ######
@@ -171,6 +171,11 @@ $ docker run --rm -ti \
171171
--log-opt elasticsearch-timeout=10 \
172172
--log-opt elasticsearch-version=5 \
173173
--log-opt elasticsearch-fields=containerID,containerName,containerImageID,containerImageName,containerCreated \
174+
--log-opt elasticsearch-bulk-workers=1 \
175+
--log-opt elasticsearch-bulk-actions=1000 \
176+
--log-opt elasticsearch-bulk-size=1024 \
177+
--log-opt elasticsearch-bulk-flush-interval=1s \
178+
--log-opt elasticsearch-bulk-stats=false \
174179
alpine echo this is a test logging message
175180
```
176181

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ func main() {
1010

1111
h := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`)
1212
d := docker.NewDriver()
13-
docker.Handlers(&h, d)
13+
docker.Handlers(&h, &d)
1414
if err := h.ServeUnix(d.Name(), 0); err != nil {
1515
panic(err)
1616
}

pkg/docker/driver.go

Lines changed: 101 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"io"
99
"log"
1010
"os"
11-
"strings"
1211
"sync"
1312
"syscall"
1413
"time"
@@ -33,13 +32,18 @@ var l = log.New(os.Stderr, "", 0)
3332

3433
// Driver ...
3534
type Driver struct {
36-
mu *sync.Mutex
37-
logs map[string]*container
38-
logger logger.Logger
39-
40-
esClient elasticsearch.Client
35+
mu *sync.Mutex
36+
container *container
37+
pipeline pipeline
38+
esClient elasticsearch.Client
39+
}
4140

42-
groker *grok.Grok
41+
type pipeline struct {
42+
group *errgroup.Group
43+
ctx context.Context
44+
outputCh chan LogMessage
45+
inputCh chan logdriver.LogEntry
46+
stopCh chan struct{}
4347
}
4448

4549
type container struct {
@@ -115,33 +119,25 @@ func (l LogMessage) timeOmityEmpty() *time.Time {
115119
// NewDriver ...
116120
func NewDriver() Driver {
117121
return Driver{
118-
logs: make(map[string]*container),
119-
mu: new(sync.Mutex),
122+
mu: new(sync.Mutex),
120123
}
121124
}
122125

123126
// StartLogging ...
124-
func (d Driver) StartLogging(file string, info logger.Info) error {
125-
d.mu.Lock()
126-
if _, exists := d.logs[file]; exists {
127-
d.mu.Unlock()
128-
return fmt.Errorf("error: logger for %q already exists", file)
127+
func (d *Driver) StartLogging(file string, info logger.Info) error {
129128

130-
}
131-
d.mu.Unlock()
129+
// log.Printf("info: starting log: %s\n", file)
132130

133131
ctx := context.Background()
134132

135-
// log.Printf("info: starting log: %s\n", file)
136-
137133
f, err := fifo.OpenFifo(ctx, file, syscall.O_RDONLY, 0700)
138134
if err != nil {
139135
return fmt.Errorf("error: opening logger fifo: %q", file)
140136
}
141137

142138
d.mu.Lock()
143139
c := &container{stream: f, info: info}
144-
d.logs[file] = c
140+
d.container = c
145141
d.mu.Unlock()
146142

147143
cfg := defaultLogOpt()
@@ -154,17 +150,13 @@ func (d Driver) StartLogging(file string, info logger.Info) error {
154150
return fmt.Errorf("error: cannot create an elasticsearch client: %v", err)
155151
}
156152

157-
d.groker, err = grok.NewGrok(cfg.grokMatch, cfg.grokPattern, cfg.grokPatternFrom, cfg.grokPatternSplitter, cfg.grokNamedCapture)
158-
if err != nil {
159-
return err
160-
}
161-
162-
msgCh := make(chan LogMessage)
163-
logCh := make(chan logdriver.LogEntry)
153+
d.pipeline.group, d.pipeline.ctx = errgroup.WithContext(ctx)
154+
d.pipeline.inputCh = make(chan logdriver.LogEntry)
155+
d.pipeline.outputCh = make(chan LogMessage)
156+
// d.pipeline.stopCh = make(chan struct{})
164157

165-
g, ectx := errgroup.WithContext(ctx)
158+
d.pipeline.group.Go(func() error {
166159

167-
g.Go(func() error {
168160
dec := protoio.NewUint32DelimitedReader(c.stream, binary.BigEndian, 1e6)
169161
defer dec.Close()
170162

@@ -182,28 +174,34 @@ func (d Driver) StartLogging(file string, info logger.Info) error {
182174
}
183175

184176
select {
185-
case logCh <- buf:
186-
case <-ectx.Done():
187-
return ectx.Err()
177+
case d.pipeline.inputCh <- buf:
178+
// case <-d.pipeline.stopCh:
179+
// return nil
180+
case <-d.pipeline.ctx.Done():
181+
return d.pipeline.ctx.Err()
188182
}
189183
buf.Reset()
190184
}
191185
})
192186

193-
g.Go(func() error {
187+
d.pipeline.group.Go(func() error {
194188

195189
var logMessage string
196190

197191
// custom log message fields
198192
msg := getLostashFields(cfg.fields, c.info)
199193

200-
for m := range logCh {
194+
groker, err := grok.NewGrok(cfg.grokMatch, cfg.grokPattern, cfg.grokPatternFrom, cfg.grokPatternSplitter, cfg.grokNamedCapture)
195+
if err != nil {
196+
return err
197+
}
198+
199+
for m := range d.pipeline.inputCh {
201200

202201
logMessage = string(m.Line)
203202

204203
// BUG: (17.09.0~ce-0~debian) docker run command throws lots empty line messages
205-
// TODO: profile: check for resource consumption
206-
if len(strings.TrimSpace(logMessage)) == 0 {
204+
if len(m.Line) == 0 {
207205
// TODO: add log debug level
208206
continue
209207
}
@@ -212,40 +210,63 @@ func (d Driver) StartLogging(file string, info logger.Info) error {
212210
msg.Partial = m.Partial
213211
msg.TimeNano = m.TimeNano
214212

215-
msg.GrokLine, msg.Line, err = d.groker.ParseLine(cfg.grokMatch, logMessage, m.Line)
213+
// TODO: create a PR to grok upstream for parsing bytes
214+
// so that we avoid having to convert the message to string
215+
msg.GrokLine, msg.Line, err = groker.ParseLine(cfg.grokMatch, logMessage, m.Line)
216216
if err != nil {
217217
l.Printf("error: [%v] parsing log message: %v\n", c.info.ID(), err)
218218
}
219219

220+
// l.Printf("INFO: grokline: %v\n", msg.GrokLine)
221+
// l.Printf("INFO: line: %v\n", string(msg.Line))
222+
220223
select {
221-
case msgCh <- msg:
222-
case <-ectx.Done():
223-
return ectx.Err()
224+
case d.pipeline.outputCh <- msg:
225+
case <-d.pipeline.ctx.Done():
226+
return d.pipeline.ctx.Err()
224227
}
228+
225229
}
226230

227231
return nil
228232
})
229233

230-
g.Go(func() error {
234+
d.pipeline.group.Go(func() error {
231235

232-
err := d.esClient.NewBulkProcessorService(ectx, cfg.Bulk.workers, cfg.Bulk.actions, cfg.Bulk.size, cfg.Bulk.flushInterval, cfg.Bulk.stats)
236+
err := d.esClient.NewBulkProcessorService(d.pipeline.ctx, cfg.Bulk.workers, cfg.Bulk.actions, cfg.Bulk.size, cfg.Bulk.flushInterval, cfg.Bulk.stats)
233237
if err != nil {
234-
l.Printf("error creating bulk processor: %v", err)
238+
l.Printf("error creating bulk processor: %v\n", err)
235239
}
236240

237-
for {
241+
// l.Printf("receving from output\n")
242+
243+
for doc := range d.pipeline.outputCh {
244+
245+
// l.Printf("sending doc: %#v\n", doc.GrokLine)
246+
d.esClient.Add(cfg.index, cfg.tzpe, doc)
247+
238248
select {
239-
case doc := <-msgCh:
240-
d.esClient.Add(cfg.index, cfg.tzpe, doc)
241-
case <-ectx.Done():
242-
return ectx.Err()
249+
case <-d.pipeline.ctx.Done():
250+
return d.pipeline.ctx.Err()
251+
default:
243252
}
244253
}
254+
// for {
255+
// select {
256+
// case doc := <-d.pipeline.outputCh:
257+
// l.Printf("sending doc: %#v\n", doc.GrokLine)
258+
// d.esClient.Add(cfg.index, cfg.tzpe, doc)
259+
260+
// case <-d.pipeline.ctx.Done():
261+
// return d.pipeline.ctx.Err()
262+
// }
263+
// }
264+
265+
return nil
245266
})
246267

247268
// TODO: create metrics from stats
248-
// g.Go(func() error {
269+
// d.pipeline.group.Go(func() error {
249270
// stats := d.esClient.Stats()
250271

251272
// fields := log.Fields{
@@ -266,32 +287,51 @@ func (d Driver) StartLogging(file string, info logger.Info) error {
266287
// }
267288
// })
268289

269-
// Check whether any goroutines failed.
270-
// if err := g.Wait(); err != nil {
271-
// panic(err)
272-
// }
273-
274290
return nil
275291
}
276292

277293
// StopLogging ...
278-
func (d Driver) StopLogging(file string) error {
294+
// TODO: change api interface
295+
func (d *Driver) StopLogging(file string) error {
279296

280297
// log.Infof("info: stopping log: %s\n", file)
281298

282-
d.mu.Lock()
283-
c, ok := d.logs[file]
284-
if ok {
285-
c.stream.Close()
286-
delete(d.logs, file)
299+
// TODO: count how many docs are in the queue before shutting down
300+
// alternative: sleep flush interval time
301+
time.Sleep(10 * time.Second)
302+
303+
if d.container != nil {
304+
// l.Printf("INFO container: %v", d.container)
305+
if err := d.container.stream.Close(); err != nil {
306+
l.Printf("error: [%v] closing container stream: %v", d.container.info.ID(), err)
307+
}
287308
}
288-
d.mu.Unlock()
289309

290310
if d.esClient != nil {
291-
d.esClient.Close()
311+
// l.Printf("INFO client: %v", d.esClient)
312+
if err := d.esClient.Close(); err != nil {
313+
l.Printf("error: closing client connection: %v", err)
314+
}
292315
d.esClient.Stop()
293316
}
294317

318+
if d.pipeline.group != nil {
319+
// l.Printf("INFO with pipeline: %v", d.pipeline)
320+
321+
// close(d.pipeline.inputCh)
322+
// close(d.pipeline.outputCh)
323+
// d.pipeline.stopCh <- struct{}{}
324+
325+
// TODO: close channels gracefully
326+
// sadly the errgroup does not export cancel function
327+
// create a PR to golang team with Stop() function
328+
// Check whether any goroutines failed.
329+
// if err := d.pipeline.group.Wait(); err != nil {
330+
// l.Printf("error with pipeline: %v", err)
331+
// }
332+
}
333+
// l.Printf("INFO done stop logging")
334+
295335
return nil
296336
}
297337

pkg/extension/grok/grok.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func NewGrok(grokMatch, grokPattern, grokPatternFrom, grokPatternSplitter string
4747
}
4848

4949
// ParseLine ...
50-
func (g Grok) ParseLine(pattern, logMessage string, line []byte) (map[string]string, []byte, error) {
50+
func (g *Grok) ParseLine(pattern, logMessage string, line []byte) (map[string]string, []byte, error) {
5151

5252
if g.Grok == nil {
5353
return nil, line, nil

0 commit comments

Comments
 (0)