Skip to content

Commit 24a8eb6

Browse files
committed
wip: ResidentVecBuilder
1 parent 45dfb8e commit 24a8eb6

File tree

3 files changed

+188
-57
lines changed

3 files changed

+188
-57
lines changed

sandpolis-database/src/lib.rs

Lines changed: 176 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@
44
use crate::config::DatabaseConfig;
55
use anyhow::{Result, anyhow, bail};
66
use chrono::{DateTime, Utc};
7-
use native_db::db_type::{KeyDefinition, KeyEntry, KeyOptions, ToKeyDefinition};
7+
use native_db::db_type::{KeyDefinition, KeyEntry, KeyOptions, KeyRange, ToKeyDefinition};
88
use native_db::transaction::RTransaction;
9-
use native_db::transaction::query::SecondaryScanIterator;
9+
use native_db::transaction::query::{SecondaryScan, SecondaryScanIterator};
1010
use native_db::watch::Event;
1111
use native_db::{Key, Models, ToInput, ToKey};
1212
use sandpolis_core::{InstanceId, RealmName};
1313
use serde::{Deserialize, Serialize};
1414
use std::collections::{BTreeMap, HashMap};
15-
use std::ops::{Range, RangeFrom};
15+
use std::ops::{Range, RangeBounds, RangeFrom};
1616
use std::path::Path;
1717
use std::sync::atomic::AtomicU64;
1818
use std::{marker::PhantomData, sync::Arc};
19-
use tokio::sync::{RwLock, RwLockReadGuard};
19+
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
2020
use tokio_util::sync::CancellationToken;
2121
use tracing::instrument::WithSubscriber;
2222
use tracing::{debug, trace};
@@ -27,59 +27,78 @@ pub mod config;
2727
pub struct DatabaseLayer {
2828
config: DatabaseConfig,
2929
models: &'static Models,
30-
inner: Arc<RwLock<HashMap<Option<RealmName>, Arc<native_db::Database<'static>>>>>,
30+
inner: Arc<RwLock<HashMap<RealmName, RealmDatabase>>>,
3131
}
3232

3333
impl DatabaseLayer {
3434
pub fn new(config: DatabaseConfig, models: &'static Models) -> Result<Self> {
35-
let system = if let Some(path) = config.get_storage_dir()? {
36-
let path = path.join("system.db");
37-
debug!(path = %path.display(), "Initializing persistent system database");
35+
let default = if let Some(path) = config.get_storage_dir()? {
36+
let path = path.join("default.db");
3837

39-
Arc::new(native_db::Builder::new().create(models, path)?)
38+
debug!(path = %path.display(), "Initializing persistent default database");
39+
native_db::Builder::new().create(models, path)?
4040
} else {
41-
debug!("Initializing ephemeral system database");
42-
Arc::new(native_db::Builder::new().create_in_memory(models)?)
41+
debug!("Initializing ephemeral default database");
42+
native_db::Builder::new().create_in_memory(models)?
4343
};
4444

4545
Ok(Self {
4646
config,
4747
models,
48-
inner: Arc::new(RwLock::new(HashMap::from([(None, system)]))),
48+
inner: Arc::new(RwLock::new(HashMap::from([(
49+
RealmName::default(),
50+
RealmDatabase::new(default),
51+
)]))),
4952
})
5053
}
5154

52-
/// Load or create a new database for the given realm.
53-
pub async fn add_realm(
54-
&mut self,
55-
name: RealmName,
56-
) -> Result<Arc<native_db::Database<'static>>> {
57-
// Check for duplicates
58-
let mut inner = self.inner.write().await;
59-
if inner.contains_key(&Some(name.clone())) {
60-
bail!("Duplicate realm");
55+
/// Get a `RealmDatabase` for the given realm.
56+
pub async fn realm(&mut self, name: RealmName) -> Result<Arc<RealmDatabase>> {
57+
{
58+
let inner = self.inner.read().await;
59+
if let Some(db) = inner.get(&name) {
60+
return Ok(db.clone());
61+
}
6162
}
6263

64+
let mut inner = self.inner.write().await;
65+
6366
let db = if let Some(path) = self.config.get_storage_dir()? {
6467
let path = path.join(format!("{name}.db"));
6568

6669
debug!(realm = %name, path = %path.display(), "Initializing persistent realm database");
67-
Arc::new(native_db::Builder::new().create(self.models, path)?)
70+
Arc::new(RealmDatabase::new(
71+
native_db::Builder::new().create(self.models, path)?,
72+
))
6873
} else {
6974
debug!(realm = %name, "Initializing ephemeral realm database");
70-
Arc::new(native_db::Builder::new().create_in_memory(self.models)?)
75+
Arc::new(RealmDatabase::new(
76+
native_db::Builder::new().create_in_memory(self.models)?,
77+
))
7178
};
72-
inner.insert(Some(name), db.clone());
79+
inner.insert(name, db.clone());
7380

7481
Ok(db)
7582
}
83+
}
84+
85+
/// Database containing all `Data` for a particular realm.
86+
#[derive(Clone)]
87+
pub struct RealmDatabase(Arc<native_db::Database<'static>>);
7688

77-
pub async fn get(&self, name: Option<RealmName>) -> Result<Arc<native_db::Database<'static>>> {
78-
let inner = self.inner.read().await;
79-
if let Some(db) = inner.get(&name) {
80-
return Ok(db.clone());
89+
impl RealmDatabase {
90+
fn new(inner: native_db::Database<'static>) -> Self {
91+
Self(Arc::new(inner))
92+
}
93+
94+
pub fn singleton() {}
95+
96+
pub fn query<T: Data>(&self) -> ResidentVecBuilder<T> {
97+
ResidentVecBuilder {
98+
db: self.0.clone(),
99+
_phantom: PhantomData,
100+
conditions: Vec::new(),
81101
}
82-
bail!("Realm not found");
83102
}
84103
}
85104

@@ -180,17 +199,21 @@ where
180199
db: Arc<native_db::Database<'static>>,
181200
inner: Arc<RwLock<T>>,
182201

183-
/// Used to stop the database from sending updates
184-
watch_id: u64,
185-
186-
/// Allows the background update thread to be stopped
187-
cancel_token: CancellationToken,
202+
watch: Option<(u64, CancellationToken)>,
188203
}
189204

190205
impl<T: Data> Drop for Resident<T> {
191206
fn drop(&mut self) {
192-
self.cancel_token.cancel();
193-
self.db.unwatch(self.watch_id).unwrap();
207+
if let Some((watch_id, token)) = self.watch.as_ref() {
208+
token.cancel();
209+
self.db.unwatch(*watch_id).unwrap();
210+
}
211+
}
212+
}
213+
214+
impl<T: Data> Resident<T> {
215+
pub fn detached(value: T) -> Result<Self> {
216+
todo!()
194217
}
195218
}
196219

@@ -252,9 +275,8 @@ impl<T: Data + 'static> Resident<T> {
252275

253276
Ok(Self {
254277
inner: cache,
255-
watch_id,
256-
cancel_token,
257278
db,
279+
watch: Some((watch_id, cancel_token)),
258280
})
259281
}
260282

@@ -451,30 +473,134 @@ where
451473
db: Arc<native_db::Database<'static>>,
452474
inner: Arc<RwLock<Vec<Resident<T>>>>,
453475

454-
/// Used to stop the database from sending updates
455-
watch_id: u64,
456-
457-
/// Allows the background update thread to be stopped
458-
cancel_token: CancellationToken,
476+
watch: Option<(u64, CancellationToken)>,
459477
}
460478

461479
impl<T: Data> Drop for ResidentVec<T> {
462480
fn drop(&mut self) {
463-
self.cancel_token.cancel();
464-
self.db.unwatch(self.watch_id).unwrap();
481+
if let Some((watch_id, token)) = self.watch.as_ref() {
482+
token.cancel();
483+
self.db.unwatch(*watch_id).unwrap();
484+
}
465485
}
466486
}
467487

468488
impl<T: Data> ResidentVec<T> {
469489
pub fn is_detached(&self) -> bool {
470-
false
490+
self.watch.is_none()
471491
}
472492

473-
pub async fn read(&self) -> RwLockReadGuard<'_, Vec<Resident<T>>> {
474-
self.inner.read().await
493+
/// Create a new detached `ResidentVec` which will not receive any updates
494+
/// after its created.
495+
fn detached(db: Arc<native_db::Database<'static>>, conditions: Vec<Condition>) -> Result<Self> {
496+
let r = db.r_transaction()?;
497+
let scan = r.scan();
498+
499+
// We have to store these temporarily until we collect()
500+
let mut scans = Vec::new();
501+
502+
Ok(Self {
503+
inner: Arc::new(RwLock::new(
504+
conditions
505+
.into_iter()
506+
.map(|condition| match condition {
507+
Condition::Equal { key, value } => {
508+
let scan = scan.secondary(key).unwrap();
509+
let it = {
510+
let it_ref = scan.equal(value).unwrap();
511+
it_ref
512+
};
513+
scans.push(scan);
514+
it
515+
}
516+
Condition::Range { key, value } => {
517+
todo!()
518+
}
519+
})
520+
.reduce(|x, y| x.and(y))
521+
.unwrap()
522+
.map(|item| item.map(|i| Resident::detached(i).unwrap()))
523+
.try_collect()?,
524+
)),
525+
watch: None,
526+
db,
527+
})
475528
}
529+
}
476530

477-
pub async fn write(&self) -> RwLockWriteGuard<'_, Vec<Resident<T>>> {
478-
self.inner.write().await
531+
// Replicate some of the Vec API
532+
impl<T: Data> ResidentVec<T> {
533+
/// Returns the number of elements in the vector, also referred to
534+
/// as its 'length'.
535+
pub async fn len(&self) -> usize {
536+
self.inner.read().await.len()
479537
}
538+
539+
/// Appends an element to the back of a collection.
540+
pub async fn push(&self, value: T) -> Result<()> {
541+
// TODO check id collision?
542+
if self.watch.is_some() {
543+
// Insert and let watcher update `inner`
544+
let rw = self.db.rw_transaction()?;
545+
rw.insert(value)?;
546+
rw.commit()?;
547+
548+
// TODO wait for update?
549+
} else {
550+
// Detached, so simple append
551+
self.inner.write().await.push(Resident::detached(value)?);
552+
}
553+
554+
Ok(())
555+
}
556+
}
557+
558+
enum Condition {
559+
Equal {
560+
key: KeyDefinition<KeyOptions>,
561+
value: Key,
562+
},
563+
Range {
564+
key: KeyDefinition<KeyOptions>,
565+
value: KeyRange,
566+
},
567+
}
568+
569+
pub struct ResidentVecBuilder<T>
570+
where
571+
T: Data,
572+
{
573+
db: Arc<native_db::Database<'static>>,
574+
_phantom: PhantomData<T>,
575+
conditions: Vec<Condition>,
576+
}
577+
578+
impl<T: Data> ResidentVecBuilder<T> {
579+
pub fn equal(mut self, key: impl ToKeyDefinition<KeyOptions>, value: impl ToKey) -> Self {
580+
self.conditions.push(Condition::Equal {
581+
key: key.key_definition(),
582+
value: value.to_key(),
583+
});
584+
self
585+
}
586+
pub fn range<R: RangeBounds<impl ToKey>>(
587+
mut self,
588+
key: impl ToKeyDefinition<KeyOptions>,
589+
value: R,
590+
) -> Self {
591+
self.conditions.push(Condition::Range {
592+
key: key.key_definition(),
593+
value: KeyRange::new(value),
594+
});
595+
self
596+
}
597+
598+
pub fn current(mut self) -> Result<ResidentVec<T>> {
599+
ResidentVec::detached(self.db, self.conditions)
600+
}
601+
}
602+
603+
impl<T: HistoricalData> ResidentVecBuilder<T> {
604+
pub fn latest(mut self) {}
605+
pub fn previous(mut self) {}
480606
}

sandpolis-realm/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl RealmLayer {
4949
instance: InstanceLayer,
5050
) -> Result<Self> {
5151
// Load all realm databases
52-
let db = database.get(None).await?;
52+
let db = database.get(None).await?; // TODO default realm
5353
let r = db.r_transaction()?;
5454
for realm in r.scan().primary::<RealmData>()?.all()? {
5555
let realm = realm?;
@@ -110,7 +110,8 @@ impl RealmLayer {
110110
/// A realm is a set of clients and agents that can interact. Each realm has a
111111
/// global CA certificate that signs certificates used to connect to the server.
112112
///
113-
/// All servers have a default realm called "default".
113+
/// All servers have a default realm called "default". All `RealmData` entries
114+
/// are stored in the default realm.
114115
#[derive(Validate)]
115116
#[data]
116117
pub struct RealmData {

sandpolis-sysinfo/src/os/user/agent.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,19 @@ use crate::os::user::UserDataKey;
33
use anyhow::Result;
44
use native_db::Database;
55
use sandpolis_agent::Collector;
6-
use sandpolis_database::{DbTimestamp, ResidentVec};
6+
use sandpolis_database::{DbTimestamp, RealmDatabase, ResidentVec};
77
use sysinfo::Users;
88

99
pub struct UserCollector {
10-
data: ResidentVec<UserData>,
10+
db: RealmDatabase,
1111
users: Users,
1212
}
1313

1414
impl UserCollector {
15-
pub fn new(data: ResidentVec<UserData>) -> Self {
15+
pub fn new(db: RealmDatabase) -> Self {
1616
Self {
1717
users: Users::new(),
18-
data,
18+
db,
1919
}
2020
}
2121
}
@@ -25,7 +25,11 @@ impl Collector for UserCollector {
2525
self.users.refresh();
2626
trace!(info = ?self.users, "Polled user info");
2727

28-
let resident_users = self.data.write().await;
28+
let users: ResidentVec<UserData> = self
29+
.db
30+
.query()
31+
.equal(UserDataKey::uid, **user.id() as u64)
32+
.latest();
2933

3034
// Update or add any new users
3135
for user in self.users.list() {

0 commit comments

Comments
 (0)