-
Notifications
You must be signed in to change notification settings - Fork 125
feat: Distributed Log Replay serialization/deserialization #1503
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
1e40840 to
09cf641
Compare
0da2ae4 to
274c7bf
Compare
274c7bf to
745077f
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1503 +/- ##
==========================================
+ Coverage 84.55% 84.85% +0.29%
==========================================
Files 123 129 +6
Lines 33366 34826 +1460
Branches 33366 34826 +1460
==========================================
+ Hits 28213 29550 +1337
- Misses 3768 3830 +62
- Partials 1385 1446 +61 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
87bd895 to
eec7cbd
Compare
| predicate_schema, | ||
| column_mapping_mode: self.state_info.column_mapping_mode, | ||
| }; | ||
| let internal_state_blob = serde_json::to_vec(&internal_state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume we don't care too much about compatibility across binary versions here (if we do, then we might want to include a version byte, so readers can understand if the serialization format has changed).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm good callout. I'll put that as a warning in the docs.
| // Deserialize internal state from json | ||
| let internal_state: InternalScanState = serde_json::from_slice(&state.internal_state_blob) | ||
| .map_err(|e| Error::generic(format!("Failed to deserialize internal state: {}", e)))?; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for safety, should we check that there aren't any unexpected keys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Added a test for that :)
| obj["new_field"] = serde_json::json!("my_new_value"); | ||
| let invalid_blob = obj.to_string(); | ||
|
|
||
| let internal_res: Result<InternalScanState, _> = serde_json::from_str(&invalid_blob); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please fix.
|
|
||
| #[test] | ||
| fn deserialize_internal_state_with_extry_fields_fails() { | ||
| // Test that missing predicate_schema when predicate exists is detected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy-pasta?
🥞 Stacked PR
Use this link to review incremental changes.
What changes are proposed in this pull request?
This PR introduces serialization and deserialization for the ScanLogReplayProcessor.
This PR adds serialization and deserialization for LogReplayProcessor.
It also removes the StaticInsert transform since:
How was this change tested?
Round trip test is performed with the LogReplayProcessor. Invalid internal state is expected to result in an error.