-
Notifications
You must be signed in to change notification settings - Fork 135
fix(gas): fix batch listen, fix history for graceful signal send in workflows #3479
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
3 Skipped Deployments
|
PR Review: Fix batch listen, fix history for graceful signal send in workflowsSummaryThis PR introduces two main improvements to the gasoline workflow engine:
Code Quality & Best PracticesStrengths:
Suggestions:
Potential Bugs & IssuesCritical:
Minor:
Performance ConsiderationsImprovements:
Concerns:
Security ConcernsNo security issues identified. The changes:
Test CoverageMissing test scenarios:
Recommended test additions: // Test graceful signal send with replay
#[test]
async fn test_graceful_signal_not_found_replay() {
// First run: target doesn't exist, should create removed event
// Second run: should replay removed event correctly
}
// Test batch signal ordering
#[test]
async fn test_batch_signal_timestamp_ordering() {
// Create signals with out-of-order timestamps
// Verify they're received in timestamp order
}Additional Notes
VerdictLGTM with minor suggestions This is a solid improvement to the workflow engine. The graceful signal sending feature is well-implemented with proper history tracking for replay consistency. The batch listen fixes address real bugs in signal ordering and indexed chunk handling. The loop optimization should provide measurable performance improvements. The code follows project conventions, has good documentation, and maintains backward compatibility. Recommended to merge after addressing the test coverage gaps. Recommendation: Approve with suggestion to add tests before merge. |
More templates
@rivetkit/cloudflare-workers
@rivetkit/db
@rivetkit/framework-base
@rivetkit/next-js
@rivetkit/react
rivetkit
@rivetkit/sql-loader
@rivetkit/engine-runner
@rivetkit/engine-runner-protocol
commit: |
25ece3d to
6ff29f6
Compare
0ca2fa2 to
321b2c3
Compare
|
Critical Issues - Must Fix Before Merge
Affected locations:
Example of what needs fixing - change from this pattern: To this pattern:
Positive Changes:
Recommendations:
The core concept is solid - adding graceful degradation for missing signal targets is the right approach. However, compilation errors need fixing before merge. |
Code Review: Batch Listen & Graceful Signal SendThis PR introduces important improvements to the workflow signal system, but there are several critical issues that need to be addressed before merging. Critical Issue: Missing runner2 ModuleSeverity: Critical - Breaks Compilation Multiple files reference workflows::runner2 but the module does not exist:
However, engine/packages/pegboard/src/workflows/mod.rs only declares pub mod actor and pub mod runner. Fix Required: Either add pub mod runner2 and create the module file, OR revert all references back to runner if runner2 was intended for a different PR. |
Code Quality Issues1. Potential Logic Issue in Batch Listen Sort/LimitSeverity: Medium In engine/packages/gasoline/src/db/kv/mod.rs:1945-2067, the limit is applied twice:
If there are multiple signal types in the filter, this could fetch more signals than needed. For example, with 3 signal types and limit=10, you might fetch up to 30 signals, then sort and take 10. Recommendation: Consider using limit: None in the initial fetch, or adjust the per-type limit to account for multiple signal types. 2. Inconsistent Event Type NamingSeverity: Low The signal batch receive event is called different things across the codebase:
Consider standardizing terminology. |
Good Practices Observed1. Excellent Error Handling PatternThe graceful_not_found() API is well-designed and much cleaner than the previous pattern of checking for specific error types. This is a clean builder API that improves code readability significantly! 2. Proper History Tracking for Graceful Not FoundThe implementation correctly handles replay scenarios by:
This maintains workflow determinism properly. 3. Loop Optimization ParallelizationThe loop optimization in engine/packages/gasoline/src/ctx/workflow.rs:866-979 is well-documented with detailed comments explaining the parallelization strategy, which is excellent for maintainability. |
Test CoverageMissing Tests for New FeaturesSeverity: Medium The PR adds significant new functionality but I do not see corresponding tests:
Recommendation: Add integration tests in engine/packages/gasoline/tests/workflows/ following the pattern of signal_test.rs. |
SummaryMust Fix Before Merge:
Should Fix: Nice to Have: Overall, this is a solid improvement to the signal system with good attention to replay consistency and error handling. The main blocker is the runner2 compilation issue. Once that is resolved and tests are added, this should be ready to merge. Great work on the graceful error handling pattern! |
6ff29f6 to
dba469c
Compare
321b2c3 to
aa9ef2a
Compare
f9664d9 to
a7d5ffd
Compare
Code ReviewThis PR introduces a graceful signal sending mechanism and fixes batch signal listening. Overall, the implementation is solid with good attention to workflow replay consistency. Here are my findings: ✅ Strengths
🔍 Potential IssuesCritical: Missing Index Initialization Bug FixLocation: The fix for missing // Fixed in these locations:
if let Some(input_chunks) = current_event.indexed_input_chunks.get_mut(key.index) {
input_chunks.push(entry);
} else {
current_event.indexed_input_chunks.insert(key.index, vec![entry]); // ✅ Now handles missing index
}Question: Are there other code paths that access Minor: Inconsistent Packed Key UsageLocation: let packed_key = self.subspace.pack(&key);This changed from Performance: Redundant Sorting?Location: Signals are now sorted by timestamp before the limit is applied. If there could be many signals, consider if sorting the entire collection is necessary or if a partial sort/heap would be more efficient. However, if signal counts are typically small, this is fine. 🎯 Best Practices & Style
🔒 SecurityNo security concerns identified. The changes don't introduce:
🧪 Test CoverageMissing: The PR doesn't include tests for:
Recommendation: Add integration tests covering these scenarios, especially the workflow replay with removed events. 📝 Additional Notes
SummaryThis is a well-implemented feature with good attention to workflow engine consistency. The main action items are:
The code follows repository conventions and integrates cleanly with the existing workflow engine. Nice work! 👍 |
PR Review: Fix batch listen, fix history for graceful signal send in workflowsSummaryThis PR introduces graceful handling of signal sending to workflows that may not exist, and fixes issues with batch signal listening and history tracking. The changes are well-structured and address real-world scenarios where workflows may have already been stopped. ✅ Strengths1. Graceful Signal Handling
2. Code Quality
3. Batch Listen Fix
4. Loop Optimization
🔍 Potential Issues1. Index Handling in Debug Code ( if let Some(input_chunks) =
current_event.indexed_input_chunks.get_mut(key.index)
{
input_chunks.push(entry);
} else {
current_event
.indexed_input_chunks
.insert(key.index, vec![entry]);
}This fallback insertion suggests the ensure check on line 926-929 may not be sufficient. If the index doesn't exist, we should fail rather than silently insert. This could hide corruption issues. Recommendation: Consider whether this else branch should be an error case instead, or if the ensure check needs adjustment. 2. Backward Compatibility in Pegboard (
Recommendation: Add a comment explaining the migration path and when the legacy support can be removed. 3. Test Coverage
Recommendation: Add test cases for:
4. Error Message Consistency ⚡ Performance ConsiderationsPositive:
Concern:
Recommendation: Consider if there's a way to merge-sort the streams or use a priority queue for very high-volume scenarios (though current approach is likely fine for typical use cases). 🔒 Security ConsiderationsNo security concerns identified. The graceful handling doesn't bypass any authorization checks and properly logs when workflows aren't found. 📝 Style & Convention AdherenceGood:
Minor:
Recommendations
Overall AssessmentThis is a solid PR that solves real problems with signal handling in workflows. The graceful handling API is well-designed and the batch listen fix addresses ordering issues properly. Main areas for improvement are test coverage and documentation around the migration path. Verdict: ✅ Approve with recommendations for follow-up test additions 🤖 Review generated by Claude Code |
c35fe5e to
d01b6fc
Compare
a7d5ffd to
41442d3
Compare
PR Review: Fix batch listen, fix history for graceful signal send in workflowsSummaryThis PR introduces a Code Quality & Best Practices✅ Strengths
|
PR Review: Fix batch listen, fix history for graceful signal send in workflowsSummaryThis PR introduces a ✅ Strengths
🐛 Potential IssuesCritical: Missing Input Chunk Initialization CheckLocation: if let Some(input_chunks) = current_event.indexed_input_chunks.get_mut(key.index) {
input_chunks.push(entry);
} else {
current_event.indexed_input_chunks.insert(key.index, vec\![entry]);
}Issue: This code inserts a new vector when an index doesn't exist, but it could indicate a logic error if indices should be sequential. If chunks arrive out of order, this could lead to incomplete data. Question: Should this be an error case instead? Or should there be validation that all expected indices are present before using the chunks? Bug: Wrong Signal Type in History KeyLocation: EventType::Signals, // Changed from EventType::SignalIssue: This appears to be a bug fix, but there's no context in the PR about what was broken. The change suggests there was a type mismatch between singular and plural signal events. Recommendation: Add a comment explaining why Potential Race Condition: Signal OrderingLocation: Issue: Signals are fetched from multiple streams in parallel, then sorted by timestamp. However, if signals arrive with identical or very close timestamps, the ordering could be non-deterministic. let mut signals = futures_util::stream::iter(owned_filter.clone())
.map(|signal_name| { /* fetch signals */ })
.flatten()
// ... collect all signals
.await?;
signals.sort_by_key(|key| key.ts); // Line 1977Recommendation: Use a stable sort or add a secondary sort key (like signal_id) for deterministic ordering: signals.sort_by_key(|key| (key.ts, key.signal_id));📝 Code Quality Concerns1. Inconsistent Error Handling PatternMultiple files changed from this pattern: let res = signal().send().await;
if let Some(WorkflowError::WorkflowNotFound) = res.as_ref().err()... {
// handle gracefully
} else {
res?;
}To this pattern: let res = signal().graceful_not_found().send().await?;
if res.is_none() {
// handle gracefully
}Issue: The new pattern is cleaner, but now pub enum SignalResult {
Sent(Id),
NotFound,
}This would make the API more self-documenting. 2. Deferred Future Pattern ComplexityLocation: The loop optimization with
3. Magic Number Without ConstantLocation: if iteration % LOOP_ITERS_PER_COMMIT == 0 {Good: Uses a constant. But the constant value isn't shown in the diff. Recommendation: Add a comment explaining why this specific interval was chosen (performance vs consistency tradeoff). 🔒 Security ConsiderationsNo significant security concerns identified. The changes are mostly internal workflow mechanics. 🧪 Test Coverage ConcernsMissing Test Cases:
Recommendation: Add integration tests for these scenarios, especially the replay cases which are critical for workflow correctness. 🎯 Specific File Feedback
|
d01b6fc to
5a41109
Compare
41442d3 to
3f3140e
Compare
3f3140e to
d321009
Compare
5a41109 to
5b7c52f
Compare
Merge activity
|

No description provided.