@@ -13,7 +13,7 @@ use sandpolis_core::{InstanceId, RealmName};
1313use serde:: de:: Visitor ;
1414use serde:: { Deserialize , Deserializer , Serialize , Serializer } ;
1515use std:: collections:: { BTreeMap , HashMap } ;
16- use std:: ops:: { Range , RangeBounds , RangeFrom } ;
16+ use std:: ops:: { Add , Range , RangeBounds , RangeFrom } ;
1717use std:: path:: Path ;
1818use std:: sync:: atomic:: AtomicU64 ;
1919use std:: { marker:: PhantomData , sync:: Arc } ;
@@ -138,44 +138,32 @@ impl RealmDatabase {
138138 // Safe to end the transaction once the watcher is registered
139139 drop ( r) ;
140140
141- let cancel_token = CancellationToken :: new ( ) ;
142- let token = cancel_token. clone ( ) ;
143- let inner = Arc :: new ( RwLock :: new ( item) ) ;
141+ let token = CancellationToken :: new ( ) ;
142+ let resident = Resident {
143+ inner : Arc :: new ( RwLock :: new ( item) ) ,
144+ db : self . 0 . clone ( ) ,
145+ watch : Some ( ( watch_id, token. clone ( ) ) ) ,
146+ } ;
147+ let resident_clone = resident. clone ( ) ;
144148
145149 tokio:: spawn ( {
146- let inner_clone = Arc :: clone ( & inner) ;
147150 async move {
148151 loop {
149152 tokio:: select! {
150153 _ = token. cancelled( ) => {
151154 break ;
152155 }
153156 event = channel. recv( ) => match event {
154- Some ( event) => match event {
155- Event :: Insert ( data) => { }
156- Event :: Update ( data) => match data. inner_new( ) {
157- Ok ( d) => {
158- let mut c = inner_clone. write( ) . await ;
159- * c = d;
160- } ,
161- Err ( _) => { } ,
162- }
163- Event :: Delete ( _) => warn!( "Deleting a singleton is undefined" ) ,
164- }
165- None => {
166- break ;
167- }
157+ Some ( event) => resident_clone. handle_event( event) . await ,
158+ None => break ,
159+
168160 }
169161 }
170162 }
171163 }
172164 } ) ;
173165
174- Ok ( Resident {
175- inner,
176- db : self . 0 . clone ( ) ,
177- watch : Some ( ( watch_id, cancel_token) ) ,
178- } )
166+ Ok ( resident)
179167 }
180168
181169 /// Create a `ResidentVec` for items matching the given query.
@@ -257,9 +245,13 @@ impl RealmDatabase {
257245 }
258246 Event :: Update ( data) => match data. inner_new:: <T >( ) {
259247 Ok ( d) => {
260- let mut c = inner_clone. write( ) . await ;
248+ let c = inner_clone. write( ) . await ;
261249 match ( * c) . get( & d. id( ) ) {
262- Some ( _) => todo!( ) ,
250+ Some ( r) => {
251+ let mut r = r. inner. write( ) . await ;
252+ * r = d;
253+ } ,
254+ // Got an update before insert?
263255 None => todo!( ) ,
264256 }
265257 } ,
@@ -287,44 +279,83 @@ impl RealmDatabase {
287279/// `Data` are rows in a database.
288280pub trait Data
289281where
290- Self : ToInput + Clone + PartialEq + Send + Sync + Default ,
282+ Self : ToInput + Clone + PartialEq + Send + Sync + Default + std :: fmt :: Debug ,
291283{
292284 /// Get the unique identifier for this particular `Data`.
293285 fn id ( & self ) -> DataIdentifier ;
294286
287+ fn set_id ( & mut self , id : DataIdentifier ) ;
288+
295289 fn revision ( & self ) -> DataRevision ;
296290
291+ fn set_revision ( & mut self , revision : DataRevision ) ;
292+
297293 fn revision_key ( ) -> KeyDefinition < KeyOptions > {
298294 KeyDefinition :: new (
299295 Self :: native_model_id ( ) ,
300296 Self :: native_model_version ( ) ,
301297 "_revision" ,
302- <String >:: key_names ( ) ,
298+ <DataRevision >:: key_names ( ) ,
299+ KeyOptions {
300+ unique : false ,
301+ optional : false ,
302+ } ,
303+ )
304+ }
305+
306+ fn creation ( & self ) -> DataCreation ;
307+
308+ fn creation_key ( ) -> KeyDefinition < KeyOptions > {
309+ KeyDefinition :: new (
310+ Self :: native_model_id ( ) ,
311+ Self :: native_model_version ( ) ,
312+ "_creation" ,
313+ <DataCreation >:: key_names ( ) ,
303314 KeyOptions {
304315 unique : false ,
305316 optional : false ,
306317 } ,
307318 )
308319 }
309320
310- /// If an expiration timestamp is given, previous generations of the `Data`
321+ /// If an expiration timestamp is given, previous revisions of the `Data`
311322 /// will be kept until the expiration date. Otherwise, only the latest
312- /// generation is kept.
323+ /// revision is kept.
313324 fn expiration ( & self ) -> Option < DataExpiration > ;
314325}
315326
316327// TODO Eq based only on inner?
317328
318329/// Revisions are previous versions of `Data`.
319- #[ derive( Eq , PartialEq , Debug , Clone , Copy , Hash ) ]
330+ #[ derive( Eq , Debug , Clone , Copy , Hash ) ]
320331pub enum DataRevision {
321- Latest ( DateTime < Utc > ) ,
322- Previous ( DateTime < Utc > ) ,
332+ Latest ( i64 ) ,
333+ Previous ( i64 ) ,
334+ }
335+
336+ impl Add < i64 > for DataRevision {
337+ type Output = DataRevision ;
338+
339+ fn add ( self , rhs : i64 ) -> Self :: Output {
340+ match self {
341+ DataRevision :: Latest ( v) => DataRevision :: Latest ( v + rhs) ,
342+ DataRevision :: Previous ( v) => DataRevision :: Previous ( v + rhs) ,
343+ }
344+ }
323345}
324346
325- impl DataRevision {
326- pub fn latest ( ) -> Self {
327- Self :: Latest ( DateTime :: default ( ) )
347+ impl From < DataRevision > for i64 {
348+ fn from ( value : DataRevision ) -> Self {
349+ match value {
350+ DataRevision :: Latest ( v) => v,
351+ DataRevision :: Previous ( v) => v,
352+ }
353+ }
354+ }
355+
356+ impl PartialEq for DataRevision {
357+ fn eq ( & self , other : & Self ) -> bool {
358+ Into :: < i64 > :: into ( * self ) == Into :: < i64 > :: into ( * other)
328359 }
329360}
330361
@@ -334,9 +365,9 @@ impl Serialize for DataRevision {
334365 S : Serializer ,
335366 {
336367 serializer. serialize_i64 ( match self {
337- DataRevision :: Latest ( dt ) => dt . timestamp_millis ( ) ,
368+ DataRevision :: Latest ( v ) => * v ,
338369 // Previous values are stored as negative so we can tell the difference
339- DataRevision :: Previous ( dt ) => -dt . timestamp_millis ( ) ,
370+ DataRevision :: Previous ( v ) => -v ,
340371 } )
341372 }
342373}
@@ -355,15 +386,9 @@ impl<'de> Visitor<'de> for DataGenerationVisitor {
355386 E : serde:: de:: Error ,
356387 {
357388 if value < 0 {
358- Ok ( DataRevision :: Previous (
359- DateTime :: from_timestamp_millis ( -value)
360- . ok_or ( serde:: de:: Error :: custom ( "Timestamp out of range" ) ) ?,
361- ) )
389+ Ok ( DataRevision :: Previous ( -value) )
362390 } else {
363- Ok ( DataRevision :: Latest (
364- DateTime :: from_timestamp_millis ( value)
365- . ok_or ( serde:: de:: Error :: custom ( "Timestamp out of range" ) ) ?,
366- ) )
391+ Ok ( DataRevision :: Latest ( value) )
367392 }
368393 }
369394}
@@ -379,17 +404,17 @@ impl<'de> Deserialize<'de> for DataRevision {
379404
380405impl Default for DataRevision {
381406 fn default ( ) -> Self {
382- Self :: Latest ( DateTime :: default ( ) )
407+ Self :: Latest ( 1 )
383408 }
384409}
385410
386411impl ToKey for DataRevision {
387412 fn to_key ( & self ) -> native_db:: Key {
388413 native_db:: Key :: new (
389414 match self {
390- DataRevision :: Latest ( dt ) => dt . timestamp_millis ( ) ,
415+ DataRevision :: Latest ( v ) => * v ,
391416 // Previous values are stored as negative so we can tell the difference
392- DataRevision :: Previous ( dt ) => -dt . timestamp_millis ( ) ,
417+ DataRevision :: Previous ( v ) => -v ,
393418 }
394419 . to_be_bytes ( )
395420 . to_vec ( ) ,
@@ -404,6 +429,8 @@ impl ToKey for DataRevision {
404429/// Uniquely identifies a record in the database.
405430pub type DataIdentifier = u64 ;
406431
432+ /// When some `Data` will expire and no longer be returnable by queries.
433+ /// Eventually it will be removed from the database altogether.
407434#[ derive( Serialize , Deserialize , Debug , PartialEq , Eq , Copy , Clone ) ]
408435pub struct DataExpiration ( DateTime < Utc > ) ;
409436
@@ -423,6 +450,27 @@ impl ToKey for DataExpiration {
423450 }
424451}
425452
453+ /// When some `Data` was first added to the database. Modification times are not
454+ /// tracked for individual `Data` because revisions handle that.
455+ #[ derive( Serialize , Deserialize , Debug , PartialEq , Eq , Copy , Clone ) ]
456+ pub struct DataCreation ( DateTime < Utc > ) ;
457+
458+ impl Default for DataCreation {
459+ fn default ( ) -> Self {
460+ Self ( Utc :: now ( ) )
461+ }
462+ }
463+
464+ impl ToKey for DataCreation {
465+ fn to_key ( & self ) -> native_db:: Key {
466+ native_db:: Key :: new ( self . 0 . timestamp_millis ( ) . to_be_bytes ( ) . to_vec ( ) )
467+ }
468+
469+ fn key_names ( ) -> Vec < String > {
470+ vec ! [ "DataCreation" . to_string( ) ]
471+ }
472+ }
473+
426474// TODO special case of ResVec?
427475/// Maintains a real-time cache of persistent objects in the database of
428476/// type `T`.
@@ -452,30 +500,59 @@ impl<T: Data + 'static> Resident<T> {
452500 pub async fn read ( & self ) -> RwLockReadGuard < ' _ , T > {
453501 self . inner . read ( ) . await
454502 }
503+
504+ async fn handle_event ( & self , event : Event ) {
505+ match event {
506+ Event :: Insert ( data) => { }
507+ Event :: Update ( data) => match data. inner_new ( ) {
508+ Ok ( d) => {
509+ // TODO check revision and discard updates from `update`
510+ let mut c = self . inner . write ( ) . await ;
511+ * c = d;
512+ }
513+ Err ( _) => { }
514+ } ,
515+ Event :: Delete ( _) => warn ! ( "Deleting a singleton is undefined" ) ,
516+ }
517+ }
455518}
456519
457520impl < T : Data > Resident < T > {
458521 pub async fn update < F > ( & self , mutator : F ) -> Result < ( ) >
459522 where
460523 F : Fn ( & mut T ) -> Result < ( ) > ,
461524 {
462- let cache = self . inner . read ( ) . await ;
463- let mut next = cache . clone ( ) ;
525+ let mut inner = self . inner . write ( ) . await ;
526+ let mut next = inner . clone ( ) ;
464527 mutator ( & mut next) ?;
465528
466- if next != * cache {
467- let rw = self . db . rw_transaction ( ) ?;
468- // TODO make sure revision is one less
469- rw. upsert ( next. clone ( ) ) ?;
470- rw. commit ( ) ?;
529+ if next == * inner {
530+ trace ! ( "Update did not change state" ) ;
531+ return Ok ( ( ) ) ;
532+ }
533+ assert_eq ! ( next. id( ) , inner. id( ) , "Primary key changed" ) ;
534+
535+ // Bump revisions
536+ next. set_revision ( next. revision ( ) + 1 ) ;
537+ inner. set_revision ( DataRevision :: Previous ( next. revision ( ) . into ( ) ) ) ;
538+
539+ trace ! ( next = ?next, "Updated" ) ;
540+
541+ let rw = self . db . rw_transaction ( ) ?;
471542
472- drop ( cache) ;
543+ if inner. expiration ( ) . is_some ( ) {
544+ // New id
545+ next. set_id ( DataIdentifier :: default ( ) ) ;
473546
474- // TODO the watcher should probably do this
475- let mut cache = self . inner . write ( ) . await ;
476- * cache = next;
547+ rw. insert ( next. clone ( ) ) ?;
548+ rw. upsert ( inner. clone ( ) ) ?;
549+ } else {
550+ rw. upsert ( next. clone ( ) ) ?;
477551 }
478552
553+ rw. commit ( ) ?;
554+
555+ * inner = next;
479556 Ok ( ( ) )
480557 }
481558
@@ -530,6 +607,7 @@ mod test_resident {
530607 }
531608
532609 #[ tokio:: test]
610+ #[ test_log:: test]
533611 async fn test_data ( ) -> Result < ( ) > {
534612 let models = Box :: leak ( Box :: new ( Models :: new ( ) ) ) ;
535613 models. define :: < TestData > ( ) . unwrap ( ) ;
@@ -570,7 +648,8 @@ mod test_resident {
570648 let rw = db. rw_transaction ( ) ?;
571649 rw. upsert ( TestData {
572650 _id : res. read ( ) . await . _id ,
573- _revision : DataRevision :: latest ( ) ,
651+ _revision : DataRevision :: default ( ) ,
652+ _creation : DataCreation :: default ( ) ,
574653 a : "test 10" . into ( ) ,
575654 b : "" . into ( ) ,
576655 } ) ?;
@@ -589,7 +668,7 @@ mod test_resident {
589668 let models = Box :: leak ( Box :: new ( Models :: new ( ) ) ) ;
590669 models. define :: < TestHistoryData > ( ) . unwrap ( ) ;
591670
592- let mut database = DatabaseLayer :: new (
671+ let database = DatabaseLayer :: new (
593672 DatabaseConfig {
594673 storage : None ,
595674 ephemeral : true ,
@@ -738,6 +817,19 @@ pub trait DataQuery<T: Data> {
738817 }
739818}
740819
820+ macro_rules! collect_conditions {
821+ ( $scan: ident, $condition: ident, $( $conditions: ident) ,* ) => {
822+ match condition. clone( ) {
823+ DataCondition :: Equal ( key, value) => scan. secondary( key) ?. equal( value) ?. try_collect( ) ?,
824+ DataCondition :: Range ( key, value) => {
825+ let secondary = scan. secondary( key) ?;
826+ let it = secondary. range( value) ?;
827+ it. try_collect( ) ?
828+ }
829+ }
830+ } ;
831+ }
832+
741833impl < T : Data > DataQuery < T > for ( ) {
742834 fn query ( & self , scan : RScan ) -> Result < Vec < T > > {
743835 Ok ( scan. primary ( ) ?. all ( ) ?. try_collect ( ) ?)
@@ -843,7 +935,8 @@ mod test_resident_vec {
843935 let rw = db. rw_transaction ( ) ?;
844936 rw. upsert ( TestData {
845937 _id : data. read ( ) . await . _id ,
846- _revision : DataRevision :: latest ( ) ,
938+ _revision : DataRevision :: default ( ) ,
939+ _creation : DataCreation :: default ( ) ,
847940 a : "test 10" . into ( ) ,
848941 b : "" . into ( ) ,
849942 } ) ?;
0 commit comments