notes

Incremental Ledger Processing Design

Status: Design
Author: System Design
Date: 2024-11-02
Related: ModFin API - Ledger System

Overview

Add incremental inline ledger processing to complement the existing Spark batch system, providing low-latency ledger entries for newly created events while maintaining batch processing as the authoritative source of truth.

Background

Current System

Events flow through the system as follows:

Event API Request
    ↓
Push to Kinesis/S3
    ↓
Debounced Spark Batch Job (runs periodically)
    ↓
Read ALL events from S3
    ↓
Preprocess (dedups, rewrites, legacy transforms)
    ↓
Process through ledger rules
    ↓
Write to PostgreSQL (full table overwrite)

Latency: 10-15 minutes total (6-9 min processing + debouncing delay)
Consistency: Perfect (full recompute each time)
Complexity: Low (clean slate approach)

Problem

Users creating events via API cannot immediately query resulting ledger entries. With batch processing taking 10-15 minutes (6-9 minutes processing + debouncing delay), this creates a poor experience for:

Goals

  1. Low latency: Provide ledger entries immediately in API response (~200-300ms vs 10-15 min batch)
  2. Safety: Never block event creation or break existing guarantees
  3. Simplicity: Reuse existing processor logic, minimal new code
  4. Best-effort: Inline processing can be incomplete; batch always wins

Non-Goals

Design

Hybrid Architecture

Event API Request
    ↓
    ├─→ [NEW] Inline Process ──→ Write Ledger (immediate, best-effort)
    │
    └─→ Push to Kinesis ──→ Batch Process (periodic, authoritative)

Both paths write to the same ledger table. Batch overwrites inline results with authoritative data.

Key Design Decisions

1. Time-Windowed History Loading

ℹ️ COMMENT: This section describes time-windowed loading, but should instead describe partition-based loading.

CRITICAL ISSUE - Spark Has No Partitioning: Based on analysis of ledgers/spark.py:

CORRECT APPROACH FOR INLINE PROCESSING: We don’t need to follow Spark’s partitioning constraints for inline processing - this is actually an advantage!

Instead of loading a time-windowed portion, we should load the full history for all accounts involved in the event:

Note: In practice, virtually all events involve only 1 account (only the LLM can generate group events with multiple accounts/components). So most of the time this will just be WHERE book_id = ? AND account = ? for a single account. But when the LLM generates multi-account events, loading all accounts works correctly.

This works because:

Key advantage over Spark: We can load all accounts involved in an event into one partition, then let the processing logic filter as needed. Unlike Spark (which would need to partition first), we can load multiple accounts because the processing logic handles filtering internally.

Instead of loading the entire ledger, only load recent entries (default: 90 days):

cutoff_date = datetime.utcnow() - timedelta(days=90)
partitioned_ledger = await load_recent_ledger(book_id, cutoff_date)

Rationale:

Trade-off:

2. Reuse Existing Processor Logic

The core processing functions are already pure Python and don’t depend on Spark:

from ledgers import processors

# Existing functions work as-is:
new_entries = processors.process_events(
    events,
    book_rulesets=book_rulesets,
    raises=False,
)

All the complex logic (waterfall allocation, balance calculations, reversals, etc.) is reused without modification.

3. Best-Effort Processing

Inline processing may produce incomplete or slightly incorrect results:

ℹ️ COMMENT: With partition-based loading (full history for all accounts involved in the event), reversals can find target entries from the initial ledger and mark them as reversed. However, there is a critical limitation: when Spark processes reversals, it reorders reversal events to come immediately after the event they reverse, and then reprocesses ALL subsequent events in the partition to ensure correct balances (especially important for waterfalls). With inline processing using an initial ledger, we cannot reprocess subsequent events - they’ve already been persisted. This means:

Optional alternative: Consider storing the event stream (in DB or memory) for the partition, and when a reversal occurs, reprocess the entire event stream for affected accounts and swap out the impacted portion of the ledger. This is similar to how the process worked in the past (which swapped out the entire ledger to pick up alterations but didn’t have access to the event stream, requiring the full process to correct subsequent events).

ℹ️ COMMENT: Most rewrites (account moves, contact moves, currency rewrites) are not relevant for inline processing. The current ledger already contains entries with updated accounts/contacts/currencies (if rewrites occurred, they were applied when those entries were created). New events being processed will also reference the current/updated accounts/contacts/currencies. Spark needs preprocessing because it processes historical events that may reference old account IDs, but inline processing works with the current state, so the mismatch doesn’t occur.

This is OK because:

4. Event Classification

ℹ️ COMMENT: With partition-based loading (full history for all accounts involved in the event), we don’t need the “too old” time-window check. All events can reference any historical entry for the accounts loaded into the partition. We should remove the cutoff_past check and only keep future date validation.

Only exclude events that cannot be processed without Spark preprocessing:

def filter_processable(events, past_days=90, future_days=365):
    """
    Generator that yields events suitable for inline processing.
    Logs and skips events that cannot be processed without Spark preprocessing.
    
    Events are excluded if they CANNOT be processed without Spark preprocessing:
    - Outside time window (too old or too far in future)
    - Reset events (affect all processing)
    - Venmo events (require deduplication)
    """
    now = datetime.utcnow()
    cutoff_past = now - timedelta(days=past_days)
    cutoff_future = now + timedelta(days=future_days)
    
    for event in events:
        effective_at = getattr(event.data, 'effective_at', None) or event.data.posted_at
        
        # Too old - won't have context in time window
        if effective_at < cutoff_past:
            logger.info(f"Skipping event {event.id}: too_old ({effective_at})")
            continue
        
        # Too far in future - probably data error
        if effective_at > cutoff_future:
            logger.info(f"Skipping event {event.id}: too_far_future ({effective_at})")
            continue
        
        # Reset affects all processing
        if event.data.event == 'reset':
            logger.info(f"Skipping event {event.id}: reset_event")
            continue
        
        # Venmo needs deduplication
        if is_venmo_event(event):
            logger.info(f"Skipping event {event.id}: venmo_dedup_required")
            continue
        
        # Processable
        yield event

Everything else is attempted inline.

Benefits of generator approach:

5. Schema Compatibility

Ledger table uses flat columns for amounts:

-- Database schema:
amount DECIMAL(38,9)
currency VARCHAR
reporting_amount DECIMAL(38,9)
reporting_currency VARCHAR

But processors expect nested dicts:

{
    'amount': {'value': Decimal('100'), 'currency': 'USD'},
    'reporting_amount': {'value': Decimal('100'), 'currency': 'USD'}
}

Simple transformation layer handles this:

def ledger_obj_to_processor_dict(entry):
    d = db.utils.obj2dict(entry)
    d['amount'] = {
        'currency': d.pop('currency', None),
        'value': d.pop('amount', None),
    }
    # ... similar for reporting_amount
    return d

6. Synchronous vs Asynchronous Processing

Decision: Start with synchronous processing in the API endpoint.

Three options were considered:

Option 1: Synchronous (Blocking)

Process inline within the API request and return results immediately:

async def create_events(...):
    events = [build_event(...)]
    
    # Push to Kinesis
    r = await put_events(session, events, environment)
    
    # Process inline (blocks response)
    inline_result = await inline.process_events_inline(dbs, events, environment)
    
    return {
        'events': events,
        'kinesis': r,
        'inline_processed': inline_result.processed,  # Boolean flag
        'inline_entry_count': inline_result.entry_count,  # How many entries created
    }

Pros:

Cons:

Option 2: Asynchronous Task (Non-blocking)

Queue processing as background task:

@tasks.task
async def process_events_inline_task(events, environment):
    async with db.Session() as dbs:
        return await inline.process_events_inline(dbs, events, environment)

async def create_events(...):
    events = [build_event(...)]
    r = await put_events(session, events, environment)
    
    # Queue inline processing
    task = process_events_inline_task.delay(events, environment)
    
    return {
        'events': events,
        'kinesis': r,
        'task_id': task.id,  # User polls for results
    }

Pros:

Cons:

Option 3: Fire-and-Forget (Background)

Process in background without task tracking:

async def create_events(...):
    events = [build_event(...)]
    r = await put_events(session, events, environment)
    
    # Fire and forget
    asyncio.create_task(inline.process_events_inline(dbs, events, environment))
    
    return {'events': events, 'kinesis': r}

Pros:

Cons:

Chosen Approach: Synchronous with Timeout Protection

Rationale:

  1. Core value proposition: The whole point of inline processing is immediate availability. Batch processing takes 6-9 minutes + debouncing latency (typically 10-15 minutes total). Synchronous processing makes ledger entries available immediately (~200-300ms), so when the API response returns, the caller can immediately fetch from the ledger endpoint and get results. Async processing would still require ~2 seconds of waiting/polling before ledger is available. Synchronous provides truly immediate availability and dramatically better UX (sub-second vs 10-15 minutes).

ℹ️ COMMENT: Updated latency estimate: With the largest account having ~600 entries, actual latency is ~30-120ms (much better than the original ~200-300ms estimate). This makes the synchronous approach compelling. Note: If using the partition swap-out alternative approach (discussed in Implementation section), latency would be ~120-470ms, which may impact the sync/async decision.

  1. Expected latency is acceptable: With proper indexing:
    • Load recent ledger: ~50-100ms (indexed query)
    • Process events: ~50-100ms (pure Python, in-memory)
    • Insert entries: ~50-100ms (bulk insert)
    • Total: ~150-300ms (acceptable for data creation API)

ℹ️ COMMENT: Updated latency breakdown based on actual data (largest account has ~600 entries):

Note on partition swap-out alternative: If using the partition swap-out approach (discussed in Implementation section), latency would be ~100-400ms for the write operation, bringing total to ~120-470ms. This significantly impacts the sync/async decision and should be considered when evaluating whether swap-out is worth the additional correctness guarantees.

  1. Start simple, optimize later: If latency becomes problematic, can:
    • Move to async (easy refactor)
    • Optimize slow parts (caching, selective loading)
    • Add fast-path for simple events that don’t need history
  2. Monitoring will guide decision: Track p50/p95/p99 latency. If p95 > 500ms, reconsider.

Implementation with safety:

async def create_events(session, book, environment, forms, ...):
    events = [build_event(book, environment, form, ...) for form in forms]
    
    # Push to Kinesis (critical path - must succeed)
    r = await put_events(session, events, environment)
    
    # Inline processing with timeout protection
    inline_processed = False
    inline_entry_count = 0
    try:
        # Timeout after 2 seconds to prevent long requests
        async with asyncio.timeout(2.0):
            from ledgers import inline
            result = await inline.process_events_inline(
                dbs, events, environment
            )
            inline_processed = result.processed
            inline_entry_count = result.entry_count
    except asyncio.TimeoutError:
        logger.warning("Inline processing timed out, will be handled by batch")
    except Exception:
        logger.exception("Inline processing failed, will be handled by batch")
    
    # Dispatch hooks
    await dispatch.call('create_events', events=events, environment=environment)
    
    return {
        'events': events,
        'kinesis': r,
        'inline_processed': inline_processed,
        'inline_entry_count': inline_entry_count,
    }

Usage pattern for callers:

# Create events
result = await create_events(...)

# Check if inline processing succeeded
if result['inline_processed']:
    # Some or all events processed inline (skipped events logged)
    print(f"Created {result['inline_entry_count']} ledger entries")
    
    # Ledger is immediately available for processable events
    ledger = await get_ledger_entries(
        book_id=book.id,
        event_ids=[e.id for e in result['events']]
    )
    
    # Note: Some events may not have entries yet (logged as skipped)
    # They'll appear after batch runs
else:
    # All events skipped or processing failed (check logs)
    # Ledger will be available after batch (10-15 min)
    pass

Future Options:

If synchronous processing proves too slow:

Implementation

Core Module: ledgers/inline.py

@dataclasses.dataclass
class InlineResult:
    """Result of inline processing."""
    processed: bool  # Whether processing succeeded
    entry_count: int  # Number of ledger entries created


async def process_events_inline(dbs, events, environment, past_days=90, future_days=365):
    """
    Process events inline against recent ledger history.
    Best-effort processing - batch will fix any issues.
    
    Returns:
        InlineResult with processed flag and entry count
    """
    if not events:
        return InlineResult(processed=False, entry_count=0)
    
    # Filter to processable events (logs skipped ones)
    processable = list(filter_processable(events, past_days, future_days))
    
    if not processable:
        logger.info("No processable events after filtering")
        return InlineResult(processed=False, entry_count=0)
    
    try:
        # All events should be from same book
        book_id = processable[0].book.id
        LedgerModel = Ledger.by(environment)
        
        # Load rulesets
        book_rulesets = await load_book_rulesets(dbs, [book_id])
        
        # Load recent ledger (time-windowed)
        cutoff_date = datetime.utcnow() - timedelta(days=past_days)
        partitioned_ledger = await load_recent_ledger(
            dbs, LedgerModel, book_id, cutoff_date
        )
        
        # ℹ️ COMMENT: Should load full history for all accounts involved in the event:
        #
        # # Extract all accounts from event and components
        # account_ids = set()
        # if hasattr(event.data, 'account') and event.data.account:
        #     account_ids.add(event.data.account)
        # for component in getattr(event.data, 'components', None) or ():
        #     if hasattr(component, 'account') and component.account:
        #         account_ids.add(component.account)
        #
        # # Load full history for all accounts (not time-windowed)
        # # Unlike Spark, we can load multiple accounts - processing logic filters internally
        # partitioned_ledger = await load_partition_ledger(
        #     dbs, LedgerModel, book_id, account_ids=list(account_ids) if account_ids else None
        # )
        
        logger.info(f"Processing {len(processable)}/{len(events)} events inline")
        
        # Process using existing logic
        new_entries = list(processors.process_events(
            processable,
            book_rulesets=book_rulesets,
            raises=False,
        ))
        
        > ** COMMENT - POTENTIAL CODE CHANGES NEEDED:**
        > 
        > **Current issue:** The `process_events()` function (via `process_grouped_events()`) starts with an empty `partitioned_ledger = []` and doesn't accept an initial ledger parameter. To support inline processing with historical context, we need to:
        > 
        > 1. **Modify `process_grouped_events()` to accept initial ledger:**
        >    ```python
        >    def process_grouped_events(events, initial_ledger=None, raises=False):
        >        partitioned_ledger = list(initial_ledger) if initial_ledger else []
        >        # ... rest of function
        >    ```
        > 
        > 2. **Modify `process_events()` to pass through initial ledger:**
        >    ```python
        >    def process_events(events, book_rulesets=None, initial_ledger=None, raises=False):
        >        # ...
        >        for _, items in itertools.groupby(events, key=bp):
        >            iterable = process_grouped_events(items, initial_ledger=initial_ledger, raises=raises)
        >            yield from (json_stringify(item) for item in iterable)
        >    ```
        > 
        > 3. **Update inline processing to pass loaded ledger:**
        >    ```python
        >    # partitioned_ledger was already loaded above (line 493)
        >    # Extract account IDs if using partition-based loading instead of time-windowed
        >    new_entries = list(processors.process_events(
        >        processable,
        >        book_rulesets=book_rulesets,
        >        initial_ledger=partitioned_ledger,  # Pass existing entries to processor
        >        raises=False,
        >    ))
        >    ```
        > 
        > This allows the processor to have access to historical entries for calculations and reversals, while still building up the full partition state correctly.
        
        # Insert new entries (incremental approach)
        if new_entries:
            await bulk_insert_entries(dbs, LedgerModel, new_entries)
        
        > **ℹ️ COMMENT - IMPORTANT DISCUSSION POINT: Partition Swap-Out Alternative:**
        > 
        > **Current approach (incremental insert):** Only new entries are inserted. This is simpler and faster, but has limitations:
        > - Reversals mutate existing entries in-place (`reversal=True`), but those mutations are not persisted
        > - Subsequent entries that depend on reversed entries may have incorrect balances until batch runs
        > - Latency: ~10-50ms (simple bulk insert)
        > 
        > **Alternative approach (partition swap-out):** Swap out the entire partition by deleting all entries for the accounts involved and inserting the full partition ledger (initial + new entries). This would:
        > - Persist reversal mutations to existing entries
        > - Ensure complete correct state for the partition (entries processed so far)
        > - **Note:** Swap-out alone does NOT correct subsequent entries that depend on reversed entries. Only the full Spark process (which reorders reversals and reprocesses all subsequent events) can correct them, OR if we process the event stream for the partition (instead of just the historical ledger) and then swap out the partition.
        > - Latency: ~100-400ms (delete + insert, depends on partition size)
        > 
        > **When swap-out might be beneficial:**
        > - When reversals are common and subsequent entry correctness is critical
        > - When partition sizes are small enough that swap-out latency is acceptable
        > - When batch correction latency (10-15 min) is too long for business requirements
        > 
        > **When incremental insert is better:**
        > - When reversals are rare
        > - When latency must be minimized (API response time critical)
        > - When batch correction latency is acceptable
        > - When partition sizes are large (swap-out becomes prohibitively slow)
        > 
        > **Implementation of swap-out approach:**
        > ```python
        > # Swap out entire partition (delete + insert)
        > if new_entries:
        >     account_ids = extract_account_ids_from_events(processable)  # Accounts involved
        >     async with dbs.begin():  # Transaction for atomicity
        >         # Delete existing entries for partition
        >         await delete_partition_entries(dbs, LedgerModel, book_id, account_ids)
        >         # Insert full partition ledger (initial + new entries)
        >         full_partition = partitioned_ledger + new_entries  # Combined state
        >         await bulk_insert_entries(dbs, LedgerModel, full_partition)
        > ```
        > 
        > **Considerations:**
        > - Transaction safety: Delete and insert must be atomic (use transaction)
        > - Conflict detection: Need optimistic locking/versioning to detect conflicts with batch updates
        > - Sync vs Async: Higher latency may make async processing more attractive
        > - Selective swap-out: Could swap only accounts that had reversals, use incremental for others
        
        return InlineResult(processed=True, entry_count=len(new_entries))
        
    except Exception as e:
        logger.exception("Inline processing failed")
        return InlineResult(processed=False, entry_count=0)

Integration Point: events/crud.py

async def create_events(session, book, environment, forms, ...):
    events = [build_event(book, environment, form, ...) for form in forms]
    
    # Push to Kinesis (critical path - must succeed)
    r = await put_events(session, events, environment)
    
    # Inline processing with timeout protection
    inline_processed = False
    inline_entry_count = 0
    try:
        # Timeout after 2 seconds to prevent long requests
        async with asyncio.timeout(2.0):
            from ledgers import inline
            result = await inline.process_events_inline(dbs, events, environment)
            inline_processed = result.processed
            inline_entry_count = result.entry_count
    except asyncio.TimeoutError:
        logger.warning("Inline processing timed out, will be handled by batch")
    except Exception:
        logger.exception("Inline processing failed, batch will recover")
    
    # Dispatch hooks
    await dispatch.call('create_events', events=events, environment=environment)
    
    return {
        'events': events,
        'kinesis': r,
        'inline_processed': inline_processed,
        'inline_entry_count': inline_entry_count,
    }

Supporting Functions

Load Recent Ledger:

async def load_recent_ledger(dbs, LedgerModel, book_id, cutoff_date):
    qry = db.select(LedgerModel) \
        .where(LedgerModel.book_id == book_id) \
        .where(LedgerModel.effective_at >= cutoff_date) \
        .order_by(LedgerModel.effective_at, LedgerModel.created_at)
    
    result = await dbs.execute(qry)
    entries = result.scalars().all()
    
    return [ledger_obj_to_processor_dict(entry) for entry in entries]

ℹ️ COMMENT: Should load full history for all accounts involved in the event (not just book_id):

Load Partition Ledger:

async def load_partition_ledger(dbs, LedgerModel, book_id, account_ids):
    """
    Load full history for all accounts involved in the event.
    
    Unlike Spark (which currently has no partitioning at all), inline processing
    can load multiple accounts into the partition. The processing logic filters by account
    internally (ledger_window, build_calcs), so this is safe and correct.
    """
    qry = db.select(LedgerModel) \
        .where(LedgerModel.book_id == book_id)
    
    # Filter to accounts involved in this event
    if account_ids:
        qry = qry.where(LedgerModel.account.in_(account_ids))
    
    qry = qry.order_by(LedgerModel.effective_at, LedgerModel.created_at)
    
    result = await dbs.execute(qry)
    entries = result.scalars().all()
    
    return [ledger_obj_to_processor_dict(entry) for entry in entries]

Load Book Rulesets:

async def load_book_rulesets(dbs, book_ids):
    qry = db.select(
        LedgerBookRuleset.book_id,
        LedgerRule.name,
        LedgerRule.logic,
    ).join(
        LedgerRuleset, 
        LedgerRuleset.id == LedgerBookRuleset.ruleset_id
    ).join(
        LedgerRule,
        LedgerRule.id == db.sa.any_(LedgerRuleset.rule_ids)
    ).where(
        LedgerBookRuleset.book_id.in_(book_ids)
    )
    
    result = await dbs.execute(qry)
    
    book_rulesets = {}
    for book_id, rule_name, rule_logic in result:
        if book_id not in book_rulesets:
            book_rulesets[book_id] = {}
        book_rulesets[book_id][rule_name] = rule_logic
    
    return book_rulesets

Event Filter:

def filter_processable(events, past_days=90, future_days=365):
    """
    Generator that yields events suitable for inline processing.
    Logs and skips events that cannot be processed without Spark preprocessing.
    
    Args:
        past_days: How far back to load history (default 90)
        future_days: Maximum future date to process (default 365)
    """
    now = datetime.utcnow()
    cutoff_past = now - timedelta(days=past_days)
    cutoff_future = now + timedelta(days=future_days)
    
    for event in events:
        effective_at = getattr(event.data, 'effective_at', None) or event.data.posted_at
        
        # Too old - won't have proper context in time window
        if effective_at < cutoff_past:
            logger.info(f"Skipping event {event.id}: too_old ({effective_at})")
            continue
        
        # Too far in future - probably data error
        if effective_at > cutoff_future:
            logger.info(f"Skipping event {event.id}: too_far_future ({effective_at})")
            continue
        
        # Reset affects all processing
        if event.data.event == 'reset':
            logger.info(f"Skipping event {event.id}: reset_event")
            continue
        
        # Venmo needs deduplication
        if is_venmo_event(event):
            logger.info(f"Skipping event {event.id}: venmo_dedup_required")
            continue
        
        # Processable
        yield event

ℹ️ COMMENT: With partition-based loading, remove past_days parameter and “too old” check since we load full partition history:

def filter_processable(events, future_days=365):
    """
    Generator that yields events suitable for inline processing.
    Logs and skips events that cannot be processed without Spark preprocessing.
    
    NOTE: With partition-based loading, we no longer need past_days parameter
    since we load full partition history.
    """
    now = datetime.utcnow()
    cutoff_future = now + timedelta(days=future_days)
    
    for event in events:
        effective_at = getattr(event.data, 'effective_at', None) or event.data.posted_at
        
        # Too far in future - probably data error
        if effective_at > cutoff_future:
            logger.info(f"Skipping event {event.id}: too_far_future ({effective_at})")
            continue
        
        # NOTE: No longer checking for "too old" - partition loading includes full history
        
        # Reset affects all processing
        if event.data.event == 'reset':
            logger.info(f"Skipping event {event.id}: reset_event")
            continue
        
        # Venmo needs deduplication
        if is_venmo_event(event):
            logger.info(f"Skipping event {event.id}: venmo_dedup_required")
            continue
        
        # Processable
        yield event

Database Requirements

Index for Time-Windowed Queries

Add composite index to support time-windowed ledger queries in ledgers/models_spark.py:

class Ledger(SchemaMixin, db.mixins.Id, db.mixins.CreatedAt, db.mixins.CRUD):
    __tablename__ = consts.LEDGER_TABLE
    
    # ... existing column definitions ...
    
    @classmethod
    def build_table_args(cls, env):
        """
        Add composite index for inline processing time-windowed queries.
        Index name includes environment to avoid conflicts across schemas.
        """
        return (
            db.Index(
                f'idx_{env}_ledger_book_effective_created',
                'book_id', 'effective_at', 'created_at',
            ),
        )

This index supports the inline processing query pattern:

WHERE book_id = ? AND effective_at >= ?
ORDER BY effective_at, created_at

The index is critical for:

ℹ️ COMMENT: Should use partition-based indexes instead:

Index for Partition-Based Queries

Add composite index to support partition-based ledger queries:

@classmethod
def build_table_args(cls, env):
    return (
        # Index for loading by book + accounts (IN clause)
        db.Index(
            f'idx_{env}_ledger_book_account_effective',
            'book_id', 'account', 'effective_at', 'created_at',
        ),
        # Index for loading by book only (when no accounts specified)
        db.Index(
            f'idx_{env}_ledger_book_effective_created',
            'book_id', 'effective_at', 'created_at',
        ),
    )

Query patterns:

Testing Strategy

Unit Tests

@pytest.mark.asyncio
async def test_inline_matches_batch_on_fresh_book(dbs, sandbox_book, sample_event):
    """Verify inline produces same results as batch on empty ledger."""
    inline_entries = await inline.process_events_inline(dbs, [sample_event], 'sandbox')
    
    book_rulesets = await inline.load_book_rulesets(dbs, [sandbox_book.id])
    batch_entries = list(processors.process_events(
        [sample_event],
        book_rulesets=book_rulesets,
        raises=True,
    ))
    
    assert len(inline_entries) == len(batch_entries)
    for inline_e, batch_e in zip(inline_entries, batch_entries):
        assert inline_e['amount'] == batch_e['amount']
@pytest.mark.asyncio
async def test_payment_waterfall_with_history(dbs, sandbox_book):
    """Verify payment waterfalls against historical invoice."""
    # Create invoice
    invoice_event = make_invoice_event(amount=100)
    await inline.process_events_inline(dbs, [invoice_event], 'sandbox')
    
    # Create payment
    payment_event = make_payment_event(amount=50)
    payment_entries = await inline.process_events_inline(
        dbs, [payment_event], 'sandbox'
    )
    
    # Verify allocation
    assert any(e.get('invoice') == invoice_event.id for e in payment_entries)
@pytest.mark.asyncio
async def test_time_window_excludes_old_entries(dbs, sandbox_book):
    """Verify time windowing correctly limits loaded entries."""
    LedgerModel = Ledger.by('sandbox')
    
    # Create old entry (100 days ago)
    old_date = datetime.utcnow() - timedelta(days=100)
    await LedgerModel.insert_one(dbs, book_id=sandbox_book.id, effective_at=old_date, ...)
    
    # Create recent entry (30 days ago)
    recent_date = datetime.utcnow() - timedelta(days=30)
    await LedgerModel.insert_one(dbs, book_id=sandbox_book.id, effective_at=recent_date, ...)
    
    # Load with 90 day window
    cutoff = datetime.utcnow() - timedelta(days=90)
    entries = await inline.load_recent_ledger(dbs, LedgerModel, sandbox_book.id, cutoff)
    
    assert len(entries) == 1  # Only recent entry

ℹ️ COMMENT: Should test partition loading with multiple accounts:

@pytest.mark.asyncio
async def test_partition_loading_includes_all_accounts(dbs, sandbox_book):
    """Verify partition loading includes full history for all accounts involved in event."""
    LedgerModel = Ledger.by('sandbox')
    
    # Create entries for account A (old and recent)
    old_date = datetime.utcnow() - timedelta(days=100)
    await LedgerModel.insert_one(dbs, book_id=sandbox_book.id, account='account_a', effective_at=old_date, ...)
    recent_date = datetime.utcnow() - timedelta(days=30)
    await LedgerModel.insert_one(dbs, book_id=sandbox_book.id, account='account_a', effective_at=recent_date, ...)
    
    # Create entries for account B (recent only)
    await LedgerModel.insert_one(dbs, book_id=sandbox_book.id, account='account_b', effective_at=recent_date, ...)
    
    # Create entry for account C (should not be included)
    await LedgerModel.insert_one(dbs, book_id=sandbox_book.id, account='account_c', effective_at=recent_date, ...)
    
    # Load partition for accounts A and B (simulating event with components for both accounts)
    entries = await inline.load_partition_ledger(dbs, LedgerModel, sandbox_book.id, account_ids=['account_a', 'account_b'])
    
    assert len(entries) == 3  # Both old and recent entries for account_a, recent for account_b
    assert all(e['book_id'] == sandbox_book.id for e in entries)
    assert all(e['account'] in ['account_a', 'account_b'] for e in entries)
    assert 'account_c' not in [e['account'] for e in entries]

Integration Tests

Test with existing fixtures to ensure inline processing produces reasonable results for realistic scenarios.

Batch Reconciliation

Monitor logs for discrepancies between inline and batch results:

# In Spark processing
if inline_entry_exists and differs_from_batch:
    logger.warning(f"Inline/batch discrepancy detected: {details}")

Performance Considerations

Memory Usage

With 90-day window and typical volumes:

All manageable. The sort/group operations are O(n log n), which is fine for these sizes.

ℹ️ COMMENT: With partition-based loading, memory usage is for all accounts involved in the event:

With partition-based loading (full history for involved accounts) and typical volumes:

In practice, virtually all events involve only 1 account (only LLM generates multi-account group events), so memory usage is very small. Even when LLM generates multi-account events (typically 2-5 accounts), total memory is still very manageable.

Query Performance

With proper indexing, the time-windowed query should be fast:

Expected: <100ms for typical queries.

ℹ️ COMMENT: With partition-based queries:

With proper indexing, the partition-based query should be fast:

Expected: <50ms for typical account sizes. In practice, virtually all events involve only 1 account (only LLM generates multi-account events):

Optimization Opportunities

1. Cache book rulesets:

from cachetools import TTLCache
_ruleset_cache = TTLCache(maxsize=100, ttl=300)  # 5 min TTL

2. Skip loading for simple events:

if event.data.event == 'invoice' and not needs_waterfall(rule):
    # Just process without loading history
    entries = process_simple_event(event, rule)

Rollout Plan

Phase 1: Implementation (Week 1)

  1. Create ledgers/inline.py module
  2. Add database indexes
  3. Write unit tests

Phase 2: Integration (Week 2)

  1. Wire into events/crud.py
  2. Add logging/metrics
  3. Test in development environment

Phase 3: Staging (Week 3)

  1. Deploy to staging
  2. Monitor performance and correctness
  3. Compare inline vs batch results

Phase 4: Production (Week 4)

  1. Deploy to production (single org)
  2. Monitor metrics
  3. Gradual rollout to all orgs

Metrics to Monitor

ℹ️ COMMENT:

Edge Cases & Failure Modes

Case 1: Reversal Target Not Found

Scenario: Reversal event arrives for transaction >90 days old (not in loaded ledger window).

Inline behavior: The existing reverseentries() function searches the loaded ledger for matching entries to reverse. If the target transaction is not in the loaded window (e.g., >90 days old), the search finds no matches and produces 0 entries. This is a graceful no-op - no errors, no special handling needed.

Batch behavior: Processes correctly with full history, finds target, generates reversal entries.

Impact: Reversal not immediately visible, appears in next batch run (10-15 min).

Acceptable: Yes, these are rare for newly-created events. The reversal processing logic naturally handles this case.

Note: No special classifier logic or future optimization needed. Reversals “just work” - they process if the target is found, no-op if not. Batch always corrects.

ℹ️ COMMENT: With partition-based loading (full history for all accounts involved in the event), reversals can find target entries correctly as long as they’re within the loaded account set. The concern about “reversals >90 days old” no longer applies since we load the full history for involved accounts. However, there is a critical limitation: reversals will correctly mark past entries and create reversal entries, but subsequent entries (especially waterfalls) that depend on those reversed entries will have incorrect balances until the full Spark batch reprocesses them. This is because Spark reorders reversals to come immediately after their target event and reprocesses all subsequent events, which inline processing cannot do when using an initial ledger. See the Best-Effort Processing section for more details.

Case 2: Account Merge During Processing

Scenario: Event references account A, which is merged to account B milliseconds later.

Inline behavior: Uses account A (original reference).

Batch behavior: Rewrites to account B (after preprocessing).

Impact: Temporary incorrect account assignment.

Acceptable: Yes, batch fixes quickly, extremely rare timing.

Case 3: Concurrent Events for Same Partition

Scenario: Two payments arrive simultaneously for same customer.

Inline behavior: Both load same historical ledger, process independently, both insert.

Batch behavior: Processes in correct order, produces authoritative result.

Impact: Possible temporary incorrect waterfall allocation.

Acceptable: Yes, batch reconciles. Could add advisory locks if problematic.

Case 4: Venmo Duplicate Events

Scenario: Duplicate venmo transaction arrives.

Inline behavior: Skipped (not processed inline).

Batch behavior: Deduplication logic removes duplicate.

Impact: Duplicate not immediately visible, removed in batch.

Acceptable: Yes, correct by design.

Case 5: Database Insert Failure

Scenario: Inline processing succeeds but DB insert fails.

Inline behavior: Exception caught, logged, returns empty list.

Batch behavior: Processes normally from Kinesis.

Impact: No inline entries, user sees nothing until batch runs.

Acceptable: Yes, event creation still succeeds, batch provides entries.

Case 6: Future-Dated Events

Scenario: Event has effective_at in the future (e.g., scheduled invoice for next month).

Inline behavior: Processes normally if within 365 days. Can waterfall against existing historical entries (e.g., future payment against current invoices).

Batch behavior: Same - processes normally.

Impact: Future-dated ledger entries immediately queryable. Good for:

Acceptable: Yes, this is desirable behavior.

Edge case: Events > 365 days in future are skipped (likely data errors). Can be adjusted via max_future_days parameter.

Note: Time window query effective_at >= cutoff_date loads past entries, not future ones. This is correct - future events only need historical context for waterfall/balance calculations, not other future events.

ℹ️ COMMENT: With partition-based loading:

Note: Partition query loads all entries for the involved accounts (by book_id and account IN (...)), ordered by effective_at. This includes both past and future entries for those accounts. This is correct - events need the full context for all accounts involved (event + components) for waterfall/balance calculations, regardless of when entries were created.

Security Considerations

Monitoring & Alerting

Metrics

# Success rate
metrics.increment('ledger.inline.processed', len(new_entries))
metrics.increment('ledger.inline.skipped')

# Latency
with metrics.timer('ledger.inline.duration'):
    await process_events_inline(...)

# Ledger size
metrics.gauge('ledger.inline.entries_loaded', len(partitioned_ledger))

Alerts

Future Enhancements

1. Smart Time Windows

Adjust window based on ledger size and event characteristics.

ℹ️ COMMENT: With partition-based loading:

1. Smart Partition Loading

Optimize partition loading based on partition size (e.g., cache small partitions, lazy-load large ones).

2. Caching Layer

Use Redis to cache recent ledger entries and rulesets.

3. Async Processing

If synchronous processing proves too slow (p95 > 500ms or timeout rate > 5%):

4. Selective Loading

Only load ledger entries relevant to event type (e.g., only receivables for payments).

5. Incremental Snapshots

Generate inline snapshots (balances, aging) alongside ledger entries.

Alternatives Considered

Alternative 1: Event Database + Full History

Approach: Store events in PostgreSQL, always process against full ledger history.

Pros: Perfect consistency, handles all edge cases.

Cons:

Rejected because: Time windowing is simpler and handles 99% of cases.

ℹ️ COMMENT:

Rejected because: Partition-based loading matches Spark processing exactly and handles all cases correctly. Time windowing would break partition isolation.

Alternative 2: Materialized View

Approach: Create materialized view of recent ledger, refresh periodically.

Pros: Query performance.

Cons:

Rejected because: Doesn’t solve the core problem.

Alternative 3: Spark Streaming

Approach: Use Spark Streaming to process events as they arrive.

Pros: Consistent with batch processing.

Cons:

Rejected because: Overkill for the problem, prefer simpler Python approach.

References

Open Questions

  1. Time window tuning: Is 90 days optimal? Should it vary by org/book?

ℹ️ COMMENT:

  1. Partition size optimization: Should we cache small partitions? When is a partition too large for inline processing?
  2. Concurrent processing: Do we need advisory locks for same-partition events?
  3. Cache layer: Would Redis caching provide meaningful improvement?
  4. Snapshot generation: Should inline processing also update snapshots?

Appendix: Full Code Example

See implementation in ledgers/inline.py for complete, production-ready code.