Status: Design
Author: System Design
Date: 2024-11-02
Related: ModFin API - Ledger System
Add incremental inline ledger processing to complement the existing Spark batch system, providing low-latency ledger entries for newly created events while maintaining batch processing as the authoritative source of truth.
Events flow through the system as follows:
Event API Request
↓
Push to Kinesis/S3
↓
Debounced Spark Batch Job (runs periodically)
↓
Read ALL events from S3
↓
Preprocess (dedups, rewrites, legacy transforms)
↓
Process through ledger rules
↓
Write to PostgreSQL (full table overwrite)
Latency: 10-15 minutes total (6-9 min processing + debouncing delay)
Consistency: Perfect (full recompute each time)
Complexity: Low (clean slate approach)
Users creating events via API cannot immediately query resulting ledger entries. With batch processing taking 10-15 minutes (6-9 minutes processing + debouncing delay), this creates a poor experience for:
Event API Request
↓
├─→ [NEW] Inline Process ──→ Write Ledger (immediate, best-effort)
│
└─→ Push to Kinesis ──→ Batch Process (periodic, authoritative)
Both paths write to the same ledger table. Batch overwrites inline results with authoritative data.
ℹ️ COMMENT: This section describes time-windowed loading, but should instead describe partition-based loading.
CRITICAL ISSUE - Spark Has No Partitioning: Based on analysis of
ledgers/spark.py:
- Events are partitioned by
(book.id, partition.key, partition.value)(line 221)- For all orgs:
partition.key = Noneandpartition.value = None(classpay is legacy and no longer used)- This means Spark is NOT actually partitioning at all - all events for a book are processed together in one massive partition
- This is a major problem that needs to be fixed ASAP - without partitioning, Spark can’t parallelize processing, there’s no point using Spark, and this explains why processing times and costs are spiraling
- Note: Spark doesn’t need to strictly partition by account - it could partition by account, or by something else. The problem is we haven’t chosen any partition key at all.
CORRECT APPROACH FOR INLINE PROCESSING: We don’t need to follow Spark’s partitioning constraints for inline processing - this is actually an advantage!
Instead of loading a time-windowed portion, we should load the full history for all accounts involved in the event:
- Base partition:
WHERE book_id = ?- Then filter to accounts:
WHERE book_id = ? AND account IN (account_from_event, account_from_component_1, account_from_component_2, ...)Note: In practice, virtually all events involve only 1 account (only the LLM can generate group events with multiple accounts/components). So most of the time this will just be
WHERE book_id = ? AND account = ?for a single account. But when the LLM generates multi-account events, loading all accounts works correctly.This works because:
- All processing logic filters by account internally (
ledger_windowline 103,build_calcsfilters by$account)- Waterfall operations use
ledger_windowwhich filters to the account from the entry being processed- Credit calculations filter by account from inputs
- So loading multiple accounts into the partition is safe - operations naturally filter to the correct account
Key advantage over Spark: We can load all accounts involved in an event into one partition, then let the processing logic filter as needed. Unlike Spark (which would need to partition first), we can load multiple accounts because the processing logic handles filtering internally.
Instead of loading the entire ledger, only load recent entries (default: 90 days):
cutoff_date = datetime.utcnow() - timedelta(days=90)
partitioned_ledger = await load_recent_ledger(book_id, cutoff_date)
Rationale:
Trade-off:
The core processing functions are already pure Python and don’t depend on Spark:
from ledgers import processors
# Existing functions work as-is:
new_entries = processors.process_events(
events,
book_rulesets=book_rulesets,
raises=False,
)
All the complex logic (waterfall allocation, balance calculations, reversals, etc.) is reused without modification.
Inline processing may produce incomplete or slightly incorrect results:
ℹ️ COMMENT: With partition-based loading (full history for all accounts involved in the event), reversals can find target entries from the initial ledger and mark them as reversed. However, there is a critical limitation: when Spark processes reversals, it reorders reversal events to come immediately after the event they reverse, and then reprocesses ALL subsequent events in the partition to ensure correct balances (especially important for waterfalls). With inline processing using an initial ledger, we cannot reprocess subsequent events - they’ve already been persisted. This means:
- Reversals will correctly mark past entries as reversed and create reversal entries
- BUT subsequent entries (especially waterfalls) that depend on those reversed entries will have incorrect balances until the full Spark batch reprocesses them
- The full Spark process will be needed to correct any subsequent entries
Optional alternative: Consider storing the event stream (in DB or memory) for the partition, and when a reversal occurs, reprocess the entire event stream for affected accounts and swap out the impacted portion of the ledger. This is similar to how the process worked in the past (which swapped out the entire ledger to pick up alterations but didn’t have access to the event stream, requiring the full process to correct subsequent events).
ℹ️ COMMENT: Most rewrites (account moves, contact moves, currency rewrites) are not relevant for inline processing. The current ledger already contains entries with updated accounts/contacts/currencies (if rewrites occurred, they were applied when those entries were created). New events being processed will also reference the current/updated accounts/contacts/currencies. Spark needs preprocessing because it processes historical events that may reference old account IDs, but inline processing works with the current state, so the mismatch doesn’t occur.
This is OK because:
ℹ️ COMMENT: With partition-based loading (full history for all accounts involved in the event), we don’t need the “too old” time-window check. All events can reference any historical entry for the accounts loaded into the partition. We should remove the
cutoff_pastcheck and only keep future date validation.
Only exclude events that cannot be processed without Spark preprocessing:
def filter_processable(events, past_days=90, future_days=365):
"""
Generator that yields events suitable for inline processing.
Logs and skips events that cannot be processed without Spark preprocessing.
Events are excluded if they CANNOT be processed without Spark preprocessing:
- Outside time window (too old or too far in future)
- Reset events (affect all processing)
- Venmo events (require deduplication)
"""
now = datetime.utcnow()
cutoff_past = now - timedelta(days=past_days)
cutoff_future = now + timedelta(days=future_days)
for event in events:
effective_at = getattr(event.data, 'effective_at', None) or event.data.posted_at
# Too old - won't have context in time window
if effective_at < cutoff_past:
logger.info(f"Skipping event {event.id}: too_old ({effective_at})")
continue
# Too far in future - probably data error
if effective_at > cutoff_future:
logger.info(f"Skipping event {event.id}: too_far_future ({effective_at})")
continue
# Reset affects all processing
if event.data.event == 'reset':
logger.info(f"Skipping event {event.id}: reset_event")
continue
# Venmo needs deduplication
if is_venmo_event(event):
logger.info(f"Skipping event {event.id}: venmo_dedup_required")
continue
# Processable
yield event
Everything else is attempted inline.
Benefits of generator approach:
Ledger table uses flat columns for amounts:
-- Database schema:
amount DECIMAL(38,9)
currency VARCHAR
reporting_amount DECIMAL(38,9)
reporting_currency VARCHAR
But processors expect nested dicts:
{
'amount': {'value': Decimal('100'), 'currency': 'USD'},
'reporting_amount': {'value': Decimal('100'), 'currency': 'USD'}
}
Simple transformation layer handles this:
def ledger_obj_to_processor_dict(entry):
d = db.utils.obj2dict(entry)
d['amount'] = {
'currency': d.pop('currency', None),
'value': d.pop('amount', None),
}
# ... similar for reporting_amount
return d
Decision: Start with synchronous processing in the API endpoint.
Three options were considered:
Process inline within the API request and return results immediately:
async def create_events(...):
events = [build_event(...)]
# Push to Kinesis
r = await put_events(session, events, environment)
# Process inline (blocks response)
inline_result = await inline.process_events_inline(dbs, events, environment)
return {
'events': events,
'kinesis': r,
'inline_processed': inline_result.processed, # Boolean flag
'inline_entry_count': inline_result.entry_count, # How many entries created
}
Pros:
Cons:
Queue processing as background task:
@tasks.task
async def process_events_inline_task(events, environment):
async with db.Session() as dbs:
return await inline.process_events_inline(dbs, events, environment)
async def create_events(...):
events = [build_event(...)]
r = await put_events(session, events, environment)
# Queue inline processing
task = process_events_inline_task.delay(events, environment)
return {
'events': events,
'kinesis': r,
'task_id': task.id, # User polls for results
}
Pros:
Cons:
Process in background without task tracking:
async def create_events(...):
events = [build_event(...)]
r = await put_events(session, events, environment)
# Fire and forget
asyncio.create_task(inline.process_events_inline(dbs, events, environment))
return {'events': events, 'kinesis': r}
Pros:
Cons:
Rationale:
ℹ️ COMMENT: Updated latency estimate: With the largest account having ~600 entries, actual latency is ~30-120ms (much better than the original ~200-300ms estimate). This makes the synchronous approach compelling. Note: If using the partition swap-out alternative approach (discussed in Implementation section), latency would be ~120-470ms, which may impact the sync/async decision.
ℹ️ COMMENT: Updated latency breakdown based on actual data (largest account has ~600 entries):
- Load partition ledger: ~10-20ms (indexed query for single account)
- Process events: ~10-50ms (pure Python, in-memory)
- Insert entries: ~10-50ms (bulk insert, incremental approach)
- Total: ~30-120ms (acceptable for data creation API)
Note on partition swap-out alternative: If using the partition swap-out approach (discussed in Implementation section), latency would be ~100-400ms for the write operation, bringing total to ~120-470ms. This significantly impacts the sync/async decision and should be considered when evaluating whether swap-out is worth the additional correctness guarantees.
Implementation with safety:
async def create_events(session, book, environment, forms, ...):
events = [build_event(book, environment, form, ...) for form in forms]
# Push to Kinesis (critical path - must succeed)
r = await put_events(session, events, environment)
# Inline processing with timeout protection
inline_processed = False
inline_entry_count = 0
try:
# Timeout after 2 seconds to prevent long requests
async with asyncio.timeout(2.0):
from ledgers import inline
result = await inline.process_events_inline(
dbs, events, environment
)
inline_processed = result.processed
inline_entry_count = result.entry_count
except asyncio.TimeoutError:
logger.warning("Inline processing timed out, will be handled by batch")
except Exception:
logger.exception("Inline processing failed, will be handled by batch")
# Dispatch hooks
await dispatch.call('create_events', events=events, environment=environment)
return {
'events': events,
'kinesis': r,
'inline_processed': inline_processed,
'inline_entry_count': inline_entry_count,
}
Usage pattern for callers:
# Create events
result = await create_events(...)
# Check if inline processing succeeded
if result['inline_processed']:
# Some or all events processed inline (skipped events logged)
print(f"Created {result['inline_entry_count']} ledger entries")
# Ledger is immediately available for processable events
ledger = await get_ledger_entries(
book_id=book.id,
event_ids=[e.id for e in result['events']]
)
# Note: Some events may not have entries yet (logged as skipped)
# They'll appear after batch runs
else:
# All events skipped or processing failed (check logs)
# Ledger will be available after batch (10-15 min)
pass
Future Options:
If synchronous processing proves too slow:
ledgers/inline.py@dataclasses.dataclass
class InlineResult:
"""Result of inline processing."""
processed: bool # Whether processing succeeded
entry_count: int # Number of ledger entries created
async def process_events_inline(dbs, events, environment, past_days=90, future_days=365):
"""
Process events inline against recent ledger history.
Best-effort processing - batch will fix any issues.
Returns:
InlineResult with processed flag and entry count
"""
if not events:
return InlineResult(processed=False, entry_count=0)
# Filter to processable events (logs skipped ones)
processable = list(filter_processable(events, past_days, future_days))
if not processable:
logger.info("No processable events after filtering")
return InlineResult(processed=False, entry_count=0)
try:
# All events should be from same book
book_id = processable[0].book.id
LedgerModel = Ledger.by(environment)
# Load rulesets
book_rulesets = await load_book_rulesets(dbs, [book_id])
# Load recent ledger (time-windowed)
cutoff_date = datetime.utcnow() - timedelta(days=past_days)
partitioned_ledger = await load_recent_ledger(
dbs, LedgerModel, book_id, cutoff_date
)
# ℹ️ COMMENT: Should load full history for all accounts involved in the event:
#
# # Extract all accounts from event and components
# account_ids = set()
# if hasattr(event.data, 'account') and event.data.account:
# account_ids.add(event.data.account)
# for component in getattr(event.data, 'components', None) or ():
# if hasattr(component, 'account') and component.account:
# account_ids.add(component.account)
#
# # Load full history for all accounts (not time-windowed)
# # Unlike Spark, we can load multiple accounts - processing logic filters internally
# partitioned_ledger = await load_partition_ledger(
# dbs, LedgerModel, book_id, account_ids=list(account_ids) if account_ids else None
# )
logger.info(f"Processing {len(processable)}/{len(events)} events inline")
# Process using existing logic
new_entries = list(processors.process_events(
processable,
book_rulesets=book_rulesets,
raises=False,
))
> **ℹ️ COMMENT - POTENTIAL CODE CHANGES NEEDED:**
>
> **Current issue:** The `process_events()` function (via `process_grouped_events()`) starts with an empty `partitioned_ledger = []` and doesn't accept an initial ledger parameter. To support inline processing with historical context, we need to:
>
> 1. **Modify `process_grouped_events()` to accept initial ledger:**
> ```python
> def process_grouped_events(events, initial_ledger=None, raises=False):
> partitioned_ledger = list(initial_ledger) if initial_ledger else []
> # ... rest of function
> ```
>
> 2. **Modify `process_events()` to pass through initial ledger:**
> ```python
> def process_events(events, book_rulesets=None, initial_ledger=None, raises=False):
> # ...
> for _, items in itertools.groupby(events, key=bp):
> iterable = process_grouped_events(items, initial_ledger=initial_ledger, raises=raises)
> yield from (json_stringify(item) for item in iterable)
> ```
>
> 3. **Update inline processing to pass loaded ledger:**
> ```python
> # partitioned_ledger was already loaded above (line 493)
> # Extract account IDs if using partition-based loading instead of time-windowed
> new_entries = list(processors.process_events(
> processable,
> book_rulesets=book_rulesets,
> initial_ledger=partitioned_ledger, # Pass existing entries to processor
> raises=False,
> ))
> ```
>
> This allows the processor to have access to historical entries for calculations and reversals, while still building up the full partition state correctly.
# Insert new entries (incremental approach)
if new_entries:
await bulk_insert_entries(dbs, LedgerModel, new_entries)
> **ℹ️ COMMENT - IMPORTANT DISCUSSION POINT: Partition Swap-Out Alternative:**
>
> **Current approach (incremental insert):** Only new entries are inserted. This is simpler and faster, but has limitations:
> - Reversals mutate existing entries in-place (`reversal=True`), but those mutations are not persisted
> - Subsequent entries that depend on reversed entries may have incorrect balances until batch runs
> - Latency: ~10-50ms (simple bulk insert)
>
> **Alternative approach (partition swap-out):** Swap out the entire partition by deleting all entries for the accounts involved and inserting the full partition ledger (initial + new entries). This would:
> - Persist reversal mutations to existing entries
> - Ensure complete correct state for the partition (entries processed so far)
> - **Note:** Swap-out alone does NOT correct subsequent entries that depend on reversed entries. Only the full Spark process (which reorders reversals and reprocesses all subsequent events) can correct them, OR if we process the event stream for the partition (instead of just the historical ledger) and then swap out the partition.
> - Latency: ~100-400ms (delete + insert, depends on partition size)
>
> **When swap-out might be beneficial:**
> - When reversals are common and subsequent entry correctness is critical
> - When partition sizes are small enough that swap-out latency is acceptable
> - When batch correction latency (10-15 min) is too long for business requirements
>
> **When incremental insert is better:**
> - When reversals are rare
> - When latency must be minimized (API response time critical)
> - When batch correction latency is acceptable
> - When partition sizes are large (swap-out becomes prohibitively slow)
>
> **Implementation of swap-out approach:**
> ```python
> # Swap out entire partition (delete + insert)
> if new_entries:
> account_ids = extract_account_ids_from_events(processable) # Accounts involved
> async with dbs.begin(): # Transaction for atomicity
> # Delete existing entries for partition
> await delete_partition_entries(dbs, LedgerModel, book_id, account_ids)
> # Insert full partition ledger (initial + new entries)
> full_partition = partitioned_ledger + new_entries # Combined state
> await bulk_insert_entries(dbs, LedgerModel, full_partition)
> ```
>
> **Considerations:**
> - Transaction safety: Delete and insert must be atomic (use transaction)
> - Conflict detection: Need optimistic locking/versioning to detect conflicts with batch updates
> - Sync vs Async: Higher latency may make async processing more attractive
> - Selective swap-out: Could swap only accounts that had reversals, use incremental for others
return InlineResult(processed=True, entry_count=len(new_entries))
except Exception as e:
logger.exception("Inline processing failed")
return InlineResult(processed=False, entry_count=0)
events/crud.pyasync def create_events(session, book, environment, forms, ...):
events = [build_event(book, environment, form, ...) for form in forms]
# Push to Kinesis (critical path - must succeed)
r = await put_events(session, events, environment)
# Inline processing with timeout protection
inline_processed = False
inline_entry_count = 0
try:
# Timeout after 2 seconds to prevent long requests
async with asyncio.timeout(2.0):
from ledgers import inline
result = await inline.process_events_inline(dbs, events, environment)
inline_processed = result.processed
inline_entry_count = result.entry_count
except asyncio.TimeoutError:
logger.warning("Inline processing timed out, will be handled by batch")
except Exception:
logger.exception("Inline processing failed, batch will recover")
# Dispatch hooks
await dispatch.call('create_events', events=events, environment=environment)
return {
'events': events,
'kinesis': r,
'inline_processed': inline_processed,
'inline_entry_count': inline_entry_count,
}
Load Recent Ledger:
async def load_recent_ledger(dbs, LedgerModel, book_id, cutoff_date):
qry = db.select(LedgerModel) \
.where(LedgerModel.book_id == book_id) \
.where(LedgerModel.effective_at >= cutoff_date) \
.order_by(LedgerModel.effective_at, LedgerModel.created_at)
result = await dbs.execute(qry)
entries = result.scalars().all()
return [ledger_obj_to_processor_dict(entry) for entry in entries]
ℹ️ COMMENT: Should load full history for all accounts involved in the event (not just book_id):
Load Partition Ledger:
async def load_partition_ledger(dbs, LedgerModel, book_id, account_ids): """ Load full history for all accounts involved in the event. Unlike Spark (which currently has no partitioning at all), inline processing can load multiple accounts into the partition. The processing logic filters by account internally (ledger_window, build_calcs), so this is safe and correct. """ qry = db.select(LedgerModel) \ .where(LedgerModel.book_id == book_id) # Filter to accounts involved in this event if account_ids: qry = qry.where(LedgerModel.account.in_(account_ids)) qry = qry.order_by(LedgerModel.effective_at, LedgerModel.created_at) result = await dbs.execute(qry) entries = result.scalars().all() return [ledger_obj_to_processor_dict(entry) for entry in entries]
Load Book Rulesets:
async def load_book_rulesets(dbs, book_ids):
qry = db.select(
LedgerBookRuleset.book_id,
LedgerRule.name,
LedgerRule.logic,
).join(
LedgerRuleset,
LedgerRuleset.id == LedgerBookRuleset.ruleset_id
).join(
LedgerRule,
LedgerRule.id == db.sa.any_(LedgerRuleset.rule_ids)
).where(
LedgerBookRuleset.book_id.in_(book_ids)
)
result = await dbs.execute(qry)
book_rulesets = {}
for book_id, rule_name, rule_logic in result:
if book_id not in book_rulesets:
book_rulesets[book_id] = {}
book_rulesets[book_id][rule_name] = rule_logic
return book_rulesets
Event Filter:
def filter_processable(events, past_days=90, future_days=365):
"""
Generator that yields events suitable for inline processing.
Logs and skips events that cannot be processed without Spark preprocessing.
Args:
past_days: How far back to load history (default 90)
future_days: Maximum future date to process (default 365)
"""
now = datetime.utcnow()
cutoff_past = now - timedelta(days=past_days)
cutoff_future = now + timedelta(days=future_days)
for event in events:
effective_at = getattr(event.data, 'effective_at', None) or event.data.posted_at
# Too old - won't have proper context in time window
if effective_at < cutoff_past:
logger.info(f"Skipping event {event.id}: too_old ({effective_at})")
continue
# Too far in future - probably data error
if effective_at > cutoff_future:
logger.info(f"Skipping event {event.id}: too_far_future ({effective_at})")
continue
# Reset affects all processing
if event.data.event == 'reset':
logger.info(f"Skipping event {event.id}: reset_event")
continue
# Venmo needs deduplication
if is_venmo_event(event):
logger.info(f"Skipping event {event.id}: venmo_dedup_required")
continue
# Processable
yield event
ℹ️ COMMENT: With partition-based loading, remove
past_daysparameter and “too old” check since we load full partition history:def filter_processable(events, future_days=365): """ Generator that yields events suitable for inline processing. Logs and skips events that cannot be processed without Spark preprocessing. NOTE: With partition-based loading, we no longer need past_days parameter since we load full partition history. """ now = datetime.utcnow() cutoff_future = now + timedelta(days=future_days) for event in events: effective_at = getattr(event.data, 'effective_at', None) or event.data.posted_at # Too far in future - probably data error if effective_at > cutoff_future: logger.info(f"Skipping event {event.id}: too_far_future ({effective_at})") continue # NOTE: No longer checking for "too old" - partition loading includes full history # Reset affects all processing if event.data.event == 'reset': logger.info(f"Skipping event {event.id}: reset_event") continue # Venmo needs deduplication if is_venmo_event(event): logger.info(f"Skipping event {event.id}: venmo_dedup_required") continue # Processable yield event
Add composite index to support time-windowed ledger queries in ledgers/models_spark.py:
class Ledger(SchemaMixin, db.mixins.Id, db.mixins.CreatedAt, db.mixins.CRUD):
__tablename__ = consts.LEDGER_TABLE
# ... existing column definitions ...
@classmethod
def build_table_args(cls, env):
"""
Add composite index for inline processing time-windowed queries.
Index name includes environment to avoid conflicts across schemas.
"""
return (
db.Index(
f'idx_{env}_ledger_book_effective_created',
'book_id', 'effective_at', 'created_at',
),
)
This index supports the inline processing query pattern:
WHERE book_id = ? AND effective_at >= ?
ORDER BY effective_at, created_at
The index is critical for:
ℹ️ COMMENT: Should use partition-based indexes instead:
Index for Partition-Based Queries
Add composite index to support partition-based ledger queries:
@classmethod def build_table_args(cls, env): return ( # Index for loading by book + accounts (IN clause) db.Index( f'idx_{env}_ledger_book_account_effective', 'book_id', 'account', 'effective_at', 'created_at', ), # Index for loading by book only (when no accounts specified) db.Index( f'idx_{env}_ledger_book_effective_created', 'book_id', 'effective_at', 'created_at', ), )Query patterns:
WHERE book_id = ? AND account IN (?, ?, ...) ORDER BY effective_at, created_atWHERE book_id = ? ORDER BY effective_at, created_at(when no account filter)
@pytest.mark.asyncio
async def test_inline_matches_batch_on_fresh_book(dbs, sandbox_book, sample_event):
"""Verify inline produces same results as batch on empty ledger."""
inline_entries = await inline.process_events_inline(dbs, [sample_event], 'sandbox')
book_rulesets = await inline.load_book_rulesets(dbs, [sandbox_book.id])
batch_entries = list(processors.process_events(
[sample_event],
book_rulesets=book_rulesets,
raises=True,
))
assert len(inline_entries) == len(batch_entries)
for inline_e, batch_e in zip(inline_entries, batch_entries):
assert inline_e['amount'] == batch_e['amount']
@pytest.mark.asyncio
async def test_payment_waterfall_with_history(dbs, sandbox_book):
"""Verify payment waterfalls against historical invoice."""
# Create invoice
invoice_event = make_invoice_event(amount=100)
await inline.process_events_inline(dbs, [invoice_event], 'sandbox')
# Create payment
payment_event = make_payment_event(amount=50)
payment_entries = await inline.process_events_inline(
dbs, [payment_event], 'sandbox'
)
# Verify allocation
assert any(e.get('invoice') == invoice_event.id for e in payment_entries)
@pytest.mark.asyncio
async def test_time_window_excludes_old_entries(dbs, sandbox_book):
"""Verify time windowing correctly limits loaded entries."""
LedgerModel = Ledger.by('sandbox')
# Create old entry (100 days ago)
old_date = datetime.utcnow() - timedelta(days=100)
await LedgerModel.insert_one(dbs, book_id=sandbox_book.id, effective_at=old_date, ...)
# Create recent entry (30 days ago)
recent_date = datetime.utcnow() - timedelta(days=30)
await LedgerModel.insert_one(dbs, book_id=sandbox_book.id, effective_at=recent_date, ...)
# Load with 90 day window
cutoff = datetime.utcnow() - timedelta(days=90)
entries = await inline.load_recent_ledger(dbs, LedgerModel, sandbox_book.id, cutoff)
assert len(entries) == 1 # Only recent entry
ℹ️ COMMENT: Should test partition loading with multiple accounts:
@pytest.mark.asyncio async def test_partition_loading_includes_all_accounts(dbs, sandbox_book): """Verify partition loading includes full history for all accounts involved in event.""" LedgerModel = Ledger.by('sandbox') # Create entries for account A (old and recent) old_date = datetime.utcnow() - timedelta(days=100) await LedgerModel.insert_one(dbs, book_id=sandbox_book.id, account='account_a', effective_at=old_date, ...) recent_date = datetime.utcnow() - timedelta(days=30) await LedgerModel.insert_one(dbs, book_id=sandbox_book.id, account='account_a', effective_at=recent_date, ...) # Create entries for account B (recent only) await LedgerModel.insert_one(dbs, book_id=sandbox_book.id, account='account_b', effective_at=recent_date, ...) # Create entry for account C (should not be included) await LedgerModel.insert_one(dbs, book_id=sandbox_book.id, account='account_c', effective_at=recent_date, ...) # Load partition for accounts A and B (simulating event with components for both accounts) entries = await inline.load_partition_ledger(dbs, LedgerModel, sandbox_book.id, account_ids=['account_a', 'account_b']) assert len(entries) == 3 # Both old and recent entries for account_a, recent for account_b assert all(e['book_id'] == sandbox_book.id for e in entries) assert all(e['account'] in ['account_a', 'account_b'] for e in entries) assert 'account_c' not in [e['account'] for e in entries]
Test with existing fixtures to ensure inline processing produces reasonable results for realistic scenarios.
Monitor logs for discrepancies between inline and batch results:
# In Spark processing
if inline_entry_exists and differs_from_batch:
logger.warning(f"Inline/batch discrepancy detected: {details}")
With 90-day window and typical volumes:
All manageable. The sort/group operations are O(n log n), which is fine for these sizes.
ℹ️ COMMENT: With partition-based loading, memory usage is for all accounts involved in the event:
With partition-based loading (full history for involved accounts) and typical volumes:
- Single account (vast majority): 1 account × entries (largest account currently has ~600 entries) → ~1-10MB in memory
- Multi-account events (LLM-generated, rare): 2-5 accounts × entries each → typically 2-50MB total
In practice, virtually all events involve only 1 account (only LLM generates multi-account group events), so memory usage is very small. Even when LLM generates multi-account events (typically 2-5 accounts), total memory is still very manageable.
With proper indexing, the time-windowed query should be fast:
Expected: <100ms for typical queries.
ℹ️ COMMENT: With partition-based queries:
With proper indexing, the partition-based query should be fast:
- Book filter: Reduces to single book’s data
- Account filter (IN clause): Composite index supports efficient filtering
- Ordered results: Composite index supports ORDER BY
Expected: <50ms for typical account sizes. In practice, virtually all events involve only 1 account (only LLM generates multi-account events):
- Single account (vast majority of cases, largest account has ~600 entries): <10-20ms
- Multi-account events (LLM-generated, 2-5 accounts): 20-50ms
- Very large multi-account events (rare): 50-100ms (still acceptable for inline processing)
1. Cache book rulesets:
from cachetools import TTLCache
_ruleset_cache = TTLCache(maxsize=100, ttl=300) # 5 min TTL
2. Skip loading for simple events:
if event.data.event == 'invoice' and not needs_waterfall(rule):
# Just process without loading history
entries = process_simple_event(event, rule)
ledgers/inline.py moduleevents/crud.pyℹ️ COMMENT:
- Query performance: Time to load partition ledger (should be < 50ms - virtually all events are single account with largest having ~600 entries, rare LLM-generated multi-account events are 2-5 accounts)
- Partition size distribution: Track entries per account to identify outliers (very rare - only LLM can generate multi-account events)
Scenario: Reversal event arrives for transaction >90 days old (not in loaded ledger window).
Inline behavior: The existing reverseentries() function searches the loaded ledger for matching entries to reverse. If the target transaction is not in the loaded window (e.g., >90 days old), the search finds no matches and produces 0 entries. This is a graceful no-op - no errors, no special handling needed.
Batch behavior: Processes correctly with full history, finds target, generates reversal entries.
Impact: Reversal not immediately visible, appears in next batch run (10-15 min).
Acceptable: Yes, these are rare for newly-created events. The reversal processing logic naturally handles this case.
Note: No special classifier logic or future optimization needed. Reversals “just work” - they process if the target is found, no-op if not. Batch always corrects.
ℹ️ COMMENT: With partition-based loading (full history for all accounts involved in the event), reversals can find target entries correctly as long as they’re within the loaded account set. The concern about “reversals >90 days old” no longer applies since we load the full history for involved accounts. However, there is a critical limitation: reversals will correctly mark past entries and create reversal entries, but subsequent entries (especially waterfalls) that depend on those reversed entries will have incorrect balances until the full Spark batch reprocesses them. This is because Spark reorders reversals to come immediately after their target event and reprocesses all subsequent events, which inline processing cannot do when using an initial ledger. See the Best-Effort Processing section for more details.
Scenario: Event references account A, which is merged to account B milliseconds later.
Inline behavior: Uses account A (original reference).
Batch behavior: Rewrites to account B (after preprocessing).
Impact: Temporary incorrect account assignment.
Acceptable: Yes, batch fixes quickly, extremely rare timing.
Scenario: Two payments arrive simultaneously for same customer.
Inline behavior: Both load same historical ledger, process independently, both insert.
Batch behavior: Processes in correct order, produces authoritative result.
Impact: Possible temporary incorrect waterfall allocation.
Acceptable: Yes, batch reconciles. Could add advisory locks if problematic.
Scenario: Duplicate venmo transaction arrives.
Inline behavior: Skipped (not processed inline).
Batch behavior: Deduplication logic removes duplicate.
Impact: Duplicate not immediately visible, removed in batch.
Acceptable: Yes, correct by design.
Scenario: Inline processing succeeds but DB insert fails.
Inline behavior: Exception caught, logged, returns empty list.
Batch behavior: Processes normally from Kinesis.
Impact: No inline entries, user sees nothing until batch runs.
Acceptable: Yes, event creation still succeeds, batch provides entries.
Scenario: Event has effective_at in the future (e.g., scheduled invoice for next month).
Inline behavior: Processes normally if within 365 days. Can waterfall against existing historical entries (e.g., future payment against current invoices).
Batch behavior: Same - processes normally.
Impact: Future-dated ledger entries immediately queryable. Good for:
Acceptable: Yes, this is desirable behavior.
Edge case: Events > 365 days in future are skipped (likely data errors). Can be adjusted via max_future_days parameter.
Note: Time window query effective_at >= cutoff_date loads past entries, not future ones. This is correct - future events only need historical context for waterfall/balance calculations, not other future events.
ℹ️ COMMENT: With partition-based loading:
Note: Partition query loads all entries for the involved accounts (by
book_idandaccount IN (...)), ordered byeffective_at. This includes both past and future entries for those accounts. This is correct - events need the full context for all accounts involved (event + components) for waterfall/balance calculations, regardless of when entries were created.
# Success rate
metrics.increment('ledger.inline.processed', len(new_entries))
metrics.increment('ledger.inline.skipped')
# Latency
with metrics.timer('ledger.inline.duration'):
await process_events_inline(...)
# Ledger size
metrics.gauge('ledger.inline.entries_loaded', len(partitioned_ledger))
Adjust window based on ledger size and event characteristics.
ℹ️ COMMENT: With partition-based loading:
1. Smart Partition Loading
Optimize partition loading based on partition size (e.g., cache small partitions, lazy-load large ones).
Use Redis to cache recent ledger entries and rulesets.
If synchronous processing proves too slow (p95 > 500ms or timeout rate > 5%):
Only load ledger entries relevant to event type (e.g., only receivables for payments).
Generate inline snapshots (balances, aging) alongside ledger entries.
Approach: Store events in PostgreSQL, always process against full ledger history.
Pros: Perfect consistency, handles all edge cases.
Cons:
Rejected because: Time windowing is simpler and handles 99% of cases.
ℹ️ COMMENT:
Rejected because: Partition-based loading matches Spark processing exactly and handles all cases correctly. Time windowing would break partition isolation.
Approach: Create materialized view of recent ledger, refresh periodically.
Pros: Query performance.
Cons:
Rejected because: Doesn’t solve the core problem.
Approach: Use Spark Streaming to process events as they arrive.
Pros: Consistent with batch processing.
Cons:
Rejected because: Overkill for the problem, prefer simpler Python approach.
ℹ️ COMMENT:
- Partition size optimization: Should we cache small partitions? When is a partition too large for inline processing?
- Concurrent processing: Do we need advisory locks for same-partition events?
- Cache layer: Would Redis caching provide meaningful improvement?
- Snapshot generation: Should inline processing also update snapshots?
See implementation in ledgers/inline.py for complete, production-ready code.