22
33mod spin;
44
5- use std:: { collections:: HashMap , sync :: Arc } ;
5+ use std:: collections:: HashMap ;
66
7- use anyhow:: Result ;
7+ use anyhow:: { Context , Result } ;
88use async_trait:: async_trait;
99use futures:: StreamExt ;
1010use redis:: { Client , ConnectionLike } ;
11- use spin_manifest:: { ComponentMap , RedisConfig , RedisTriggerConfiguration , TriggerConfig } ;
12- use spin_redis:: SpinRedisData ;
13- use spin_trigger:: { cli:: NoArgs , TriggerExecutor } ;
11+ use serde:: { de:: IgnoredAny , Deserialize , Serialize } ;
12+ use spin_trigger_new:: { cli:: NoArgs , TriggerAppEngine , TriggerExecutor } ;
1413
1514use crate :: spin:: SpinRedisExecutor ;
1615
1716wit_bindgen_wasmtime:: import!( { paths: [ "../../wit/ephemeral/spin-redis.wit" ] , async : * } ) ;
1817
19- type ExecutionContext = spin_engine :: ExecutionContext < SpinRedisData > ;
20- type RuntimeContext = spin_engine :: RuntimeContext < SpinRedisData > ;
18+ pub ( crate ) type RuntimeData = spin_redis :: SpinRedisData ;
19+ pub ( crate ) type Store = spin_core :: Store < RuntimeData > ;
2120
2221/// The Spin Redis trigger.
23- #[ derive( Clone ) ]
2422pub struct RedisTrigger {
25- /// Trigger configuration.
26- trigger_config : RedisTriggerConfiguration ,
27- /// Component trigger configurations.
28- component_triggers : ComponentMap < RedisConfig > ,
29- /// Spin execution context.
30- engine : Arc < ExecutionContext > ,
31- /// Map from channel name to tuple of component name & index.
32- subscriptions : HashMap < String , usize > ,
23+ engine : TriggerAppEngine < Self > ,
24+ // Redis address to connect to
25+ address : String ,
26+ // Mapping of subscription channels to component IDs
27+ channel_components : HashMap < String , String > ,
3328}
3429
35- pub struct RedisTriggerConfig ( String , RedisConfig ) ;
36-
37- impl TryFrom < ( String , TriggerConfig ) > for RedisTriggerConfig {
38- type Error = spin_manifest:: Error ;
39-
40- fn try_from ( ( component, config) : ( String , TriggerConfig ) ) -> Result < Self , Self :: Error > {
41- Ok ( RedisTriggerConfig ( component, config. try_into ( ) ?) )
42- }
30+ /// Redis trigger configuration.
31+ #[ derive( Clone , Debug , Default , Deserialize , Serialize ) ]
32+ #[ serde( deny_unknown_fields) ]
33+ pub struct RedisTriggerConfig {
34+ /// Component ID to invoke
35+ pub component : String ,
36+ /// Channel to subscribe to
37+ pub channel : String ,
38+ /// Trigger executor (currently unused)
39+ #[ serde( default , skip_serializing) ]
40+ pub executor : IgnoredAny ,
4341}
4442
4543#[ async_trait]
4644impl TriggerExecutor for RedisTrigger {
47- type GlobalConfig = RedisTriggerConfiguration ;
45+ const TRIGGER_TYPE : & ' static str = "redis" ;
46+ type RuntimeData = RuntimeData ;
4847 type TriggerConfig = RedisTriggerConfig ;
4948 type RunConfig = NoArgs ;
50- type RuntimeContext = SpinRedisData ;
51-
52- fn new (
53- execution_context : ExecutionContext ,
54- global_config : Self :: GlobalConfig ,
55- trigger_configs : impl IntoIterator < Item = Self :: TriggerConfig > ,
56- ) -> Result < Self > {
57- let component_triggers: ComponentMap < RedisConfig > = trigger_configs
58- . into_iter ( )
59- . map ( |config| ( config. 0 , config. 1 ) )
60- . collect ( ) ;
61- let subscriptions = execution_context
62- . config
63- . components
64- . iter ( )
65- . enumerate ( )
66- . filter_map ( |( idx, component) | {
67- component_triggers
68- . get ( & component. id )
69- . map ( |redis_config| ( redis_config. channel . clone ( ) , idx) )
70- } )
49+
50+ fn new ( engine : TriggerAppEngine < Self > ) -> Result < Self > {
51+ let address = engine
52+ . app ( )
53+ . require_metadata ( "redis_address" )
54+ . context ( "Failed to configure Redis trigger" ) ?;
55+
56+ let channel_components = engine
57+ . trigger_configs ( )
58+ . map ( |( _, config) | ( config. channel . clone ( ) , config. component . clone ( ) ) )
7159 . collect ( ) ;
7260
7361 Ok ( Self {
74- trigger_config : global_config,
75- component_triggers,
76- engine : Arc :: new ( execution_context) ,
77- subscriptions,
62+ engine,
63+ address,
64+ channel_components,
7865 } )
7966 }
8067
8168 /// Run the Redis trigger indefinitely.
8269 async fn run ( self , _config : Self :: RunConfig ) -> Result < ( ) > {
83- let address = self . trigger_config . address . as_str ( ) ;
70+ let address = & self . address ;
8471
8572 tracing:: info!( "Connecting to Redis server at {}" , address) ;
8673 let mut client = Client :: open ( address. to_string ( ) ) ?;
8774 let mut pubsub = client. get_async_connection ( ) . await ?. into_pubsub ( ) ;
8875
8976 // Subscribe to channels
90- for ( subscription, idx) in self . subscriptions . iter ( ) {
91- let name = & self . engine . config . components [ * idx] . id ;
92- tracing:: info!(
93- "Subscribed component #{} ({}) to channel: {}" ,
94- idx,
95- name,
96- subscription
97- ) ;
98- pubsub. subscribe ( subscription) . await ?;
77+ for ( channel, component) in self . channel_components . iter ( ) {
78+ tracing:: info!( "Subscribing component {component:?} to channel {channel:?}" ) ;
79+ pubsub. subscribe ( channel) . await ?;
9980 }
10081
10182 let mut stream = pubsub. on_message ( ) ;
@@ -120,35 +101,12 @@ impl RedisTrigger {
120101 let channel = msg. get_channel_name ( ) ;
121102 tracing:: info!( "Received message on channel {:?}" , channel) ;
122103
123- if let Some ( idx) = self . subscriptions . get ( channel) . copied ( ) {
124- let component = & self . engine . config . components [ idx] ;
125- let executor = self
126- . component_triggers
127- . get ( & component. id )
128- . and_then ( |t| t. executor . clone ( ) )
129- . unwrap_or_default ( ) ;
130-
131- let follow = self
132- . engine
133- . config
134- . follow_components
135- . should_follow ( & component. id ) ;
136-
137- match executor {
138- spin_manifest:: RedisExecutor :: Spin => {
139- tracing:: trace!( "Executing Spin Redis component {}" , component. id) ;
140- let executor = SpinRedisExecutor ;
141- executor
142- . execute (
143- & self . engine ,
144- & component. id ,
145- channel,
146- msg. get_payload_bytes ( ) ,
147- follow,
148- )
149- . await ?
150- }
151- } ;
104+ if let Some ( component_id) = self . channel_components . get ( channel) {
105+ tracing:: trace!( "Executing Redis component {component_id:?}" ) ;
106+ let executor = SpinRedisExecutor ;
107+ executor
108+ . execute ( & self . engine , component_id, channel, msg. get_payload_bytes ( ) )
109+ . await ?
152110 } else {
153111 tracing:: debug!( "No subscription found for {:?}" , channel) ;
154112 }
@@ -163,11 +121,10 @@ impl RedisTrigger {
163121pub ( crate ) trait RedisExecutor : Clone + Send + Sync + ' static {
164122 async fn execute (
165123 & self ,
166- engine : & ExecutionContext ,
167- component : & str ,
124+ engine : & TriggerAppEngine < RedisTrigger > ,
125+ component_id : & str ,
168126 channel : & str ,
169127 payload : & [ u8 ] ,
170- follow : bool ,
171128 ) -> Result < ( ) > ;
172129}
173130
0 commit comments