Skip to content

Commit 1614780

Browse files
committed
Refactor handlers to share global contexts instead of passing per handler.
1 parent bcca2c8 commit 1614780

File tree

5 files changed

+110
-180
lines changed

5 files changed

+110
-180
lines changed

cmd/http.go

Lines changed: 56 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,20 @@ import (
77
"strconv"
88

99
"github.com/go-chi/chi/v5"
10-
"github.com/zerodha/dungbeetle/v2/internal/core"
1110
"github.com/zerodha/dungbeetle/v2/models"
1211
)
1312

1413
// reValidateName represents the character classes allowed in a job ID.
1514
var reValidateName = regexp.MustCompile("(?i)^[a-z0-9-_:]+$")
1615

17-
// handleGetTasksList returns the jobs list. If the optional query param ?sql=1
16+
// GetTasksList returns the jobs list. If the optional query param ?sql=1
1817
// is passed, it returns the raw SQL bodies as well.
19-
func handleGetTasksList(w http.ResponseWriter, r *http.Request) {
20-
var (
21-
co = r.Context().Value("core").(*core.Core)
22-
)
23-
24-
tasks := co.GetTasks()
18+
func (a *Handlers) GetTasksList(w http.ResponseWriter, r *http.Request) {
19+
tasks := a.co.GetTasks()
2520

2621
// Full task including SQL queries.
2722
if r.URL.Query().Get("sql") == "" {
28-
sendResponse(w, tasks)
23+
a.sendResponse(w, tasks)
2924
return
3025
}
3126

@@ -35,73 +30,58 @@ func handleGetTasksList(w http.ResponseWriter, r *http.Request) {
3530
out = append(out, name)
3631
}
3732

38-
sendResponse(w, out)
33+
a.sendResponse(w, out)
3934
}
4035

4136
// handleGetJobStatus returns the status of a given jobID.
42-
func handleGetJobStatus(w http.ResponseWriter, r *http.Request) {
43-
var (
44-
co = r.Context().Value("core").(*core.Core)
37+
func (a *Handlers) handleGetJobStatus(w http.ResponseWriter, r *http.Request) {
38+
var jobID = chi.URLParam(r, "jobID")
4539

46-
jobID = chi.URLParam(r, "jobID")
47-
)
48-
49-
out, err := co.GetJobStatus(jobID)
40+
out, err := a.co.GetJobStatus(jobID)
5041
if err != nil {
51-
lo.Error("could not get job status", "error", err, "job_id", jobID)
52-
sendErrorResponse(w, "job not found", http.StatusNotFound)
42+
a.log.Error("could not get job status", "error", err, "job_id", jobID)
43+
a.sendErrorResponse(w, "job not found", http.StatusNotFound)
5344
return
5445
}
5546

56-
sendResponse(w, out)
47+
a.sendResponse(w, out)
5748
}
5849

5950
// handleGetGroupStatus returns the status of a given groupID.
60-
func handleGetGroupStatus(w http.ResponseWriter, r *http.Request) {
61-
var (
62-
co = r.Context().Value("core").(*core.Core)
63-
64-
groupID = chi.URLParam(r, "groupID")
65-
)
51+
func (a *Handlers) handleGetGroupStatus(w http.ResponseWriter, r *http.Request) {
52+
var groupID = chi.URLParam(r, "groupID")
6653

67-
out, err := co.GetJobGroupStatus(groupID)
54+
out, err := a.co.GetJobGroupStatus(groupID)
6855
if err != nil {
69-
lo.Error("could not get group job status", "error", err, "group_id", groupID)
70-
sendErrorResponse(w, err.Error(), http.StatusNotFound)
56+
a.log.Error("could not get group job status", "error", err, "group_id", groupID)
57+
a.sendErrorResponse(w, err.Error(), http.StatusNotFound)
7158
return
7259
}
7360

74-
sendResponse(w, out)
61+
a.sendResponse(w, out)
7562
}
7663

7764
// handleGetPendingJobs returns pending jobs in a given queue.
78-
func handleGetPendingJobs(w http.ResponseWriter, r *http.Request) {
79-
var (
80-
co = r.Context().Value("core").(*core.Core)
81-
queue = chi.URLParam(r, "queue")
82-
)
65+
func (a *Handlers) handleGetPendingJobs(w http.ResponseWriter, r *http.Request) {
66+
var queue = chi.URLParam(r, "queue")
8367

84-
out, err := co.GetPendingJobs(queue)
68+
out, err := a.co.GetPendingJobs(queue)
8569
if err != nil {
86-
lo.Error("could not get pending jobs", "error", err, "queue", queue)
87-
sendErrorResponse(w, "error fetching pending tasks", http.StatusInternalServerError)
70+
a.log.Error("could not get pending jobs", "error", err, "queue", queue)
71+
a.sendErrorResponse(w, "error fetching pending tasks", http.StatusInternalServerError)
8872
return
8973
}
9074

91-
sendResponse(w, out)
75+
a.sendResponse(w, out)
9276
}
9377

9478
// handlePostJob creates a new job against a given task name.
95-
func handlePostJob(w http.ResponseWriter, r *http.Request) {
96-
var (
97-
co = r.Context().Value("core").(*core.Core)
98-
99-
taskName = chi.URLParam(r, "taskName")
100-
)
79+
func (a *Handlers) handlePostJob(w http.ResponseWriter, r *http.Request) {
80+
var taskName = chi.URLParam(r, "taskName")
10181

10282
if r.ContentLength == 0 {
103-
lo.Error("request body sent empty")
104-
sendErrorResponse(w, "request body is empty", http.StatusBadRequest)
83+
a.log.Error("request body sent empty")
84+
a.sendErrorResponse(w, "request body is empty", http.StatusBadRequest)
10585
return
10686
}
10787

@@ -110,105 +90,99 @@ func handlePostJob(w http.ResponseWriter, r *http.Request) {
11090
req models.JobReq
11191
)
11292
if err := decoder.Decode(&req); err != nil {
113-
lo.Error("error parsing request JSON", "error", err, "task_name", taskName, "request", req)
114-
sendErrorResponse(w, "error parsing request JSON", http.StatusBadRequest)
93+
a.log.Error("error parsing request JSON", "error", err, "task_name", taskName, "request", req)
94+
a.sendErrorResponse(w, "error parsing request JSON", http.StatusBadRequest)
11595
return
11696
}
11797

11898
if !reValidateName.Match([]byte(req.JobID)) {
119-
sendErrorResponse(w, "invalid characters in the `job_id`", http.StatusBadRequest)
99+
a.sendErrorResponse(w, "invalid characters in the `job_id`", http.StatusBadRequest)
120100
return
121101
}
122102

123103
// Create the job signature.
124-
out, err := co.NewJob(req, taskName)
104+
out, err := a.co.NewJob(req, taskName)
125105
if err != nil {
126-
lo.Error("could not create new job", "error", err, "task_name", taskName, "request", req)
127-
sendErrorResponse(w, err.Error(), http.StatusInternalServerError)
106+
a.log.Error("could not create new job", "error", err, "task_name", taskName, "request", req)
107+
a.sendErrorResponse(w, err.Error(), http.StatusInternalServerError)
128108
return
129109
}
130110

131-
sendResponse(w, out)
111+
a.sendResponse(w, out)
132112
}
133113

134114
// handlePostJobGroup creates multiple jobs under a group.
135-
func handlePostJobGroup(w http.ResponseWriter, r *http.Request) {
115+
func (a *Handlers) handlePostJobGroup(w http.ResponseWriter, r *http.Request) {
136116
var (
137-
co = r.Context().Value("core").(*core.Core)
138-
139117
decoder = json.NewDecoder(r.Body)
140118
req models.GroupReq
141119
)
142120

143121
if err := decoder.Decode(&req); err != nil {
144-
lo.Error("error parsing JSON body", "error", err, "request", req)
145-
sendErrorResponse(w, "error parsing JSON body", http.StatusBadRequest)
122+
a.log.Error("error parsing JSON body", "error", err, "request", req)
123+
a.sendErrorResponse(w, "error parsing JSON body", http.StatusBadRequest)
146124
return
147125
}
148126

149-
out, err := co.NewJobGroup(req)
127+
out, err := a.co.NewJobGroup(req)
150128
if err != nil {
151-
lo.Error("error creating job signature", "error", err, "request", req)
152-
sendErrorResponse(w, err.Error(), http.StatusInternalServerError)
129+
a.log.Error("error creating job signature", "error", err, "request", req)
130+
a.sendErrorResponse(w, err.Error(), http.StatusInternalServerError)
153131
return
154132
}
155133

156-
sendResponse(w, out)
134+
a.sendResponse(w, out)
157135
}
158136

159137
// handleCancelJob deletes a job from the job queue. If the job is running,
160138
// it is cancelled first and then deleted.
161-
func handleCancelJob(w http.ResponseWriter, r *http.Request) {
139+
func (a *Handlers) handleCancelJob(w http.ResponseWriter, r *http.Request) {
162140
var (
163-
co = r.Context().Value("core").(*core.Core)
164-
165141
jobID = chi.URLParam(r, "jobID")
166142
purge, _ = strconv.ParseBool(r.URL.Query().Get("purge"))
167143
)
168144

169-
if err := co.CancelJob(jobID, purge); err != nil {
170-
lo.Error("could not cancel job", "error", err, "job_id", jobID)
171-
sendErrorResponse(w, err.Error(), http.StatusInternalServerError)
145+
if err := a.co.CancelJob(jobID, purge); err != nil {
146+
a.log.Error("could not cancel job", "error", err, "job_id", jobID)
147+
a.sendErrorResponse(w, err.Error(), http.StatusInternalServerError)
172148
}
173149

174-
sendResponse(w, true)
150+
a.sendResponse(w, true)
175151
}
176152

177153
// handleCancelGroupJob deletes all the jobs of a group along with the group.
178154
// If the job is running, it is cancelled first, and then deleted.
179-
func handleCancelGroupJob(w http.ResponseWriter, r *http.Request) {
155+
func (a *Handlers) handleCancelGroupJob(w http.ResponseWriter, r *http.Request) {
180156
var (
181-
co = r.Context().Value("core").(*core.Core)
182-
183157
groupID = chi.URLParam(r, "groupID")
184158
purge, _ = strconv.ParseBool(r.URL.Query().Get("purge"))
185159
)
186160

187161
// Get state of group.
188-
if err := co.CancelJobGroup(groupID, purge); err != nil {
189-
lo.Error("could not cancel group job", "error", err, "group_id", groupID)
190-
sendErrorResponse(w, err.Error(), http.StatusInternalServerError)
162+
if err := a.co.CancelJobGroup(groupID, purge); err != nil {
163+
a.log.Error("could not cancel group job", "error", err, "group_id", groupID)
164+
a.sendErrorResponse(w, err.Error(), http.StatusInternalServerError)
191165
return
192166
}
193167

194-
sendResponse(w, true)
168+
a.sendResponse(w, true)
195169
}
196170

197-
// sendErrorResponse sends a JSON envelope to the HTTP response.
198-
func sendResponse(w http.ResponseWriter, data interface{}) {
171+
// sendResponse sends a JSON envelope to the HTTP response.
172+
func (a *Handlers) sendResponse(w http.ResponseWriter, data interface{}) {
199173
w.Header().Set("Content-Type", "application/json; charset=utf-8")
200174
out, err := json.Marshal(models.HTTPResp{Status: "success", Data: data})
201175
if err != nil {
202-
lo.Error("could marshal response", "error", err)
203-
sendErrorResponse(w, "Internal Server Error", http.StatusInternalServerError)
176+
a.log.Error("could marshal response", "error", err)
177+
a.sendErrorResponse(w, "Internal Server Error", http.StatusInternalServerError)
204178
return
205179
}
206180

207181
w.Write(out)
208182
}
209183

210184
// sendErrorResponse sends a JSON error envelope to the HTTP response.
211-
func sendErrorResponse(w http.ResponseWriter, message string, code int) {
185+
func (a *Handlers) sendErrorResponse(w http.ResponseWriter, message string, code int) {
212186
w.Header().Set("Content-Type", "application/json; charset=utf-8")
213187
w.WriteHeader(code)
214188

cmd/init.go

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package main
22

33
import (
4-
"context"
54
"embed"
65
"errors"
76
"fmt"
@@ -31,8 +30,8 @@ func initFlags(ko *koanf.Koanf) {
3130
// Command line flags.
3231
f := flag.NewFlagSet("config", flag.ContinueOnError)
3332
f.Usage = func() {
34-
lo.Info("DungBeetle")
35-
lo.Info(f.FlagUsages())
33+
log.Info("DungBeetle")
34+
log.Info(f.FlagUsages())
3635
os.Exit(0)
3736
}
3837

@@ -52,7 +51,7 @@ func initFlags(ko *koanf.Koanf) {
5251
}
5352

5453
func initConfig(ko *koanf.Koanf) {
55-
lo.Info("buildstring", "value", buildString)
54+
log.Info("buildstring", "value", buildString)
5655

5756
// Generate new config file.
5857
if ok := ko.Bool("new-config"); ok {
@@ -82,12 +81,12 @@ func initConfig(ko *koanf.Koanf) {
8281
case "ERROR":
8382
opts.Level = slog.LevelError
8483
default:
85-
lo.Error("incorrect log level in app")
84+
log.Error("incorrect log level in app")
8685
os.Exit(1)
8786
}
8887

8988
// Override the logger according to level
90-
lo = slog.New(slog.NewTextHandler(os.Stdout, opts))
89+
log = slog.New(slog.NewTextHandler(os.Stdout, opts))
9190
}
9291

9392
func generateConfig() error {
@@ -109,41 +108,41 @@ func generateConfig() error {
109108
}
110109

111110
// initHTTP is a blocking function that initializes and runs the HTTP server.
112-
func initHTTP(co *core.Core) {
111+
func initHTTP(h *Handlers, ko *koanf.Koanf) {
113112
r := chi.NewRouter()
114113

115-
// Middleware to attach the instance of core to every handler.
114+
// Request logging middleware.
116115
r.Use(func(next http.Handler) http.Handler {
117116
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
118-
lo.Debug("server received request",
117+
log.Debug("server received request",
119118
"method", r.Method,
120119
"header", r.Header,
121120
"uri", r.RequestURI,
122121
"remote-address", r.RemoteAddr,
123122
"content-length", r.ContentLength,
124123
"form", r.Form,
125124
)
126-
ctx := context.WithValue(r.Context(), "core", co)
127-
next.ServeHTTP(w, r.WithContext(ctx))
125+
126+
next.ServeHTTP(w, r)
128127
})
129128
})
130129

131130
// Register HTTP handlers.
132131
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
133-
sendResponse(w, fmt.Sprintf("dungbeetle %s", buildString))
132+
h.sendResponse(w, fmt.Sprintf("dungbeetle %s", buildString))
134133
})
135-
r.Get("/tasks", handleGetTasksList)
136-
r.Post("/tasks/{taskName}/jobs", handlePostJob)
137-
r.Get("/jobs/{jobID}", handleGetJobStatus)
138-
r.Delete("/jobs/{jobID}", handleCancelJob)
139-
r.Delete("/groups/{groupID}", handleCancelGroupJob)
140-
r.Get("/jobs/queue/{queue}", handleGetPendingJobs)
141-
r.Post("/groups", handlePostJobGroup)
142-
r.Get("/groups/{groupID}", handleGetGroupStatus)
143-
144-
lo.Info("starting HTTP server", "address", ko.String("server"))
134+
r.Get("/tasks", h.GetTasksList)
135+
r.Post("/tasks/{taskName}/jobs", h.handlePostJob)
136+
r.Get("/jobs/{jobID}", h.handleGetJobStatus)
137+
r.Delete("/jobs/{jobID}", h.handleCancelJob)
138+
r.Delete("/groups/{groupID}", h.handleCancelGroupJob)
139+
r.Get("/jobs/queue/{queue}", h.handleGetPendingJobs)
140+
r.Post("/groups", h.handlePostJobGroup)
141+
r.Get("/groups/{groupID}", h.handleGetGroupStatus)
142+
143+
log.Info("starting HTTP server", "address", ko.String("server"))
145144
if err := http.ListenAndServe(ko.String("server"), r); err != nil {
146-
lo.Error("shutting down http server", "error", err)
145+
log.Error("shutting down http server", "error", err)
147146
}
148147
os.Exit(0)
149148
}
@@ -152,11 +151,11 @@ func initCore(ko *koanf.Koanf) (*core.Core, error) {
152151
// Source DBs config.
153152
var srcDBs map[string]dbpool.Config
154153
if err := ko.Unmarshal("db", &srcDBs); err != nil {
155-
lo.Error("error reading source DB config", "error", err)
154+
log.Error("error reading source DB config", "error", err)
156155
return nil, fmt.Errorf("error reading source DB config : %w", err)
157156
}
158157
if len(srcDBs) == 0 {
159-
lo.Error("found 0 source databases in config")
158+
log.Error("found 0 source databases in config")
160159
return nil, fmt.Errorf("found 0 source databases in config")
161160
}
162161

@@ -190,7 +189,7 @@ func initCore(ko *koanf.Koanf) (*core.Core, error) {
190189
UnloggedTables: resDBs[name].Unlogged,
191190
}
192191

193-
backend, err := sqldb.NewSQLBackend(db, opt, lo)
192+
backend, err := sqldb.NewSQLBackend(db, opt, log)
194193
if err != nil {
195194
return nil, fmt.Errorf("error initializing result backend: %w", err)
196195
}

0 commit comments

Comments
 (0)