- Go 100%
|
Some checks failed
CI / test-nats (push) Failing after 6s
CI / test-codec-otelkit (push) Successful in 16s
CI / test-integration (push) Failing after 6s
CI / benchmarks (push) Failing after 4s
CI / test-sqlitestore (push) Failing after 38s
CI / test-pgstore (push) Failing after 1m23s
CI / test-core (push) Successful in 1m36s
Repository moved to nullsoft/eskit — module path now matches. |
||
|---|---|---|
| .forgejo/workflows | ||
| benchmarks | ||
| cmd | ||
| codec | ||
| command | ||
| commandlog | ||
| commandqueue | ||
| conformance | ||
| dcb | ||
| docs | ||
| embeddednats | ||
| eventstoretest | ||
| examples | ||
| gdpr | ||
| hooks | ||
| id | ||
| integration | ||
| liveprojection | ||
| metrics | ||
| middleware | ||
| natsbatch | ||
| natscluster | ||
| natscommand | ||
| natseventbus | ||
| natslock | ||
| natsnotifier | ||
| natsstore | ||
| otelkit | ||
| pgprocessor | ||
| pgqueue | ||
| pgstore | ||
| pgview | ||
| processor | ||
| projection | ||
| prommetrics | ||
| rebuild | ||
| runner | ||
| snapshot | ||
| sqlitequeue | ||
| sqlitestore | ||
| sqlview | ||
| subscription | ||
| typereg | ||
| .gitignore | ||
| adversarial_test.go | ||
| alignment_test.go | ||
| append_options.go | ||
| append_options_edge_test.go | ||
| audit.go | ||
| bench-before.txt | ||
| BENCHMARK.txt | ||
| BENCHMARK_COMPARISON.md | ||
| BENCHMARK_HINTS.txt | ||
| benchmark_test.go | ||
| changenotifier.go | ||
| changenotifier_adversarial_test.go | ||
| changenotifier_bench_test.go | ||
| changenotifier_redteam_test.go | ||
| changenotifier_test.go | ||
| context.go | ||
| context_test.go | ||
| crypto.go | ||
| crypto_test.go | ||
| decider.go | ||
| decider_test.go | ||
| decidertest.go | ||
| deletion.go | ||
| deletion_bench_test.go | ||
| deletion_test.go | ||
| deserialize.go | ||
| deserialize_test.go | ||
| dispatcher.go | ||
| dispatcher_test.go | ||
| dlq.go | ||
| dlq_test.go | ||
| domains_test.go | ||
| errors.go | ||
| eskit | ||
| eskit_test.go | ||
| event.go | ||
| event_registry.go | ||
| event_type_bench_test.go | ||
| event_type_test.go | ||
| eventbus.go | ||
| eventbus_channel.go | ||
| eventbus_test.go | ||
| fuzz_test.go | ||
| gdpr.md | ||
| go.mod | ||
| go.sum | ||
| handler.go | ||
| handler_retry_test.go | ||
| helpers.go | ||
| helpers_test.go | ||
| integration_sse_test.go | ||
| LICENSE | ||
| lifecycle.go | ||
| lifecycle_edge_test.go | ||
| lifecycle_test.go | ||
| load_options_edge_test.go | ||
| memory.go | ||
| memory_deletion.go | ||
| memory_lifecycle.go | ||
| memory_store_test.go | ||
| metadata_test.go | ||
| middleware.go | ||
| middleware_builtin.go | ||
| middleware_builtin_edge_test.go | ||
| middleware_builtin_test.go | ||
| middleware_comprehensive_test.go | ||
| middleware_test.go | ||
| multi_subscription.go | ||
| multi_subscription_test.go | ||
| observability.go | ||
| observability_test.go | ||
| profiler.go | ||
| profiler_test.go | ||
| projection_comprehensive_test.go | ||
| projection_test.go | ||
| projectiontest.go | ||
| projectiontest_test.go | ||
| raw_event.go | ||
| raw_event_edge_test.go | ||
| README.md | ||
| redteam6_root_test.go | ||
| redteam_dlq_test.go | ||
| redteam_feb28_test.go | ||
| redteam_mar01_test.go | ||
| redteam_mar02_test.go | ||
| redteam_mar07_test.go | ||
| redteam_mar08_test.go | ||
| redteam_mar10_test.go | ||
| redteam_mar12_test.go | ||
| redteam_mar13_test.go | ||
| redteam_test.go | ||
| register.go | ||
| register_test.go | ||
| runner.go | ||
| runner_test.go | ||
| serve_changes.go | ||
| serve_changes_test.go | ||
| singlewriter.go | ||
| singlewriter_redteam_r5_test.go | ||
| singlewriter_test.go | ||
| snapshot_store.go | ||
| stateview.go | ||
| store.go | ||
| store_features_test.go | ||
| STORE_PARITY.md | ||
| stream.go | ||
| tenant.go | ||
| tenant_test.go | ||
| typecache.go | ||
| typecache_test.go | ||
| upcaster.go | ||
| upcaster_test.go | ||
| upcasting_integration_test.go | ||
eskit — Event Sourcing Toolkit for Go
A pragmatic, generic event sourcing framework for Go. Built around the decider pattern with first-class support for all three event modeling patterns: State Changes, State Views, and Automations.
go get git.nullsoft.is/ash/eskit
Quick Start — Counter
package main
import (
"context"
"fmt"
"git.nullsoft.is/ash/eskit"
)
type Incremented struct{}
type Decremented struct{}
type Increment struct{}
type Decrement struct{}
var counter = eskit.Decider[int, any, any]{
InitialState: func() int { return 0 },
Decide: func(state int, cmd any) ([]any, error) {
switch cmd.(type) {
case Increment:
return []any{Incremented{}}, nil
case Decrement:
return []any{Decremented{}}, nil
default:
return nil, fmt.Errorf("unknown command: %T", cmd)
}
},
Evolve: func(state int, event any) int {
switch event.(type) {
case Incremented:
return state + 1
case Decremented:
return state - 1
default:
return state
}
},
}
func main() {
ctx := context.Background()
store := eskit.NewMemoryStore[any]()
handler := eskit.NewDeciderHandler(store, counter)
state, _, _ := handler.Handle(ctx, "counter", "1", Increment{})
state, _, _ = handler.Handle(ctx, "counter", "1", Increment{})
state, _, _ = handler.Handle(ctx, "counter", "1", Decrement{})
fmt.Println("Counter:", state) // Counter: 1
}
See examples/counter/ for the runnable version.
Features
- Decider pattern — pure functions:
Decide(state, cmd) → events,Evolve(state, event) → state - Generic stores — MemoryStore, SQLite, PostgreSQL, NATS JetStream
- Subscriptions — durable, checkpointed event delivery via
subscription.Subscription - EventDispatcher — unified event dispatch engine with O(1) type filtering for both projections and processors
- Subscription — single struct for registering event handlers (name, event type filter, handler function)
- State Views —
StateView[E]struct for durable checkpoint-based projections, withsqlviewhelper for SQL-backed read models andOnChangecallback for real-time notifications - Change notifications —
ChangeNotifierfor in-process pub/sub of projection changes,ChangeRelayinterface for cross-server broadcast (PG LISTEN/NOTIFY implementation included) - Real-time SSE —
ServeChangesandMultiSubscriptionfor wiring projections to Server-Sent Events with fallback polling, debounce, and Datastar integration - Snapshots — configurable snapshot intervals with schema versioning
- Middleware — logging, metrics, retry, single-writer, custom
- Codecs — JSON, jsoniter, CBOR, Protobuf (pluggable via
codec.Codec, multi-codec migration support) - Ordering guarantees — strict global sequence delivery with gap detection, self-healing
SequenceChecker, and safe-by-default blocking (see docs/ordering-and-gaps.md) - Schema evolution — upcasters for event versioning
- GDPR — crypto-shredding for PII in events (envelope encryption with AES-256-GCM,
sync.Pool-optimized cipher and nonce reuse) - Multi-tenancy — tenant isolation middleware
- Observability — OpenTelemetry tracing + metrics (separate
otelkitmodule) - NATS clustering — distributed commands, event bus, locks
- Dynamic Consistency Boundary (DCB) — cross-stream decision logic
- Health checks — HTTP health endpoint for projection lag monitoring, suitable for load balancer checks (see docs/operations.md)
- Testing — Given/When/Then helpers, conformance suites
- Store Parity — feature parity matrix across all backends (STORE_PARITY.md)
Sub-Modules
Core eskit has zero heavy dependencies (only go-cmp for test helpers). Store backends, codecs, and integrations are separate modules — you only pull in what you use.
The core module includes: deciders, MemoryStore, subscriptions, projections, live projections, middleware, snapshots, commands, GDPR crypto-shredding, DCB, processor, conformance testing, and all interfaces. Everything that doesn't require CGO or heavy third-party dependencies.
Separate modules for heavy dependencies:
# Store backends
go get git.nullsoft.is/ash/eskit/sqlitestore # SQLite
go get git.nullsoft.is/ash/eskit/pgstore # PostgreSQL
go get git.nullsoft.is/ash/eskit/natsstore # NATS JetStream
# Serialization
go get git.nullsoft.is/ash/eskit/codec # jsoniter, CBOR, Protobuf
# Command queue
go get git.nullsoft.is/ash/eskit/sqlitequeue # SQLite command queue (durable, single-server)
go get git.nullsoft.is/ash/eskit/pgqueue # PostgreSQL command queue
# NATS integrations
go get git.nullsoft.is/ash/eskit/natseventbus # Event bus
go get git.nullsoft.is/ash/eskit/natscommand # Distributed CommandBus
go get git.nullsoft.is/ash/eskit/natscluster # Cluster command routing
go get git.nullsoft.is/ash/eskit/natsbatch # Batch event processing
go get git.nullsoft.is/ash/eskit/natslock # Distributed locks
go get git.nullsoft.is/ash/eskit/natsnotifier # Event notifications
go get git.nullsoft.is/ash/eskit/embeddednats # Embedded server (testing)
# Observability
go get git.nullsoft.is/ash/eskit/otelkit # OpenTelemetry tracing + metrics
# Metrics
go get git.nullsoft.is/ash/eskit/metrics # Recorder interface (zero deps)
go get git.nullsoft.is/ash/eskit/prommetrics # Prometheus implementation
# Runtime
go get git.nullsoft.is/ash/eskit/runner # Concurrent service runner
# Event processors
go get git.nullsoft.is/ash/eskit/processor # Event-reactive processors & todolist pattern
Append Modes
By default, Append uses optimistic concurrency control — pass the expected stream version to detect conflicts. For append-only streams where ordering doesn't matter, use AppendAny:
// Standard — optimistic concurrency check
store.Append(ctx, "order", "123", 3, events)
// Append-only mode — skip version check (useful for log/audit streams)
store.Append(ctx, "audit", "log-1", eskit.AppendAny, events)
See docs/stores.md for details on when to use each mode.
Type Registration
Events are plain structs — no interfaces needed. Register them once, and the wire name is derived from the Go type:
reg := eskit.NewEventRegistry()
eskit.Register[OrderPlaced](reg) // → "sales.OrderPlaced"
eskit.Register[PaymentProcessed](reg) // → "finance.PaymentProcessed"
store, _ := sqlitestore.New[any](db, sqlitestore.WithRegistry[any](reg))
Commands are plain structs — no interface needed. Register them with the bus, providing the stream type and an ID extractor:
type PlaceOrder struct {
OrderID string
Items []Item
}
command.Register(bus, store, orderDecider, "order", func(c PlaceOrder) string {
return c.OrderID
})
No string names. No EventType() methods. No CommandName() methods. The Go type system is the single source of truth.
The Decider Pattern
Domain logic as pure functions — easy to test, compose, and reason about:
Command → Decide(state, command) → []Event
State + Event → Evolve(state, event) → State
Test with Given/When/Then:
eskit.Given(decider, pastEvents...).
When(command).
Then(t, expectedEvents...)
Documentation
| Topic | Link |
|---|---|
| Store backends & comparison | docs/stores.md |
| Serialization & codecs | docs/serialization.md |
| Middleware | docs/middleware.md |
| Subscriptions & watchers | docs/subscriptions.md |
| Ordering guarantees & gap detection | docs/ordering-and-gaps.md |
| Snapshots | docs/snapshots.md |
| Advanced (GDPR, clustering, multi-tenancy, etc.) | docs/advanced.md |
| Architecture | docs/architecture.md |
| Processors (reactive + TodoProcessor) | docs/processor.md |
| Metadata, correlation & causation | docs/metadata.md |
| Observability (OTel tracing + metrics) | docs/observability.md |
| State Views (green slices) & OnChange | docs/state-views.md |
| Zero-downtime projection rebuild | docs/rebuild.md |
| Change notifications (ChangeNotifier, ChangeRelay) | docs/subscriptions.md |
| Real-time SSE & Datastar integration | docs/sse-guide.md |
| Quick start guide | docs/quickstart.md |
| Performance | docs/performance.md |
| Testing guide | docs/testing.md |
| FAQ | docs/faq.md |
| Schema separation (eventstore + readmodel) | docs/schema-separation.md |
| Production tuning (pools, checkpoints, Postgres) | docs/production-tuning.md |
| Known limitations & improvement opportunities | docs/limitations.md |
| Store feature parity matrix | STORE_PARITY.md |
Sub-Packages
| Package | Description |
|---|---|
typereg |
Thread-safe type registry for wire deserialization of commands and events |
commandqueue |
Transport-agnostic command queue interface with in-memory, SQLite, Postgres, and NATS implementations |
processor |
Event-reactive processors with typed handlers and todolist pattern |
See docs/command-queue.md for the command queue architecture and migration guide.
Conformance Suite
The eventstoretest package provides a comprehensive conformance test suite that any EventStore implementation must pass. Run it against your store with a single call:
func TestMyStore(t *testing.T) {
eventstoretest.RunSuite(t, func(t *testing.T) eskit.EventStore[eventstoretest.TestEvent] {
return mystore.New(t)
})
}
The suite covers:
| Category | Tests |
|---|---|
| Basic CRUD | Append single/multiple, read back, empty streams, stream isolation, sequential appends |
| Concurrency | Optimistic concurrency control, wrong version detection, concurrent appends |
| Ordering | Insertion order preserved, monotonically increasing versions |
| Read Patterns | LoadFrom version offsets, beyond-end reads, empty stream reads |
| AppendAny | Unconditional appends to new/existing streams, concurrent AppendAny |
| Stream Type Isolation | Events scoped to stream types, cross-type isolation |
| Event Fields | StreamType, EventType, StreamID, GlobalSequence all populated correctly |
| Special Stream Names | Hyphens, underscores, UUIDs, slashes, unicode in stream IDs |
| Metadata | Preservation, many extra keys, batch application, unicode values |
| Metadata Edge Cases | Large metadata maps, batch metadata propagation, unicode metadata values |
| Idempotency | Duplicate version append rejection |
| Edge Cases | Large payloads, empty data, many streams, many events, unicode data, binary-like data, long stream IDs, unique event IDs, timestamp validation |
| Stress | Concurrent appends (different streams), concurrent OCC (same stream), concurrent readers + writers |
A companion RunBenchmarkSuite provides standardized performance benchmarks across stores.
Fuzz Testing
Fuzz tests cover codec deserialization (JSON, CBOR, Protobuf) and core operations (stream IDs, event types) to catch panics on untrusted input:
cd codec && go test -fuzz=FuzzJSONUnmarshal -fuzztime=30s
cd codec && go test -fuzz=FuzzCBORUnmarshal -fuzztime=30s
go test -fuzz=FuzzStreamID -fuzztime=30s
go test -fuzz=FuzzEventTypeName -fuzztime=30s
Lifecycle Hooks
The hooks package provides composable middleware hooks at three boundaries: event append, command dispatch, and projection processing.
Event Append Hooks
Wrap any EventStore with before/after append hooks:
import "git.nullsoft.is/ash/eskit/hooks"
// Validation hook — reject events that fail business rules
store := hooks.WrapStore(innerStore,
hooks.WithBeforeAppend(func(ctx context.Context, streamID string, events []MyEvent) error {
if streamID == "" {
return errors.New("stream ID required")
}
return nil // allow append
}),
hooks.WithAfterAppend(func(ctx context.Context, streamID string, events []eskit.Event[MyEvent]) {
log.Printf("appended %d events to %s", len(events), streamID)
}),
)
- BeforeAppend can reject (return error stops the append, no events written)
- AfterAppend is fire-and-forget (panics are recovered, never breaks the append)
- Multiple hooks per point, executed in registration order
- Zero overhead when no hooks are registered (returns unwrapped store)
Command Dispatch Hooks
mw := hooks.CommandHookMiddleware(
[]hooks.BeforeCommandFunc{
func(ctx context.Context, cmd any) error {
log.Printf("dispatching: %T", cmd)
return nil
},
},
[]hooks.AfterCommandFunc{
func(ctx context.Context, cmd any, err error) {
if err != nil {
log.Printf("command failed: %v", err)
}
},
},
)
Projection Hooks
Handle projection errors with Skip/Retry/Halt strategies, and get notified when caught up:
handler := hooks.NewProjectionHandler(innerHandler,
hooks.WithOnError(func(ctx context.Context, event eskit.Event[MyEvent], err error) hooks.ErrorAction {
if isTransient(err) {
return hooks.Retry
}
return hooks.Skip // skip poison events
}),
hooks.WithOnCaughtUp[MyEvent](func(ctx context.Context, position uint64) {
log.Printf("projection caught up to position %d", position)
}),
hooks.WithMaxRetries[MyEvent](5),
)
Atomic Checkpoint (Crash-Safe Projections)
For non-idempotent side effects (sending emails, charging payments), use CheckpointInTx to save the checkpoint within the same database transaction as Evolve. If the process crashes after Evolve but before commit, both the projection update and checkpoint are rolled back — preventing double processing on restart.
var EmailView = pgview.Config[domain.Event]{
Name: "email-sender",
EventTypes: []string{"OrderConfirmed"},
CheckpointInTx: true, // checkpoint saved in same TX as Evolve
Evolve: func(ctx context.Context, tx pgx.Tx, event eskit.Event[domain.Event]) error {
// Record the email in the DB (same transaction)
_, err := tx.Exec(ctx, `INSERT INTO sent_emails ...`, ...)
return err
},
}
Works with both pgview and sqlview. The subscription automatically injects checkpoint info via context — no manual wiring needed.
Leader Election (Multi-Node)
Running N instances means N copies of every processor and projection — duplicate emails, duplicate payments. eskit solves this with leader election via Postgres advisory locks.
Why Not Just Checkpoints?
The checkpoint is a bookmark, not a lock. Two nodes with the same checkpoint name both read the same position, process the same events, and execute duplicate side effects. The checkpoint tracks progress — it doesn't coordinate access.
How It Works
- On
Start(), the processor callsLeaderLock.Acquire(ctx, "processor:send-emails") - Winner: holds the lock, reads events, executes side effects, advances checkpoint
- Others: block on
Acquire, waiting for the lock — zero CPU, zero duplicates - Graceful shutdown (SIGTERM/context cancel): lock is released via
defer release()— instant handoff, next instance acquires immediately - Hard crash: Postgres detects dead connection, releases advisory lock (~30s depending on TCP keepalive). New leader replays from checkpoint, rebuilds TodoList, resumes.
Rolling deploy scenario:
- Old node receives SIGTERM →
defer release()runs → lock released - New node was blocking on
Acquire→ acquires instantly - Replays from shared checkpoint → rebuilds TodoList → executes pending items
- Zero downtime for normal deploys. 30s gap only on hard crashes.
⚠️ At-least-once guarantee: If a crash occurs after a side effect executes but before the checkpoint saves, the event replays and the side effect runs again. All Execute handlers and subscription Handlers with side effects must be idempotent. Use idempotency keys, INSERT ... ON CONFLICT DO NOTHING, or check-before-act patterns.
Configuration
Single-node (development) — no changes needed:
store := processor.NewMemoryStore[string]()
proc := processor.NewTodoProcessor(store, processor.TodoConfig[string]{
Name: "send-emails",
Execute: sendEmail,
Interval: time.Second,
// No leader lock → all instances process (fine for single node)
})
Multi-node (production) — add leader election:
store := processor.NewMemoryStore[string]()
proc := processor.NewTodoProcessor(store, processor.TodoConfig[string]{
Name: "send-emails",
Execute: sendEmail,
LeaderLock: pgstore.NewPgLockRegistry(pool), // advisory lock — one leader
})
Subscription-only (projections):
sub, _ := subscription.New(subscription.Config[MyEvent]{
ConsumerID: "order-summary",
Reader: pgStore,
Checkpoint: pgstore.NewPgCheckpoint(pool, "order-summary"),
Handler: updateProjection,
LeaderLock: pgstore.NewPgLockRegistry(pool),
})
Everything in Postgres. No extra infrastructure. No extra tables. Advisory locks use the same connection pool.
Batch checkpointing reduces DB writes during catch-up — save every 100 events instead of every event:
cfg := subscription.FromStateView(view, reader, cp,
subscription.WithCheckpointEvery[MyEvent](100), // save every 100 events
subscription.WithCheckpointFlushInterval[MyEvent](time.Second), // or every 1s
)
See docs/subscriptions.md for details.
Projections (State Views) with Leader Election
Projections are inherently safer than processors because they're idempotent — they UPSERT to a read model. If an event replays, the projection writes the same data again. No harm done.
Single-node: no LeaderLock needed. The subscription reads, updates the projection, checkpoints. Crash → replay from checkpoint → re-UPSERT → same result.
Multi-node: add LeaderLock to prevent duplicate work (wasted CPU, not data corruption):
// Define the projection (pgview handles Evolve + CheckpointInTx)
var OrderSummary = pgview.Config[domain.Event]{
Name: "order-summary",
EventTypes: []string{"OrderPlaced", "OrderShipped"},
CheckpointInTx: true, // checkpoint in same TX as projection update
Evolve: func(ctx context.Context, tx pgx.Tx, event eskit.Event[domain.Event]) error {
_, err := tx.Exec(ctx, `INSERT INTO order_summaries ... ON CONFLICT DO UPDATE ...`)
return err
},
}
// Wire it with LeaderLock via subscription
sub, _ := subscription.New(subscription.Config[domain.Event]{
ConsumerID: "order-summary",
Reader: pgStore,
Checkpoint: pgstore.NewPgCheckpoint(pool, "order-summary"),
Handler: OrderSummary.Handler(pool),
LeaderLock: pgstore.NewPgLockRegistry(pool), // only one node runs this projection
})
Three levels of safety:
| Level | Config | Replay window | Use when |
|---|---|---|---|
| Basic | default | Handler → checkpoint save | Single node, idempotent projections |
| Atomic | CheckpointInTx: true |
Zero (same TX) | Multi-node projections, paranoid mode |
| Leader + Atomic | Both | Zero + single writer | Production multi-node (recommended) |
CheckpointInTx eliminates the replay window. LeaderLock eliminates duplicate processing. Together: one node, zero gaps, zero duplicates.
When to Use Subscription vs Automation
Plain subscription + LeaderLock — for simple side effects (trigger → execute):
// Trigger event arrives → execute immediately → checkpoint advances
sub, _ := subscription.New(subscription.Config[MyEvent]{
ConsumerID: "send-welcome-email",
Reader: pgStore,
Checkpoint: pgstore.NewPgCheckpoint(pool, "send-welcome-email"),
LeaderLock: pgstore.NewPgLockRegistry(pool),
Handler: func(ctx context.Context, event GlobalEvent[MyEvent]) error {
if event.EventType == "UserRegistered" {
return sendWelcomeEmail(ctx, event.Data) // execute inline
}
return nil
},
})
Handler returns nil → checkpoint advances. Handler returns error → event replays. Crash → event replays. Simple, correct, no extra state.
Automation — for deferred execution, completion tracking, or retry with backoff:
The processor package adds a TodoList between event reading and execution. The checkpoint advances only after side effects execute — never before. This means:
- Crash before execute → events replay, side effects re-run (at-least-once)
- Failed items retry up to MaxAttempts → move to DLQ → checkpoint advances
- Completion events can cancel pending items before they execute
Use Automation when you need to batch items, wait for external completion signals, or have complex retry logic. For everything else, prefer a plain subscription.
Processor
The processor package provides two patterns for event-driven processing: typed event handlers for immediate reactions, and the todolist pattern for stateful, filtered processing.
Pattern 1: Typed Event Handlers
Use processor.New with On[T] handlers when you want to react to specific events immediately. The processor subscribes only to the event types you register.
import "git.nullsoft.is/ash/eskit/processor"
p := processor.New(pgStore, checkpoint, bus, []processor.Handler{
processor.On[OrderPlaced](func(ctx context.Context, e OrderPlaced) error {
return bus.Send(ctx, ShipOrder{OrderID: e.OrderID})
}),
processor.On[PaymentFailed](func(ctx context.Context, e PaymentFailed) error {
return bus.Send(ctx, CancelOrder{OrderID: e.OrderID})
}),
}, processor.WithName("order-processor"))
go p.Start(ctx)
Pattern 2: TodoProcessor
Use NewTodoProcessor with a Store when you need to claim and process work items from a state-view table. Exactly-once across multiple servers via FOR UPDATE SKIP LOCKED.
import (
"git.nullsoft.is/ash/eskit/processor"
"git.nullsoft.is/ash/eskit/pgprocessor"
)
// Store claims from a state-view table (pgview populates it)
store := pgprocessor.NewStore[ChargeItem](pool, pgprocessor.Config{
Table: "charges",
ClaimWhere: "pending = true AND attempts < 3",
MarkSQL: "UPDATE charges SET pending = false WHERE item_key = $1",
})
// Same processor works with MemStore (tests) or Postgres (production)
charger := processor.NewTodoProcessor(store, processor.TodoConfig[ChargeItem]{
Name: "charge-processor",
Workers: 4,
Execute: func(ctx context.Context, key string, item ChargeItem) error {
// ctx has metadata (correlation, causation) — flows to commands automatically
return bus.Send(ctx, ChargeCard{OrderID: key, Amount: item.Amount})
},
})
go charger.Start(ctx)
Batch execution — process multiple items per commit for higher throughput:
proc := processor.NewTodoProcessor(store, processor.TodoConfig[ChargeItem]{
Name: "bulk-charger",
BatchSize: 50,
ExecuteBatch: func(ctx context.Context, items []processor.BatchItem[ChargeItem]) error {
for _, it := range items {
if err := chargeCard(ctx, it.Key, it.Item); err != nil {
return err // entire batch rolls back
}
}
return nil
},
})
See docs/processor.md for batch configuration, benchmarks, and trade-offs.
Context Metadata — Zero Boilerplate
Metadata (correlation, causation, principal) flows through context.Context automatically. Set it once at the HTTP boundary:
// HTTP middleware — start a chain
ctx = eskit.NewChain(ctx, eskit.Principal{Kind: eskit.PrincipalUser, ID: userID})
// Handlers — just send commands, metadata flows through ctx
bus.Send(ctx, PlaceOrder{OrderID: "123"})
// Reactive processors — auto-propagated, zero boilerplate
processor.On[OrderPlaced](func(ctx context.Context, e OrderPlaced) error {
return bus.Send(ctx, ChargePayment{OrderID: e.OrderID})
})
See Metadata Guide for details.
When to Use Which
| Pattern | Best for |
|---|---|
On[T] handlers |
Immediate reactions: event → command, no state needed |
| TodoProcessor | Work queues: claim items, retry on failure, exactly-once across servers |
Observability
Full OpenTelemetry instrumentation via the separate otelkit module:
bus.Use(otelkit.CommandTracing(tracer))
bus.Use(otelkit.CommandMetrics(meter))
store = otelkit.WithTracing(otelkit.WithMetrics(store, meter), tracer)
See Observability Guide for full wiring with all components.
Command Persistence
The commandlog package records every command dispatched through the CommandBus — invaluable for production debugging, audit trails, and replay analysis.
Setup
import (
"git.nullsoft.is/ash/eskit/commandlog"
"git.nullsoft.is/ash/eskit/commandlog/memlog" // or sqlitelog
)
// Create a store (memory for dev, SQLite for production).
store := memlog.New()
// Add middleware to the command bus.
bus.Use(commandlog.Middleware(store))
With Correlation and Metadata
bus.Use(commandlog.Middleware(store,
commandlog.WithCorrelationFunc(func(ctx context.Context, cmd command.Command) string {
return correlationIDFromContext(ctx)
}),
commandlog.WithMetadataFunc(func(ctx context.Context, cmd command.Command) map[string]string {
return map[string]string{"user": userFromContext(ctx)}
}),
))
Querying the Log
// Get a specific command record.
record, _ := store.Get(ctx, "cmd-id")
// Find all commands in a saga/correlation chain.
chain, _ := store.FindByCorrelation(ctx, "saga-42")
// List recent commands with pagination.
recent, _ := store.List(ctx, commandlog.QueryOpts{Limit: 50})
// Find by command type with time range filtering.
orders, _ := store.FindByType(ctx, "PlaceOrder", commandlog.QueryOpts{
After: time.Now().Add(-24 * time.Hour),
})
Backends
| Backend | Module | Use Case |
|---|---|---|
| Memory | commandlog/memlog |
Testing, development |
| SQLite | commandlog/sqlitelog |
Production, single-node |
Write your own by implementing commandlog.CommandLogStore.
Examples
- MarketHub — THE reference architecture — 8 bounded contexts, projections, cross-context event flows
- Bank — all patterns, production-like deciders and projections
- Counter — minimal decider (this README)
- Order system — full domain with projections & processors
- SQLite app — persistent SQLite backend
- Subscription app — durable subscriptions
- Memory demo — in-memory store
- PostgreSQL app — PostgreSQL backend
- NATS app — NATS JetStream
- GDPR demo — crypto-shredding
CLI Tool
The eskit CLI provides management and inspection of eskit event stores.
Installation
go install git.nullsoft.is/ash/eskit/cmd/eskit@latest
Usage
# Connect to a SQLite event store
eskit --store sqlite:///path/to/app.db <command>
# Output as JSON for scripting
eskit --store sqlite:///path/to/app.db --json streams list
# Decode CBOR payloads
eskit --store sqlite:///path/to/app.db --codec cbor events show 42
# Override stored codec (force JSON decode)
eskit --store sqlite:///path/to/app.db --codec json events show 42
# Show raw hex dump (skip decoding entirely)
eskit --store sqlite:///path/to/app.db --raw events show 42
Commands
| Command | Description |
|---|---|
streams list |
List all streams with event counts |
streams show <id> |
Show events in a stream, decoded |
events search --correlation <id> |
Find events by correlation ID |
events search --type <event-type> |
Find events by event type |
events show <global-pos> |
Show single event with full detail |
projections list |
Show projection checkpoint positions |
projections rebuild <name> |
Reset a projection checkpoint to 0 |
commands list |
List recent commands (from command log) |
commands show <id> |
Show command with resulting events |
store stats |
Event count, stream count, DB size |
store health |
Connectivity check |
Flags
| Flag | Description |
|---|---|
--store |
Store DSN (sqlite:///path or postgres://user:pass@host/db) |
--json |
Output as JSON instead of table |
--codec |
Payload codec (json, cbor, protobuf); auto-detect if omitted. Overrides the codec stored in each event row. |
--raw |
Show raw payload bytes as hex dump (skip all decoding) |
--limit |
Max results to return |
Examples
# List all streams
$ eskit --store sqlite:///tmp/myapp.db streams list
STREAM EVENTS LATEST VERSION LAST EVENT
---------- ------ -------------- --------------------------
account-1 5 5 2025-01-15T10:30:00Z
order-42 3 3 2025-01-15T11:00:00Z
# Search events by correlation ID (JSON output)
$ eskit --store sqlite:///tmp/myapp.db --json events search --correlation req-abc123
[
{"id": 1, "stream_id": "account-1", "version": 1, "event_type": "AccountOpened", ...},
{"id": 5, "stream_id": "order-42", "version": 1, "event_type": "OrderCreated", ...}
]
# Check store health
$ eskit --store sqlite:///tmp/myapp.db store health
Store is healthy.
# Get store statistics
$ eskit --store sqlite:///tmp/myapp.db store stats
Events: 1523
Streams: 42
DB Size: 2.3 MB
Store Migration
The migrate subcommand copies all event data between eskit stores, preserving stream IDs, event order, versions, codecs, timestamps, and all metadata.
# Basic migration between SQLite databases
eskit migrate --from sqlite:///source.db --to sqlite:///target.db
# Custom batch size (default 500)
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --batch-size 1000
# Dry-run: count events/streams without writing
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --dry-run
# Verify event counts match after migration
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --verify
# Resumable migration with checkpoint file
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --checkpoint migrate.ckpt
# Re-encode events from JSON to CBOR during migration
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --recode-to cbor
| Flag | Description |
|---|---|
--from |
Source store DSN (required) |
--to |
Target store DSN (required) |
--batch-size |
Events per write transaction (default 500, max 100000) |
--dry-run |
Validate source readable, count events/streams, write nothing |
--verify |
Post-migration comparison of event counts per stream |
--checkpoint |
File to track completed streams for resumable migrations |
--recode-to |
Re-encode events to json or cbor during migration |
Progress is reported to stderr: [42/128 streams] [15,234 events] [stream: order-abc123]
The migration architecture uses MigrationSource and MigrationTarget interfaces, making it straightforward to add new backends (Postgres, NATS, etc.).
Codec Support
The CLI automatically decodes event payloads based on the codec name stored in each event row:
| Codec | Decoding |
|---|---|
json, jsoniter |
Standard JSON decode |
cbor |
CBOR binary decode (maps normalized for JSON output) |
protobuf |
Generic wire-format decode (field numbers + values, no schema needed) |
gob |
Hex dump (requires registered types not available in CLI) |
| Unknown/custom | Hex dump with codec name shown |
Use --codec to override the stored codec (e.g., when metadata is wrong). Use --raw to skip decoding entirely and see the raw hex dump.
License
MIT — see LICENSE.