Command persistence middleware for production debugging #114

Closed
opened 2026-02-23 09:30:17 +00:00 by ash · 0 comments
Owner

Summary

Add a middleware that persists every command sent through the CommandBus, capturing the full lifecycle: intent → decision → resulting events.

Nobody in Go ES does this. Axon (Java) has it and it's their strongest enterprise selling point. We'd bring enterprise-grade observability to Go event sourcing.

Philosophy: Bricks, Not Cathedrals

eskit is a toolkit of composable bricks. Users pick the pieces they need and assemble them. Command persistence follows the same principle:

  • Interface firstCommandLogStore is the contract. We provide implementations, but users can bring their own (Redis, DynamoDB, flat files, whatever).
  • Each backend is an independent module — import only what you use. SQLite users don't pull in Postgres deps.
  • The middleware is decoupled from storageWithCommandAudit(bus, store) works with ANY CommandLogStore implementation.
  • Zero lock-in — swap backends without changing application code.

What gets stored

type CommandRecord struct {
    ID            string
    CorrelationID string
    CommandType   string
    Payload       []byte        // serialized command
    Metadata      Metadata      // user, IP, timestamp, etc.
    Status        string        // accepted, rejected, failed
    Error         string        // if rejected/failed
    EventIDs      []string      // resulting event IDs
    Duration      time.Duration
    Timestamp     time.Time
}

API

// Pick your brick:
auditStore := sqlitelog.New(db)       // or pglog.New(pool) or natslog.New(js) or your own
auditBus := eskit.WithCommandAudit(bus, auditStore)

// Or bring your own:
type CommandLogStore interface {
    Record(ctx context.Context, record CommandRecord) error
    Get(ctx context.Context, id string) (CommandRecord, error)
    FindByCorrelation(ctx context.Context, correlationID string) ([]CommandRecord, error)
    FindByType(ctx context.Context, commandType string, opts QueryOpts) ([]CommandRecord, error)
    List(ctx context.Context, opts QueryOpts) ([]CommandRecord, error)
}

// Users implement this interface for custom backends — Redis, Mongo, S3, anything.

Provided Backends (Bricks)

  • Memorycommandlog/memlog/ — for testing and dev
  • SQLitecommandlog/sqlitelog/command_log table, indexed on correlation_id, command_type, timestamp
  • Postgrescommandlog/pglog/command_log table, same indexes, JSONB for payload/metadata
  • NATScommandlog/natslog/ — dedicated JetStream stream, KV for lookups

Each backend is its own Go sub-module (same pattern as event stores). Users import only what they need.

Custom Backends

Users who want a backend we don't provide just implement CommandLogStore. The conformance test suite (commandlogtest/) is exported so they can verify their implementation passes all requirements.

// In user's code:
func TestMyRedisLog(t *testing.T) {
    store := myredislog.New(redisClient)
    commandlogtest.RunSuite(t, store)  // proves it works
}

Testing Requirements

  • Full unit tests for middleware (success, rejection, failure, timeout paths)
  • Conformance test suite (commandlogtest/) — shared tests ALL backends must pass, exported for custom implementations
  • Integration tests with each provided backend (Memory, SQLite, Postgres, NATS)
  • Correlation ID chain tests (command → events linkage)
  • Concurrent command logging (race-free under -race)
  • Edge cases: nil metadata, empty payload, very large payloads
  • cmp.Diff assertions, Given/When/Then style
  • Query tests: by correlation, by type, by time range, pagination

Benchmark Requirements

  • Benchmark middleware overhead vs bare CommandBus (must be < 5% for in-memory)
  • Benchmark write throughput per backend (Memory, SQLite, Postgres, NATS)
  • Benchmark query by correlation ID per backend
  • Benchmark with varying payload sizes (100B, 1KB, 10KB, 100KB)
  • Compare JSON vs CBOR serialization for command records
  • Memory allocation profiling (zero-alloc hot path where possible)
  • Compare all backends head-to-head (same workload, same hardware)

Why

  • Production debugging: "what command caused this mess?"
  • Full correlation chain: command → events across streams
  • Rejected command visibility (events only show successes)
  • Performance analysis (command duration tracking)
  • Foundation for the Management UI correlation tracer

Build order

This should be built FIRST — small surface area, immediately useful, and both CLI and UI depend on it.

## Summary Add a middleware that persists every command sent through the CommandBus, capturing the full lifecycle: intent → decision → resulting events. Nobody in Go ES does this. Axon (Java) has it and it's their strongest enterprise selling point. We'd bring enterprise-grade observability to Go event sourcing. ## Philosophy: Bricks, Not Cathedrals eskit is a toolkit of composable bricks. Users pick the pieces they need and assemble them. Command persistence follows the same principle: - **Interface first** — `CommandLogStore` is the contract. We provide implementations, but users can bring their own (Redis, DynamoDB, flat files, whatever). - **Each backend is an independent module** — import only what you use. SQLite users don't pull in Postgres deps. - **The middleware is decoupled from storage** — `WithCommandAudit(bus, store)` works with ANY `CommandLogStore` implementation. - **Zero lock-in** — swap backends without changing application code. ## What gets stored ```go type CommandRecord struct { ID string CorrelationID string CommandType string Payload []byte // serialized command Metadata Metadata // user, IP, timestamp, etc. Status string // accepted, rejected, failed Error string // if rejected/failed EventIDs []string // resulting event IDs Duration time.Duration Timestamp time.Time } ``` ## API ```go // Pick your brick: auditStore := sqlitelog.New(db) // or pglog.New(pool) or natslog.New(js) or your own auditBus := eskit.WithCommandAudit(bus, auditStore) // Or bring your own: type CommandLogStore interface { Record(ctx context.Context, record CommandRecord) error Get(ctx context.Context, id string) (CommandRecord, error) FindByCorrelation(ctx context.Context, correlationID string) ([]CommandRecord, error) FindByType(ctx context.Context, commandType string, opts QueryOpts) ([]CommandRecord, error) List(ctx context.Context, opts QueryOpts) ([]CommandRecord, error) } // Users implement this interface for custom backends — Redis, Mongo, S3, anything. ``` ## Provided Backends (Bricks) - **Memory** — `commandlog/memlog/` — for testing and dev - **SQLite** — `commandlog/sqlitelog/` — `command_log` table, indexed on correlation_id, command_type, timestamp - **Postgres** — `commandlog/pglog/` — `command_log` table, same indexes, JSONB for payload/metadata - **NATS** — `commandlog/natslog/` — dedicated JetStream stream, KV for lookups Each backend is its own Go sub-module (same pattern as event stores). Users import only what they need. ## Custom Backends Users who want a backend we don't provide just implement `CommandLogStore`. The conformance test suite (`commandlogtest/`) is exported so they can verify their implementation passes all requirements. ```go // In user's code: func TestMyRedisLog(t *testing.T) { store := myredislog.New(redisClient) commandlogtest.RunSuite(t, store) // proves it works } ``` ## Testing Requirements - Full unit tests for middleware (success, rejection, failure, timeout paths) - **Conformance test suite** (`commandlogtest/`) — shared tests ALL backends must pass, exported for custom implementations - Integration tests with each provided backend (Memory, SQLite, Postgres, NATS) - Correlation ID chain tests (command → events linkage) - Concurrent command logging (race-free under `-race`) - Edge cases: nil metadata, empty payload, very large payloads - cmp.Diff assertions, Given/When/Then style - Query tests: by correlation, by type, by time range, pagination ## Benchmark Requirements - Benchmark middleware overhead vs bare CommandBus (must be < 5% for in-memory) - Benchmark write throughput per backend (Memory, SQLite, Postgres, NATS) - Benchmark query by correlation ID per backend - Benchmark with varying payload sizes (100B, 1KB, 10KB, 100KB) - Compare JSON vs CBOR serialization for command records - Memory allocation profiling (zero-alloc hot path where possible) - Compare all backends head-to-head (same workload, same hardware) ## Why - Production debugging: "what command caused this mess?" - Full correlation chain: command → events across streams - Rejected command visibility (events only show successes) - Performance analysis (command duration tracking) - Foundation for the Management UI correlation tracer ## Build order This should be built FIRST — small surface area, immediately useful, and both CLI and UI depend on it.
ash closed this issue 2026-02-23 10:59:00 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
ash/eskit#114
No description provided.