Skip to content

Commit 144fe08

Browse files
committed
feat: create dojo
0 parents  commit 144fe08

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1877
-0
lines changed

README.md

Lines changed: 474 additions & 0 deletions
Large diffs are not rendered by default.

consumer/Dockerfile

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
FROM golang:1.16-alpine AS builder
2+
3+
WORKDIR /app
4+
5+
COPY go.* ./
6+
RUN go mod download
7+
8+
COPY main.go main.go
9+
10+
RUN CGO_ENABLED=0 go build -ldflags="-s -w" -o=consumer
11+
12+
13+
FROM scratch AS runner
14+
15+
COPY --from=builder /app/consumer /consumer
16+
17+
ENTRYPOINT ["/consumer"]
18+
CMD []

consumer/go.mod

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
module gitlab.com/padok-team/training/exercise-kubernetes-scaling-to-zero/consumer
2+
3+
go 1.16
4+
5+
require (
6+
github.com/go-redis/redis/v8 v8.10.0
7+
github.com/gorilla/handlers v1.5.1
8+
github.com/gorilla/mux v1.8.0
9+
github.com/prometheus/client_golang v1.11.0
10+
)

consumer/go.sum

Lines changed: 205 additions & 0 deletions
Large diffs are not rendered by default.

consumer/main.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"flag"
6+
"fmt"
7+
"log"
8+
"net/http"
9+
"os"
10+
"os/signal"
11+
"sync"
12+
"time"
13+
14+
"github.com/go-redis/redis/v8"
15+
"github.com/gorilla/mux"
16+
"github.com/prometheus/client_golang/prometheus"
17+
"github.com/prometheus/client_golang/prometheus/promauto"
18+
"github.com/prometheus/client_golang/prometheus/promhttp"
19+
)
20+
21+
// =============================================================================
22+
// CLI flags
23+
// =============================================================================
24+
25+
var (
26+
cpuBurn = flag.Bool("cpu-burn", false, "Whether to use CPU while processing messages")
27+
httpAddr = flag.String("http", ":8080", "Address to listen for requests on")
28+
redisAddr = flag.String("redis-server", "redis-master:6379", "Redis server to consume messages from")
29+
redisQueue = flag.String("redis-queue", "padok", "Redis queue to consume messages from")
30+
timePerMsg = flag.Duration("per-msg", time.Second, "The amount of time the consumer spends on each message")
31+
)
32+
33+
// =============================================================================
34+
// Main logic
35+
// =============================================================================
36+
37+
func main() {
38+
if err := run(); err != nil {
39+
log.Fatal(err)
40+
}
41+
}
42+
43+
func run() error {
44+
flag.Parse()
45+
46+
// This context will get canceled upon receiving a SIGINT signal from the
47+
// operating system. We use this to shut the consumer down gracefully.
48+
ctx, shutdown := signal.NotifyContext(context.Background(), os.Interrupt)
49+
50+
var wg sync.WaitGroup
51+
52+
// Start consuming messages from Redis.
53+
client := redis.NewClient(&redis.Options{Addr: *redisAddr})
54+
c := newConsumer(client, *redisQueue, *timePerMsg, *cpuBurn)
55+
wg.Add(1)
56+
go func() {
57+
defer wg.Done()
58+
c.consumeMessages(ctx)
59+
}()
60+
61+
// Start serving HTTP requests.
62+
srv := http.Server{Addr: *httpAddr, Handler: c.router}
63+
wg.Add(1)
64+
go func() {
65+
defer wg.Done()
66+
log.Printf("Listening on %s...", *httpAddr)
67+
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
68+
log.Printf("Error: unable to listen for requests: %s", err)
69+
shutdown()
70+
}
71+
}()
72+
// Shut the HTTP server down when the context is cancelled.
73+
go func() {
74+
<-ctx.Done()
75+
shutdownCtx, _ := context.WithTimeout(context.Background(), 5*time.Second)
76+
if err := srv.Shutdown(shutdownCtx); err != nil {
77+
log.Printf("Error: failed to shut down HTTP server gracefully: %s", err.Error())
78+
}
79+
}()
80+
81+
<-ctx.Done()
82+
log.Println("Shutting down...")
83+
wg.Wait()
84+
85+
return nil
86+
}
87+
88+
// =============================================================================
89+
// Consumer
90+
// =============================================================================
91+
92+
type consumer struct {
93+
wg sync.WaitGroup
94+
95+
router *mux.Router
96+
97+
client *redis.Client
98+
queue string
99+
100+
perMsg time.Duration
101+
cpuBurn bool
102+
103+
healthy bool
104+
105+
consumed prometheus.Counter
106+
}
107+
108+
func newConsumer(client *redis.Client, queue string, perMsg time.Duration, cpuBurn bool) *consumer {
109+
c := consumer{
110+
router: mux.NewRouter(),
111+
client: client,
112+
queue: queue,
113+
perMsg: perMsg,
114+
cpuBurn: cpuBurn,
115+
healthy: true,
116+
consumed: promauto.NewCounter(prometheus.CounterOpts{
117+
Name: "consumer_messages_consumed_total",
118+
Help: "The total number of consumed messages",
119+
}),
120+
}
121+
122+
c.router.HandleFunc("/healthz", c.handleHealthcheck).Methods("GET")
123+
c.router.Handle("/metrics", promhttp.Handler())
124+
125+
return &c
126+
}
127+
128+
// =============================================================================
129+
// HTTP handlers
130+
// =============================================================================
131+
132+
func (p *consumer) handleHealthcheck(w http.ResponseWriter, r *http.Request) {
133+
switch p.healthy {
134+
case true:
135+
fmt.Fprintf(w, "The server is healthy")
136+
case false:
137+
w.WriteHeader(http.StatusInternalServerError)
138+
fmt.Fprintf(w, "The server is unhealthy: the last attempt to publish a message failed")
139+
}
140+
}
141+
142+
// =============================================================================
143+
// Message consumption
144+
// =============================================================================
145+
146+
func (c *consumer) consumeMessages(ctx context.Context) {
147+
messages := make(chan string)
148+
go func() {
149+
var backoff time.Duration
150+
151+
for {
152+
select {
153+
case <-ctx.Done():
154+
close(messages)
155+
return
156+
case <-time.After(backoff):
157+
result, err := c.client.BLPop(ctx, 0, c.queue).Result()
158+
if err != nil {
159+
backoff = min(backoff+time.Second, 5*time.Second)
160+
log.Printf("Error getting message from queue: %s", err.Error())
161+
continue
162+
}
163+
backoff = 0
164+
messages <- result[1]
165+
}
166+
}
167+
}()
168+
169+
log.Println("Starting to comsume messages...")
170+
171+
for msg := range messages {
172+
log.Printf("Received a message: %q.", msg)
173+
log.Println("Processing message...")
174+
175+
if c.cpuBurn {
176+
done := time.After(c.perMsg)
177+
for {
178+
select {
179+
case <-done:
180+
break
181+
default:
182+
}
183+
}
184+
} else {
185+
time.Sleep(c.perMsg)
186+
}
187+
188+
c.consumed.Inc()
189+
log.Println("Message processed.")
190+
}
191+
192+
log.Println("Done consuming messages.")
193+
}
194+
195+
func min(a, b time.Duration) time.Duration {
196+
if a < b {
197+
return a
198+
}
199+
return b
200+
}
263 KB
Loading
721 KB
Loading

helm/charts/consumer/Chart.yaml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
apiVersion: v2
2+
name: consumer
3+
description: A simple service to consume messages
4+
5+
# A chart can be either an 'application' or a 'library' chart.
6+
#
7+
# Application charts are a collection of templates that can be packaged into versioned archives
8+
# to be deployed.
9+
#
10+
# Library charts provide useful utilities or functions for the chart developer. They're included as
11+
# a dependency of application charts to inject those utilities and functions into the rendering
12+
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
13+
type: application
14+
15+
# This is the chart version. This version number should be incremented each time you make changes
16+
# to the chart and its templates, including the app version.
17+
# Versions are expected to follow Semantic Versioning (https://semver.org/)
18+
version: 0.1.0
19+
20+
# This is the version number of the application being deployed. This version number should be
21+
# incremented each time you make changes to the application. Versions are not expected to
22+
# follow Semantic Versioning. They should reflect the version the application is using.
23+
appVersion: 1.16.0
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
{{/* vim: set filetype=mustache: */}}
2+
{{/*
3+
Expand the name of the chart.
4+
*/}}
5+
{{- define "consumer.name" -}}
6+
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
7+
{{- end }}
8+
9+
{{/*
10+
Create a default fully qualified app name.
11+
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
12+
If release name contains chart name it will be used as a full name.
13+
*/}}
14+
{{- define "consumer.fullname" -}}
15+
{{- if .Values.fullnameOverride }}
16+
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
17+
{{- else }}
18+
{{- $name := default .Chart.Name .Values.nameOverride }}
19+
{{- if contains $name .Release.Name }}
20+
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
21+
{{- else }}
22+
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
23+
{{- end }}
24+
{{- end }}
25+
{{- end }}
26+
27+
{{/*
28+
Create chart name and version as used by the chart label.
29+
*/}}
30+
{{- define "consumer.chart" -}}
31+
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
32+
{{- end }}
33+
34+
{{/*
35+
Common labels
36+
*/}}
37+
{{- define "consumer.labels" -}}
38+
helm.sh/chart: {{ include "consumer.chart" . }}
39+
{{ include "consumer.selectorLabels" . }}
40+
{{- if .Chart.AppVersion }}
41+
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
42+
{{- end }}
43+
app.kubernetes.io/managed-by: {{ .Release.Service }}
44+
{{- end }}
45+
46+
{{/*
47+
Selector labels
48+
*/}}
49+
{{- define "consumer.selectorLabels" -}}
50+
app.kubernetes.io/name: {{ include "consumer.name" . }}
51+
app.kubernetes.io/instance: {{ .Release.Name }}
52+
{{- end }}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
apiVersion: apps/v1
2+
kind: Deployment
3+
metadata:
4+
name: {{ include "consumer.fullname" . }}
5+
labels:
6+
{{ include "consumer.labels" . | indent 4 }}
7+
spec:
8+
selector:
9+
matchLabels:
10+
{{ include "consumer.selectorLabels" . | indent 6 }}
11+
template:
12+
metadata:
13+
labels:
14+
{{ include "consumer.labels" . | indent 8 }}
15+
spec:
16+
containers:
17+
- name: server
18+
image: padok.fr/consumer:{{ .Values.version }}
19+
ports:
20+
- name: http
21+
containerPort: 8080
22+
protocol: TCP
23+
livenessProbe:
24+
httpGet:
25+
path: /healthz
26+
port: http
27+
readinessProbe:
28+
httpGet:
29+
path: /healthz
30+
port: http

0 commit comments

Comments
 (0)