Skip to content

Implement Dropped Event Tracking for Remote Access Subscriptions #41

@lxsaah

Description

@lxsaah

The Event struct in the AimX protocol includes a dropped field for backpressure signaling, but it's currently hardcoded to None. We need to implement proper tracking of dropped events when the subscription queue becomes full.

Background

The AimX protocol (defined in protocol.rs lines 139-156) includes the Event struct with a documented dropped field:

/// Event message from server (subscription push)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Event {
    pub subscription_id: String,
    pub sequence: u64,
    pub data: JsonValue,
    pub timestamp: String,
    
    /// Number of dropped events since last delivery (optional)
    #[serde(skip_serializing_if = "Option::is_none")]
    pub dropped: Option<u64>,
}

Currently in handler.rs:1075, this field is always set to None:

let event = Event {
    subscription_id: subscription_id.clone(),
    sequence,
    data: json_value,
    timestamp,
    dropped: None, // TODO: Implement dropped event tracking
};

Problem Statement

When a subscription's queue (sized by config.subscription_queue_size, default 100) becomes full, new events may be dropped depending on the buffer type (SPMC Ring, SingleLatest, or Mailbox). Without tracking these drops:

  1. Clients have no visibility into whether they're keeping up with the event stream
  2. Backpressure signals are missing, making it difficult to diagnose slow consumers
  3. Protocol compliance is incomplete - the feature is documented but not implemented
  4. Monitoring gaps - operators can't detect subscription performance issues

Current Behavior

  • Events flow from subscribe_record_updates() through an mpsc::Receiver<serde_json::Value>
  • If the receiver is slow, events may be dropped by the underlying buffer
  • The dropped field is always None, providing no indication of data loss

Desired Behavior

  • Track the number of events dropped since the last successful delivery
  • Include this count in the Event.dropped field when non-zero
  • Reset the counter after reporting
  • Provide clients with actionable backpressure information

Technical Approach

Option 1: Sequence Gap Detection (Recommended)

Track the last expected sequence number and compare against received values:

async fn stream_subscription_events(
    subscription_id: String,
    mut value_rx: tokio::sync::mpsc::Receiver<serde_json::Value>,
    event_tx: tokio::sync::mpsc::UnboundedSender<Event>,
) {
    let mut sequence: u64 = 1;
    let mut last_received_sequence: Option<u64> = None;
    
    while let Some(json_value) = value_rx.recv().await {
        // Detect gaps in sequence numbers (if buffer includes metadata)
        let dropped = if let Some(last_seq) = last_received_sequence {
            let expected = last_seq + 1;
            if sequence > expected {
                Some(sequence - expected)
            } else {
                None
            }
        } else {
            None
        };
        
        last_received_sequence = Some(sequence);
        
        let event = Event {
            subscription_id: subscription_id.clone(),
            sequence,
            data: json_value,
            timestamp: generate_timestamp(),
            dropped,
        };
        
        // Send event...
        sequence += 1;
    }
}

Option 2: Buffer-Level Instrumentation

Modify subscribe_record_updates() to return a channel that includes drop count metadata:

pub struct SubscriptionValue {
    pub data: serde_json::Value,
    pub dropped_since_last: u64,
}

pub fn subscribe_record_updates(
    &self,
    type_id: TypeId,
    queue_size: usize,
) -> DbResult<(
    tokio::sync::mpsc::Receiver<SubscriptionValue>,
    tokio::sync::oneshot::Sender<()>,
)>

This requires changes to the buffer implementations to track drops.

Option 3: Try-Receive Pattern

Use try_recv() in a loop to detect and count drops:

while let Some(json_value) = value_rx.recv().await {
    let mut dropped = 0;
    
    // Drain any queued messages (indicates backpressure)
    while let Ok(_) = value_rx.try_recv() {
        dropped += 1;
    }
    
    let event = Event {
        dropped: if dropped > 0 { Some(dropped) } else { None },
        // ...
    };
}

Implementation Checklist

  • Choose tracking approach (recommend Option 1 or 2)
  • Implement drop counter in stream_subscription_events()
  • Update buffer implementations if needed (for Option 2)
  • Add unit tests for drop detection
  • Add integration test with slow consumer scenario
  • Update protocol documentation with behavior details
  • Add metrics/tracing for dropped events
  • Document client handling recommendations in protocol docs

Test Cases

  1. No drops: Fast consumer should see dropped: None for all events
  2. Slow consumer: Simulate slow recv() and verify dropped: Some(n) appears
  3. Recovery: After drops, verify counter resets and continues correctly
  4. Multiple subscriptions: Ensure drop tracking is independent per subscription
  5. Buffer types: Test with SPMC Ring, SingleLatest, and Mailbox buffers

Related Files

  • aimdb-core/src/remote/handler.rs:1075 - TODO comment location
  • aimdb-core/src/remote/protocol.rs:154 - Event struct definition
  • aimdb-core/src/database.rs - subscribe_record_updates() implementation
  • aimdb-core/src/buffer/*.rs - Buffer implementations

Acceptance Criteria

  • dropped field correctly reports event loss
  • No false positives (dropped=0 should be None, not Some(0))
  • Performance impact is minimal (<5% overhead)
  • Works across all buffer types
  • Documented in protocol specification
  • Covered by automated tests

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions