Skip to content

Conversation

@jarhodes314
Copy link
Contributor

@jarhodes314 jarhodes314 commented Sep 12, 2025

Proposed changes

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue

#510

Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s. You can activate automatic signing by running just prepare-dev once)
  • I ran just format as mentioned in CODING_GUIDELINES
  • I used just check as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

@codecov
Copy link

codecov bot commented Sep 12, 2025

@jarhodes314
Copy link
Contributor Author

Database comparison

I've been comparing two database implementations. The first is fjall, an embedded key-value store written in Rust (what we settled on for the original hackathon implementation). The other is sqlite. Sqlite is advantageous if we want to expose the database for other users.

Performance

To measure the perfomance of the database APIs tedge-flows is using, I created a simple benchmark that inserts batches of data to three different series in a loop, then drains one of the series entirely. In practice, database IO will also have a necessary cost of invoking a javascript-based flow, but since that will execute identically with either database backend in use, I didn't attempt to measure the impact of this.

In terms of code-changes, I changed very little of my original implementation before measuring the performance. The only change I made was to use transactions when inserting multiple values, as this has a hugely significant impact on sqlite insertion performance. I used the equivalent "batch" feature of fjall as well, though in this case, this had a more modest impact on insert performance.

In terms of performance, at least with my current implementation, fjall is significantly quicker (>10x) to insert to and somewhat quicker to drain from. Sqlite takes up a bit less disk space, but not massively so.

> ./target/release/db_bench fjall bench2
Benchmarking fjall backend with 15000 inserts and 5000 drains from bench.fjall
Inserted 15000 items (across 15 batches of 1000) in 18.550153ms
Drained 5000 items in 9.597644ms

> ./target/release/db_bench sqlite
Benchmarking sqlite backend with 15000 inserts and 5000 drains from bench.sqlite
Inserted 15000 items (across 15 batches of 1000) in 300.421601ms
Drained 5000 items in 26.938409ms

> du -sh bench.*
4.2M    bench.fjall
3.3M    bench.sqlite

With larger insertions, the IO performance is pretty similar to before (though sqlite is quicker for draining the large number of records). It appears the disk usage of fjall is quite significantly lower (~3x) than the disk usage of sqlite.

> rm -rf bench.*

> ./target/release/db_bench sqlite 100000
Benchmarking sqlite backend with 1500000 inserts and 500000 drains from bench.sqlite
Inserted 1500000 items (across 15 batches of 100000) in 16.10093498s
Drained 500000 items in 1.294142794s

> ./target/release/db_bench fjall 100000
Benchmarking fjall backend with 1500000 inserts and 500000 drains from bench.fjall
Inserted 1500000 items (across 15 batches of 100000) in 1.890891907s
Drained 500000 items in 931.588034ms

> du -sh bench.*
58M     bench.fjall
193M    bench.sqlite

Impact on binary size

Adding the fjall based database to the flows feature in the tedge binary results in a 3.4% increase to the binary size. The impact from (statically-linked) sqlite is over twice that (8.1%). Because we use musl builds, we cannot dynamically link to sqlite, so I didn't include this in the comparison.

=== tedge Binary Size Comparison ===
Configuration        | Size (bytes) | Size (MB) | Overhead
---------------------|--------------|-----------|----------------
No Database:         | 18696688     | 17 MB     | baseline
Fjall Database:      | 19327584     | 18 MB     | +616.109375 KB
SQLite Database:     | 20212200     | 19 MB     | +1479.992188 KB

=== Database Overhead Analysis ===
Fjall overhead:  616.109375 KB (3.4%)
SQLite overhead: 1479.992188 KB (8.1%)
SQLite vs Fjall: +863.882812 KB (4.6%)

Copy link
Contributor

@didier-wenzek didier-wenzek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current design must be revisited before adding new kind of message sources.

tokio::select! {
_ = interval.tick() => {
let drained_messages = self.drain_db().await?;
self.on_messages(MessageSource::MeaDB, drained_messages).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This design is a legacy of the POC and is actually not so neat.

What is wrong, is that all the flows are requested for messages drained from the db (in self.drain_db()), and then these messages are produced to all the flows (in self.on_messages()) while at most one flow is interested by each: the former source flow! This is:

  • inefficient: messages drained from the DB are even produced to flows unrelated to the MEA DB
  • complicated: each message must be attached a source such as MessageSource::MeaDB
  • not extensible: a new method is required for each kind of message sources (similar to self.drain_db()).

A better approach would be to add a on_interval() method on the sources themselves. Only the messages produced by the source of a flow will then be processed by the flow. This can even be improved with MessageSource trait. But the key point is that the flow actor has no more to be updated for each new source kind.

Comment on lines +88 to +102
pub fn accept(&self, source: MessageSource, message_topic: &str) -> bool {
match &self.input {
FlowInput::Mqtt {
topics: input_topics,
} => source == MessageSource::Mqtt && input_topics.accept_topic_name(message_topic),
FlowInput::MeaDB { .. } => source == MessageSource::MeaDB,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the proposed design, a flow is no more requested to accept or not a message that has been produced by a random source. A flow will only have to process messages produced by its own source.

}
}

impl FlowInput {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main idea of the proposed design is to let the source generates messages on some configured interval.

impl FlowInput {
    pub async fn on_interval(
        &mut self,
        timestamp: &DateTime,
    ) -> Result<Vec<Message>, FlowError> {
      match self {
            FlowInput::Mqtt { topics } => Ok(vec![]),
            FlowInput::MeaDB { .. } => {
                drain_db().await
            }
        }
    }
}

didier-wenzek and others added 8 commits October 13, 2025 12:00
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants