@@ -23,14 +23,21 @@ type NotificationInjector interface {
2323 InjectSMIGRATED (ctx context.Context , seqID int64 , hostPort string , slots ... string ) error
2424
2525 // InjectMOVING injects a MOVING notification (for standalone)
26- InjectMOVING (ctx context.Context , seqID int64 , slot int ) error
26+ // Format: ["MOVING", seqID, timeS, endpoint]
27+ InjectMOVING (ctx context.Context , seqID int64 , timeS int64 , endpoint string ) error
2728
2829 // InjectMIGRATING injects a MIGRATING notification (for standalone)
2930 InjectMIGRATING (ctx context.Context , seqID int64 , slot int ) error
3031
3132 // InjectMIGRATED injects a MIGRATED notification (for standalone)
3233 InjectMIGRATED (ctx context.Context , seqID int64 , slot int ) error
3334
35+ // InjectFAILING_OVER injects a FAILING_OVER notification
36+ InjectFAILING_OVER (ctx context.Context , seqID int64 ) error
37+
38+ // InjectFAILED_OVER injects a FAILED_OVER notification
39+ InjectFAILED_OVER (ctx context.Context , seqID int64 ) error
40+
3441 // Start starts the injector (if needed)
3542 Start () error
3643
@@ -475,8 +482,8 @@ func (p *ProxyNotificationInjector) InjectSMIGRATED(ctx context.Context, seqID i
475482 return p .injectNotification (notification )
476483}
477484
478- func (p * ProxyNotificationInjector ) InjectMOVING (ctx context.Context , seqID int64 , slot int ) error {
479- notification := formatMovingNotification (seqID , slot )
485+ func (p * ProxyNotificationInjector ) InjectMOVING (ctx context.Context , seqID int64 , timeS int64 , endpoint string ) error {
486+ notification := formatMovingNotification (seqID , timeS , endpoint )
480487 return p .injectNotification (notification )
481488}
482489
@@ -490,6 +497,16 @@ func (p *ProxyNotificationInjector) InjectMIGRATED(ctx context.Context, seqID in
490497 return p .injectNotification (notification )
491498}
492499
500+ func (p * ProxyNotificationInjector ) InjectFAILING_OVER (ctx context.Context , seqID int64 ) error {
501+ notification := formatFailingOverNotification (seqID )
502+ return p .injectNotification (notification )
503+ }
504+
505+ func (p * ProxyNotificationInjector ) InjectFAILED_OVER (ctx context.Context , seqID int64 ) error {
506+ notification := formatFailedOverNotification (seqID )
507+ return p .injectNotification (notification )
508+ }
509+
493510func (p * ProxyNotificationInjector ) injectNotification (notification string ) error {
494511 url := p .apiBaseURL + "/send-to-all-clients?encoding=raw"
495512 resp , err := p .httpClient .Post (url , "application/octet-stream" , strings .NewReader (notification ))
@@ -541,9 +558,14 @@ func formatSMigratedNotification(seqID int64, endpoints ...string) string {
541558 return strings .Join (parts , "" )
542559}
543560
544- func formatMovingNotification (seqID int64 , slot int ) string {
545- slotStr := fmt .Sprintf ("%d" , slot )
546- return fmt .Sprintf (">3\r \n $6\r \n MOVING\r \n :%d\r \n $%d\r \n %s\r \n " , seqID , len (slotStr ), slotStr )
561+ func formatMovingNotification (seqID int64 , timeS int64 , endpoint string ) string {
562+ // Format: ["MOVING", seqID, timeS, endpoint]
563+ if endpoint == "" {
564+ // 3 elements: MOVING, seqID, timeS
565+ return fmt .Sprintf (">3\r \n $6\r \n MOVING\r \n :%d\r \n :%d\r \n " , seqID , timeS )
566+ }
567+ // 4 elements: MOVING, seqID, timeS, endpoint
568+ return fmt .Sprintf (">4\r \n $6\r \n MOVING\r \n :%d\r \n :%d\r \n $%d\r \n %s\r \n " , seqID , timeS , len (endpoint ), endpoint )
547569}
548570
549571func formatMigratingNotification (seqID int64 , slot int ) string {
@@ -556,6 +578,16 @@ func formatMigratedNotification(seqID int64, slot int) string {
556578 return fmt .Sprintf (">3\r \n $8\r \n MIGRATED\r \n :%d\r \n $%d\r \n %s\r \n " , seqID , len (slotStr ), slotStr )
557579}
558580
581+ func formatFailingOverNotification (seqID int64 ) string {
582+ // Format: ["FAILING_OVER", seqID]
583+ return fmt .Sprintf (">2\r \n $12\r \n FAILING_OVER\r \n :%d\r \n " , seqID )
584+ }
585+
586+ func formatFailedOverNotification (seqID int64 ) string {
587+ // Format: ["FAILED_OVER", seqID]
588+ return fmt .Sprintf (">2\r \n $11\r \n FAILED_OVER\r \n :%d\r \n " , seqID )
589+ }
590+
559591
560592// FaultInjectorNotificationInjector implements NotificationInjector using the real fault injector
561593type FaultInjectorNotificationInjector struct {
@@ -646,9 +678,9 @@ func (f *FaultInjectorNotificationInjector) InjectSMIGRATED(ctx context.Context,
646678 return fmt .Errorf ("SMIGRATED cannot be directly injected with real fault injector - it's generated when migration completes" )
647679}
648680
649- func (f * FaultInjectorNotificationInjector ) InjectMOVING (ctx context.Context , seqID int64 , slot int ) error {
650- // MOVING notifications are generated during slot migration
651- return fmt .Errorf ("MOVING cannot be directly injected with real fault injector - it's generated during migration " )
681+ func (f * FaultInjectorNotificationInjector ) InjectMOVING (ctx context.Context , seqID int64 , timeS int64 , endpoint string ) error {
682+ // MOVING notifications are generated during bind action
683+ return fmt .Errorf ("MOVING cannot be directly injected with real fault injector - it's generated during bind action " )
652684}
653685
654686func (f * FaultInjectorNotificationInjector ) InjectMIGRATING (ctx context.Context , seqID int64 , slot int ) error {
@@ -667,4 +699,13 @@ func (f *FaultInjectorNotificationInjector) InjectMIGRATED(ctx context.Context,
667699 return fmt .Errorf ("MIGRATED cannot be directly injected with real fault injector - it's generated when migration completes" )
668700}
669701
702+ func (f * FaultInjectorNotificationInjector ) InjectFAILING_OVER (ctx context.Context , seqID int64 ) error {
703+ // FAILING_OVER is generated automatically when failover starts
704+ return fmt .Errorf ("FAILING_OVER cannot be directly injected with real fault injector - it's generated when failover starts" )
705+ }
706+
707+ func (f * FaultInjectorNotificationInjector ) InjectFAILED_OVER (ctx context.Context , seqID int64 ) error {
708+ // FAILED_OVER is generated automatically when failover completes
709+ return fmt .Errorf ("FAILED_OVER cannot be directly injected with real fault injector - it's generated when failover completes" )
710+ }
670711
0 commit comments