@@ -123,49 +123,12 @@ impl Container for AzureContainer {
123123 Ok ( Box :: new ( AzureIncomingData :: new ( client, range) ) )
124124 }
125125
126- async fn connect_stm ( & self , name : & str , mut stm : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > , finished_tx : tokio:: sync:: mpsc:: Sender < ( ) > ) -> anyhow:: Result < ( ) > {
127- use tokio:: io:: AsyncReadExt ;
128-
129- // Azure limits us to 50k blocks per blob. At 2MB/block that allows 100GB, which will be
130- // enough for most use cases. If users need flexibility for larger blobs, we could make
131- // the block size configurable via the runtime config ("size hint" or something).
132- const BLOCK_SIZE : usize = 2 * 1024 * 1024 ;
133-
126+ async fn connect_stm ( & self , name : & str , stm : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > , finished_tx : tokio:: sync:: mpsc:: Sender < anyhow:: Result < ( ) > > ) -> anyhow:: Result < ( ) > {
134127 let client = self . client . blob_client ( name) ;
135128
136129 tokio:: spawn ( async move {
137- let mut blocks = vec ! [ ] ;
138-
139- ' put_blocks: loop {
140- let mut bytes = Vec :: with_capacity ( BLOCK_SIZE ) ;
141- loop {
142- let read = stm. read_buf ( & mut bytes) . await . unwrap ( ) ;
143- let len = bytes. len ( ) ;
144-
145- if read == 0 {
146- // end of stream - send the last block and go
147- let id_bytes = uuid:: Uuid :: new_v4 ( ) . as_bytes ( ) . to_vec ( ) ;
148- let block_id = azure_storage_blobs:: prelude:: BlockId :: new ( id_bytes) ;
149- client. put_block ( block_id. clone ( ) , bytes) . await . unwrap ( ) ;
150- blocks. push ( azure_storage_blobs:: blob:: BlobBlockType :: Uncommitted ( block_id) ) ;
151- break ' put_blocks;
152- }
153- if len >= BLOCK_SIZE {
154- let id_bytes = uuid:: Uuid :: new_v4 ( ) . as_bytes ( ) . to_vec ( ) ;
155- let block_id = azure_storage_blobs:: prelude:: BlockId :: new ( id_bytes) ;
156- client. put_block ( block_id. clone ( ) , bytes) . await . unwrap ( ) ;
157- blocks. push ( azure_storage_blobs:: blob:: BlobBlockType :: Uncommitted ( block_id) ) ;
158- break ;
159- }
160- }
161- }
162-
163- let block_list = azure_storage_blobs:: blob:: BlockList {
164- blocks
165- } ;
166- client. put_block_list ( block_list) . await . unwrap ( ) ;
167-
168- finished_tx. send ( ( ) ) . await . expect ( "should sent finish tx" ) ;
130+ let result = Self :: connect_stm_core ( stm, client) . await ;
131+ finished_tx. send ( result) . await . expect ( "should sent finish tx" ) ;
169132 } ) ;
170133
171134 Ok ( ( ) )
@@ -177,3 +140,46 @@ impl Container for AzureContainer {
177140 }
178141}
179142
143+ impl AzureContainer {
144+ async fn connect_stm_core ( mut stm : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > , client : azure_storage_blobs:: prelude:: BlobClient ) -> anyhow:: Result < ( ) > {
145+ use tokio:: io:: AsyncReadExt ;
146+
147+ // Azure limits us to 50k blocks per blob. At 2MB/block that allows 100GB, which will be
148+ // enough for most use cases. If users need flexibility for larger blobs, we could make
149+ // the block size configurable via the runtime config ("size hint" or something).
150+ const BLOCK_SIZE : usize = 2 * 1024 * 1024 ;
151+
152+ let mut blocks = vec ! [ ] ;
153+
154+ ' put_blocks: loop {
155+ let mut bytes = Vec :: with_capacity ( BLOCK_SIZE ) ;
156+ loop {
157+ let read = stm. read_buf ( & mut bytes) . await ?;
158+ let len = bytes. len ( ) ;
159+
160+ if read == 0 {
161+ // end of stream - send the last block and go
162+ let id_bytes = uuid:: Uuid :: new_v4 ( ) . as_bytes ( ) . to_vec ( ) ;
163+ let block_id = azure_storage_blobs:: prelude:: BlockId :: new ( id_bytes) ;
164+ client. put_block ( block_id. clone ( ) , bytes) . await ?;
165+ blocks. push ( azure_storage_blobs:: blob:: BlobBlockType :: Uncommitted ( block_id) ) ;
166+ break ' put_blocks;
167+ }
168+ if len >= BLOCK_SIZE {
169+ let id_bytes = uuid:: Uuid :: new_v4 ( ) . as_bytes ( ) . to_vec ( ) ;
170+ let block_id = azure_storage_blobs:: prelude:: BlockId :: new ( id_bytes) ;
171+ client. put_block ( block_id. clone ( ) , bytes) . await ?;
172+ blocks. push ( azure_storage_blobs:: blob:: BlobBlockType :: Uncommitted ( block_id) ) ;
173+ break ;
174+ }
175+ }
176+ }
177+
178+ let block_list = azure_storage_blobs:: blob:: BlockList {
179+ blocks
180+ } ;
181+ client. put_block_list ( block_list) . await ?;
182+
183+ Ok ( ( ) )
184+ }
185+ }
0 commit comments