Skip to content

Commit 2f95742

Browse files
author
rafael_chicoli
committed
tests: readd missing bulk tests
1 parent 03c5e67 commit 2f95742

File tree

3 files changed

+194
-7
lines changed

3 files changed

+194
-7
lines changed

pkg/docker/container.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,83 @@ func (b *BulkWorker) Commit(ctx context.Context) {
290290
// b.logger.WithFields(log.Fields{"took": took}).Debug("bulk response time")
291291
}
292292

293+
// CustomBulkProcessor ... this will continue be tested...
294+
// func (c *container) CustomBulkProcessor(ctx context.Context, workers int, indexName, tzpe string, actions, bulkSize int, flushInterval, timeout time.Duration) error {
295+
296+
// c.logger.Debug("starting pipeline: Log")
297+
298+
// for workerID := 0; workerID < workers; workerID++ {
299+
300+
// b, err := newWorker(c.esClient, c.logger, actions, workerID, flushInterval, timeout)
301+
// c.bulkService[workerID] = b
302+
// if err != nil {
303+
// return err
304+
// }
305+
306+
// b.logger.Debug("starting worker")
307+
// c.pipeline.group.Go(func() error {
308+
309+
// defer func() {
310+
// b.logger.Debug("closing worker")
311+
// // commit any left messages in the queue
312+
// b.Flush(ctx)
313+
// b.logger.Debug("stopping ticker")
314+
// b.ticker.Stop()
315+
// delete(c.bulkService, workerID)
316+
// }()
317+
318+
// // healthcheck := func() error {
319+
// // cctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
320+
// // defer cancel()
321+
// // resp, err := c.esClient.Do(cctx, "HEAD", "")
322+
// // if err != nil {
323+
// // c.logger.WithError(err).Debug("error head")
324+
// // time.Sleep(5 * time.Second)
325+
// // return err
326+
// // }
327+
// // if resp >= 200 && resp < 300 {
328+
// // return nil
329+
// // }
330+
// // c.logger.WithField("status", resp).Debug("status code")
331+
// // time.Sleep(3 * time.Second)
332+
// // return errors.New("not beetwen 200 and 300 status code")
333+
// // }
334+
// for {
335+
336+
// // for healthcheck() != nil {
337+
// // c.logger.Debug("healthcheck is checking")
338+
// // }
339+
340+
// select {
341+
// case doc, open := <-c.pipeline.outputCh:
342+
// if !open {
343+
// return nil
344+
// }
345+
// // c.logger.WithField("line", string(doc.Line)).Info("reached last pipeline")
346+
// b.Add(indexName, tzpe, doc)
347+
348+
// if b.CommitRequired(actions, bulkSize) {
349+
// b.Commit(ctx)
350+
// }
351+
352+
// case <-b.ticker.C:
353+
// // b.logger.WithField("ticker", b.ticker).Debug("ticking")
354+
// b.Flush(ctx)
355+
// case <-ctx.Done():
356+
// b.logger.WithError(ctx.Err()).Error("closing log pipeline: Log")
357+
// return ctx.Err()
358+
// // commit has to be in the same goroutine
359+
// // because of reset is called in the Do() func
360+
// // case c.pipeline.commitCh <- struct{}{}:
361+
// }
362+
363+
// }
364+
// })
365+
// }
366+
367+
// return nil
368+
// }
369+
293370
// Stats shows metrics related to the bulk service
294371
// func (d *Driver) Stats(filename string, config Configuration) error {
295372
// TODO: create metrics from stats

pkg/elasticsearch/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type Client interface {
2323
Flush() error
2424

2525
NewBulkProcessorService(ctx context.Context, workers, actions, size int, flushInterval, timeout time.Duration, stats bool, log *logrus.Entry) error
26-
// NewBulkProcessorService(ctx context.Context, workers, actions, size int, flushInterval time.Duration, stats bool, executionId int64, requests interface{}, response interface{}, err error) error
26+
2727
// Stop stops the background processes that the client is running,
2828
// i.e. sniffing the cluster periodically and running health checks
2929
// on the nodes.

tests/acceptance-tests/05-bulk.sh

Lines changed: 116 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,120 @@ function setUp(){
99
}
1010

1111
function tearDown(){
12-
# _debug
12+
_debug
1313
_make undeploy_elasticsearch 1>/dev/null
1414
}
1515

16+
function test_bulk_commit_after_one_action(){
17+
18+
description="[${BASHT_TEST_FILENAME##*/}] acceptance-tests (v${CLIENT_VERSION}): $BASHT_TEST_NUMBER - bulk commit after one action"
19+
echo "$description"
20+
21+
name="${BASHT_TEST_FILENAME##*/}.${BASHT_TEST_NUMBER}"
22+
message="$((RANDOM)) $description"
23+
24+
basht_run docker run --rm -ti \
25+
--log-driver rchicoli/docker-log-elasticsearch:development \
26+
--log-opt elasticsearch-url="${ELASTICSEARCH_URL}" \
27+
--log-opt elasticsearch-version="${CLIENT_VERSION}" \
28+
--name "$name" \
29+
--log-opt elasticsearch-bulk-workers=2 \
30+
--log-opt elasticsearch-bulk-actions=1 \
31+
--log-opt elasticsearch-bulk-flush-interval=30s \
32+
alpine echo -n "$message"
33+
34+
sleep "${SLEEP_TIME}"
35+
36+
basht_run curl -s -G --connect-timeout 5 \
37+
"${ELASTICSEARCH_URL}/${ELASTICSEARCH_INDEX}/${ELASTICSEARCH_TYPE}/_search?pretty=true&size=1" \
38+
--data-urlencode "q=message:\"$message\""
39+
40+
basht_assert "echo '${output}' | jq -r '.hits.hits[0]._source.message'" == "$message"
41+
42+
docker rm -f "$name"
43+
44+
}
45+
46+
function test_bulk_disable_actions_and_bulk_size(){
47+
48+
description="[${BASHT_TEST_FILENAME##*/}] acceptance-tests (v${CLIENT_VERSION}): $BASHT_TEST_NUMBER - bulk disable actions and bulk size"
49+
echo "$description"
50+
51+
name="${BASHT_TEST_FILENAME##*/}.${BASHT_TEST_NUMBER}"
52+
message="$((RANDOM)) $description"
53+
54+
basht_run docker run --rm -ti \
55+
--log-driver rchicoli/docker-log-elasticsearch:development \
56+
--log-opt elasticsearch-url="${ELASTICSEARCH_URL}" \
57+
--log-opt elasticsearch-version="${CLIENT_VERSION}" \
58+
--name "$name" \
59+
--log-opt elasticsearch-bulk-workers=1 \
60+
--log-opt elasticsearch-bulk-actions="-1" \
61+
--log-opt elasticsearch-bulk-size="-1" \
62+
--log-opt elasticsearch-bulk-flush-interval="10s" \
63+
alpine echo -n "$message"
64+
65+
# total numbers of hits should be zero, because the flush interval has not been reached
66+
basht_run curl -s -G --connect-timeout 5 \
67+
"${ELASTICSEARCH_URL}/${ELASTICSEARCH_INDEX}/${ELASTICSEARCH_TYPE}/_search?pretty=true&size=1" \
68+
--data-urlencode "q=message:\"$message\""
69+
basht_assert "echo '${output}' | jq -r '.hits.total'" == 0
70+
71+
# wait for the flush interval time
72+
sleep 10s
73+
74+
basht_run curl -s -G --connect-timeout 5 \
75+
"${ELASTICSEARCH_URL}/${ELASTICSEARCH_INDEX}/${ELASTICSEARCH_TYPE}/_search?pretty=true&size=1" \
76+
--data-urlencode "q=message:\"$message\""
77+
78+
basht_assert "echo '${output}' | jq -r '.hits.hits[0]._source.message'" == "$message"
79+
80+
docker rm -f "$name"
81+
82+
}
83+
84+
function test_bulk_multiple_messages(){
85+
86+
description="[${BASHT_TEST_FILENAME##*/}] acceptance-tests (v${CLIENT_VERSION}): $BASHT_TEST_NUMBER - bulk multiple messages"
87+
echo "$description"
88+
89+
name="${BASHT_TEST_FILENAME##*/}.${BASHT_TEST_NUMBER}"
90+
message="bulk-multi-message"
91+
92+
basht_run docker run -d \
93+
--log-driver rchicoli/docker-log-elasticsearch:development \
94+
--log-opt elasticsearch-url="${ELASTICSEARCH_URL}" \
95+
--log-opt elasticsearch-version="${CLIENT_VERSION}" \
96+
--name "$name" --ip="${WEBAPPER_IP}" --network="docker_development" \
97+
--log-opt elasticsearch-bulk-actions=100 \
98+
--log-opt elasticsearch-bulk-flush-interval='2s' \
99+
--log-opt elasticsearch-bulk-workers=2 \
100+
rchicoli/webapper
101+
102+
bulk_size=99
103+
104+
for i in $(seq 1 "$bulk_size"); do
105+
basht_run curl -s -XPOST -H "Content-Type: application/json" --data "{\"message\":\"$message-$i\"}" "http://${WEBAPPER_IP}:${WEBAPPER_PORT}/log" >/dev/null
106+
done
107+
108+
sleep "${SLEEP_TIME}"
109+
110+
basht_run docker stop "$name"
111+
basht_run docker rm "$name"
112+
113+
sleep "${SLEEP_TIME}"
114+
115+
basht_run curl -s --connect-timeout 5 "${ELASTICSEARCH_URL}/${ELASTICSEARCH_INDEX}/${ELASTICSEARCH_TYPE}/_search?pretty=true&size=1"
116+
basht_assert "echo '${output}' | jq -r '.hits.total'" == "$bulk_size"
117+
118+
for i in $(seq 1 "$bulk_size"); do
119+
basht_run curl -G -s --connect-timeout 5 "${ELASTICSEARCH_URL}/${ELASTICSEARCH_INDEX}/${ELASTICSEARCH_TYPE}/_search?pretty=true&size=1" \
120+
--data-urlencode "q=message:\"${message}-$i\"" >/dev/null
121+
basht_assert "echo '${output}' | jq -r '.hits.hits[0]._source.message'" equals "$message-$i"
122+
done
123+
124+
}
125+
16126
# May 03 18:59:34 sunlight dockerd[7729]: time="2018-05-03T18:59:34+02:00" level=error
17127
# msg="level=info msg=\"response error message and status code\"
18128
# containerID=55eeb1ed63dbb828a7bb0ad2a371e1f1f6781b854e8811bd45a4a14ed92f762e
@@ -35,12 +145,12 @@ function test_bulk_rejections(){
35145
--log-opt elasticsearch-url="${ELASTICSEARCH_URL}" \
36146
--log-opt elasticsearch-version="${CLIENT_VERSION}" \
37147
--name "$name" --ip="${WEBAPPER_IP}" --network="docker_development" \
38-
--log-opt elasticsearch-bulk-actions=5000 \
39-
--log-opt elasticsearch-bulk-flush-interval='10s' \
40-
--log-opt elasticsearch-bulk-workers=50 \
148+
--log-opt elasticsearch-bulk-actions=500 \
149+
--log-opt elasticsearch-bulk-flush-interval='5s' \
150+
--log-opt elasticsearch-bulk-workers=100 \
41151
rchicoli/webapper
42152

43-
bulk_size=5
153+
bulk_size=20000
44154

45155
seq 1 "$bulk_size" | \
46156
xargs -n 1 -P 4 \
@@ -56,4 +166,4 @@ function test_bulk_rejections(){
56166
basht_run curl -s --connect-timeout 5 "${ELASTICSEARCH_URL}/${ELASTICSEARCH_INDEX}/${ELASTICSEARCH_TYPE}/_search?pretty=true&size=1"
57167
basht_assert "echo '${output}' | jq -r '.hits.total'" == "$bulk_size"
58168

59-
}
169+
}

0 commit comments

Comments
 (0)