Skip to content

Conversation

@MasterPtato
Copy link
Contributor

No description provided.

@vercel
Copy link

vercel bot commented Nov 18, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivetkit-serverless Error Error Dec 2, 2025 7:11pm
3 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
rivet-cloud Ignored Ignored Dec 2, 2025 7:11pm
rivet-inspector Ignored Ignored Preview Dec 2, 2025 7:11pm
rivet-site Ignored Ignored Preview Dec 2, 2025 7:11pm

Copy link
Contributor Author

MasterPtato commented Nov 18, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Nov 18, 2025

PR Review: Fix batch listen, fix history for graceful signal send in workflows

Summary

This PR introduces two main improvements to the gasoline workflow engine:

  1. Graceful signal sending - adds graceful_not_found() method to signal builders that returns Ok(None) instead of erroring when the target workflow is not found
  2. Batch listen fix - corrects signal batching to properly sort by timestamp and handle indexed input chunks

Code Quality & Best Practices

Strengths:

  • Consistent API design across both common/signal.rs and workflow/signal.rs builders
  • Good documentation with doc comments explaining behavior (lines 107-108, 118-119 in both files)
  • Follows existing error handling patterns using Result<Option<Id>>
  • Proper tracing/logging throughout with structured logging as per CLAUDE.md guidelines
  • Loop optimization improvements maintain good performance characteristics

Suggestions:

  1. Type annotation consistency (gasoline/src/db/kv/mod.rs:1945):

    let mut signals = futures_util::stream::iter(owned_filter.clone())

    The mut is added here, but signals were already being collected into a Vec. This is correct now for the sort operation on line 1980, but consider adding a comment explaining why this needs to be mutable.

  2. Error message clarity (api-peer/src/actors/delete.rs:74-77):
    The warning message could be more specific about which operation failed:

    tracing::warn!(
        actor_id=?path.actor_id,
        "actor workflow not found when sending destroy signal, likely already stopped"
    );

Potential Bugs & Issues

Critical:

  1. Database index insertion logic (gasoline/src/db/kv/debug.rs:931-939 and gasoline/src/db/kv/mod.rs:1505-1512):

    if let Some(input_chunks) = current_event.indexed_input_chunks.get_mut(key.index) {
        input_chunks.push(entry);
    } else {
        current_event.indexed_input_chunks.insert(key.index, vec![entry]);
    }

    This is a good fix! The previous code had a logic error where it would only push to existing indices but never create new ones. This could have caused data loss in signal batching. The fix properly handles both cases.

  2. Timestamp consistency (gasoline/src/db/kv/mod.rs:1980-2012):
    Good catch fixing the timestamp! The code now:

    • Sorts signals by key.ts (line 1980)
    • Uses a single now timestamp for all acks (line 1982, 2011)
    • Applies limit AFTER sorting but BEFORE processing (line 1995)

    This ensures consistent ordering and prevents race conditions.

Minor:

  1. Event type mismatch fix (gasoline/src/db/kv/keys/history.rs:1645):
    EventType::Signal -> EventType::Signals
    Good catch! This was incorrectly using the singular form for batch operations.

Performance Considerations

Improvements:

  1. Loop event upsertion parallelization (gasoline/src/ctx/workflow.rs:872-978):
    The new approach defers loop event upserts to run in parallel with the next iteration. This is a clever optimization that should reduce latency in tight loops. The 4-way join on lines 910-940 is well-documented with comments explaining the parallel execution strategy.

  2. Signal batching optimization (gasoline/src/db/kv/mod.rs:1995):
    Applying .take(limit) after sorting but before processing is more efficient than the previous approach of sorting after collection.

Concerns:

  1. The buffer_unordered(1024) on line 2063 could cause memory pressure if processing 1024 signals simultaneously. Consider if this limit should be configurable or tied to the input limit parameter.

Security Concerns

No security issues identified. The changes:

  • Properly handle missing workflows without exposing sensitive information
  • Maintain transactional integrity with conflict ranges
  • Don't introduce new attack surfaces

Test Coverage

Missing test scenarios:

  1. Test that graceful_not_found() properly creates and replays removed events in workflow history
  2. Test signal batching with timestamps in non-chronological order to verify sorting
  3. Test the indexed input chunks fix with multiple signals in a batch
  4. Test loop event upsertion parallelization with rapid iterations

Recommended test additions:

// Test graceful signal send with replay
#[test]
async fn test_graceful_signal_not_found_replay() {
    // First run: target doesn't exist, should create removed event
    // Second run: should replay removed event correctly
}

// Test batch signal ordering
#[test]
async fn test_batch_signal_timestamp_ordering() {
    // Create signals with out-of-order timestamps
    // Verify they're received in timestamp order
}

Additional Notes

  1. Display text changes (engine/src/util/wf/mod.rs): The changes from "signal receive" to "signal recv" for consistency are good, but ensure this doesn't break any external tooling that parses these strings.

  2. Logging level change (universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs:67): Changing from info! to debug! for conflict logging is appropriate as this could be noisy in production.

  3. TypeScript comment (runner-protocol/src/index.ts:56-58): The TODO comment about renaming createTs to update_ts should be addressed in a future PR to maintain naming consistency per CLAUDE.md conventions.

Verdict

LGTM with minor suggestions

This is a solid improvement to the workflow engine. The graceful signal sending feature is well-implemented with proper history tracking for replay consistency. The batch listen fixes address real bugs in signal ordering and indexed chunk handling. The loop optimization should provide measurable performance improvements.

The code follows project conventions, has good documentation, and maintains backward compatibility. Recommended to merge after addressing the test coverage gaps.


Recommendation: Approve with suggestion to add tests before merge.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Nov 18, 2025

More templates

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@3479

@rivetkit/db

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/db@3479

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@3479

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@3479

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@3479

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@3479

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@3479

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@3479

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@3479

commit: 321b2c3

@MasterPtato MasterPtato force-pushed the 11-17-feat_gas_add_batch_listen branch from 25ece3d to 6ff29f6 Compare November 18, 2025 23:50
@MasterPtato MasterPtato force-pushed the 11-17-fix_gas_fix_batch_listen_fix_history_for_graceful_signal_send_in_workflows branch from 0ca2fa2 to 321b2c3 Compare November 18, 2025 23:50
@claude
Copy link

claude bot commented Nov 18, 2025

Critical Issues - Must Fix Before Merge

  1. Missing Variable Assignments - In several files, the code calls .await? but then tries to use res which was never assigned. This will cause compilation errors.

Affected locations:

  • engine/packages/api-peer/src/actors/delete.rs:67-73
  • engine/packages/guard/src/routing/pegboard_gateway.rs:195-207
  • engine/packages/pegboard-serverless/src/lib.rs:483-507
  • engine/packages/pegboard/src/workflows/runner.rs (4 occurrences)

Example of what needs fixing - change from this pattern:

.graceful_not_found()
.send()
.await?;
if res.is_none() {

To this pattern:

let res = ctx.signal(...)
    .graceful_not_found()
    .send()
    .await?;
if res.is_none() {
  1. Batch Listen Limit Mismatch - In db/kv/mod.rs:1995, the limit is applied AFTER the history event is inserted but BEFORE acknowledgment. This could cause signals beyond the limit to be lost or re-delivered incorrectly.

Positive Changes:

  • Better API ergonomics with graceful_not_found builder pattern
  • Proper history handling via removed events for replay determinism
  • Batch signal sorting by timestamp for correct ordering
  • Loop event upsertion properly parallelized

Recommendations:

  • Fix missing res variable assignments (critical)
  • Verify batch listen limit application (critical)
  • Add tests for graceful signal sending and replay
  • Add more context to debug logs
  • Verify EventType::Signals change backward compatibility

The core concept is solid - adding graceful degradation for missing signal targets is the right approach. However, compilation errors need fixing before merge.

@claude
Copy link

claude bot commented Nov 18, 2025

Code Review: Batch Listen & Graceful Signal Send

This PR introduces important improvements to the workflow signal system, but there are several critical issues that need to be addressed before merging.


Critical Issue: Missing runner2 Module

Severity: Critical - Breaks Compilation

Multiple files reference workflows::runner2 but the module does not exist:

  • engine/packages/pegboard/src/lib.rs:17 - Tries to register runner2::Workflow
  • engine/packages/pegboard/src/workflows/actor/mod.rs:6 - Imports runner2::AllocatePendingActorsInput
  • engine/packages/pegboard-runner/src/ping_task.rs:24 - References runner2::Input
  • engine/packages/pegboard-serverless/src/lib.rs:483+ - References runner2::Stop and runner2::Workflow

However, engine/packages/pegboard/src/workflows/mod.rs only declares pub mod actor and pub mod runner.

Fix Required: Either add pub mod runner2 and create the module file, OR revert all references back to runner if runner2 was intended for a different PR.

@claude
Copy link

claude bot commented Nov 18, 2025

Code Quality Issues

1. Potential Logic Issue in Batch Listen Sort/Limit

Severity: Medium

In engine/packages/gasoline/src/db/kv/mod.rs:1945-2067, the limit is applied twice:

  1. First, when fetching signals from each signal type (up to limit per type)
  2. Second, after sorting all signals by timestamp (taking only limit total)

If there are multiple signal types in the filter, this could fetch more signals than needed. For example, with 3 signal types and limit=10, you might fetch up to 30 signals, then sort and take 10.

Recommendation: Consider using limit: None in the initial fetch, or adjust the per-type limit to account for multiple signal types.

2. Inconsistent Event Type Naming

Severity: Low

The signal batch receive event is called different things across the codebase:

  • In db/debug.rs:125 - signal receive
  • In db/kv/keys/history.rs:1645 - EventType::Signals
  • In engine util/wf/mod.rs - signal recv

Consider standardizing terminology.

@claude
Copy link

claude bot commented Nov 18, 2025

Good Practices Observed

1. Excellent Error Handling Pattern

The graceful_not_found() API is well-designed and much cleaner than the previous pattern of checking for specific error types. This is a clean builder API that improves code readability significantly!

2. Proper History Tracking for Graceful Not Found

The implementation correctly handles replay scenarios by:

  • Checking is_removed() during replay (engine/packages/gasoline/src/builder/workflow/signal.rs:117-126)
  • Creating a removed event when the target is not found (lines 176-190)
  • Comparing removed events during replay to ensure consistency

This maintains workflow determinism properly.

3. Loop Optimization Parallelization

The loop optimization in engine/packages/gasoline/src/ctx/workflow.rs:866-979 is well-documented with detailed comments explaining the parallelization strategy, which is excellent for maintainability.

@claude
Copy link

claude bot commented Nov 18, 2025

Test Coverage

Missing Tests for New Features

Severity: Medium

The PR adds significant new functionality but I do not see corresponding tests:

  1. No test for graceful_not_found() - Should test:

    • Signal to non-existent workflow returns Ok(None)
    • Replay of gracefully-not-found signal works correctly
    • Without graceful_not_found(), signal to non-existent workflow still returns error
  2. No test for batch listen - Should test:

    • Multiple signals received in correct timestamp order
    • Limit is applied correctly
    • Multiple signal types are handled properly

Recommendation: Add integration tests in engine/packages/gasoline/tests/workflows/ following the pattern of signal_test.rs.

@claude
Copy link

claude bot commented Nov 18, 2025

Summary

Must Fix Before Merge:

  1. Resolve the missing runner2 module issue (critical compilation error)

Should Fix:
2. Add test coverage for graceful_not_found and batch listen
3. Review the batch listen limit logic (potential over-fetching)
4. Standardize signal event naming across the codebase

Nice to Have:
5. Review the log level change in universaldb transaction_conflict_tracker.rs:67 (changed from info to debug)


Overall, this is a solid improvement to the signal system with good attention to replay consistency and error handling. The main blocker is the runner2 compilation issue. Once that is resolved and tests are added, this should be ready to merge.

Great work on the graceful error handling pattern!

@MasterPtato MasterPtato force-pushed the 11-17-feat_gas_add_batch_listen branch from 6ff29f6 to dba469c Compare November 19, 2025 22:06
@MasterPtato MasterPtato force-pushed the 11-17-fix_gas_fix_batch_listen_fix_history_for_graceful_signal_send_in_workflows branch from 321b2c3 to aa9ef2a Compare November 19, 2025 22:06
@claude
Copy link

claude bot commented Nov 26, 2025

Code Review

This PR introduces a graceful signal sending mechanism and fixes batch signal listening. Overall, the implementation is solid with good attention to workflow replay consistency. Here are my findings:

Strengths

  1. Excellent Replay Consistency: The graceful_not_found feature properly handles workflow replay by checking for removed events (line 117 in builder/workflow/signal.rs) and inserting removed events when workflows aren't found (lines 176-185). This ensures deterministic replay behavior.

  2. API Consistency: Both common and workflow signal builders expose the same .graceful_not_found() API, making it intuitive for developers.

  3. Batch Listen Fix: The signal sorting and limit application fixes in db/kv/mod.rs (lines 1976-1977, 1992) address critical correctness issues:

    • Sorting signals by timestamp before processing ensures correct ordering
    • Moving .take(limit) after sorting prevents processing wrong signals
  4. Performance Optimization: The loop event parallelization improvements (lines 910-923 in ctx/workflow.rs) are well-designed, running init/upsert operations concurrently with loop execution.

  5. Code Cleanup: Simplified error handling using graceful_not_found() reduces boilerplate across multiple call sites.

🔍 Potential Issues

Critical: Missing Index Initialization Bug Fix

Location: engine/packages/gasoline/src/db/kv/debug.rs:935-938 and engine/packages/gasoline/src/db/kv/mod.rs:1505-1508

The fix for missing indexed_input_chunks is duplicated but only in debug and batch listen code paths. This suggests a potential bug in other signal handling paths:

// Fixed in these locations:
if let Some(input_chunks) = current_event.indexed_input_chunks.get_mut(key.index) {
    input_chunks.push(entry);
} else {
    current_event.indexed_input_chunks.insert(key.index, vec![entry]);  // ✅ Now handles missing index
}

Question: Are there other code paths that access indexed_input_chunks that might have the same issue? Consider searching for similar patterns.

Minor: Inconsistent Packed Key Usage

Location: engine/packages/gasoline/src/db/kv/mod.rs:1998

let packed_key = self.subspace.pack(&key);

This changed from tx.pack(&key) in the original code. While both should work, verify that self.subspace.pack() and tx.pack() are equivalent in this context to avoid subtle bugs.

Performance: Redundant Sorting?

Location: engine/packages/gasoline/src/db/kv/mod.rs:1976-1977

Signals are now sorted by timestamp before the limit is applied. If there could be many signals, consider if sorting the entire collection is necessary or if a partial sort/heap would be more efficient. However, if signal counts are typically small, this is fine.

🎯 Best Practices & Style

  1. ✅ Logging Conventions: Follows repository standards with lowercase messages and structured logging:

    tracing::debug!(?x, "signal target not found")  // Good
  2. ✅ Error Handling: Properly uses anyhow errors without glob imports, following CLAUDE.md guidelines.

  3. ✅ Documentation: Clear doc comments on public APIs explaining the graceful_not_found behavior.

  4. Minor Naming: loop_event_commit_futloop_event_init_fut is a good improvement but consider loop_event_upsert_fut could be loop_event_persist_fut for clarity.

🔒 Security

No security concerns identified. The changes don't introduce:

  • Command injection risks
  • XSS vulnerabilities
  • SQL injection (using typed parameters)
  • Improper authentication/authorization

🧪 Test Coverage

Missing: The PR doesn't include tests for:

  1. graceful_not_found() behavior in both replay and normal execution
  2. Batch listen with signals arriving out-of-order
  3. Edge case: Empty signal batches
  4. Removed event replay for gracefully not found signals

Recommendation: Add integration tests covering these scenarios, especially the workflow replay with removed events.

📝 Additional Notes

  1. Runner Workflow Migration (pegboard-serverless/src/lib.rs:483-511): The fallback to old runner workflow is a good transitional pattern. Document when this can be removed.

  2. pnpm-lock.yaml Changes: Unrelated dependency version changes should ideally be in a separate commit, but not a blocker.

  3. Debug vs Production Code: The indexed_input_chunks fix appears in both debug and production paths—verify this is intentional and covers all cases.

Summary

This is a well-implemented feature with good attention to workflow engine consistency. The main action items are:

  1. High Priority: Audit all indexed_input_chunks access patterns for the missing-index bug
  2. Medium Priority: Add test coverage for graceful signal sending scenarios
  3. Low Priority: Consider performance implications of sorting all signals before limiting

The code follows repository conventions and integrates cleanly with the existing workflow engine. Nice work! 👍

@claude
Copy link

claude bot commented Nov 26, 2025

PR Review: Fix batch listen, fix history for graceful signal send in workflows

Summary

This PR introduces graceful handling of signal sending to workflows that may not exist, and fixes issues with batch signal listening and history tracking. The changes are well-structured and address real-world scenarios where workflows may have already been stopped.

✅ Strengths

1. Graceful Signal Handling

  • The new graceful_not_found() API is well-designed and intuitive
  • Return type change from Result<Id> to Result<Option<Id>> is semantically correct
  • Properly handles both replay scenarios and new executions
  • Good use of removed events for workflow history consistency

2. Code Quality

  • Clear documentation on when graceful_not_found() should be used
  • Consistent implementation across both builder/workflow/signal.rs and builder/common/signal.rs
  • Proper error handling with fallback to old behavior when not using graceful mode
  • Good use of structured logging (following CLAUDE.md conventions)

3. Batch Listen Fix

  • Sorting signals by timestamp before processing (engine/packages/gasoline/src/db/kv/mod.rs:1976-1977)
  • Applying limit before enumeration prevents index mismatches
  • Fixed event type from EventType::Signal to EventType::Signals in history keys

4. Loop Optimization

  • Deferred loop event upsertion for parallelization is clever (engine/packages/gasoline/src/ctx/workflow.rs:966-978)
  • Good documentation explaining the parallel execution strategy in comments

🔍 Potential Issues

1. Index Handling in Debug Code (engine/packages/gasoline/src/db/kv/debug.rs:935-939)

if let Some(input_chunks) =
    current_event.indexed_input_chunks.get_mut(key.index)
{
    input_chunks.push(entry);
} else {
    current_event
        .indexed_input_chunks
        .insert(key.index, vec![entry]);
}

This fallback insertion suggests the ensure check on line 926-929 may not be sufficient. If the index doesn't exist, we should fail rather than silently insert. This could hide corruption issues.

Recommendation: Consider whether this else branch should be an error case instead, or if the ensure check needs adjustment.

2. Backward Compatibility in Pegboard (engine/packages/pegboard-serverless/src/lib.rs:492-510)
The retry with old runner workflow is good for migration, but:

  • No comment explaining why we need to support both runner and runner2
  • No indication of when the fallback can be removed
  • Could lead to confusion for future maintainers

Recommendation: Add a comment explaining the migration path and when the legacy support can be removed.

3. Test Coverage

  • No new tests added for graceful_not_found() functionality
  • No tests verifying the removed event is properly created and replayed
  • No tests for the batch listen ordering fix

Recommendation: Add test cases for:

  • Signal sending with graceful_not_found() when target doesn't exist
  • Replaying a workflow with a gracefully failed signal send
  • Batch signal reception with proper ordering

4. Error Message Consistency
The warning message "workflow not found, likely already stopped" is used in multiple places. Consider extracting to a constant or using a shared logging function for consistency.

⚡ Performance Considerations

Positive:

  • Loop event upsertion parallelization is excellent
  • Batch signal sorting happens once before processing
  • Proper use of tokio::join! for parallel operations

Concern:

  • In engine/packages/gasoline/src/db/kv/mod.rs:1942-1973, signals are fetched from all streams, collected into a Vec, then sorted. For workflows listening to many signal types with high volume, this could be memory-intensive.

Recommendation: Consider if there's a way to merge-sort the streams or use a priority queue for very high-volume scenarios (though current approach is likely fine for typical use cases).

🔒 Security Considerations

No security concerns identified. The graceful handling doesn't bypass any authorization checks and properly logs when workflows aren't found.

📝 Style & Convention Adherence

Good:

  • ✅ Follows structured logging patterns (e.g., tracing::info!(?x, "foo"))
  • ✅ Log messages are lowercase as per CLAUDE.md
  • ✅ Proper use of anyhow::Result
  • ✅ No glob imports from anyhow

Minor:

  • The pnpm-lock.yaml changes seem unrelated to the PR's main purpose and might indicate dependency drift

Recommendations

  1. Add test coverage for the new graceful_not_found functionality
  2. Document the runner/runner2 migration in pegboard-serverless
  3. Review the debug.rs index handling - should it fail instead of fallback insert?
  4. Consider adding a test that verifies batch signal ordering
  5. Add a TODO or issue for removing runner legacy support once migration is complete

Overall Assessment

This is a solid PR that solves real problems with signal handling in workflows. The graceful handling API is well-designed and the batch listen fix addresses ordering issues properly. Main areas for improvement are test coverage and documentation around the migration path.

Verdict: ✅ Approve with recommendations for follow-up test additions


🤖 Review generated by Claude Code

@claude
Copy link

claude bot commented Nov 27, 2025

PR Review: Fix batch listen, fix history for graceful signal send in workflows

Summary

This PR introduces a graceful_not_found() method for signal sending that allows workflows to handle missing signal targets without throwing errors. It also fixes batch signal listening by properly sorting signals by timestamp and fixes index handling for signal chunks.

Code Quality & Best Practices

✅ Strengths

  1. Consistent API Design: The graceful_not_found() pattern is well-implemented across both common and workflow signal builders with proper documentation.

  2. Proper Error Handling: The PR correctly handles the "not found" case by:

    • Returning Option<Id> instead of just Id
    • Recording a "removed" event in workflow history for replay consistency
    • Maintaining deterministic replay behavior
  3. Good Documentation: Function-level documentation clearly explains the behavior of graceful_not_found() and the return type semantics.

  4. Follows Repository Patterns: Uses structured logging correctly with field-level parameters as specified in CLAUDE.md.

⚠️ Areas for Improvement

  1. Inconsistent Index Handling (gasoline/src/db/kv/debug.rs:936 and gasoline/src/db/kv/mod.rs:1506):

    Issue: The code removed the assertion ensure!(current_event.indexed_input_chunks.len() == key.index, ...) and now uses get_mut() + fallback insertion. This could mask data corruption issues where indices arrive out of order.

    Recommendation: Consider re-adding validation or at least a debug assertion to ensure indices are sequential. If out-of-order is expected, add a comment explaining why.

  2. Loop Optimization Complexity (gasoline/src/ctx/workflow.rs:910-924):
    The deferred future pattern for loop_event_upsert_fut is clever but complex.

    Suggestion: Add a comment explaining the timing diagram - when iteration N runs, it commits the state from iteration N-1 in parallel. This would help future maintainers.

Potential Bugs

  1. Signal Acknowledgment Change (gasoline/src/db/kv/mod.rs:1998):
    The code changed from tx.pack(&key) to self.subspace.pack(&key).

    Question: Is this intentional? Need to verify this does not affect transaction isolation.

  2. Missing Graceful Fallback in pegboard-runner (pegboard-runner/src/ping_task.rs:29):
    Changed from runner to runner2 without graceful fallback. If this is part of a migration, consider handling both versions like in pegboard-serverless/src/lib.rs:493-509.

Performance Considerations

  1. ✅ Parallel Transaction Optimization: The loop optimization that defers loop_event_upsert_fut to run in parallel with the next iteration is excellent for reducing latency.

  2. ✅ Signal Batch Processing: Sorting signals by timestamp before limiting ensures the oldest signals are processed first, which is correct for FIFO semantics.

Security Concerns

No significant security issues identified. The graceful_not_found() pattern properly validates inputs and maintains audit trails through the "removed" event mechanism.

Test Coverage

⚠️ Missing Tests: This PR does not include test files. Given the complexity of the changes, especially:

  • The graceful signal not found behavior
  • The deferred loop optimization
  • Signal batch sorting

Recommendation: Add tests covering:

  1. Signal send with graceful_not_found() when target does not exist
  2. Workflow replay with removed signal events
  3. Batch signal processing ordering
  4. Loop event upsertion timing

Additional Notes

  1. pnpm-lock.yaml changes: The lock file shows dependency updates unrelated to the Rust changes. Consider whether these should be in a separate PR.

  2. Logging Level Change (universaldb/.../transaction_conflict_tracker.rs:67):
    Changed from tracing::info! to tracing::debug! for conflict range logging. This is reasonable but ensure this does not hide important diagnostic information in production.

  3. Event Type Fix (gasoline/src/db/kv/keys/history.rs:1645):
    Good catch fixing EventType::Signal to EventType::Signals for batch operations.

Verdict

Overall Assessment: Good with minor concerns

The core functionality is well-implemented with good error handling and API design. The main concerns are:

  1. The removed index validation could mask bugs
  2. Missing test coverage
  3. Need clarification on the tx.pack() vs self.subspace.pack() change

Recommendations before merge:

  • Add test coverage for graceful signal handling
  • Clarify/validate the index handling logic
  • Add explanatory comment for loop optimization timing
  • Verify the subspace.pack() change is intentional

@claude
Copy link

claude bot commented Nov 27, 2025

PR Review: Fix batch listen, fix history for graceful signal send in workflows

Summary

This PR introduces a graceful_not_found() pattern for signal sending and fixes issues with batch signal listening and loop event handling in the Gasoline workflow engine.


✅ Strengths

  1. Excellent API Design: The graceful_not_found() builder pattern is clean and ergonomic, making intent explicit at call sites
  2. Comprehensive Implementation: Handles both common signal builder and workflow signal builder consistently
  3. Good Replay Handling: Properly handles replay of graceful not found signals through removed events (workflow/signal.rs:117-126)
  4. Performance Optimization: The loop event parallelization is well-thought-out with clear comments explaining the complex join logic (ctx/workflow.rs:901-944)
  5. Batch Listen Fix: Properly sorts signals by timestamp and applies limit correctly (db/kv/mod.rs:1975-1992)

🐛 Potential Issues

Critical: Missing Input Chunk Initialization Check

Location: engine/packages/gasoline/src/db/kv/mod.rs:1503-1509 and engine/packages/gasoline/src/db/kv/debug.rs:932-938

if let Some(input_chunks) = current_event.indexed_input_chunks.get_mut(key.index) {
    input_chunks.push(entry);
} else {
    current_event.indexed_input_chunks.insert(key.index, vec\![entry]);
}

Issue: This code inserts a new vector when an index doesn't exist, but it could indicate a logic error if indices should be sequential. If chunks arrive out of order, this could lead to incomplete data.

Question: Should this be an error case instead? Or should there be validation that all expected indices are present before using the chunks?


Bug: Wrong Signal Type in History Key

Location: engine/packages/gasoline/src/db/kv/keys/history.rs:1645

EventType::Signals,  // Changed from EventType::Signal

Issue: This appears to be a bug fix, but there's no context in the PR about what was broken. The change suggests there was a type mismatch between singular and plural signal events.

Recommendation: Add a comment explaining why Signals (plural) is correct here, or add a test case that would have caught this.


Potential Race Condition: Signal Ordering

Location: engine/packages/gasoline/src/db/kv/mod.rs:1942-1977

Issue: Signals are fetched from multiple streams in parallel, then sorted by timestamp. However, if signals arrive with identical or very close timestamps, the ordering could be non-deterministic.

let mut signals = futures_util::stream::iter(owned_filter.clone())
    .map(|signal_name| { /* fetch signals */ })
    .flatten()
    // ... collect all signals
    .await?;

signals.sort_by_key(|key| key.ts);  // Line 1977

Recommendation: Use a stable sort or add a secondary sort key (like signal_id) for deterministic ordering:

signals.sort_by_key(|key| (key.ts, key.signal_id));

📝 Code Quality Concerns

1. Inconsistent Error Handling Pattern

Multiple files changed from this pattern:

let res = signal().send().await;
if let Some(WorkflowError::WorkflowNotFound) = res.as_ref().err()... {
    // handle gracefully
} else {
    res?;
}

To this pattern:

let res = signal().graceful_not_found().send().await?;
if res.is_none() {
    // handle gracefully
}

Issue: The new pattern is cleaner, but now send() returns Result<Option<Id>> which is a complex return type. Consider whether a custom enum would be clearer:

pub enum SignalResult {
    Sent(Id),
    NotFound,
}

This would make the API more self-documenting.


2. Deferred Future Pattern Complexity

Location: engine/packages/gasoline/src/ctx/workflow.rs:910-944

The loop optimization with loop_event_init_fut and loop_event_upsert_fut is complex and has a large comment explaining it. While the optimization is valuable, consider:

  1. Extract this into a helper function with a clear name
  2. Add unit tests specifically for the parallelization logic
  3. Consider using a state machine pattern to make the flow clearer

3. Magic Number Without Constant

Location: engine/packages/gasoline/src/ctx/workflow.rs:953

if iteration % LOOP_ITERS_PER_COMMIT == 0 {

Good: Uses a constant. But the constant value isn't shown in the diff.

Recommendation: Add a comment explaining why this specific interval was chosen (performance vs consistency tradeoff).


🔒 Security Considerations

No significant security concerns identified. The changes are mostly internal workflow mechanics.


🧪 Test Coverage Concerns

Missing Test Cases:

  1. Graceful not found replay: Test that replaying a workflow with a gracefully-not-found signal correctly skips the removed event
  2. Batch signal ordering: Test that signals with close timestamps are processed in a deterministic order
  3. Loop event parallelization: Test that the deferred futures execute correctly across iterations
  4. Signal limit enforcement: Verify that the limit is applied after sorting (line 1992)

Recommendation: Add integration tests for these scenarios, especially the replay cases which are critical for workflow correctness.


🎯 Specific File Feedback

engine/packages/pegboard-serverless/src/lib.rs:487-519

Issue: Backwards compatibility fallback for old runner workflow:

if res.is_none() {
    // Retry with old runner wf
    let res = ctx.signal(runner::Stop { ... })
        .to_workflow::<runner::Workflow>()
        .graceful_not_found()
        .send()
        .await?;
}

Questions:

  • How long does this backwards compatibility need to be maintained?
  • Should there be a deprecation warning?
  • Is there a migration path to upgrade old runner workflows?

engine/packages/pegboard/src/lib.rs:17

registry.register_workflow::<runner2::Workflow>()?;

Issue: A new runner2 workflow is registered but there's no explanation in the PR of what changed between runner and runner2.

Recommendation: Add a module-level comment in the runner2 module explaining the differences and migration path.


⚡ Performance Considerations

Positive:

  • Loop event upsertion is properly parallelized (ctx/workflow.rs:965-978)
  • Signal sorting happens once before processing (db/kv/mod.rs:1977)

Potential Issue:

  • The signal batch listen fetches ALL signals from each stream up to limit, sorts them, then takes limit. If there are many signals, this could fetch and sort more than necessary.

Optimization: Consider using a priority queue approach to merge pre-sorted streams:

// Instead of: fetch all -> sort all -> take limit
// Do: merge K sorted streams -> take limit

This would be O(Klimit) instead of O(KN) where N could be much larger than limit.


📋 Documentation Recommendations

  1. Add rustdoc comments to graceful_not_found() explaining when to use it
  2. Document the Result<Option<Id>> return type on send() with examples
  3. Add inline comments explaining the backwards compatibility code with runner2
  4. Document the loop parallelization pattern for future maintainers

🔍 Questions for Author

  1. What was the original bug that prompted the EventType::Signals fix in history.rs:1645?
  2. Are there any known edge cases with the signal timestamp sorting?
  3. What's the migration timeline for runner -> runner2?
  4. Should the indexed_input_chunks insertion be an error case instead of silent insertion?

✨ Minor Nits

  1. engine/packages/universaldb/.../transaction_conflict_tracker.rs:67: Changed log level from info\! to debug\! - good change, but consider if this will make debugging conflicts harder in production
  2. Multiple files use if res.is_none() pattern - consider using if let None = res or matches\!(res, None) for consistency with Rust idioms
  3. Comment at line ctx/workflow.rs:992 has good explanation but could be formatted better

🎖️ Overall Assessment

Grade: B+

This is solid systems programming work with good attention to replay semantics and performance optimization. The main concerns are:

  1. Potential race condition in signal ordering (should be fixed)
  2. Complex return type Result<Option<Id>> (consider refactoring)
  3. Missing test coverage for critical replay paths (should be added)
  4. Unclear backwards compatibility story (needs documentation)

The code quality is generally high and follows the project conventions well. With the suggested fixes, this would be an A-level PR.


🚀 Recommendation

Approve with minor changes requested

Required before merge:

  • Fix signal ordering to be deterministic (add signal_id as secondary sort key)
  • Add test for graceful not found replay

Nice to have:

  • Document runner -> runner2 migration
  • Extract loop parallelization into helper function
  • Add rustdoc examples for graceful_not_found()

@MasterPtato MasterPtato force-pushed the 11-17-fix_gas_fix_batch_listen_fix_history_for_graceful_signal_send_in_workflows branch from d01b6fc to 5a41109 Compare November 27, 2025 00:58
@MasterPtato MasterPtato force-pushed the 11-17-feat_gas_add_batch_listen branch from 41442d3 to 3f3140e Compare November 27, 2025 00:58
@graphite-app
Copy link
Contributor

graphite-app bot commented Dec 3, 2025

Merge activity

  • Dec 3, 12:33 AM UTC: NathanFlurry added this pull request to the Graphite merge queue.
  • Dec 3, 12:34 AM UTC: CI is running for this pull request on a draft pull request (#3577) due to your merge queue CI optimization settings.
  • Dec 3, 12:34 AM UTC: Merged by the Graphite merge queue via draft PR: #3577.

graphite-app bot pushed a commit that referenced this pull request Dec 3, 2025
@graphite-app graphite-app bot closed this Dec 3, 2025
@graphite-app graphite-app bot deleted the 11-17-fix_gas_fix_batch_listen_fix_history_for_graceful_signal_send_in_workflows branch December 3, 2025 00:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants