Skip to content

ericbrisrubio/messages-worker-sdk

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Messages Worker SDK

A Go SDK client for interacting with the messages-worker service. This SDK provides a convenient way to submit messages, manage workers, and check service health from Go applications.

Features

  • Message Operations: Submit single or bulk messages with different priorities
  • Worker Management: Monitor and scale workers dynamically
  • Health Checks: Check service health and availability
  • Callback Signature Verification: Verify HMAC-SHA256 signatures for secure callback handling
  • Error Handling: Comprehensive error handling with custom error types
  • Context Support: Full context.Context support for timeouts and cancellation
  • Type Safety: Strongly typed API with proper validation

Installation

go get github.com/ericbrisrubio/messages-worker-sdk

Quick Start

package main

import (
    "context"
    "fmt"
    "log"
    
    "github.com/ericbrisrubio/messages-worker-sdk"
)

func main() {
    // Create a client
    client := sdk.NewClientWithDefaults()
    
    ctx := context.Background()
    
    // Submit a message
    resp, err := client.PostHighPriorityMessage(ctx, "pr-123", "https://example.com/callback", map[string]interface{}{
        "pull_request": map[string]interface{}{
            "id": 123,
            "title": "Add new feature",
        },
    })
    if err != nil {
        log.Fatal(err)
    }
    
    fmt.Printf("Message submitted: %s\n", resp.ID)
}

Client Configuration

Default Configuration

client := sdk.NewClientWithDefaults()
// Uses: http://localhost:8083 with 30s timeout

Custom Configuration

config := &sdk.Config{
    BaseURL: "https://messages-worker.example.com",
    Timeout: 60 * time.Second,
}
client := sdk.NewClient(config)

Message Operations

Single Message Submission

messageReq := &sdk.MessageRequest{
    ItemID:      "pr-123",
    Priority:    sdk.PriorityHigh,
    Topic:       sdk.TopicPullRequests,
    CallbackURL: "https://example.com/callback",
    ObjectBody: map[string]interface{}{
        "pull_request": map[string]interface{}{
            "id": 123,
            "title": "Add new feature",
        },
    },
}

resp, err := client.PostMessage(ctx, messageReq)

Bulk Message Submission

bulkReq := &sdk.BulkMessageRequest{
    Messages: []sdk.MessageRequest{
        {
            ItemID:      "pr-123",
            Priority:    sdk.PriorityHigh,
            Topic:       sdk.TopicPullRequests,
            CallbackURL: "https://example.com/callback1",
            ObjectBody:  map[string]interface{}{"data": "value1"},
        },
        {
            ItemID:      "pr-124",
            Priority:    sdk.PriorityMedium,
            Topic:       sdk.TopicPullRequests,
            CallbackURL: "https://example.com/callback2",
            ObjectBody:  map[string]interface{}{"data": "value2"},
        },
    },
}

resp, err := client.PostBulkMessages(ctx, bulkReq)

Convenience Methods

// High priority message
resp, err := client.PostHighPriorityMessage(ctx, "pr-123", "https://example.com/callback", data)

// Medium priority message (default)
resp, err := client.PostMessageWithDefaults(ctx, "pr-123", "https://example.com/callback", data)

// Low priority message
resp, err := client.PostLowPriorityMessage(ctx, "pr-123", "https://example.com/callback", data)

Worker Management

Get Worker Status

status, err := client.GetWorkerStatus(ctx)
fmt.Printf("Total workers: %d\n", status.TotalWorkers)
fmt.Printf("High priority workers: %d\n", status.HighPriority.Count)

Scale Workers

// Add workers
resp, err := client.AddWorkers(ctx, "high", 3)

// Remove workers
resp, err := client.RemoveWorkers(ctx, "medium", 2)

// Scale workers (positive to add, negative to remove)
resp, err := client.ScaleWorkers(ctx, "low", -1)

Remove All Workers

resp, err := client.RemoveAllWorkers(ctx)
fmt.Printf("Removed %d workers\n", resp.TotalRemoved)

Get Worker Counts

// Get count for specific priority
count, err := client.GetWorkerCount(ctx, "high")

// Get total count
total, err := client.GetTotalWorkerCount(ctx)

Health Checks

Check Service Health

health, err := client.CheckHealth(ctx)
if err != nil {
    log.Printf("Service is unhealthy: %v", err)
} else {
    fmt.Printf("Service is healthy: %s\n", health.Status)
}

Simple Health Check

if client.IsHealthy(ctx) {
    fmt.Println("Service is healthy")
} else {
    fmt.Println("Service is unhealthy")
}

Ping Service

resp, err := client.Ping(ctx)

Error Handling

The SDK provides comprehensive error handling with custom error types:

resp, err := client.PostMessage(ctx, messageReq)
if err != nil {
    if sdk.IsAPIError(err) {
        // Handle API errors (4xx, 5xx responses)
        apiErr := err.(*sdk.APIError)
        fmt.Printf("API Error %d: %s\n", apiErr.StatusCode, apiErr.Message)
    } else {
        // Handle other errors (network, parsing, etc.)
        fmt.Printf("Error: %v\n", err)
    }
}

Error Types

  • APIError: Errors returned by the API (HTTP 4xx, 5xx)
  • Network errors: Connection failures, timeouts
  • Validation errors: Invalid request parameters
  • Parsing errors: JSON marshaling/unmarshaling failures

Message Priorities

The SDK supports three priority levels:

  • sdk.PriorityLow: Low priority messages (30s delay)
  • sdk.PriorityMedium: Medium priority messages (15s delay)
  • sdk.PriorityHigh: High priority messages (5s delay)

Topics

Currently supported topics:

  • sdk.TopicPullRequests: Pull request related messages

Context Support

All SDK methods support context.Context for timeouts and cancellation:

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

resp, err := client.PostMessage(ctx, messageReq)

Callback Signature Verification

The messages-worker can generate HMAC-SHA256 signatures for callback requests. Use the SDK to verify these signatures:

import (
    "io"
    "github.com/ericbrisrubio/messages-worker-sdk"
)

func handleCallback(w http.ResponseWriter, r *http.Request) {
    secret := os.Getenv("CALLBACK_SECRET")
    
    // Read the request body
    body, err := io.ReadAll(r.Body)
    if err != nil {
        log.Printf("Failed to read request body: %v", err)
        http.Error(w, "Failed to read request body", http.StatusBadRequest)
        return
    }
    
    // Get signature from header
    signature := r.Header.Get(sdk.GetSignatureHeader())
    
    // Verify signature
    valid, err := sdk.VerifyCallbackSignature(body, signature, secret)
    if err != nil {
        log.Printf("Signature verification error: %v", err)
        http.Error(w, "Signature verification failed", http.StatusUnauthorized)
        return
    }
    if !valid {
        log.Printf("Invalid signature")
        http.Error(w, "Invalid signature", http.StatusUnauthorized)
        return
    }
    
    // Process callback...
    log.Println("Signature verified successfully")
}

Signature Header

The messages-worker sends signatures in the X-Callback-Signature header:

X-Callback-Signature: <hmac-sha256-hex-signature>

Configuration

Set the CALLBACK_SECRET environment variable in your messages-worker configuration:

export CALLBACK_SECRET="your-secret-key-here"

Examples

See the examples/ directory for complete working examples:

  • examples/main.go: Comprehensive example showing all SDK features
  • examples/callback_handler.go: Complete callback handler with signature verification

API Reference

Client Methods

Message Operations

  • PostMessage(ctx, req) - Submit a single message
  • PostBulkMessages(ctx, req) - Submit multiple messages
  • PostMessageWithDefaults(ctx, itemID, callbackURL, objectBody) - Submit with defaults
  • PostHighPriorityMessage(ctx, itemID, callbackURL, objectBody) - Submit high priority
  • PostLowPriorityMessage(ctx, itemID, callbackURL, objectBody) - Submit low priority

Worker Management

  • GetWorkerStatus(ctx) - Get current worker status
  • ScaleWorkers(ctx, priority, count) - Scale workers (positive/negative count)
  • AddWorkers(ctx, priority, count) - Add workers
  • RemoveWorkers(ctx, priority, count) - Remove workers
  • RemoveAllWorkers(ctx) - Remove all workers
  • GetWorkerCount(ctx, priority) - Get worker count for priority
  • GetTotalWorkerCount(ctx) - Get total worker count

Health Checks

  • CheckHealth(ctx) - Check service health
  • IsHealthy(ctx) - Simple boolean health check
  • Ping(ctx) - Alias for CheckHealth

Types

Message Types

  • MessageRequest - Single message request
  • MessageResponse - Single message response
  • BulkMessageRequest - Bulk message request
  • BulkMessageResponse - Bulk message response

Worker Types

  • WorkerInfo - Individual worker information
  • PriorityWorkerInfo - Worker info for a priority level
  • WorkerStatusResponse - Complete worker status
  • ScaleWorkersResponse - Worker scaling response
  • RemoveAllWorkersResponse - Remove all workers response

Health Types

  • HealthResponse - Health check response

Configuration Types

  • Config - Client configuration
  • APIError - API error type

License

This SDK is part of the messages-worker project and follows the same license terms.

About

Messages Worker SDK client

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published