Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions cmd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var reValidateName = regexp.MustCompile("(?i)^[a-z0-9-_:]+$")
// is passed, it returns the raw SQL bodies as well.
func handleGetTasksList(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)
)

tasks := co.GetTasks()
Expand All @@ -41,7 +41,7 @@ func handleGetTasksList(w http.ResponseWriter, r *http.Request) {
// handleGetJobStatus returns the status of a given jobID.
func handleGetJobStatus(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)

jobID = chi.URLParam(r, "jobID")
)
Expand All @@ -59,7 +59,7 @@ func handleGetJobStatus(w http.ResponseWriter, r *http.Request) {
// handleGetGroupStatus returns the status of a given groupID.
func handleGetGroupStatus(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)

groupID = chi.URLParam(r, "groupID")
)
Expand All @@ -77,7 +77,7 @@ func handleGetGroupStatus(w http.ResponseWriter, r *http.Request) {
// handleGetPendingJobs returns pending jobs in a given queue.
func handleGetPendingJobs(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)
queue = chi.URLParam(r, "queue")
)

Expand All @@ -94,7 +94,7 @@ func handleGetPendingJobs(w http.ResponseWriter, r *http.Request) {
// handlePostJob creates a new job against a given task name.
func handlePostJob(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)

taskName = chi.URLParam(r, "taskName")
)
Expand Down Expand Up @@ -134,7 +134,7 @@ func handlePostJob(w http.ResponseWriter, r *http.Request) {
// handlePostJobGroup creates multiple jobs under a group.
func handlePostJobGroup(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)

decoder = json.NewDecoder(r.Body)
req models.GroupReq
Expand All @@ -160,7 +160,7 @@ func handlePostJobGroup(w http.ResponseWriter, r *http.Request) {
// it is cancelled first and then deleted.
func handleCancelJob(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)

jobID = chi.URLParam(r, "jobID")
purge, _ = strconv.ParseBool(r.URL.Query().Get("purge"))
Expand All @@ -178,7 +178,7 @@ func handleCancelJob(w http.ResponseWriter, r *http.Request) {
// If the job is running, it is cancelled first, and then deleted.
func handleCancelGroupJob(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)

groupID = chi.URLParam(r, "groupID")
purge, _ = strconv.ParseBool(r.URL.Query().Get("purge"))
Expand Down
34 changes: 26 additions & 8 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/zerodha/dungbeetle/v2/internal/resultbackends/sqldb"
)

type contextKey string

const coreKey contextKey = "core"

var (
//go:embed config.sample.toml
efs embed.FS
Expand All @@ -30,9 +34,21 @@ var (
func initFlags(ko *koanf.Koanf) {
// Command line flags.
f := flag.NewFlagSet("config", flag.ContinueOnError)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I feel is unnecessary. Let's leave the flags stdout untouched. The rest of the changes are fine.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, Let’s keep flags stdout as is.

const usage = `
DungBeetle is a distributed SQL job queue and worker system.
It allows you to run SQL queries as jobs in a distributed manner, with support for job queues, concurrency, and result storage.

Usage:
dungbeetle [flags]
Flags:
`

f.Usage = func() {
lo.Info("DungBeetle")
lo.Info(f.FlagUsages())
fmt.Fprint(os.Stderr, usage)
f.SetOutput(os.Stderr)
f.PrintDefaults()
fmt.Fprintln(os.Stderr)
os.Exit(0)
}

Expand All @@ -52,18 +68,20 @@ func initFlags(ko *koanf.Koanf) {
}

func initConfig(ko *koanf.Koanf) {
lo.Info("buildstring", "value", buildString)

// Generate new config file.
if ok := ko.Bool("new-config"); ok {
if err := generateConfig(); err != nil {
fmt.Println(err)
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
fmt.Println("config.toml generated. Edit and run --install.")
fmt.Fprintln(os.Stderr, "config.toml generated.")
os.Exit(0)
}

// Print build string.
lo.Info("buildstring", "value", buildString)

// Load the config file.
if err := ko.Load(file.Provider(ko.String("config")), toml.Parser()); err != nil {
slog.Error("error reading config", "error", err)
Expand Down Expand Up @@ -123,7 +141,7 @@ func initHTTP(co *core.Core) {
"content-length", r.ContentLength,
"form", r.Form,
)
ctx := context.WithValue(r.Context(), "core", co)
ctx := context.WithValue(r.Context(), coreKey, co)
next.ServeHTTP(w, r.WithContext(ctx))
})
})
Expand Down Expand Up @@ -199,10 +217,10 @@ func initCore(ko *koanf.Koanf) (*core.Core, error) {
}

if v := ko.MustString("job_queue.broker.type"); v != "redis" {
return nil, fmt.Errorf("unsupported job_queue.broker.type '%s'. Only 'redis' is supported.", v)
return nil, fmt.Errorf("unsupported job_queue.broker.type '%s'. Only 'redis' is supported", v)
}
if v := ko.MustString("job_queue.state.type"); v != "redis" {
return nil, fmt.Errorf("unsupported job_queue.state.type '%s'. Only 'redis' is supported.", v)
return nil, fmt.Errorf("unsupported job_queue.state.type '%s'. Only 'redis' is supported", v)
}

lo := slog.Default()
Expand Down
2 changes: 1 addition & 1 deletion internal/dbpool/dbpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func New(mp map[string]Config) (map[string]*sql.DB, error) {
return out, nil
}

// connectDB creates and returns a database connection.
// NewConn creates and returns a database connection.
func NewConn(cfg Config) (*sql.DB, error) {
db, err := sql.Open(cfg.Type, cfg.DSN)
if err != nil {
Expand Down