11use anyhow:: Result ;
22use spin_core:: wasmtime:: component:: Resource ;
3- use spin_world:: spin:: postgres:: postgres:: { self as v3} ;
3+ use spin_world:: spin:: postgres3_0_0:: postgres:: { self as v3} ;
4+ use spin_world:: spin:: postgres4_0_0:: postgres:: { self as v4} ;
45use spin_world:: v1:: postgres as v1;
56use spin_world:: v1:: rdbms_types as v1_types;
67use spin_world:: v2:: postgres:: { self as v2} ;
@@ -16,25 +17,25 @@ impl<CF: ClientFactory> InstanceState<CF> {
1617 async fn open_connection < Conn : ' static > (
1718 & mut self ,
1819 address : & str ,
19- ) -> Result < Resource < Conn > , v3 :: Error > {
20+ ) -> Result < Resource < Conn > , v4 :: Error > {
2021 self . connections
2122 . push (
2223 self . client_factory
2324 . get_client ( address)
2425 . await
25- . map_err ( |e| v3 :: Error :: ConnectionFailed ( format ! ( "{e:?}" ) ) ) ?,
26+ . map_err ( |e| v4 :: Error :: ConnectionFailed ( format ! ( "{e:?}" ) ) ) ?,
2627 )
27- . map_err ( |_| v3 :: Error :: ConnectionFailed ( "too many connections" . into ( ) ) )
28+ . map_err ( |_| v4 :: Error :: ConnectionFailed ( "too many connections" . into ( ) ) )
2829 . map ( Resource :: new_own)
2930 }
3031
3132 async fn get_client < Conn : ' static > (
3233 & self ,
3334 connection : Resource < Conn > ,
34- ) -> Result < & CF :: Client , v3 :: Error > {
35+ ) -> Result < & CF :: Client , v4 :: Error > {
3536 self . connections
3637 . get ( connection. rep ( ) )
37- . ok_or_else ( || v3 :: Error :: ConnectionFailed ( "no connection found" . into ( ) ) )
38+ . ok_or_else ( || v4 :: Error :: ConnectionFailed ( "no connection found" . into ( ) ) )
3839 }
3940
4041 async fn is_address_allowed ( & self , address : & str ) -> Result < bool > {
@@ -68,11 +69,15 @@ impl<CF: ClientFactory> InstanceState<CF> {
6869
6970fn v2_params_to_v3 (
7071 params : Vec < v2_types:: ParameterValue > ,
71- ) -> Result < Vec < v3 :: ParameterValue > , v2:: Error > {
72+ ) -> Result < Vec < v4 :: ParameterValue > , v2:: Error > {
7273 params. into_iter ( ) . map ( |p| p. try_into ( ) ) . collect ( )
7374}
7475
75- impl < CF : ClientFactory > spin_world:: spin:: postgres:: postgres:: HostConnection for InstanceState < CF > {
76+ fn v3_params_to_v4 ( params : Vec < v3:: ParameterValue > ) -> Vec < v4:: ParameterValue > {
77+ params. into_iter ( ) . map ( |p| p. into ( ) ) . collect ( )
78+ }
79+
80+ impl < CF : ClientFactory > v3:: HostConnection for InstanceState < CF > {
7681 #[ instrument( name = "spin_outbound_pg.open" , skip( self , address) , err( level = Level :: INFO ) , fields( otel. kind = "client" , db. system = "postgresql" , db. address = Empty , server. port = Empty , db. namespace = Empty ) ) ]
7782 async fn open ( & mut self , address : String ) -> Result < Resource < v3:: Connection > , v3:: Error > {
7883 spin_factor_outbound_networking:: record_address_fields ( & address) ;
@@ -86,7 +91,7 @@ impl<CF: ClientFactory> spin_world::spin::postgres::postgres::HostConnection for
8691 "address {address} is not permitted"
8792 ) ) ) ;
8893 }
89- self . open_connection ( & address) . await
94+ Ok ( self . open_connection ( & address) . await ? )
9095 }
9196
9297 #[ instrument( name = "spin_outbound_pg.execute" , skip( self , connection, params) , err( level = Level :: INFO ) , fields( otel. kind = "client" , db. system = "postgresql" , otel. name = statement) ) ]
@@ -96,10 +101,11 @@ impl<CF: ClientFactory> spin_world::spin::postgres::postgres::HostConnection for
96101 statement : String ,
97102 params : Vec < v3:: ParameterValue > ,
98103 ) -> Result < u64 , v3:: Error > {
99- self . get_client ( connection)
104+ Ok ( self
105+ . get_client ( connection)
100106 . await ?
101- . execute ( statement, params)
102- . await
107+ . execute ( statement, v3_params_to_v4 ( params) )
108+ . await ? )
103109 }
104110
105111 #[ instrument( name = "spin_outbound_pg.query" , skip( self , connection, params) , err( level = Level :: INFO ) , fields( otel. kind = "client" , db. system = "postgresql" , otel. name = statement) ) ]
@@ -109,13 +115,64 @@ impl<CF: ClientFactory> spin_world::spin::postgres::postgres::HostConnection for
109115 statement : String ,
110116 params : Vec < v3:: ParameterValue > ,
111117 ) -> Result < v3:: RowSet , v3:: Error > {
118+ Ok ( self
119+ . get_client ( connection)
120+ . await ?
121+ . query ( statement, v3_params_to_v4 ( params) )
122+ . await ?
123+ . into ( ) )
124+ }
125+
126+ async fn drop ( & mut self , connection : Resource < v3:: Connection > ) -> anyhow:: Result < ( ) > {
127+ self . connections . remove ( connection. rep ( ) ) ;
128+ Ok ( ( ) )
129+ }
130+ }
131+
132+ impl < CF : ClientFactory > v4:: HostConnection for InstanceState < CF > {
133+ #[ instrument( name = "spin_outbound_pg.open" , skip( self , address) , err( level = Level :: INFO ) , fields( otel. kind = "client" , db. system = "postgresql" , db. address = Empty , server. port = Empty , db. namespace = Empty ) ) ]
134+ async fn open ( & mut self , address : String ) -> Result < Resource < v4:: Connection > , v4:: Error > {
135+ spin_factor_outbound_networking:: record_address_fields ( & address) ;
136+
137+ if !self
138+ . is_address_allowed ( & address)
139+ . await
140+ . map_err ( |e| v4:: Error :: Other ( e. to_string ( ) ) ) ?
141+ {
142+ return Err ( v4:: Error :: ConnectionFailed ( format ! (
143+ "address {address} is not permitted"
144+ ) ) ) ;
145+ }
146+ self . open_connection ( & address) . await
147+ }
148+
149+ #[ instrument( name = "spin_outbound_pg.execute" , skip( self , connection, params) , err( level = Level :: INFO ) , fields( otel. kind = "client" , db. system = "postgresql" , otel. name = statement) ) ]
150+ async fn execute (
151+ & mut self ,
152+ connection : Resource < v4:: Connection > ,
153+ statement : String ,
154+ params : Vec < v4:: ParameterValue > ,
155+ ) -> Result < u64 , v4:: Error > {
156+ self . get_client ( connection)
157+ . await ?
158+ . execute ( statement, params)
159+ . await
160+ }
161+
162+ #[ instrument( name = "spin_outbound_pg.query" , skip( self , connection, params) , err( level = Level :: INFO ) , fields( otel. kind = "client" , db. system = "postgresql" , otel. name = statement) ) ]
163+ async fn query (
164+ & mut self ,
165+ connection : Resource < v4:: Connection > ,
166+ statement : String ,
167+ params : Vec < v4:: ParameterValue > ,
168+ ) -> Result < v4:: RowSet , v4:: Error > {
112169 self . get_client ( connection)
113170 . await ?
114171 . query ( statement, params)
115172 . await
116173 }
117174
118- async fn drop ( & mut self , connection : Resource < v3 :: Connection > ) -> anyhow:: Result < ( ) > {
175+ async fn drop ( & mut self , connection : Resource < v4 :: Connection > ) -> anyhow:: Result < ( ) > {
119176 self . connections . remove ( connection. rep ( ) ) ;
120177 Ok ( ( ) )
121178 }
@@ -133,10 +190,16 @@ impl<CF: ClientFactory> v3::Host for InstanceState<CF> {
133190 }
134191}
135192
193+ impl < CF : ClientFactory > v4:: Host for InstanceState < CF > {
194+ fn convert_error ( & mut self , error : v4:: Error ) -> Result < v4:: Error > {
195+ Ok ( error)
196+ }
197+ }
198+
136199/// Delegate a function call to the v3::HostConnection implementation
137200macro_rules! delegate {
138201 ( $self: ident. $name: ident( $address: expr, $( $arg: expr) ,* ) ) => { {
139- if !$self. is_address_allowed( & $address) . await . map_err( |e| v3 :: Error :: Other ( e. to_string( ) ) ) ? {
202+ if !$self. is_address_allowed( & $address) . await . map_err( |e| v4 :: Error :: Other ( e. to_string( ) ) ) ? {
140203 return Err ( v1:: PgError :: ConnectionFailed ( format!(
141204 "address {} is not permitted" , $address
142205 ) ) ) ;
@@ -145,7 +208,7 @@ macro_rules! delegate {
145208 Ok ( c) => c,
146209 Err ( e) => return Err ( e. into( ) ) ,
147210 } ;
148- <Self as v3 :: HostConnection >:: $name( $self, connection, $( $arg) ,* )
211+ <Self as v4 :: HostConnection >:: $name( $self, connection, $( $arg) ,* )
149212 . await
150213 . map_err( |e| e. into( ) )
151214 } } ;
0 commit comments