44use crate :: config:: DatabaseConfig ;
55use anyhow:: { Result , anyhow, bail} ;
66use chrono:: { DateTime , Utc } ;
7- use native_db:: db_type:: { KeyDefinition , KeyOptions , ToKeyDefinition } ;
7+ use native_db:: db_type:: { KeyDefinition , KeyEntry , KeyOptions , ToKeyDefinition } ;
88use native_db:: transaction:: RTransaction ;
99use native_db:: transaction:: query:: SecondaryScanIterator ;
1010use native_db:: watch:: Event ;
1111use native_db:: { Key , Models , ToInput , ToKey } ;
12- use sandpolis_core:: { RealmName , InstanceId } ;
12+ use sandpolis_core:: { InstanceId , RealmName } ;
1313use serde:: { Deserialize , Serialize } ;
1414use std:: collections:: { BTreeMap , HashMap } ;
15- use std:: ops:: Range ;
15+ use std:: ops:: { Range , RangeFrom } ;
1616use std:: path:: Path ;
1717use std:: sync:: atomic:: AtomicU64 ;
1818use std:: { marker:: PhantomData , sync:: Arc } ;
@@ -83,6 +83,10 @@ impl DatabaseLayer {
8383 }
8484}
8585
86+ // TODO if we define update, etc on these traits and pass in the db as a
87+ // parameter, we can use dynamic dispatch to handle different impl for versioned
88+ // Data instead of separate helper struct
89+
8690/// `Data` is what's stored in a database!
8791pub trait Data
8892where
@@ -114,7 +118,8 @@ where
114118 }
115119}
116120
117- /// `Data` that expires and will be removed from the database.
121+ /// `Data` that expires and will be removed from the database after a certain
122+ /// time.
118123pub trait ExpiringData
119124where
120125 Self : Data ,
@@ -165,12 +170,10 @@ impl ToKey for DbTimestamp {
165170pub type DataIdentifier = u64 ;
166171pub type DataExpiration = DateTime < Utc > ;
167172
168- // TODO rename Resident?
169-
170173/// Maintains a real-time cache of persistent objects in the database of
171174/// type `T`.
172175#[ derive( Clone ) ]
173- pub struct Watch < T >
176+ pub struct Resident < T >
174177where
175178 T : Data ,
176179{
@@ -184,14 +187,14 @@ where
184187 cancel_token : CancellationToken ,
185188}
186189
187- impl < T : Data > Drop for Watch < T > {
190+ impl < T : Data > Drop for Resident < T > {
188191 fn drop ( & mut self ) {
189192 self . cancel_token . cancel ( ) ;
190193 self . db . unwatch ( self . watch_id ) . unwrap ( ) ;
191194 }
192195}
193196
194- impl < T : Data + Default + ' static > Watch < T > {
197+ impl < T : Data + Default + ' static > Resident < T > {
195198 /// Create a new `Watch` when there's only one row in the database.
196199 pub fn singleton ( db : Arc < native_db:: Database < ' static > > ) -> Result < Self > {
197200 let r = db. r_transaction ( ) ?;
@@ -260,7 +263,7 @@ impl<T: Data + Default + 'static> Watch<T> {
260263 }
261264}
262265
263- impl < T : Data > Watch < T > {
266+ impl < T : Data > Resident < T > {
264267 pub async fn update < F > ( & self , mutator : F ) -> Result < ( ) >
265268 where
266269 F : Fn ( & mut T ) -> Result < ( ) > ,
@@ -284,18 +287,32 @@ impl<T: Data> Watch<T> {
284287 }
285288}
286289
287- impl < T : HistoricalData > Watch < T > {
288- pub fn history ( & self , range : Range < DbTimestamp > ) -> Result < Vec < T > > {
290+ impl < T : HistoricalData > Resident < T > {
291+ pub async fn history ( & self , range : RangeFrom < DbTimestamp > ) -> Result < Vec < T > > {
292+ // Get values of all secondary keys
293+ let mut secondary_keys = ( * self . cache . read ( ) . await ) . native_db_secondary_keys ( ) ;
294+
295+ // Remove timestamp because we're going to set that one manually
296+ secondary_keys. retain ( |key_def, _| * key_def != T :: timestamp_key ( ) ) ;
297+
289298 let r = self . db . r_transaction ( ) ?;
290- Ok ( r. scan ( )
291- . secondary ( T :: timestamp_key ( ) ) ?
292- . range ( range) ?
293- . try_collect ( ) ?)
299+
300+ // TODO use the most restrictive condition first for performance
301+ let mut it = r. scan ( ) . secondary ( T :: timestamp_key ( ) ) ?. range ( range) ?;
302+
303+ for ( key_def, key) in secondary_keys. into_iter ( ) {
304+ it = it. and ( r. scan ( ) . secondary ( key_def) ?. equal ( match key {
305+ KeyEntry :: Default ( key) => key,
306+ KeyEntry :: Optional ( key) => key. unwrap ( ) , // TODO
307+ } ) ?) ;
308+ }
309+
310+ Ok ( it. try_collect ( ) ?)
294311 }
295312}
296313
297314#[ cfg( test) ]
298- mod test_database {
315+ mod test_resident {
299316 use super :: * ;
300317 use anyhow:: Result ;
301318 use native_db:: Models ;
@@ -318,6 +335,21 @@ mod test_database {
318335 pub b : String ,
319336 }
320337
338+ #[ derive( Serialize , Deserialize , PartialEq , Debug , Clone , Default , Data , HistoricalData ) ]
339+ #[ native_model( id = 6 , version = 1 ) ]
340+ #[ native_db]
341+ pub struct TestHistoryData {
342+ #[ primary_key]
343+ pub _id : DataIdentifier ,
344+
345+ #[ secondary_key]
346+ pub _timestamp : DbTimestamp ,
347+
348+ #[ secondary_key]
349+ pub a : String ,
350+ pub b : String ,
351+ }
352+
321353 #[ tokio:: test]
322354 async fn test_build_database ( ) -> Result < ( ) > {
323355 let models = Box :: leak ( Box :: new ( Models :: new ( ) ) ) ;
@@ -333,7 +365,7 @@ mod test_database {
333365 database. add_realm ( "default" . parse ( ) ?) . await ?;
334366
335367 let db = database. get ( Some ( "default" . parse ( ) ?) ) . await ?;
336- let watch: Watch < TestData > = Watch :: singleton ( db. clone ( ) ) ?;
368+ let watch: Resident < TestData > = Resident :: singleton ( db. clone ( ) ) ?;
337369
338370 // Update data a bunch of times
339371 for i in 1 ..10 {
@@ -370,15 +402,62 @@ mod test_database {
370402
371403 Ok ( ( ) )
372404 }
405+
406+ #[ tokio:: test]
407+ async fn test_historical_data ( ) -> Result < ( ) > {
408+ let models = Box :: leak ( Box :: new ( Models :: new ( ) ) ) ;
409+ models. define :: < TestHistoryData > ( ) . unwrap ( ) ;
410+
411+ let mut database = DatabaseLayer :: new (
412+ DatabaseConfig {
413+ storage : None ,
414+ ephemeral : true ,
415+ } ,
416+ models,
417+ ) ?;
418+
419+ let db = database. get ( None ) . await ?;
420+ let watch: Resident < TestHistoryData > = Resident :: singleton ( db. clone ( ) ) ?;
421+
422+ // Update data a bunch of times
423+ for i in 1 ..10 {
424+ watch
425+ . update ( |data| {
426+ data. b = format ! ( "test {i}" ) ;
427+ Ok ( ( ) )
428+ } )
429+ . await ?;
430+ }
431+
432+ // Database should have 10 items
433+ {
434+ let r = db. r_transaction ( ) ?;
435+ let items: Vec < TestHistoryData > = r. scan ( ) . primary ( ) ?. all ( ) ?. try_collect ( ) ?;
436+ assert_eq ! ( items. len( ) , 10 ) ;
437+ }
438+
439+ // Check history
440+ {
441+ assert_eq ! (
442+ watch
443+ . history( DbTimestamp :: Latest ( DateTime :: default ( ) ) ..)
444+ . await ?
445+ . len( ) ,
446+ 0
447+ ) ;
448+ }
449+
450+ Ok ( ( ) )
451+ }
373452}
374453
375454#[ derive( Clone ) ]
376- pub struct TimeResVec < T >
455+ pub struct ResidentVec < T >
377456where
378- T : HistoricalData ,
457+ T : Data ,
379458{
380459 db : Arc < native_db:: Database < ' static > > ,
381- cache : Arc < RwLock < T > > ,
460+ cache : Arc < RwLock < Vec < Resident < T > > > > ,
382461
383462 /// Used to stop the database from sending updates
384463 watch_id : u64 ,
@@ -387,9 +466,15 @@ where
387466 cancel_token : CancellationToken ,
388467}
389468
390- impl < T : HistoricalData > Drop for TimeResVec < T > {
469+ impl < T : Data > Drop for ResidentVec < T > {
391470 fn drop ( & mut self ) {
392471 self . cancel_token . cancel ( ) ;
393472 self . db . unwatch ( self . watch_id ) . unwrap ( ) ;
394473 }
395474}
475+
476+ impl < T : Data > ResidentVec < T > {
477+ pub fn is_detached ( & self ) -> bool {
478+ false
479+ }
480+ }
0 commit comments