@@ -11,6 +11,7 @@ use spin_core::async_trait;
1111use spin_factor_blobstore:: { Error , Container , ContainerManager } ;
1212
1313pub struct BlobStoreS3 {
14+ builder : object_store:: aws:: AmazonS3Builder ,
1415 client : async_once_cell:: Lazy <
1516 aws_sdk_s3:: Client ,
1617 std:: pin:: Pin < Box < dyn std:: future:: Future < Output = aws_sdk_s3:: Client > + Send > > ,
@@ -69,6 +70,16 @@ impl BlobStoreS3 {
6970 region : String ,
7071 auth_options : BlobStoreS3AuthOptions ,
7172 ) -> Result < Self > {
73+ let builder = match & auth_options {
74+ BlobStoreS3AuthOptions :: RuntimeConfigValues ( config) =>
75+ object_store:: aws:: AmazonS3Builder :: new ( )
76+ . with_region ( & region)
77+ . with_access_key_id ( & config. access_key )
78+ . with_secret_access_key ( & config. secret_key )
79+ . with_token ( config. token . clone ( ) . unwrap_or_default ( ) ) ,
80+ BlobStoreS3AuthOptions :: Environmental => object_store:: aws:: AmazonS3Builder :: from_env ( ) ,
81+ } ;
82+
7283 let region_clone = region. clone ( ) ;
7384 let client_fut = Box :: pin ( async move {
7485 let sdk_config = match auth_options {
@@ -84,15 +95,18 @@ impl BlobStoreS3 {
8495 aws_sdk_s3:: Client :: new ( & sdk_config)
8596 } ) ;
8697
87- Ok ( Self { client : async_once_cell:: Lazy :: from_future ( client_fut) } )
98+ Ok ( Self { builder , client : async_once_cell:: Lazy :: from_future ( client_fut) } )
8899 }
89100}
90101
91102#[ async_trait]
92103impl ContainerManager for BlobStoreS3 {
93104 async fn get ( & self , name : & str ) -> Result < Arc < dyn Container > , Error > {
105+ let store = self . builder . clone ( ) . with_bucket_name ( name) . build ( ) . map_err ( |e| e. to_string ( ) ) ?;
106+
94107 Ok ( Arc :: new ( S3Container {
95108 name : name. to_owned ( ) ,
109+ store,
96110 client : self . client . get_unpin ( ) . await . clone ( ) ,
97111 } ) )
98112 }
@@ -108,6 +122,7 @@ impl ContainerManager for BlobStoreS3 {
108122
109123struct S3Container {
110124 name : String ,
125+ store : object_store:: aws:: AmazonS3 ,
111126 client : aws_sdk_s3:: Client ,
112127}
113128
@@ -183,14 +198,25 @@ impl Container for S3Container {
183198 Ok ( Box :: new ( S3IncomingData :: new ( resp) ) )
184199 }
185200
186- async fn connect_stm ( & self , name : & str , stm : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > , finished_tx : tokio:: sync:: mpsc:: Sender < ( ) > ) -> anyhow:: Result < ( ) > {
187- let client = self . client . clone ( ) ;
188- let bucket = self . name . clone ( ) ;
189- let name = name. to_owned ( ) ;
190- let byte_stm = to_byte_stream ( stm) ;
201+ async fn connect_stm ( & self , name : & str , mut stm : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > , finished_tx : tokio:: sync:: mpsc:: Sender < ( ) > ) -> anyhow:: Result < ( ) > {
202+ let store = self . store . clone ( ) ;
203+ let path = object_store:: path:: Path :: from ( name) ;
191204
192205 tokio:: spawn ( async move {
193- client. put_object ( ) . bucket ( & bucket) . key ( name) . body ( byte_stm) . send ( ) . await . unwrap ( ) ;
206+ use object_store:: ObjectStore ;
207+ let mupload = store. put_multipart ( & path) . await . unwrap ( ) ;
208+ let mut writer = object_store:: WriteMultipart :: new ( mupload) ;
209+ loop {
210+ use tokio:: io:: AsyncReadExt ;
211+ let mut buf = vec ! [ 0 ; 5 * 1024 * 1024 ] ;
212+ let read_amount = stm. read ( & mut buf) . await . unwrap ( ) ;
213+ if read_amount == 0 {
214+ break ;
215+ }
216+ buf. truncate ( read_amount) ;
217+ writer. put ( buf. into ( ) ) ;
218+ }
219+ writer. finish ( ) . await . unwrap ( ) ;
194220 finished_tx. send ( ( ) ) . await . expect ( "should sent finish tx" ) ;
195221 } ) ;
196222
@@ -203,14 +229,6 @@ impl Container for S3Container {
203229 }
204230}
205231
206- fn to_byte_stream ( read : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > ) -> aws_sdk_s3:: primitives:: ByteStream {
207- use futures:: StreamExt ;
208-
209- let stm = tokio_util:: io:: ReaderStream :: new ( read) . map ( |item| item. map ( |by| http_body:: Frame :: data ( by) ) ) ;
210- let stm_body = http_body_util:: StreamBody :: new ( stm) ;
211- aws_sdk_s3:: primitives:: ByteStream :: from_body_1_x ( stm_body)
212- }
213-
214232struct S3IncomingData {
215233 get_obj_resp : Option < get_object:: GetObjectOutput > ,
216234}
0 commit comments