Skip to content

Commit 2f3aa35

Browse files
committed
feat: add sse
1 parent 3271da1 commit 2f3aa35

File tree

14 files changed

+2216
-0
lines changed

14 files changed

+2216
-0
lines changed

pkg/sse/README.md

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
## SSE
2+
3+
A high-performance Go language Server-Sent Events (SSE) server and client implementation, supporting uni-cast and broadcast events, automatic reconnection, message persistence, and other features.
4+
5+
### Features
6+
7+
- 🚀 High-performance Event Hub for managing client connections
8+
- 🔌 Supports automatic reconnection and event retransmission on disconnect
9+
- 📊 Built-in push statistics and performance monitoring
10+
- 🔒 Thread-safe client management
11+
- ⏱️ Supports timeout retries and asynchronous task processing
12+
- 💾 Optional persistent storage interface
13+
- ❤️ Built-in heartbeat detection mechanism
14+
15+
<br>
16+
17+
### Example of Use
18+
19+
#### Server Example
20+
21+
```go
22+
package main
23+
24+
import (
25+
"net/http"
26+
"strconv"
27+
"time"
28+
"math/rand"
29+
"github.com/gin-gonic/gin"
30+
"github.com/go-dev-frame/sponge/pkg/sse"
31+
)
32+
33+
func main() {
34+
// Initialize SSE Hub
35+
hub := sse.NewHub()
36+
defer hub.Close()
37+
38+
// Create Gin router
39+
r := gin.Default()
40+
41+
// SSE Event Stream Interface, requires authentication to set uid
42+
r.GET("/events", func(c *gin.Context) {
43+
var uid string
44+
u, isExists := c.Get("uid")
45+
if !isExists {
46+
uid = strconv.Itoa(rand.Intn(99) + 100) // mock uid
47+
} else {
48+
uid, _ = u.(string)
49+
}
50+
hub.Serve(c, uid)
51+
})
52+
53+
// Register event push endpoint, supports pushing to specified users and broadcast pushing
54+
// Push to specified users
55+
// curl -X POST -H "Content-Type: application/json" -d '{"uids":["u001"],"events":[{"event":"message","data":"hello_world"}]}' http://localhost:8080/push
56+
// Broadcast push, not specifying users means pushing to all users
57+
// curl -X POST -H "Content-Type: application/json" -d '{"events":[{"event":"message","data":"hello_world"}]}' http://localhost:8080/push
58+
r.POST("/push", hub.PushEventHandler())
59+
60+
// simulated event push
61+
go func() {
62+
i := 0
63+
for {
64+
time.Sleep(time.Second * 5)
65+
i++
66+
e := &sse.Event{Event: sse.DefaultEventType, Data: "hello_world_" + strconv.Itoa(i)}
67+
_ = hub.Push(nil, e) // broadcast push
68+
//_ = hub.Push([]string{uid}, e) // specified user push
69+
}
70+
}()
71+
72+
// Start HTTP server
73+
if err := http.ListenAndServe(":8080", r); err != nil {
74+
panic(err)
75+
}
76+
}
77+
```
78+
79+
<br>
80+
81+
#### Client Example
82+
83+
```go
84+
package main
85+
86+
import (
87+
"fmt"
88+
"github.com/go-dev-frame/sponge/pkg/sse"
89+
)
90+
91+
func main() {
92+
url := "http://localhost:8080/events"
93+
94+
// Create SSE client
95+
client := sse.NewClient(url)
96+
97+
client.OnEvent(sse.DefaultEventType, func(event *sse.Event) {
98+
fmt.Printf("Received: %#v\n", event)
99+
})
100+
101+
err := client.Connect()
102+
if err != nil {
103+
fmt.Printf("Connection failed: %v\n", err)
104+
return
105+
}
106+
107+
fmt.Println("SSE client started, press Ctrl+C to exit")
108+
<-client.Wait()
109+
}
110+
```
111+
112+
<br>
113+
114+
### Advanced Configuration
115+
116+
#### Using Persistent Storage
117+
118+
You can implement map, redis, mysql and other storage to achieve persistent storage and query of events. Example code:
119+
120+
```go
121+
// Implement the Store interface
122+
type MyStore struct{}
123+
124+
func (s *MyStore) Save(ctx context.Context, e *sse.Event) error {
125+
// Implement event storage logic
126+
return nil
127+
}
128+
129+
func (s *MyStore) ListByLastID(ctx context.Context, eventType string, lastID string, pageSize int) ([]*sse.Event, string, error) {
130+
// Implement event query logic, paginate query, return event list, last event ID
131+
return nil, nil
132+
}
133+
134+
// Create Hub with storage
135+
hub := sse.NewHub(sse.WithStore(&MyStore{}))
136+
```
137+
138+
<br>
139+
140+
#### Configure whether events need to be resent when the client disconnects and reconnects
141+
142+
To enable this feature, it needs to be used with event persistent storage. Example code:
143+
144+
```go
145+
hub := sse.NewHub(
146+
sse.WithStore(&MyStore{}),
147+
sse.WithEnableResendEvents(),
148+
)
149+
```
150+
151+
<br>
152+
153+
#### Customizing Push Failed Event Handling
154+
155+
Code example:
156+
157+
```go
158+
fn := func(uid string, event *sse.Event) {
159+
// Custom handling logic for push failures, such as logging or saving to database
160+
log.Printf("Push failed: User %s, Event ID %s", uid, event.ID)
161+
}
162+
163+
// Create Hub with push failed handling
164+
hub := sse.NewHub(sse.WithPushFailedHandleFn(fn))
165+
```
166+
167+
<br>
168+
169+
### API Reference
170+
171+
#### Hub Methods
172+
173+
- `NewHub(opts ...HubOption) *Hub`: Creates a new event hub, supporting custom persistence, re-sending events, logging, push event buffer size, and concurrent push event goroutine options.
174+
- `Push(uids []string, events ...*Event) error`: Pushes events to specified users or all users
175+
- `OnlineClientsNum() int`: Gets the number of online clients
176+
- `Close()`: Closes the event hub
177+
- `PrintPushStats()`: Prints push statistics
178+
179+
<br>
180+
181+
#### Serve Method
182+
183+
- `Serve(c *gin.Context, hub *Hub, uid string, opts...ServeOption)`: Handles SSE client connection requests, supports setting custom request headers.
184+
185+
<br>
186+
187+
#### Client Methods
188+
189+
- `NewClient(url string) *SSEClient`: Creates a new SSE client, supporting custom request headers, reconnection interval, and log options.
190+
- `Connect() error`: Connects to the server
191+
- `Disconnect()`: Disconnects
192+
- `OnEvent(eventType string, callback EventCallback)`: Registers an event callback
193+
194+
<br>
195+
196+
### Performance Tuning
197+
198+
- `WithChanBufferSize(size int)`: Sets the broadcast channel buffer size
199+
- `WithWorkerNum(num int)`: Sets the number of asynchronous worker goroutines

pkg/sse/benchmark_test.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package sse
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"sync"
8+
"sync/atomic"
9+
"testing"
10+
"time"
11+
12+
"github.com/gin-gonic/gin"
13+
14+
"github.com/go-dev-frame/sponge/pkg/utils"
15+
)
16+
17+
func BenchmarkSSEPushOneClient(b *testing.B) {
18+
port, _ := utils.GetAvailablePort()
19+
eventType := DefaultEventType
20+
ctx, cancel := context.WithCancel(context.Background())
21+
22+
// start sse server
23+
hub := NewHub(WithContext(ctx, cancel))
24+
25+
gin.SetMode(gin.ReleaseMode)
26+
r := gin.New()
27+
uid := "u001"
28+
r.GET("/events", func(c *gin.Context) {
29+
hub.Serve(c, uid)
30+
})
31+
32+
go func() {
33+
r.Run(":" + strconv.Itoa(port))
34+
}()
35+
36+
time.Sleep(100 * time.Millisecond)
37+
38+
var received int64
39+
40+
// run sse client
41+
client := NewClient(fmt.Sprintf("http://localhost:%d/events", port), WithClientReconnectTimeInterval(100*time.Millisecond))
42+
var wg sync.WaitGroup
43+
client.OnEvent(eventType, func(e *Event) {
44+
atomic.AddInt64(&received, 1)
45+
wg.Done()
46+
})
47+
err := client.Connect()
48+
if err != nil {
49+
b.Fatalf("Failed to connect SSE client: %v", err)
50+
}
51+
time.Sleep(100 * time.Millisecond)
52+
53+
uids := []string{uid}
54+
var event = &Event{
55+
Event: eventType,
56+
Data: "test-push-data",
57+
}
58+
59+
wg.Add(b.N)
60+
b.ResetTimer()
61+
for i := 0; i < b.N; i++ {
62+
_ = hub.Push(uids, event) // push event to one client
63+
}
64+
b.StopTimer()
65+
66+
done := make(chan struct{})
67+
go func() {
68+
wg.Wait()
69+
close(done)
70+
}()
71+
72+
// wait for all events to be received
73+
select {
74+
case <-done:
75+
case <-time.After(10 * time.Second):
76+
b.Error("timeout: SSE client did not receive all events")
77+
}
78+
79+
// Reporting metrics (events per second)
80+
elapsed := b.Elapsed()
81+
eventsPerSec := float64(b.N) / elapsed.Seconds()
82+
b.ReportMetric(eventsPerSec, "events/sec")
83+
}
84+
85+
func BenchmarkSSEServerBroadcast(b *testing.B) {
86+
port, _ := utils.GetAvailablePort()
87+
eventType := DefaultEventType
88+
ctx, cancel := context.WithCancel(context.Background())
89+
90+
// start sse server
91+
hub := NewHub(WithContext(ctx, cancel))
92+
93+
gin.SetMode(gin.ReleaseMode)
94+
r := gin.New()
95+
var count int64 = 10000
96+
r.GET("/events", func(c *gin.Context) {
97+
atomic.AddInt64(&count, 1)
98+
uid := fmt.Sprintf("%d", atomic.LoadInt64(&count)) // mock user id
99+
hub.Serve(c, uid)
100+
})
101+
go func() {
102+
r.Run(":" + strconv.Itoa(port))
103+
}()
104+
105+
time.Sleep(100 * time.Millisecond)
106+
107+
var received int64
108+
var wg sync.WaitGroup
109+
110+
// run sse clients
111+
clientNum := 10
112+
for i := 0; i < clientNum; i++ {
113+
client := NewClient(fmt.Sprintf("http://localhost:%d/events", port), WithClientReconnectTimeInterval(100*time.Millisecond))
114+
client.OnEvent(eventType, func(e *Event) {
115+
atomic.AddInt64(&received, 1)
116+
wg.Done()
117+
})
118+
err := client.Connect()
119+
if err != nil {
120+
b.Fatalf("Failed to connect SSE client: %v", err)
121+
}
122+
}
123+
time.Sleep(200 * time.Millisecond)
124+
125+
var event = &Event{
126+
Event: eventType,
127+
Data: "test-push-data",
128+
}
129+
130+
wg.Add(b.N * clientNum)
131+
b.ResetTimer()
132+
for i := 0; i < b.N; i++ {
133+
_ = hub.Push(nil, event) // push event to all clients
134+
}
135+
b.StopTimer()
136+
137+
done := make(chan struct{})
138+
go func() {
139+
wg.Wait()
140+
close(done)
141+
}()
142+
143+
// wait for all events to be received
144+
select {
145+
case <-done:
146+
case <-time.After(10 * time.Second):
147+
b.Error("timeout: SSE client did not receive all events")
148+
}
149+
150+
total, success, failed, _ := hub.PushStats.Snapshot()
151+
152+
// Reporting metrics (events per second)
153+
elapsed := b.Elapsed()
154+
eventsPerSec := float64(b.N) / elapsed.Seconds()
155+
b.ReportMetric(eventsPerSec, "events/sec")
156+
b.ReportMetric(float64(total), "total_push")
157+
b.ReportMetric(float64(success), "success_push")
158+
b.ReportMetric(float64(failed), "failed_push")
159+
}

0 commit comments

Comments
 (0)