feat: competing consumers — leader election for projections and automations #26

Closed
opened 2026-02-19 22:24:41 +00:00 by ash · 3 comments
Owner

Problem

Running eskit on N servers means N instances of every projection and automation. This causes:

  1. Projections: N writes to the same Postgres/SQLite read model. Wasted work at best, data corruption at worst (concurrent UPSERTs racing).
  2. Automations: N executions of every side effect. N emails sent, N webhooks fired, N payments initiated.

Currently EventSubscription runs independently per instance with no coordination. There is no competing consumer pattern.

Building Blocks We Already Have

  • LockRegistry interface — Acquire(ctx, id) (release, error) / TryAcquire(id) (release, bool)
  • PgLockRegistry — Postgres advisory locks, session-scoped, auto-released on disconnect/crash
  • MemoryLockRegistry — single-process (dev/test)
  • SingleWriterMiddleware — uses LockRegistry for command serialization

Design

Core: Add LockRegistry to subscription.Config

type Config[E any] struct {
    // ... existing fields ...

    // LeaderLock, when set, ensures only one instance processes events
    // for this subscription across all nodes. The subscription acquires
    // the lock before processing and holds it for its lifetime.
    // On failover, the new leader resumes from the checkpoint.
    LeaderLock LockRegistry
}

Subscription Behavior With LeaderLock

  1. Start() → attempt LeaderLock.Acquire(ctx, "sub:"+Name)
  2. If acquired → process events normally, checkpoint as usual
  3. If blocked → wait (another node is leader). When lock acquired (failover), resume from checkpoint.
  4. If context cancelled → release lock, stop
  5. If node crashes → Postgres detects dead connection, releases advisory lock. Another node acquires within pool health check interval.

Automation Behavior

Same pattern. Automation.Config gets LeaderLock field. Only the leader node processes the TodoList.

Two Levels of Protection

For automations with side effects, we want belt AND suspenders:

  1. Leader lock on the automation — only one node runs the automation loop (prevents duplicate triggers)
  2. Per-item idempotency lock — before executing each side effect, acquire advisory lock on "exec:"+item.Key. Even if leader election fails briefly (split-brain during Postgres failover), a specific side effect runs at most once.
// In automation Execute func:
func sendPurchaseEmail(ctx context.Context, item TodoItem[string]) error {
    release, err := locks.Acquire(ctx, "exec:purchase-email:"+item.Key)
    if err != nil {
        return nil // another node got it
    }
    defer release()
    
    // Idempotency: check if already executed
    if alreadySent(ctx, item.Key) {
        return nil
    }
    
    // Execute side effect
    err = sendEmail(...)
    if err != nil {
        return err // will retry
    }
    
    // Record execution (in same PG transaction as checkpoint if possible)
    markSent(ctx, item.Key)
    return nil
}

Failover Scenarios

Scenario Expected Behavior Recovery Time
Leader node crashes PG detects dead conn, releases lock. Standby acquires. PG idle_in_transaction_session_timeout or TCP keepalive (~30s default)
Leader node graceful shutdown Lock released on Stop(). Standby acquires. < 1s
Network partition (leader can still reach PG) Leader keeps lock, continues processing. No downtime.
Network partition (leader loses PG) PG releases lock after timeout. Standby acquires. ~30s
PG primary failover All locks released. All nodes reconnect to new primary. One wins. Depends on PG failover (typically 10-30s)
Split brain (two nodes both think they are leader) Per-item idempotency lock prevents duplicate execution. Self-healing.

What To Benchmark

  1. Advisory lock acquire/release latency — expect sub-ms, prove it
  2. Failover time — kill leader process, measure time until standby acquires
  3. Lock overhead per subscription — N subscriptions = N held advisory locks = N dedicated PG connections. Measure pool pressure.
  4. Throughput with vs without LeaderLock — should be negligible since lock is held for subscription lifetime, not per-event
  5. Hash collision rateadvisoryLockID uses FNV-1a to int64. For M subscription names, probability of collision. Should be astronomically low but verify.

Test Plan

  1. Unit: leader acquires, processes events — basic happy path
  2. Unit: standby blocks, resumes on leader release — failover simulation
  3. Unit: context cancellation releases lock — clean shutdown
  4. Integration: two processes, one PG — start two subscriptions with same name, verify only one processes
  5. Integration: kill leader, verify standby takes over — actual failover
  6. Integration: both attempt side effect — verify per-item lock prevents duplicate
  7. Stress: rapid leader cycling — repeatedly kill/restart, verify no missed or duplicated events
  8. Stress: checkpoint consistency after failover — new leader resumes from correct position, no reprocessing beyond checkpoint

Implementation Phases

  • Phase 1: Add LeaderLock to subscription.Config, acquire on Start, release on Stop
  • Phase 2: Add LeaderLock to automation.Config, same pattern
  • Phase 3: Per-item idempotency lock helper for automation Execute funcs
  • Phase 4: Benchmark suite — all measurements above
  • Phase 5: Failover integration tests with real Postgres
  • Phase 6: Documentation — clustering guide, deployment patterns

Connection Pool Implications

Each held advisory lock = 1 dedicated PG connection. With 20 projections + 5 automations = 25 connections just for locks. This is fine for most deployments but:

  • Document the formula: lock_connections = num_projections + num_automations
  • Add to PG pool sizing guidance
  • Consider pg_try_advisory_lock (non-blocking) with retry loop to avoid holding connections while waiting

NOT In Scope

  • Partition-based competing consumers (Kafka-style) — overkill for our use case
  • NATS-based leader election — we are removing NATS
  • Custom consensus (Raft etc) — Postgres IS our consensus layer

References

  • Existing: PgLockRegistry in pgstore/cluster.go
  • Existing: LockRegistry interface in singlewriter.go
  • Existing: SingleWriterMiddleware pattern
  • Related: eskit #26 (original clustering issue)
## Problem Running eskit on N servers means N instances of every projection and automation. This causes: 1. **Projections:** N writes to the same Postgres/SQLite read model. Wasted work at best, data corruption at worst (concurrent UPSERTs racing). 2. **Automations:** N executions of every side effect. N emails sent, N webhooks fired, N payments initiated. Currently `EventSubscription` runs independently per instance with no coordination. There is no competing consumer pattern. ## Building Blocks We Already Have - `LockRegistry` interface — `Acquire(ctx, id) (release, error)` / `TryAcquire(id) (release, bool)` - `PgLockRegistry` — Postgres advisory locks, session-scoped, auto-released on disconnect/crash - `MemoryLockRegistry` — single-process (dev/test) - `SingleWriterMiddleware` — uses LockRegistry for command serialization ## Design ### Core: Add `LockRegistry` to `subscription.Config` ```go type Config[E any] struct { // ... existing fields ... // LeaderLock, when set, ensures only one instance processes events // for this subscription across all nodes. The subscription acquires // the lock before processing and holds it for its lifetime. // On failover, the new leader resumes from the checkpoint. LeaderLock LockRegistry } ``` ### Subscription Behavior With LeaderLock 1. **Start()** → attempt `LeaderLock.Acquire(ctx, "sub:"+Name)` 2. **If acquired** → process events normally, checkpoint as usual 3. **If blocked** → wait (another node is leader). When lock acquired (failover), resume from checkpoint. 4. **If context cancelled** → release lock, stop 5. **If node crashes** → Postgres detects dead connection, releases advisory lock. Another node acquires within pool health check interval. ### Automation Behavior Same pattern. `Automation.Config` gets `LeaderLock` field. Only the leader node processes the TodoList. ### Two Levels of Protection For automations with side effects, we want **belt AND suspenders:** 1. **Leader lock on the automation** — only one node runs the automation loop (prevents duplicate triggers) 2. **Per-item idempotency lock** — before executing each side effect, acquire advisory lock on `"exec:"+item.Key`. Even if leader election fails briefly (split-brain during Postgres failover), a specific side effect runs at most once. ```go // In automation Execute func: func sendPurchaseEmail(ctx context.Context, item TodoItem[string]) error { release, err := locks.Acquire(ctx, "exec:purchase-email:"+item.Key) if err != nil { return nil // another node got it } defer release() // Idempotency: check if already executed if alreadySent(ctx, item.Key) { return nil } // Execute side effect err = sendEmail(...) if err != nil { return err // will retry } // Record execution (in same PG transaction as checkpoint if possible) markSent(ctx, item.Key) return nil } ``` ### Failover Scenarios | Scenario | Expected Behavior | Recovery Time | |----------|------------------|---------------| | Leader node crashes | PG detects dead conn, releases lock. Standby acquires. | PG `idle_in_transaction_session_timeout` or TCP keepalive (~30s default) | | Leader node graceful shutdown | Lock released on Stop(). Standby acquires. | < 1s | | Network partition (leader can still reach PG) | Leader keeps lock, continues processing. | No downtime. | | Network partition (leader loses PG) | PG releases lock after timeout. Standby acquires. | ~30s | | PG primary failover | All locks released. All nodes reconnect to new primary. One wins. | Depends on PG failover (typically 10-30s) | | Split brain (two nodes both think they are leader) | Per-item idempotency lock prevents duplicate execution. | Self-healing. | ### What To Benchmark 1. **Advisory lock acquire/release latency** — expect sub-ms, prove it 2. **Failover time** — kill leader process, measure time until standby acquires 3. **Lock overhead per subscription** — N subscriptions = N held advisory locks = N dedicated PG connections. Measure pool pressure. 4. **Throughput with vs without LeaderLock** — should be negligible since lock is held for subscription lifetime, not per-event 5. **Hash collision rate** — `advisoryLockID` uses FNV-1a to int64. For M subscription names, probability of collision. Should be astronomically low but verify. ### Test Plan 1. **Unit: leader acquires, processes events** — basic happy path 2. **Unit: standby blocks, resumes on leader release** — failover simulation 3. **Unit: context cancellation releases lock** — clean shutdown 4. **Integration: two processes, one PG** — start two subscriptions with same name, verify only one processes 5. **Integration: kill leader, verify standby takes over** — actual failover 6. **Integration: both attempt side effect** — verify per-item lock prevents duplicate 7. **Stress: rapid leader cycling** — repeatedly kill/restart, verify no missed or duplicated events 8. **Stress: checkpoint consistency after failover** — new leader resumes from correct position, no reprocessing beyond checkpoint ### Implementation Phases - [ ] Phase 1: Add `LeaderLock` to `subscription.Config`, acquire on Start, release on Stop - [ ] Phase 2: Add `LeaderLock` to `automation.Config`, same pattern - [ ] Phase 3: Per-item idempotency lock helper for automation Execute funcs - [ ] Phase 4: Benchmark suite — all measurements above - [ ] Phase 5: Failover integration tests with real Postgres - [ ] Phase 6: Documentation — clustering guide, deployment patterns ### Connection Pool Implications Each held advisory lock = 1 dedicated PG connection. With 20 projections + 5 automations = 25 connections just for locks. This is fine for most deployments but: - Document the formula: `lock_connections = num_projections + num_automations` - Add to PG pool sizing guidance - Consider `pg_try_advisory_lock` (non-blocking) with retry loop to avoid holding connections while waiting ### NOT In Scope - Partition-based competing consumers (Kafka-style) — overkill for our use case - NATS-based leader election — we are removing NATS - Custom consensus (Raft etc) — Postgres IS our consensus layer ## References - Existing: `PgLockRegistry` in `pgstore/cluster.go` - Existing: `LockRegistry` interface in `singlewriter.go` - Existing: `SingleWriterMiddleware` pattern - Related: eskit #26 (original clustering issue)
Author
Owner

Deferred to post-v1. Deferred: design when we actually need to scale beyond one instance.

**Deferred to post-v1.** Deferred: design when we actually need to scale beyond one instance.
Author
Owner

Re-prioritized to CRITICAL. YoYoPass requires clustering for production scalability. Not optional.

**Re-prioritized to CRITICAL.** YoYoPass requires clustering for production scalability. Not optional.
Author
Owner

Why NOT Now — Decision Record (2026-03-01)

Discussion with Axon engineers (Marc Klefter) confirmed: consistent hashing for command routing took Axon 15+ years to get right. The complexity is in rebalancing, failover, partition assignment, and split-brain handling — not the concept itself.

Current Production Baseline (stress tested 2026-03-01)

  • Single Postgres: ~5,000 appends/sec sustained (100K events, 100 goroutines)
  • Same-stream contention: advisory locks serialize cleanly, zero lost writes
  • Batch append: ~22,000 events/sec (single 10K batch)
  • Subscription throughput: ~5,000 events/sec with gap detection, zero gaps
  • Connection pool: 50 concurrent ops on 5 connections, zero deadlocks

Why Advisory Locks + Auto-Retry Is Sufficient

  1. pg_advisory_xact_lock(hashtext(streamID)) serializes same-stream writes — no clashing, no wasted retries (unlike Ruby approach of optimistic retry loops)
  2. Different streams run fully parallel — no artificial bottleneck
  3. CommandHandler auto-retry (#156) handles rare edge cases (hash collisions, transient failures)
  4. No routing infrastructure needed — Postgres IS the coordinator

When To Build This

Trigger conditions (monitored via #146 lag monitoring):

  • Subscription lag consistently >1000 events behind
  • Append latency p99 >50ms under normal load
  • Need for >99.9% uptime (HA requirement)
  • Multi-region deployment requirement

Consistent Hashing Plan (when needed)

NATS JetStream provides consistent hashing out of the box:

  • Route commands by hashtext(streamID) % partitionCount to specific consumers
  • JetStream durable pull consumers with queue groups handle rebalancing
  • No custom rebalancing code needed — NATS manages consumer group membership
  • Partition count is fixed at deployment, not dynamic (simpler, sufficient)

This avoids building the 15 years of Axon infrastructure — we leverage NATS for the hard parts.

References

  • Stress test proof: commit c5d75a7 (pgstore/stress_test.go)
  • Advisory lock hash collision tracking: #160
  • Lag monitoring: #146
  • CommandHandler auto-retry: #156
## Why NOT Now — Decision Record (2026-03-01) Discussion with Axon engineers (Marc Klefter) confirmed: consistent hashing for command routing took Axon **15+ years** to get right. The complexity is in rebalancing, failover, partition assignment, and split-brain handling — not the concept itself. ### Current Production Baseline (stress tested 2026-03-01) - **Single Postgres**: ~5,000 appends/sec sustained (100K events, 100 goroutines) - **Same-stream contention**: advisory locks serialize cleanly, zero lost writes - **Batch append**: ~22,000 events/sec (single 10K batch) - **Subscription throughput**: ~5,000 events/sec with gap detection, zero gaps - **Connection pool**: 50 concurrent ops on 5 connections, zero deadlocks ### Why Advisory Locks + Auto-Retry Is Sufficient 1. `pg_advisory_xact_lock(hashtext(streamID))` serializes same-stream writes — no clashing, no wasted retries (unlike Ruby approach of optimistic retry loops) 2. Different streams run fully parallel — no artificial bottleneck 3. CommandHandler auto-retry (#156) handles rare edge cases (hash collisions, transient failures) 4. No routing infrastructure needed — Postgres IS the coordinator ### When To Build This Trigger conditions (monitored via #146 lag monitoring): - Subscription lag consistently >1000 events behind - Append latency p99 >50ms under normal load - Need for >99.9% uptime (HA requirement) - Multi-region deployment requirement ### Consistent Hashing Plan (when needed) NATS JetStream provides consistent hashing out of the box: - Route commands by `hashtext(streamID) % partitionCount` to specific consumers - JetStream durable pull consumers with queue groups handle rebalancing - No custom rebalancing code needed — NATS manages consumer group membership - Partition count is fixed at deployment, not dynamic (simpler, sufficient) This avoids building the 15 years of Axon infrastructure — we leverage NATS for the hard parts. ### References - Stress test proof: commit `c5d75a7` (pgstore/stress_test.go) - Advisory lock hash collision tracking: #160 - Lag monitoring: #146 - CommandHandler auto-retry: #156
ash changed title from Cluster scaling: partitioned commands, competing projections, rebalancing to feat: competing consumers — leader election for projections and automations 2026-03-02 21:00:30 +00:00
ash closed this issue 2026-03-02 22:16:03 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
ash/eskit#26
No description provided.