Skip to content

Conversation

@MasterPtato
Copy link
Contributor

No description provided.

@vercel
Copy link

vercel bot commented Nov 18, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivetkit-serverless Error Error Dec 2, 2025 7:11pm
3 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
rivet-cloud Ignored Ignored Dec 2, 2025 7:11pm
rivet-inspector Ignored Ignored Preview Dec 2, 2025 7:11pm
rivet-site Ignored Ignored Preview Dec 2, 2025 7:11pm

Copy link
Contributor Author

MasterPtato commented Nov 18, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Nov 18, 2025

More templates

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@3483

@rivetkit/db

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/db@3483

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@3483

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@3483

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@3483

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@3483

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@3483

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@3483

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@3483

commit: fa56065

@claude
Copy link

claude bot commented Nov 18, 2025

Pull Request Review

Overview

This PR simplifies the runner workflow by removing the command/event handling logic from the runner workflow and moving event forwarding directly to actor workflows. This is a significant refactoring that reduces ~500 lines of code by eliminating intermediate state management.

Key Changes

  1. Event Forwarding Architecture Change

    • Before: Events were sent to the runner workflow, stored in state, then forwarded to actor workflows
    • After: Events are forwarded directly from ws_to_tunnel_task.rs to actor workflows using the actor_id tag
    • This eliminates the need for the runner workflow to track last_event_idx, last_event_ack_idx, and event storage
  2. Removed State Management (runner2.rs:29-40)

    • Removed last_event_idx, last_command_idx, and commands vector from State
    • Simplified LifecycleState by removing event tracking fields
    • This reduces memory overhead and workflow complexity
  3. Removed Activities

    • InsertDb: Database initialization logic removed (~160 lines)
    • ProcessInit: Init packet processing removed (~80 lines)
    • InsertEvents: Event storage removed (~25 lines)
    • InsertCommands: Command storage and batching removed (~35 lines)
  4. Signal Changes

    • Removed Command signal entirely
    • Kept Forward, CheckQueue, and Stop signals
    • Forward now only handles ToServerStopping and rejects other message types

Code Quality & Best Practices

Good:

  • Follows error handling conventions with anyhow and custom RivetError types
  • Proper use of structured logging (tracing::warn!(?actor_id, ...))
  • Consistent naming conventions (snake_case, past tense for timestamps)
  • Good use of workspace dependencies
  • Clear separation of concerns by moving event handling closer to where it's needed

⚠️ Concerns:

  1. Missing Actor ID in Event Forwarding (ws_to_tunnel_task.rs:341-356)

    protocol::ToServer::ToServerEvents(events) => {
        let res = ctx.signal(pegboard::workflows::runner2::Forward {
            inner: protocol::ToServer::try_from(msg)
                .context("failed to convert message for workflow forwarding")?,
        })
        .tag("actor_id", actor_id)  // ❌ ERROR: 'actor_id' not defined

    Issue: The variable actor_id is not defined in this scope. You need to extract the actor_id from the events first.

    Fix: Extract actor_id from the events before using it:

    protocol::ToServer::ToServerEvents(events) => {
        for event in &events.events {
            let actor_id = crate::utils::event_actor_id(&event.inner).to_string();
            let res = ctx.signal(pegboard::workflows::actor::Event {
                inner: event.inner.clone(),
            })
            .tag("actor_id", &actor_id)
            .graceful_not_found()
            .send()
            .await?;
            if res.is_none() {
                tracing::warn!(?actor_id, "failed to send signal to actor workflow, likely already stopped");
            }
        }
    }
  2. TODO Comment Without Implementation Plan (runner2.rs:79)

    // TODO: Ack events

    Question: How will events be acknowledged now that the runner workflow no longer tracks them? This could lead to unbounded event queues or lost events if not handled properly.

  3. Potential Lost Init Handling (runner2.rs:98-108)
    The workflow now rejects ToServerInit messages, but there's no clear indication of where init logic has moved to. The init activity only writes to RunnerByKeyKey, which is much simpler than before.

    Question: Was the metadata and prepopulated actor names handling removed intentionally, or does it need to be reimplemented elsewhere?

  4. Variable Naming Improvement (conn.rs:79,89)
    ✅ Good change renaming packet to init_packet for clarity

Potential Bugs

CRITICAL: Compilation Error

  • Line 346 in ws_to_tunnel_task.rs: actor_id is undefined

⚠️ Possible Logic Issues:

  1. Event Acknowledgment: Without event tracking in the runner workflow, how are events acknowledged back to the client? This could cause the client to resend events unnecessarily.

  2. Command Flow: The Command signal was removed entirely. Need to verify that actor start commands are still being sent through an alternative path.

  3. Race Condition Handling: The old code had logic to prevent scheduling actors to draining runners (removed code around line 110-140 of the diff). Verify this is now handled elsewhere.

Performance Considerations

Improvements:

  • Reduced workflow state size (no more storing commands/events in memory)
  • Eliminated intermediate event storage and forwarding overhead
  • Simpler signal processing loop should have better throughput

⚠️ Potential Concerns:

  • Events are now forwarded individually rather than batched - could increase signal overhead if many events arrive simultaneously
  • Consider implementing batching in the new architecture if event volume is high

Security Concerns

✅ No new security issues introduced

  • Existing validation logic for actor ownership is preserved in KV request handling
  • Error messages don't leak sensitive information

Test Coverage

⚠️ Recommendations:

  1. Add integration tests for the new event forwarding path to ensure events reach actor workflows correctly
  2. Test the timeout/expiry behavior to ensure actors are properly marked as lost when runners disconnect
  3. Verify the draining behavior works correctly without the intermediate command handling
  4. Test the init flow to ensure runners can still connect and initialize properly

Additional Questions

  1. Was the InsertDb activity's functionality moved elsewhere, or is runner initialization now handled differently?
  2. How are commands being sent to runners now that the Command signal and command batching logic are removed?
  3. What's the plan for event acknowledgment mentioned in the TODO comment?

Summary

This is a valuable simplification that removes significant complexity from the runner workflow. However, there's a critical compilation error that needs to be fixed before this can be merged. Additionally, the missing event acknowledgment logic needs to be clarified or implemented.

Recommendation: Request changes to fix the compilation error and clarify the event acknowledgment and command flow architecture.


Review generated following CLAUDE.md conventions for error handling, logging patterns, and code style.

@claude
Copy link

claude bot commented Nov 19, 2025

Code Review

Overview

This PR significantly simplifies the runner workflow by removing event and command tracking logic. The changes reduce the workflow state and delegate more responsibility to actor workflows.

Key Changes Analysis

1. Removed Event/Command State Management

The PR removes:

  • Event tracking and acknowledgment logic (last_event_idx, last_event_ack_idx)
  • Command buffering and indexing (last_command_idx, commands vector)
  • Associated activities: ProcessInit, InsertEvents, InsertCommands, InsertDb

Positive: This is a good simplification that reduces state size and complexity.

2. Event Forwarding Moved to WebSocket Handler

In ws_to_tunnel_task.rs:347-362:

protocol::ToServer::ToServerEvents(events) => {
    let res = ctx.signal(pegboard::workflows::runner2::Forward {
        inner: protocol::ToServer::try_from(msg)
            .context("failed to convert message for workflow forwarding")?,
    })
    .tag("actor_id", actor_id)
    .graceful_not_found()
    // ...
}

Issue Found: This code references actor_id on lines 352 and 357, but actor_id is not defined in scope. This will cause a compilation error.

Looking at the protocol, ToServerEvents likely contains a list of events, and you need to extract the actor_id from each event. The original code had crate::utils::event_actor_id(&event.inner).

Recommendation: You need to iterate over the events and extract actor_id properly:

protocol::ToServer::ToServerEvents(events) => {
    for event in &events.events {
        let actor_id = crate::utils::event_actor_id(&event.inner);
        let res = ctx.signal(pegboard::workflows::actor::Event {
            inner: event.inner.clone(),
        })
        .to_workflow::<pegboard::workflows::actor::Workflow>()
        .tag("actor_id", &actor_id.to_string())
        .graceful_not_found()
        .send()
        .await
        .with_context(|| {
            format!("failed to forward signal to actor workflow: {}", actor_id)
        })?;
        if res.is_none() {
            tracing::warn!(?actor_id, "failed to send signal to actor workflow, likely already stopped");
        }
    }
}

3. Removed Init Message Handling 🔍

The runner workflow no longer processes ToServerInit packets, which previously:

  • Populated actor names
  • Stored runner metadata
  • Inserted runner into database indexes
  • Sent ToClientInit response with configuration

Concerns:

  1. Where does database insertion happen now? The InsertDb activity that wrote critical runner state (namespace_id, name, key, version, slots, indexes) has been removed. The Init activity only writes to RunnerByKeyKey.
  2. Runner allocation index: The removed code populated RunnerAllocIdxKey which is used by AllocatePendingActors. Without this, new runners may not be discoverable for actor allocation.
  3. Missing client response: Runners expect a ToClientInit response. Where is this sent now?

Recommendation: Verify that runner initialization is complete elsewhere, or this may break runner connectivity and actor allocation.

4. Simplified Runner Workflow State

LifecycleState now only tracks:

struct LifecycleState {
    draining: bool,
}

Positive: Much cleaner state management.

5. Variable Naming Improvement

In conn.rs:67-77: Renamed packet to init_packet for clarity. Good improvement.

6. Removed Signal Type

Removed Command signal variant since commands are no longer buffered by the runner workflow.

Potential Issues

  1. 🔴 CRITICAL: Compilation Error

    • Line 352 & 357 in ws_to_tunnel_task.rs: actor_id is undefined
    • This will fail to compile
  2. 🟡 WARNING: Missing Runner Initialization

    • Database insertion logic removed but unclear where it is replaced
    • Runner allocation index population missing
    • ToClientInit response not sent
  3. 🟡 WARNING: Event Acknowledgment

    • Line 79 has // TODO: Ack events comment
    • Without acking, runners may resend events indefinitely
  4. 🟢 INFO: Event Batch Size Constant

    • EVENT_ACK_BATCH_SIZE constant on line 16 is now unused (can be removed)

Testing Recommendations

  1. Compile test: Verify the code compiles (currently blocked by undefined actor_id)
  2. Integration test: Test full runner connection flow:
    • Runner connects and sends ToServerInit
    • Verify runner appears in allocation indexes
    • Verify actors can be allocated to the runner
  3. Event forwarding test: Verify actor events are correctly forwarded to actor workflows
  4. Graceful degradation: Test what happens when actor workflow does not exist

Security Considerations

No security issues identified. The changes maintain proper actor ownership validation in KV operations.

Performance Considerations

Positive: Removing state buffering reduces memory usage and workflow replay time. The simplified state should improve workflow performance.

Code Quality

  • Follow CLAUDE.md conventions ✅
  • Proper error handling with anyhow ✅
  • Structured logging with tracing ✅
  • Lowercase log messages ✅

Summary

This is a good architectural simplification, but has critical compilation errors that must be fixed before merging. The undefined actor_id variable needs to be resolved, and the initialization flow needs verification to ensure runners are properly registered in the database.

Recommendation: Fix the compilation error, verify the init flow is complete, and add tests before merging.

@claude
Copy link

claude bot commented Nov 19, 2025

Critical Bug Found: The variable actor_id is referenced on line 352 of ws_to_tunnel_task.rs but is never defined. This will cause a compilation error in the ToServerEvents match arm. The events need to be iterated individually and each event actor_id extracted to forward to the correct actor workflow.

@claude
Copy link

claude bot commented Nov 19, 2025

PR Review Summary - Part 1:

This PR significantly simplifies the runner workflow by removing ~500 lines of code. Major changes include:

MUST FIX:

  1. Compilation error with undefined actor_id variable in ws_to_tunnel_task.rs:352
  2. Remove unused EVENT_ACK_BATCH_SIZE constant (line 15-16 of runner2.rs)
  3. Remove unused CommandRow struct (line 43-48 of runner2.rs)

CODE QUALITY ISSUES:

  • Missing event acknowledgment implementation (TODO at line 79)
  • Events not being acknowledged to runners could cause message buildup
  • Loss of backpressure mechanisms

ARCHITECTURAL CONCERNS:

  • Removed tracking of event/command indices (last_event_idx, last_event_ack_idx, last_command_idx)
  • Removed storage of commands and events in workflow state
  • Removed InsertDb, ProcessInit, InsertEvents, InsertCommands activities (300+ lines)
  • Removed Command signal from Main join_signal

Questions to address:

  • Where is event/command state now tracked?
  • How do runners recover from disconnections?
  • How is ordering guaranteed?
  • Is this a breaking change?

@claude
Copy link

claude bot commented Nov 19, 2025

PR Review Summary - Part 2:

PERFORMANCE CONSIDERATIONS:

Positive:

  • Significantly reduced workflow state size
  • Fewer database transactions per message
  • Better separation of concerns (events route directly to actors)

Concerns:

  • Event forwarding will need to iterate events individually
  • Missing batch acknowledgment logic

TEST COVERAGE:
No test changes included. For a refactoring this large, should add:

  1. Integration tests for new event routing path
  2. Runner reconnection scenario tests
  3. Command delivery verification
  4. Concurrent event handling tests

POSITIVE CHANGES:

  • Good naming improvement: packet renamed to init_packet (conn.rs:67)
  • Cleaner separation between runner and actor workflows
  • Simplified workflow logic

Overall: This is a valuable simplification but needs the compilation fix and testing before merge. The architectural changes need documentation to help understand the new event flow and migration path for existing runners.

@MasterPtato MasterPtato force-pushed the 11-18-fix_pb_simplify_runner_wf branch from fefb6f0 to c088d42 Compare November 20, 2025 03:07
@MasterPtato MasterPtato force-pushed the 11-18-fix_gas_fix_loop_forgotten_bug_due_to_concurrency branch from 4294775 to 5458e50 Compare November 20, 2025 03:07
@vercel vercel bot temporarily deployed to Preview – rivetkit-serverless November 20, 2025 03:07 Inactive
@claude
Copy link

claude bot commented Nov 20, 2025

Code Review

I've reviewed this PR which simplifies the runner workflow. Here's my detailed feedback:

🎯 Overall Assessment

This is a substantial refactoring that introduces a new protocol version (v4/mk2) and simplifies the runner workflow architecture. The changes are generally well-structured, but there are several areas that need attention.


Strengths

  1. Good Protocol Versioning Strategy: The separation between mk1 and mk2 protocols using is_mk2() helper is clean and allows for graceful migration.

  2. Event Batching: The new ActorEventDemuxer properly batches events (up to 1024) which should improve performance - good optimization.

  3. Proper Shutdown Handling: The shutdown() method on ActorEventDemuxer properly awaits all spawned tasks, preventing potential data loss.


🔴 Critical Issues

1. Compilation Errors in tunnel_to_ws_task.rs

The refactored code has critical issues that will prevent compilation:

Lines ~45-65: The recv_msg() and handle_message_mk2() functions reference undefined variables:

  • tunnel_sub, eviction_sub, tunnel_to_ws_abort_rx are not in scope
  • ctx, conn, init_tx, event_demuxer are not in scope in the refactored functions
  • Function signatures don't match their usage

The functions appear to have been extracted but weren't properly parameterized. Example:

async fn recv_msg() -> Result<std::result::Result<Option<()>, LifecycleResult>> {
    let tunnel_msg = tokio::select! {
        res = tunnel_sub.next() => { // ❌ tunnel_sub not in scope

Fix: Add proper parameters to these functions or inline them back into the main task loop.

2. Missing Signal Type in conn.rs:149

ctx.signal(Init);  // ❌ What is Init?

This signal type doesn't appear to be defined anywhere in the visible code. This will fail to compile.

3. Undefined Helper Function in lib.rs

Line 6 references protocol::is_new() but the protocol module only exports is_mk2(). This function doesn't exist:

let workflow_id = if protocol::is_new(protocol_version) {  // ❌ Undefined function

Should probably be protocol::is_mk2().

4. Incorrect Return Type in tunnel_to_ws_task.rs

async fn recv_msg() -> Result<std::result::Result<Option<()>, LifecycleResult>> {
    // ...
    Ok(Some(data))  // ❌ Returns data but signature says Option<()>
}

The function signature says it returns Option<()> but it's actually returning data (a Vec<u8>).


⚠️ High Priority Issues

5. Potential Memory Leak in ActorEventDemuxer

File: actor_event_demuxer.rs:36-67

The last_seen timestamp is set when channels are created but never updated when events are ingested. This means:

  • Active channels will be garbage collected after 30 seconds even if they're receiving events
  • The TODO comment on line 77 about aborting being safe suggests uncertainty

Fix:

pub fn ingest(&mut self, actor_id: Id, event: protocol::Event) {
    if let Some(channel) = self.channels.get_mut(&actor_id) {
        channel.last_seen = Instant::now();  // ← Add this
        let _ = channel.tx.send(event);

6. Silent Error Handling

File: actor_event_demuxer.rs:38

let _ = channel.tx.send(event);  // ❌ Silently ignores send errors

If the channel is full or closed, events are silently dropped. This could lead to hard-to-debug issues.

Recommendation: At minimum, log when sends fail:

if let Err(err) = channel.tx.send(event) {
    tracing::warn!(?actor_id, ?err, "failed to send event to actor channel");
}

7. Missing Protocol Version in runner2.rs InsertDbInput

File: runner2.rs:289-299

The InsertDbInput struct includes protocol_version field (line 295), but when this struct is instantiated in the workflow (lines 74-82), the protocol_version field is not provided. This will cause a compilation error.

Fix: Add protocol_version: input.protocol_version to the struct initialization.

8. Incorrect Variable Reference in ping_task.rs

File: ping_task.rs:38

let wf = if protocol::is_mk2(conn.protocol_version) {  // ✓ Correct
    // ...
} else {
    // ...
};

// Later on line 75:
if protocol::is_mk2(conn.protocol_version) {  // ❌ Should be consistent

The check uses conn.protocol_version but the function definition suggests it should use the workflow's stored protocol version. Verify this is intentional.


📋 Code Quality Issues

9. Inconsistent Error Handling Pattern

The error handling varies between graceful degradation and hard failures without clear rationale:

  • actor_event_demuxer.rs:119: Logs warning but continues
  • tunnel_to_ws_task.rs: Returns error immediately on parse failure (after fix)

Recommendation: Document the error handling strategy - when to fail fast vs. gracefully degrade.

10. Missing Documentation

The new ActorEventDemuxer is a core component but lacks module-level documentation explaining:

  • Why batching is beneficial
  • The GC strategy and its implications
  • Thread safety considerations

🔍 Potential Bugs

11. Race Condition in Initialization

File: ws_to_tunnel_task.rs:46-55 and tunnel_to_ws_task.rs:23-32

For mk2 protocol, tunnel_to_ws task waits for init_rx signal before processing, while ws_to_tunnel sends it. However:

  • What happens if the init signal is sent before the tunnel_to_ws task starts waiting?
  • watch::channel(()) doesn't buffer, so early sends might be lost

Recommendation: Verify the initialization sequence is deterministic or use a different synchronization primitive.

12. Incomplete Migration Path

The code supports both mk1 (v3) and mk2 (v4) protocols but:

  • No migration strategy documented
  • No metrics/logging to track which protocol versions are in use
  • No deprecation timeline for mk1

Recommendation: Add telemetry to track protocol version usage.


🧪 Testing Concerns

  • Missing Test Coverage: This is a major refactor with no apparent test additions
  • Protocol Compatibility: No evidence of tests validating mk1/mk2 compatibility
  • Error Paths: Edge cases like GC during active event processing aren't tested

Recommendation: Add integration tests covering:

  1. Protocol version negotiation
  2. ActorEventDemuxer GC behavior
  3. Graceful shutdown sequences

📝 Minor Issues

  1. Typo in Function Name: is_mk2 could be more descriptive as is_protocol_v4_or_later
  2. Magic Numbers: EVENT_ACK_BATCH_SIZE = 500 and batch size 1024 in ActorEventDemuxer should be constants with explanatory comments
  3. Inconsistent Naming: mk2 vs v4 - pick one naming convention

🎬 Action Items

Before merging:

  1. ✅ Fix all compilation errors (issues [SVC-2555] Set up issue templates #1-4)
  2. ✅ Fix the memory leak in ActorEventDemuxer ([SVC-2358] Enable Redis memory overcommit #5)
  3. ✅ Add protocol_version to InsertDbInput instantiation ([SVC-2404] Improve internal documentation #7)
  4. ⚠️ Improve error handling for silent failures (Remove servers not in salt #6)
  5. ⚠️ Add tests for new functionality
  6. ⚠️ Verify initialization race condition ([SOC-15] Fix error when running user-delete #11)

Future improvements:

  • Add documentation for ActorEventDemuxer
  • Add protocol version metrics
  • Create migration guide for mk1 → mk2

📊 Overall Recommendation

Do not merge yet - there are critical compilation errors that need to be fixed first. Once the compilation issues are resolved and the memory leak is addressed, this refactor looks promising. The architecture simplification is a good direction, but the implementation needs polish.

Please address the critical issues and consider adding tests before merging.

@claude
Copy link

claude bot commented Nov 20, 2025

PR Review: fix(pb): simplify runner wf

I've reviewed this PR which introduces a new protocol version (v4/mk2) for the pegboard runner workflow and adds an event demuxer for actor events.

Critical Issues

1. Compilation Errors in tunnel_to_ws_task.rs

  • Line 21: conn.protocol should be conn.protocol_version
  • Lines 36-44, 62-76: recv_msg() and handle_message_mk2() are called without required parameters
  • Line 51: Function signature declares no parameters but references tunnel_sub, eviction_sub, and tunnel_to_ws_abort_rx
  • Line 79: Function signature declares no parameters but uses tunnel_msg, ctx, and conn

These helper functions need to either accept the necessary parameters in their signatures OR be moved inside the main function as closures.

2. Compilation Errors in ws_to_tunnel_task.rs
Similar scoping issues with recv_msg() and handle_message_mk2() not receiving proper parameters.

3. Missing Signal Handling in conn.rs
Line 177 uses ctx.signal(Init) but Init is not defined, imported, or given a workflow target.

High Priority Issues

4. Potential Memory Leak in ActorEventDemuxer
The last_seen field is never updated after initial creation (line 64), so GC will incorrectly remove channels after 30 seconds. Use get_mut to update timestamp on each ingest.

5. Error Handling Silently Drops Send Failures
Line 38 in actor_event_demuxer.rs silently drops events if send fails. Should log warnings.

6. Protocol Version Checking Inconsistencies
conn.rs:150 uses protocol::is_new() which is not defined. Should be is_mk2?

Medium Priority Issues

7. Missing Protocol Version Field
InsertDbInput struct defines protocol_version field but never sets it when created (lines 74-82 of runner2.rs).

8. Incorrect Message Type Usage
Line 185 of runner2.rs uses protocol::ToRunner::ToRunnerClose but for mk2 should be protocol::mk2::ToRunner::ToRunnerClose.

Code Quality

Good:

  • Event batching up to 1024 events is excellent
  • Graceful shutdown waits for in-flight events
  • Proper structured logging

Suggestions:

  • Add metrics for demuxer channel count
  • Add tests for protocol version branching

Summary

Cannot merge as-is due to compilation errors. Priority fixes:

  1. Fix function signatures in tunnel_to_ws_task.rs and ws_to_tunnel_task.rs
  2. Fix or define Init signal in conn.rs
  3. Update last_seen timestamp in demuxer
  4. Add missing protocol_version field to InsertDbInput
  5. Define or remove protocol::is_new()

Once addressed, this is a solid simplification!

@MasterPtato MasterPtato force-pushed the 11-18-fix_pb_simplify_runner_wf branch from c088d42 to 657b813 Compare November 21, 2025 19:57
@MasterPtato MasterPtato force-pushed the 11-18-fix_gas_fix_loop_forgotten_bug_due_to_concurrency branch from 5458e50 to 8431b53 Compare November 21, 2025 19:57
@claude
Copy link

claude bot commented Nov 26, 2025

Code Review: PR #3483 - fix(pb): simplify runner wf

I've reviewed this PR which simplifies the runner workflow and introduces protocol version 4. Here are my findings:

Critical Issues 🔴

1. Compilation Errors in tunnel_to_ws_task.rs and ws_to_tunnel_task.rs

Both refactored task files have multiple critical compilation errors:

  • tunnel_to_ws_task.rs:22: References undefined conn.protocol instead of conn.protocol_version
  • tunnel_to_ws_task.rs:37-38,44-45: recv_msg() is called without required parameters (needs tunnel_sub, eviction_sub, tunnel_to_ws_abort_rx)
  • tunnel_to_ws_task.rs:52-78: Function signature doesn't include the variables it references (tunnel_sub, eviction_sub, tunnel_to_ws_abort_rx)
  • tunnel_to_ws_task.rs:80-146: handle_message_mk2() and handle_message_mk1() both reference undefined variables (tunnel_msg, ctx, conn) that aren't in their function signatures
  • ws_to_tunnel_task.rs:62-76,79-109: Similar issues - recv_msg() and handle_message_mk2() reference undefined variables

These errors suggest the refactoring was incomplete. The functions need proper parameter passing.

2. ActorEventDemuxer: Potential Memory Leak

In actor_event_demuxer.rs:37-38:

if let Some(channel) = self.channels.get(&actor_id) {
    let _ = channel.tx.send(event);

The last_seen timestamp is never updated when events are ingested for existing channels. This means:

  • Channels receiving events regularly will still be garbage collected after 30 seconds
  • This defeats the purpose of the GC mechanism
  • Could cause active actor event channels to be aborted unexpectedly

Fix: Update last_seen when sending events:

if let Some(channel) = self.channels.get_mut(&actor_id) {
    channel.last_seen = Instant::now();
    let _ = channel.tx.send(event);

3. Silently Dropped Events

In actor_event_demuxer.rs:38 and throughout the new code, errors from channel.tx.send(event) are silently ignored with let _ =. If the channel is full or closed, events are dropped without logging.

Recommendation: Log failed sends at minimum:

if channel.tx.send(event).is_err() {
    tracing::warn!(?actor_id, "failed to send event to actor channel");
}

4. Protocol Version Helper Function Inconsistency

In lib.rs:14-16:

pub fn is_mk2(protocol_version: u16) -> bool {
    protocol_version > PROTOCOL_MK1_VERSION
}

But in conn.rs:129 and ping_task.rs:39, there's also a reference to protocol::is_new() which isn't defined in the diff. This inconsistency could cause issues.

High Priority Issues 🟠

5. Missing Signal Type in conn.rs

conn.rs:147: ctx.signal(Init) - The Init signal type is not imported or defined anywhere visible in the diff.

6. Unsafe Task Abort with TODO

actor_event_demuxer.rs:76-78:

// TODO: Verify aborting is safe here
channel.handle.abort();

Aborting tasks can leave actors in inconsistent states. This needs verification before merge, or implement graceful shutdown instead.

7. Incorrect Logging Convention

actor_event_demuxer.rs:89: Uses channels=?self.channels.len() which logs with debug formatting. According to CLAUDE.md, structured logging should use channels = self.channels.len() (no ? prefix for simple values).

Medium Priority Issues 🟡

8. Missing Error Context in Protocol Detection

In conn.rs:129-149, the code branches on protocol::is_new(protocol_version) but doesn't handle the case where protocol version is invalid/unsupported. Consider adding validation.

9. Unbounded Channel Risk

actor_event_demuxer.rs:40: Uses mpsc::unbounded_channel(). If event production outpaces consumption, this could lead to unbounded memory growth per actor.

10. Inconsistent Import Patterns

ws_to_tunnel_task.rs:1: Violates CLAUDE.md guideline - imports should be at the top of the file, not inline. The code uses:

use anyhow::Context;

Following CLAUDE.md: "Always add imports at the top of the file inside of inline within the function."

Though the current import is at the top, ensure new code doesn't add inline imports.

11. Potential Integer Overflow

ping_task.rs:25: rtt = now.saturating_sub(pong.ts) then cast to u32. If RTT exceeds u32::MAX, it's set to u32::MAX. Consider logging this edge case.

Positive Aspects ✅

  1. Good separation of concerns - The ActorEventDemuxer provides clean event routing
  2. Batching optimization - Event batching (1024 at a time) should improve performance
  3. Proper cleanup - The demuxer shutdown logic properly awaits all tasks
  4. Protocol versioning - Good backward compatibility structure with separate Mk1/Mk2 types
  5. Graceful workflow not found handling - Uses .graceful_not_found() pattern correctly

Security Considerations 🔒

No major security issues identified. The actor validation in KV requests (checking actor_belongs) is good.

Testing Recommendations 🧪

  1. Add tests for ActorEventDemuxer GC behavior
  2. Test protocol version switching between mk1 and mk2
  3. Test event batching under high load
  4. Test graceful shutdown with in-flight events
  5. Add integration tests for the new runner2 workflow

Summary

This PR cannot be merged in its current state due to critical compilation errors. The refactoring needs to be completed to properly pass parameters to the extracted functions. The ActorEventDemuxer also needs the last_seen update fix to function correctly.

Blocking issues before merge:

  • Fix all compilation errors in tunnel_to_ws_task.rs and ws_to_tunnel_task.rs
  • Fix ActorEventDemuxer last_seen not being updated
  • Define/import the Init signal type
  • Resolve is_new vs is_mk2 naming inconsistency

Would be happy to review again once these are addressed!

@claude
Copy link

claude bot commented Nov 26, 2025

Pull Request Review

I've reviewed this PR and found several critical issues that prevent compilation:

Critical Compilation Errors

tunnel_to_ws_task.rs:

  • Lines 52-78: recv_msg() missing parameters (tunnel_sub, eviction_sub, tunnel_to_ws_abort_rx) that are used in body
  • Lines 80-164: handle_message_mk2() missing ctx, conn, tunnel_msg, data parameters
  • Line 166-252: handle_message_mk1() has same parameter issues
  • Line 201: Invalid continue statement outside loop
  • Line 22: Wrong field conn.protocol should be conn.protocol_version

ws_to_tunnel_task.rs:

  • Lines 79-117: recv_msg() missing parameters
  • Lines 119-464: handle_message_mk2() missing data parameter, wrong msg type
  • Lines 396-439: Contains misplaced workflow code with undefined variables (input, last_command_idx, etc)
  • Lines 415, 428: Uses res_msg variable but should be msg
  • Lines 467-759: handle_message_mk1() missing parameters
  • Lines 378, 738, 771: Calls to non-existent handle_tunnel_message function
  • Line 761-768: Empty ack_commands() stub
  • Line 60: Wrong field conn.protocol_version

actor_event_demuxer.rs:

  • Lines 36-67: First event lost when creating new channel - only sends in Some branch
  • Lines 73-83: GC broken - last_seen never updated so all channels removed after 30s
  • Line 38: Silently drops events on send failure
  • Line 77-78: TODO suggests uncertainty about task abort safety

Other Issues

  • Significant code duplication in KV handling between mk1 and mk2
  • Multiple TODO comments about exposing errors to clients (security concern)
  • No tests mentioned despite protocol version complexity
  • Empty PR description

Recommendations

  1. Fix all compilation errors - code will not build
  2. Fix event demuxer bugs (lost events, broken GC)
  3. Remove misplaced workflow code
  4. Extract duplicate KV code
  5. Add error logging
  6. Add tests for both protocol versions
  7. Make last_seen updatable (use Mutex or refactor)

This needs significant work before merging. The errors suggest it wasn't tested after refactoring.

@claude
Copy link

claude bot commented Nov 27, 2025

PR Review: fix(pb): simplify runner wf

Overview

This PR simplifies the runner workflow by introducing a new protocol version (mk2) and refactoring the WebSocket message handling architecture. The changes are significant and introduce several architectural improvements.

Code Quality & Best Practices

✅ Strengths

  1. Protocol Versioning: Good separation between mk1 and mk2 protocol versions, allowing backward compatibility
  2. Event Demultiplexing: The new ActorEventDemuxer provides batching and parallelization for actor events
  3. Shutdown Handling: Proper cleanup with event_demuxer.shutdown().await ensures in-flight events complete
  4. Structured Logging: Good use of tracing with structured fields (?actor_id, ?err)

⚠️ Issues Found

Critical: Compilation Errors in ws_to_tunnel_task.rs

The file has several critical issues that will prevent compilation:

  1. Line 62-63, 70-71: Functions handle_message_mk2 and handle_message_mk1 are called with incorrect signatures

    handle_message_mk2(msg).await?  // Missing required parameters

    These functions expect ctx, conn, init_tx, event_demuxer, and msg but are called with only msg.

  2. Line 79: recv_msg() function signature declares it returns Result<std::result::Result<Option<()>, LifecycleResult>> but the function body operates on undefined variables ws_rx, eviction_sub2, tunnel_to_ws_abort_rx that should be parameters.

  3. Line 106: Returns Ok(Some(data)) but data is undefined - should be the binary message content.

  4. Line 125: Parameter msg: () should be data: Vec<u8> or similar.

  5. Lines 128, 137, 151, etc: Variable data is undefined throughout handle_message_mk2.

  6. Lines 378-381, 384-394, 397-439: References to undefined variables like input, last_command_idx, prepopulate_actor_names, metadata, runner_lost_threshold, init_data suggest incomplete refactoring.

  7. Line 447: Call to ack_commands(&ctx).await? but ack_commands function body is empty (lines 761-768).

  8. Lines 467, 738, 771, 830: Functions handle_tunnel_message called with inconsistent parameters.

  9. Line 748: protocol::ToServer::try_from(msg) - unclear if this conversion is defined.

Logic Issues in tunnel_to_ws_task.rs

The file appears incomplete with placeholder code:

  1. Lines 41-49: recv_msg() and handle_message_mk2() functions are called but not properly defined in this files context. Missing the actual message receiving and handling logic.

  2. Lines 104-114: Function signatures do not match - similar issue to ws_to_tunnel_task.rs.

  3. Line 136: continue; statement outside of handle_message_mk1 - likely a refactoring error.

Bug in actor_event_demuxer.rs

Line 37: last_seen field is never updated after initial channel creation

if let Some(channel) = self.channels.get(&actor_id) {
    let _ = channel.tx.send(event);
    // BUG: Should update channel.last_seen here
}

This means channels will be garbage collected after MAX_LAST_SEEN even if actively receiving events. Should use:

if let Some(channel) = self.channels.get_mut(&actor_id) {
    let _ = channel.tx.send(event);
    channel.last_seen = Instant::now();  // Update timestamp
}

Code Style Issues

  1. Missing imports: Many files reference types without visible imports in the diff (e.g., Init signal in conn.rs:136).

  2. Inconsistent error handling: Line 38 in actor_event_demuxer.rs silently ignores send errors with let _ =. Should at least log failed sends.

  3. TODO comments: Several TODOs added that should be tracked:

    • actor_event_demuxer.rs:77: "Verify aborting is safe here"
    • ws_to_tunnel_task.rs:214: "Add queue and bg thread for processing kv ops"
    • ws_to_tunnel_task.rs:235, 279, 306, 333, 359: "Dont return actual error?"
    • runner2.rs:131: "Periodically ack events"

Performance Considerations

✅ Good

  1. Batching: Event demuxer uses recv_many(&mut buffer, 1024) for batch processing
  2. Parallel tasks: Separate spawn for each actors event processing prevents blocking
  3. Unbounded channels: Appropriate for event ingestion (though consider bounds for backpressure)

⚠️ Concerns

  1. GC on every ingest: Lines 69-83 in actor_event_demuxer.rs run GC check on every ingest() call. Consider using a dedicated GC task or less frequent checks.

  2. Synchronous KV operations: All KV operations in handle_message_mk2 are awaited inline. The TODO at line 214 correctly identifies this should be queued.

  3. Database transaction sizes: allocate_pending_actors activity scans potentially large ranges with StreamingMode::Iterator which could be slow for large queues.

Security Concerns

⚠️ Issues

  1. Error message exposure: Lines 234-280 in ws_to_tunnel_task.rs return raw error messages to clients. TODOs acknowledge this but its a security risk:

    // TODO: Dont return actual error?
    message: err.to_string(),

    Should sanitize error messages before sending to client.

  2. Actor ownership validation: Good that line 190-211 validates actor belongs to runner before KV operations.

  3. No rate limiting: KV operations do not appear to have rate limiting per-runner or per-actor.

Test Coverage

Missing: No test files included in this PR. Given the complexity of the changes, tests are critical:

  • Unit tests for ActorEventDemuxer (especially GC logic)
  • Integration tests for protocol version switching
  • Tests for error handling paths
  • Tests for concurrent event processing

Additional Recommendations

  1. Fix compilation errors first - The PR cannot be merged in its current state.

  2. Add missing signal definition: Init signal referenced in conn.rs:136 needs to be defined.

  3. Complete refactoring: Many functions reference undefined variables suggesting incomplete refactoring from the previous version.

  4. Document protocol versions: Add documentation explaining when to use mk1 vs mk2 and migration path.

  5. Consider bounds on event channels: While unbounded channels work, they could lead to memory issues under extreme load. Consider tokios mpsc::channel(capacity) with appropriate backpressure handling.

  6. Improve error handling: Rather than silently ignoring send errors, log them and consider metrics/alerts for channel send failures.

  7. Add metrics: Consider adding metrics for:

    • Event demuxer queue depths
    • Event batch sizes
    • GC operations
    • Protocol version distribution

Summary

This PR introduces valuable architectural improvements but has critical compilation errors that must be fixed before merging. The ws_to_tunnel_task.rs file appears to be incomplete with undefined variables and function signature mismatches. The actor_event_demuxer.rs has a bug where last_seen is not updated for existing channels.

Recommendation: Request changes to fix compilation errors and the last_seen bug before approval.


Priority Items:

  1. ⛔ Fix compilation errors in ws_to_tunnel_task.rs
  2. ⛔ Complete refactoring (undefined variables)
  3. 🔴 Fix last_seen update bug in actor_event_demuxer.rs
  4. 🟡 Sanitize error messages sent to clients
  5. 🟡 Add test coverage

@graphite-app
Copy link
Contributor

graphite-app bot commented Dec 3, 2025

Merge activity

  • Dec 3, 12:33 AM UTC: NathanFlurry added this pull request to the Graphite merge queue.
  • Dec 3, 12:34 AM UTC: CI is running for this pull request on a draft pull request (#3577) due to your merge queue CI optimization settings.
  • Dec 3, 12:34 AM UTC: Merged by the Graphite merge queue via draft PR: #3577.

graphite-app bot pushed a commit that referenced this pull request Dec 3, 2025
@graphite-app graphite-app bot closed this Dec 3, 2025
@graphite-app graphite-app bot deleted the 11-18-fix_pb_simplify_runner_wf branch December 3, 2025 00:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants