11use std:: sync:: Arc ;
2- use tokio:: sync:: Mutex ;
32
43use anyhow:: Result ;
5- // use azure_data_cosmos::{
6- // prelude::{AuthorizationToken, CollectionClient, CosmosClient, Query},
7- // CosmosEntity,
8- // };
94use azure_storage_blobs:: prelude:: { BlobServiceClient , ContainerClient } ;
10- // use futures::StreamExt;
11- // use serde::{Deserialize, Serialize};
125use spin_core:: async_trait;
136use spin_factor_blobstore:: { Error , Container , ContainerManager } ;
147
15- pub struct BlobStoreAzureBlob {
16- client : BlobServiceClient ,
17- // client: CollectionClient,
18- }
8+ pub mod auth;
9+ mod incoming_data;
10+ mod object_names;
1911
20- /// Azure Cosmos Key / Value runtime config literal options for authentication
21- #[ derive( Clone , Debug ) ]
22- pub struct BlobStoreAzureRuntimeConfigOptions {
23- account : String ,
24- key : String ,
25- }
12+ use auth:: AzureBlobAuthOptions ;
13+ use incoming_data:: AzureIncomingData ;
14+ use object_names:: AzureObjectNames ;
2615
27- impl BlobStoreAzureRuntimeConfigOptions {
28- pub fn new ( account : String , key : String ) -> Self {
29- Self { account, key }
30- }
31- }
32-
33- /// Azure Cosmos Key / Value enumeration for the possible authentication options
34- #[ derive( Clone , Debug ) ]
35- pub enum BlobStoreAzureAuthOptions {
36- /// Runtime Config values indicates the account and key have been specified directly
37- RuntimeConfigValues ( BlobStoreAzureRuntimeConfigOptions ) ,
38- /// Environmental indicates that the environment variables of the process should be used to
39- /// create the StorageCredentials for the storage client. For now this uses old school credentials:
40- ///
41- /// STORAGE_ACCOUNT
42- /// STORAGE_ACCESS_KEY
43- ///
44- /// TODO: Thorsten pls make this proper with *hand waving* managed identity and stuff!
45- Environmental ,
16+ pub struct AzureContainerManager {
17+ client : BlobServiceClient ,
4618}
4719
48- impl BlobStoreAzureBlob {
20+ impl AzureContainerManager {
4921 pub fn new (
50- // account: String,
51- // container: String,
52- auth_options : BlobStoreAzureAuthOptions ,
22+ auth_options : AzureBlobAuthOptions ,
5323 ) -> Result < Self > {
5424 let ( account, credentials) = match auth_options {
55- BlobStoreAzureAuthOptions :: RuntimeConfigValues ( config) => {
25+ AzureBlobAuthOptions :: AccountKey ( config) => {
5626 ( config. account . clone ( ) , azure_storage:: StorageCredentials :: access_key ( & config. account , config. key . clone ( ) ) )
5727 } ,
58- BlobStoreAzureAuthOptions :: Environmental => {
28+ AzureBlobAuthOptions :: Environmental => {
5929 let account = std:: env:: var ( "STORAGE_ACCOUNT" ) . expect ( "missing STORAGE_ACCOUNT" ) ;
6030 let access_key = std:: env:: var ( "STORAGE_ACCESS_KEY" ) . expect ( "missing STORAGE_ACCOUNT_KEY" ) ;
6131 ( account. clone ( ) , azure_storage:: StorageCredentials :: access_key ( account, access_key) )
@@ -68,10 +38,10 @@ impl BlobStoreAzureBlob {
6838}
6939
7040#[ async_trait]
71- impl ContainerManager for BlobStoreAzureBlob {
41+ impl ContainerManager for AzureContainerManager {
7242 async fn get ( & self , name : & str ) -> Result < Arc < dyn Container > , Error > {
73- Ok ( Arc :: new ( AzureBlobContainer {
74- _name : name. to_owned ( ) ,
43+ Ok ( Arc :: new ( AzureContainer {
44+ _label : name. to_owned ( ) ,
7545 client : self . client . container_client ( name) ,
7646 } ) )
7747 }
@@ -85,16 +55,16 @@ impl ContainerManager for BlobStoreAzureBlob {
8555 }
8656}
8757
88- struct AzureBlobContainer {
89- _name : String ,
58+ struct AzureContainer {
59+ _label : String ,
9060 client : ContainerClient ,
9161}
9262
9363/// Azure doesn't provide us with a container creation time
9464const DUMMY_CREATED_AT : u64 = 0 ;
9565
9666#[ async_trait]
97- impl Container for AzureBlobContainer {
67+ impl Container for AzureContainer {
9868 async fn exists ( & self ) -> anyhow:: Result < bool > {
9969 Ok ( self . client . exists ( ) . await ?)
10070 }
@@ -150,32 +120,24 @@ impl Container for AzureBlobContainer {
150120 azure_core:: request_options:: Range :: Range ( start..( end + 1 ) )
151121 } ;
152122 let client = self . client . blob_client ( name) ;
153- Ok ( Box :: new ( AzureBlobIncomingData :: new ( client, range) ) )
123+ Ok ( Box :: new ( AzureIncomingData :: new ( client, range) ) )
154124 }
155125
156126 async fn connect_stm ( & self , name : & str , mut stm : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > , finished_tx : tokio:: sync:: mpsc:: Sender < ( ) > ) -> anyhow:: Result < ( ) > {
157127 use tokio:: io:: AsyncReadExt ;
158128
159- // It seems like we can't construct a SeekableStream over a SimplexStream, which
160- // feels unfortunate. I am not sure that the outgoing-value interface gives
161- // us a way to construct a len-able stream, because we don't know until finish
162- // time how much the guest is going to write to it. (We might be able to do resettable...
163- // but len-able...) So for now we read it into a buffer and then zoosh that up in
164- // one go.
165- //
166- // We can kind of work around this by doing a series of Put Block calls followed by
167- // a Put Block List. So we need to buffer only each block. But that still requires
168- // care as you are limited to 50_000 committed / 100_000 uncommitted blocks.
169-
170- const APPROX_BLOCK_SIZE : usize = 2 * 1024 * 1024 ;
129+ // Azure limits us to 50k blocks per blob. At 2MB/block that allows 100GB, which will be
130+ // enough for most use cases. If users need flexibility for larger blobs, we could make
131+ // the block size configurable via the runtime config ("size hint" or something).
132+ const BLOCK_SIZE : usize = 2 * 1024 * 1024 ;
171133
172134 let client = self . client . blob_client ( name) ;
173135
174136 tokio:: spawn ( async move {
175137 let mut blocks = vec ! [ ] ;
176138
177139 ' put_blocks: loop {
178- let mut bytes = Vec :: with_capacity ( APPROX_BLOCK_SIZE ) ; // 2MB buffer x 50k blocks per blob = 100GB. WHICH SHOULD BE ENOUGH FOR ANYONE.
140+ let mut bytes = Vec :: with_capacity ( BLOCK_SIZE ) ;
179141 loop {
180142 let read = stm. read_buf ( & mut bytes) . await . unwrap ( ) ;
181143 let len = bytes. len ( ) ;
@@ -188,7 +150,7 @@ impl Container for AzureBlobContainer {
188150 blocks. push ( azure_storage_blobs:: blob:: BlobBlockType :: Uncommitted ( block_id) ) ;
189151 break ' put_blocks;
190152 }
191- if len >= APPROX_BLOCK_SIZE {
153+ if len >= BLOCK_SIZE {
192154 let id_bytes = uuid:: Uuid :: new_v4 ( ) . as_bytes ( ) . to_vec ( ) ;
193155 let block_id = azure_storage_blobs:: prelude:: BlockId :: new ( id_bytes) ;
194156 client. put_block ( block_id. clone ( ) , bytes) . await . unwrap ( ) ;
@@ -211,171 +173,7 @@ impl Container for AzureBlobContainer {
211173
212174 async fn list_objects ( & self ) -> anyhow:: Result < Box < dyn spin_factor_blobstore:: ObjectNames > > {
213175 let stm = self . client . list_blobs ( ) . into_stream ( ) ;
214- Ok ( Box :: new ( AzureBlobBlobsList :: new ( stm) ) )
176+ Ok ( Box :: new ( AzureObjectNames :: new ( stm) ) )
215177 }
216178}
217179
218- struct AzureBlobIncomingData {
219- // The Mutex is used to make it Send
220- stm : Mutex < Option <
221- azure_core:: Pageable <
222- azure_storage_blobs:: blob:: operations:: GetBlobResponse ,
223- azure_core:: error:: Error
224- >
225- > > ,
226- client : azure_storage_blobs:: prelude:: BlobClient ,
227- }
228-
229- impl AzureBlobIncomingData {
230- fn new ( client : azure_storage_blobs:: prelude:: BlobClient , range : azure_core:: request_options:: Range ) -> Self {
231- let stm = client. get ( ) . range ( range) . into_stream ( ) ;
232- Self {
233- stm : Mutex :: new ( Some ( stm) ) ,
234- client,
235- }
236- }
237-
238- fn consume_async_impl ( & mut self ) -> wasmtime_wasi:: pipe:: AsyncReadStream {
239- use futures:: TryStreamExt ;
240- use tokio_util:: compat:: FuturesAsyncReadCompatExt ;
241- let stm = self . consume_as_stream ( ) ;
242- let ar = stm. into_async_read ( ) ;
243- let arr = ar. compat ( ) ;
244- wasmtime_wasi:: pipe:: AsyncReadStream :: new ( arr)
245- }
246-
247- fn consume_as_stream ( & mut self ) -> impl futures:: stream:: Stream < Item = Result < Vec < u8 > , std:: io:: Error > > {
248- use futures:: StreamExt ;
249- let opt_stm = self . stm . get_mut ( ) ;
250- let stm = opt_stm. take ( ) . unwrap ( ) ;
251- let byte_stm = stm. flat_map ( |chunk| streamify_chunk ( chunk. unwrap ( ) . data ) ) ;
252- byte_stm
253- }
254- }
255-
256- fn streamify_chunk ( chunk : azure_core:: ResponseBody ) -> impl futures:: stream:: Stream < Item = Result < Vec < u8 > , std:: io:: Error > > {
257- use futures:: StreamExt ;
258- chunk. map ( |c| Ok ( c. unwrap ( ) . to_vec ( ) ) )
259- }
260-
261-
262- struct AzureBlobBlobsList {
263- // The Mutex is used to make it Send
264- stm : Mutex <
265- azure_core:: Pageable <
266- azure_storage_blobs:: container:: operations:: ListBlobsResponse ,
267- azure_core:: error:: Error
268- >
269- > ,
270- read_but_not_yet_returned : Vec < String > ,
271- end_stm_after_read_but_not_yet_returned : bool ,
272- }
273-
274- impl AzureBlobBlobsList {
275- fn new ( stm : azure_core:: Pageable <
276- azure_storage_blobs:: container:: operations:: ListBlobsResponse ,
277- azure_core:: error:: Error
278- > ) -> Self {
279- Self {
280- stm : Mutex :: new ( stm) ,
281- read_but_not_yet_returned : Default :: default ( ) ,
282- end_stm_after_read_but_not_yet_returned : false ,
283- }
284- }
285-
286- async fn read_impl ( & mut self , len : u64 ) -> anyhow:: Result < ( Vec < String > , bool ) > {
287- use futures:: StreamExt ;
288-
289- let len: usize = len. try_into ( ) . unwrap ( ) ;
290-
291- // If we have names outstanding, send that first. (We are allowed to send less than len,
292- // and so sending all pending stuff before paging, rather than trying to manage a mix of
293- // pending stuff with newly retrieved chunks, simplifies the code.)
294- if !self . read_but_not_yet_returned . is_empty ( ) {
295- if self . read_but_not_yet_returned . len ( ) <= len {
296- // We are allowed to send all pending names
297- let to_return = self . read_but_not_yet_returned . drain ( ..) . collect ( ) ;
298- return Ok ( ( to_return, self . end_stm_after_read_but_not_yet_returned ) ) ;
299- } else {
300- // Send as much as we can. The rest remains in the pending buffer to send,
301- // so this does not represent end of stream.
302- let to_return = self . read_but_not_yet_returned . drain ( 0 ..len) . collect ( ) ;
303- return Ok ( ( to_return, false ) ) ;
304- }
305- }
306-
307- // Get one chunk and send as much as we can of it. Aagin, we don't need to try to
308- // pack the full length here - we can send chunk by chunk.
309-
310- let Some ( chunk) = self . stm . get_mut ( ) . next ( ) . await else {
311- return Ok ( ( vec ! [ ] , false ) ) ;
312- } ;
313- let chunk = chunk. unwrap ( ) ;
314-
315- // TODO: do we need to prefix these with a prefix from somewhere or do they include it?
316- let mut names: Vec < _ > = chunk. blobs . blobs ( ) . map ( |blob| blob. name . clone ( ) ) . collect ( ) ;
317- let at_end = chunk. next_marker . is_none ( ) ;
318-
319- if names. len ( ) <= len {
320- // We can send them all!
321- return Ok ( ( names, at_end) ) ;
322- } else {
323- // We have more names than we can send in this response. Send what we can and
324- // stash the rest.
325- let to_return: Vec < _ > = names. drain ( 0 ..len) . collect ( ) ;
326- self . read_but_not_yet_returned = names;
327- self . end_stm_after_read_but_not_yet_returned = at_end;
328- return Ok ( ( to_return, false ) ) ;
329- }
330- }
331- }
332-
333- #[ async_trait]
334- impl spin_factor_blobstore:: IncomingData for AzureBlobIncomingData {
335- async fn consume_sync ( & mut self ) -> anyhow:: Result < Vec < u8 > > {
336- use futures:: StreamExt ;
337- let mut data = vec ! [ ] ;
338- let Some ( pageable) = self . stm . get_mut ( ) else {
339- anyhow:: bail!( "oh no" ) ;
340- } ;
341-
342- loop {
343- let Some ( chunk) = pageable. next ( ) . await else {
344- break ;
345- } ;
346- let chunk = chunk. unwrap ( ) ;
347- let by = chunk. data . collect ( ) . await . unwrap ( ) ;
348- data. extend ( by. to_vec ( ) ) ;
349- }
350-
351- Ok ( data)
352- }
353-
354- fn consume_async ( & mut self ) -> wasmtime_wasi:: pipe:: AsyncReadStream { // Box<dyn futures::stream::Stream<Item = Result<Vec<u8>, std::io::Error>>> {
355- self . consume_async_impl ( )
356- }
357-
358- async fn size ( & mut self ) -> anyhow:: Result < u64 > {
359- // TODO: in theory this should be infallible once we have the IncomingData
360- // object. But in practice if we use the Pageable for that we don't get it until
361- // we do the first read. So that would force us to either pre-fetch the
362- // first chunk or to issue a properties request *just in case* size() was
363- // called. So I'm making it fallible for now.
364- Ok ( self . client . get_properties ( ) . await ?. blob . properties . content_length )
365- }
366- }
367-
368- #[ async_trait]
369- impl spin_factor_blobstore:: ObjectNames for AzureBlobBlobsList {
370- async fn read ( & mut self , len : u64 ) -> anyhow:: Result < ( Vec < String > , bool ) > {
371- self . read_impl ( len) . await // Separate function because rust-analyser gives better intellisense when async_trait isn't in the picture!
372- }
373-
374- async fn skip ( & mut self , num : u64 ) -> anyhow:: Result < ( u64 , bool ) > {
375- // TODO: there is a question (raised as an issue on the repo) about the required behaviour
376- // here. For now I assume that skipping fewer than `num` is allowed as long as we are
377- // honest about it. Because it is easier that is why.
378- let ( skipped, at_end) = self . read_impl ( num) . await ?;
379- Ok ( ( skipped. len ( ) . try_into ( ) . unwrap ( ) , at_end) )
380- }
381- }
0 commit comments