A Go SDK client for interacting with the messages-worker service. This SDK provides a convenient way to submit messages, manage workers, and check service health from Go applications.
- Message Operations: Submit single or bulk messages with different priorities
- Worker Management: Monitor and scale workers dynamically
- Health Checks: Check service health and availability
- Callback Signature Verification: Verify HMAC-SHA256 signatures for secure callback handling
- Error Handling: Comprehensive error handling with custom error types
- Context Support: Full context.Context support for timeouts and cancellation
- Type Safety: Strongly typed API with proper validation
go get github.com/ericbrisrubio/messages-worker-sdkpackage main
import (
"context"
"fmt"
"log"
"github.com/ericbrisrubio/messages-worker-sdk"
)
func main() {
// Create a client
client := sdk.NewClientWithDefaults()
ctx := context.Background()
// Submit a message
resp, err := client.PostHighPriorityMessage(ctx, "pr-123", "https://example.com/callback", map[string]interface{}{
"pull_request": map[string]interface{}{
"id": 123,
"title": "Add new feature",
},
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Message submitted: %s\n", resp.ID)
}client := sdk.NewClientWithDefaults()
// Uses: http://localhost:8083 with 30s timeoutconfig := &sdk.Config{
BaseURL: "https://messages-worker.example.com",
Timeout: 60 * time.Second,
}
client := sdk.NewClient(config)messageReq := &sdk.MessageRequest{
ItemID: "pr-123",
Priority: sdk.PriorityHigh,
Topic: sdk.TopicPullRequests,
CallbackURL: "https://example.com/callback",
ObjectBody: map[string]interface{}{
"pull_request": map[string]interface{}{
"id": 123,
"title": "Add new feature",
},
},
}
resp, err := client.PostMessage(ctx, messageReq)bulkReq := &sdk.BulkMessageRequest{
Messages: []sdk.MessageRequest{
{
ItemID: "pr-123",
Priority: sdk.PriorityHigh,
Topic: sdk.TopicPullRequests,
CallbackURL: "https://example.com/callback1",
ObjectBody: map[string]interface{}{"data": "value1"},
},
{
ItemID: "pr-124",
Priority: sdk.PriorityMedium,
Topic: sdk.TopicPullRequests,
CallbackURL: "https://example.com/callback2",
ObjectBody: map[string]interface{}{"data": "value2"},
},
},
}
resp, err := client.PostBulkMessages(ctx, bulkReq)// High priority message
resp, err := client.PostHighPriorityMessage(ctx, "pr-123", "https://example.com/callback", data)
// Medium priority message (default)
resp, err := client.PostMessageWithDefaults(ctx, "pr-123", "https://example.com/callback", data)
// Low priority message
resp, err := client.PostLowPriorityMessage(ctx, "pr-123", "https://example.com/callback", data)status, err := client.GetWorkerStatus(ctx)
fmt.Printf("Total workers: %d\n", status.TotalWorkers)
fmt.Printf("High priority workers: %d\n", status.HighPriority.Count)// Add workers
resp, err := client.AddWorkers(ctx, "high", 3)
// Remove workers
resp, err := client.RemoveWorkers(ctx, "medium", 2)
// Scale workers (positive to add, negative to remove)
resp, err := client.ScaleWorkers(ctx, "low", -1)resp, err := client.RemoveAllWorkers(ctx)
fmt.Printf("Removed %d workers\n", resp.TotalRemoved)// Get count for specific priority
count, err := client.GetWorkerCount(ctx, "high")
// Get total count
total, err := client.GetTotalWorkerCount(ctx)health, err := client.CheckHealth(ctx)
if err != nil {
log.Printf("Service is unhealthy: %v", err)
} else {
fmt.Printf("Service is healthy: %s\n", health.Status)
}if client.IsHealthy(ctx) {
fmt.Println("Service is healthy")
} else {
fmt.Println("Service is unhealthy")
}resp, err := client.Ping(ctx)The SDK provides comprehensive error handling with custom error types:
resp, err := client.PostMessage(ctx, messageReq)
if err != nil {
if sdk.IsAPIError(err) {
// Handle API errors (4xx, 5xx responses)
apiErr := err.(*sdk.APIError)
fmt.Printf("API Error %d: %s\n", apiErr.StatusCode, apiErr.Message)
} else {
// Handle other errors (network, parsing, etc.)
fmt.Printf("Error: %v\n", err)
}
}- APIError: Errors returned by the API (HTTP 4xx, 5xx)
- Network errors: Connection failures, timeouts
- Validation errors: Invalid request parameters
- Parsing errors: JSON marshaling/unmarshaling failures
The SDK supports three priority levels:
sdk.PriorityLow: Low priority messages (30s delay)sdk.PriorityMedium: Medium priority messages (15s delay)sdk.PriorityHigh: High priority messages (5s delay)
Currently supported topics:
sdk.TopicPullRequests: Pull request related messages
All SDK methods support context.Context for timeouts and cancellation:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := client.PostMessage(ctx, messageReq)The messages-worker can generate HMAC-SHA256 signatures for callback requests. Use the SDK to verify these signatures:
import (
"io"
"github.com/ericbrisrubio/messages-worker-sdk"
)
func handleCallback(w http.ResponseWriter, r *http.Request) {
secret := os.Getenv("CALLBACK_SECRET")
// Read the request body
body, err := io.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read request body: %v", err)
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}
// Get signature from header
signature := r.Header.Get(sdk.GetSignatureHeader())
// Verify signature
valid, err := sdk.VerifyCallbackSignature(body, signature, secret)
if err != nil {
log.Printf("Signature verification error: %v", err)
http.Error(w, "Signature verification failed", http.StatusUnauthorized)
return
}
if !valid {
log.Printf("Invalid signature")
http.Error(w, "Invalid signature", http.StatusUnauthorized)
return
}
// Process callback...
log.Println("Signature verified successfully")
}The messages-worker sends signatures in the X-Callback-Signature header:
X-Callback-Signature: <hmac-sha256-hex-signature>
Set the CALLBACK_SECRET environment variable in your messages-worker configuration:
export CALLBACK_SECRET="your-secret-key-here"See the examples/ directory for complete working examples:
examples/main.go: Comprehensive example showing all SDK featuresexamples/callback_handler.go: Complete callback handler with signature verification
PostMessage(ctx, req)- Submit a single messagePostBulkMessages(ctx, req)- Submit multiple messagesPostMessageWithDefaults(ctx, itemID, callbackURL, objectBody)- Submit with defaultsPostHighPriorityMessage(ctx, itemID, callbackURL, objectBody)- Submit high priorityPostLowPriorityMessage(ctx, itemID, callbackURL, objectBody)- Submit low priority
GetWorkerStatus(ctx)- Get current worker statusScaleWorkers(ctx, priority, count)- Scale workers (positive/negative count)AddWorkers(ctx, priority, count)- Add workersRemoveWorkers(ctx, priority, count)- Remove workersRemoveAllWorkers(ctx)- Remove all workersGetWorkerCount(ctx, priority)- Get worker count for priorityGetTotalWorkerCount(ctx)- Get total worker count
CheckHealth(ctx)- Check service healthIsHealthy(ctx)- Simple boolean health checkPing(ctx)- Alias for CheckHealth
MessageRequest- Single message requestMessageResponse- Single message responseBulkMessageRequest- Bulk message requestBulkMessageResponse- Bulk message response
WorkerInfo- Individual worker informationPriorityWorkerInfo- Worker info for a priority levelWorkerStatusResponse- Complete worker statusScaleWorkersResponse- Worker scaling responseRemoveAllWorkersResponse- Remove all workers response
HealthResponse- Health check response
Config- Client configurationAPIError- API error type
This SDK is part of the messages-worker project and follows the same license terms.