- Go 100%
- NATSLockRegistry: distributed single-writer via NATS KV CAS with TTL auto-expiry - EventBus interface with ChannelEventBus (in-process) and NATSEventBus (distributed) - NATSEventBus: queue groups for exactly-once delivery across cluster nodes - ProjectionRunner: subscribes to EventBus, feeds events to projections - AutomationRunner: subscribes to EventBus, feeds events to reactors - ClusterCommandHandler: transparent command routing to lock owner via NATS request/reply - Embedded NATS server (embeddednats package) for dev/testing — no Docker needed - PgClusterStore: read/write pool splitting for PostgreSQL replicas - PgLockRegistry: distributed single-writer via PostgreSQL advisory locks - README: scaling tiers, DCB vs stream-based ES scaling paths - README: clear distinction — DCB scales with PostgreSQL, NOT NATS - All tests pass with -race flag |
||
|---|---|---|
| .forgejo/workflows | ||
| benchmarks | ||
| dcb | ||
| embeddednats | ||
| examples/order-processing | ||
| natsstore | ||
| pgstore | ||
| sqlitestore | ||
| adversarial_test.go | ||
| benchmark_test.go | ||
| cluster.go | ||
| decider.go | ||
| decider_test.go | ||
| decidertest.go | ||
| domains_test.go | ||
| eskit_test.go | ||
| event.go | ||
| eventbus.go | ||
| eventbus_channel.go | ||
| eventbus_nats.go | ||
| eventbus_test.go | ||
| go.mod | ||
| go.sum | ||
| handler.go | ||
| LICENSE | ||
| memory.go | ||
| memory_store_test.go | ||
| middleware.go | ||
| middleware_comprehensive_test.go | ||
| middleware_test.go | ||
| nats_eventbus_test.go | ||
| natslock.go | ||
| natslock_test.go | ||
| observability.go | ||
| observability_test.go | ||
| profiler.go | ||
| profiler_test.go | ||
| projection.go | ||
| projection_comprehensive_test.go | ||
| projection_test.go | ||
| reactor.go | ||
| reactor_test.go | ||
| README.md | ||
| runner.go | ||
| runner_test.go | ||
| serializer.go | ||
| serializer_test.go | ||
| singlewriter.go | ||
| singlewriter_test.go | ||
| store.go | ||
eskit
Event Sourcing toolkit for Go. Built on the decider pattern with generics, organized around the three patterns of Event Modeling.
Command → Decide(state, command) → []Event → Evolve(state, event) → State
The Three Patterns
Every event-sourced system is composed of exactly three patterns. eskit makes all three first-class:
| Pattern | What it does | eskit type |
|---|---|---|
| State Change | Command → Decide → Events | Decider + CommandHandler |
| State View | Events → Project → Read Model | EventProjector |
| Automation | Event → React → Command | Reactor + ReactorRegistry |
These map directly to the three swimlanes in an Event Model. Each vertical slice on the model corresponds to one of these patterns. eskit gives you the building blocks; you compose them.
┌──────────────────────────────────────────────────────────┐
│ Event Model │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ State Change │ │ State View │ │ Automation │ │
│ │ │ │ │ │ │ │
│ │ Command ──▶│ │ Events ──▶ │ │ Event ──▶ │ │
│ │ Decide │ │ Project │ │ React │ │
│ │ Events ◀──│ │ Read Model │ │ Command ◀──│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└──────────────────────────────────────────────────────────┘
Why No Aggregates
eskit deliberately avoids the "Aggregate" concept from DDD tactical patterns. Here's why:
- Aggregates become god objects. An
OrderAggregatehandling create, add items, submit, ship, cancel, refund — that's six responsibilities in one file. Every developer touches the same code. - Deciders are pure functions. No inheritance, no base classes, no framework coupling. Just
Decide(state, command) → eventsandEvolve(state, event) → state. - Vertical slices eliminate conflicts. Each slice (create-order, add-item, submit-order) is its own folder with its own types. Two developers — or two AI agents — can work on different slices with zero merge conflicts.
The decider pattern gives you everything aggregates promised, without the baggage.
Install
go get git.nullsoft.is/ash/eskit
Features
- Three event modeling patterns — state changes, state views, automations
- Go generics — type-safe commands, events, and state
- Given/When/Then test DSL — decider tests that read like specifications
- Pluggable event stores — in-memory, SQLite, PostgreSQL, NATS JetStream
- Dynamic Consistency Boundary (DCB) — tag-based event sourcing (dcb.events)
- Single Writer — per-stream lock registry eliminates optimistic concurrency retries
- Middleware — before/after hooks for command handling
- Snapshotting — optional state snapshots for long-lived streams
- Pluggable serialization — JSON (stdlib or jsoniter), Gob, custom
- Event registry — type-safe deserialization with factory-based type resolution
- Built-in profiler — rolling-window percentile stats, degradation detection
- Observability — structured logging (slog), instrumented stores
- Clustering — distributed locks (NATS KV / PG advisory), event bus, command routing
- Embedded NATS — in-process NATS server for dev/testing, single binary deployment
- Zero magic — no reflection, no code generation, no frameworks
Pattern 1: State Change
The core pattern. A command arrives, the decider examines current state, and produces events.
Define your domain
// State
type BankAccount struct {
Exists bool
Balance int
}
// Commands
type BankCommand interface{ isBankCommand() }
type OpenAccount struct{ InitialBalance int }
type Deposit struct{ Amount int }
type Withdraw struct{ Amount int }
func (OpenAccount) isBankCommand() {}
func (Deposit) isBankCommand() {}
func (Withdraw) isBankCommand() {}
// Events
type BankEvent interface{ isBankEvent() }
type AccountOpened struct{ InitialBalance int }
type MoneyDeposited struct{ Amount int }
type MoneyWithdrawn struct{ Amount int }
func (AccountOpened) isBankEvent() {}
func (MoneyDeposited) isBankEvent() {}
func (MoneyWithdrawn) isBankEvent() {}
Create a decider
var bankDecider = eskit.Decider[BankAccount, BankCommand, BankEvent]{
InitialState: func() BankAccount { return BankAccount{} },
Decide: func(state BankAccount, cmd BankCommand) ([]BankEvent, error) {
switch c := cmd.(type) {
case OpenAccount:
if state.Exists {
return nil, errors.New("account already exists")
}
return []BankEvent{AccountOpened{c.InitialBalance}}, nil
case Deposit:
if !state.Exists {
return nil, errors.New("account does not exist")
}
return []BankEvent{MoneyDeposited{c.Amount}}, nil
case Withdraw:
if !state.Exists {
return nil, errors.New("account does not exist")
}
if state.Balance < c.Amount {
return nil, errors.New("insufficient funds")
}
return []BankEvent{MoneyWithdrawn{c.Amount}}, nil
}
return nil, errors.New("unknown command")
},
Evolve: func(state BankAccount, event BankEvent) BankAccount {
switch e := event.(type) {
case AccountOpened:
state.Exists = true
state.Balance = e.InitialBalance
case MoneyDeposited:
state.Balance += e.Amount
case MoneyWithdrawn:
state.Balance -= e.Amount
}
return state
},
}
Test with Given/When/Then
Every decider test is a specification. Given some history, when a command arrives, then expect these events (or an error):
func TestOpenAccount(t *testing.T) {
eskit.Test(t, bankDecider).
Given().
When(OpenAccount{InitialBalance: 100}).
ThenExpect(AccountOpened{InitialBalance: 100})
}
func TestOpenAccountAlreadyExists(t *testing.T) {
eskit.Test(t, bankDecider).
Given(AccountOpened{InitialBalance: 100}).
When(OpenAccount{InitialBalance: 50}).
ThenError("already exists")
}
func TestWithdrawInsufficientFunds(t *testing.T) {
eskit.Test(t, bankDecider).
Given(AccountOpened{InitialBalance: 50}).
When(Withdraw{Amount: 100}).
ThenError("insufficient")
}
func TestDepositToExistingAccount(t *testing.T) {
eskit.Test(t, bankDecider).
Given(AccountOpened{InitialBalance: 100}).
When(Deposit{Amount: 50}).
ThenExpect(MoneyDeposited{Amount: 50})
}
Wire it up
store := eskit.NewMemoryStore[BankEvent]() // testing
// store, _ := sqlitestore.New[BankEvent]("events.db") // single instance
// store, _ := pgstore.New[BankEvent](ctx, connString) // production
handler := &eskit.CommandHandler[BankAccount, BankCommand, BankEvent]{
Decider: bankDecider,
Store: store,
}
state, events, err := handler.Handle(ctx, "acc-123", OpenAccount{InitialBalance: 100})
state, events, err = handler.Handle(ctx, "acc-123", Deposit{Amount: 50})
// state.Balance == 150
Pattern 2: State View (Projections)
Events flow into read model handlers. Subscribe to events and build whatever view you need:
projector := eskit.NewEventProjector[BankEvent]()
// Build a balance cache
projector.Register("balance-cache", func(ctx context.Context, event eskit.Event[BankEvent]) error {
switch e := event.Data.(type) {
case MoneyDeposited:
cache.Add(event.StreamID, e.Amount)
case MoneyWithdrawn:
cache.Add(event.StreamID, -e.Amount)
}
return nil
})
// Audit log
projector.Register("audit-log", func(ctx context.Context, event eskit.Event[BankEvent]) error {
log.Printf("[audit] %s v%d: %T", event.StreamID, event.Version, event.Data)
return nil
})
// Wire to command handler — projects after each successful persist
handler := &eskit.CommandHandler[BankAccount, BankCommand, BankEvent]{
Decider: bankDecider,
Store: store,
Projector: projector,
}
// Rebuild read models from scratch
projector.Replay(ctx, store, []string{"acc-123", "acc-456"})
Pattern 3: Automation (Reactors)
An event in one slice triggers a command in another. This is how you compose slices without coupling them:
reactors := eskit.NewReactorRegistry[OrderEvent]()
// When an order is submitted, reserve inventory
reactors.Register("reserve-inventory", eskit.Reactor[OrderEvent]{
Trigger: func(event OrderEvent) bool {
_, ok := event.(OrderSubmitted)
return ok
},
Execute: func(ctx context.Context, event eskit.Event[OrderEvent]) error {
// Send a command to the inventory decider
_, _, err := inventoryHandler.Handle(ctx, event.StreamID, ReserveStock{Quantity: 1})
return err
},
})
// When an order is cancelled, send a notification
reactors.Register("cancellation-email", eskit.Reactor[OrderEvent]{
Trigger: func(event OrderEvent) bool {
_, ok := event.(OrderCancelled)
return ok
},
Execute: func(ctx context.Context, event eskit.Event[OrderEvent]) error {
return emailService.SendCancellation(ctx, event.StreamID)
},
})
// Reactors plug into the projection system — they're just event handlers
projector := eskit.NewEventProjector[OrderEvent]()
projector.Register("automations", reactors.Handle)
The composition
A complete system is just state changes, state views, and automations wired together:
┌─────────────┐ ┌────────────┐
│ CreateOrder │────▶│ EventStore │──┐
│ (Decider) │ └────────────┘ │
└─────────────┘ │
▼
┌──────────────┐
│ Projector │
│ │
┌─────────────┐ │ ┌──────────┐│ ┌─────────────┐
│ Inventory │◀─── command ─│ │ Reactor ││ │ Order List │
│ (Decider) │ │ └──────────┘│ │ (Read Model)│
└─────────────┘ │ ┌──────────┐│◀────│ │
│ │ View ││ └─────────────┘
│ └──────────┘│
└──────────────┘
Vertical Slices
Structure your code around business capabilities, not technical layers. Each slice is a self-contained folder:
examples/
order-processing/
state.go # Shared state type
create-order/
command.go, event.go, decider.go, decider_test.go
add-item/
command.go, event.go, decider.go, decider_test.go
submit-order/
command.go, event.go, decider.go, decider_test.go
ship-order/
command.go, event.go, decider.go, decider_test.go
cancel-order/
command.go, event.go, decider.go, decider_test.go
Each slice maps to one swimlane on the event model. Multiple developers (or AI agents) can work on different slices simultaneously with zero conflicts.
See examples/order-processing/ for a complete working example.
Middleware
Wrap command handling with cross-cutting concerns:
// Logging
handler.Use(eskit.LoggingMiddleware[S, C, E](logger))
// Single Writer — serialize writes per stream, eliminate retries
registry := eskit.NewMemoryLockRegistry()
handler.Use(eskit.SingleWriterMiddleware[S, C, E](registry, false))
// Profiling
profiler := eskit.NewProfiler()
handler.Use(eskit.ProfilerMiddleware[S, C, E](profiler))
// Custom
handler.Use(func(ctx context.Context, streamID string, cmd C,
next eskit.HandleFunc[S, E],
) (S, []eskit.Event[E], error) {
user := auth.FromContext(ctx)
if user == nil {
var zero S
return zero, nil, errors.New("unauthorized")
}
return next(ctx)
})
Snapshotting
For long-lived streams, snapshot state to avoid replaying thousands of events:
snapStore := eskit.NewMemorySnapshotStore[BankAccount]()
handler := &eskit.CommandHandler[BankAccount, BankCommand, BankEvent]{
Decider: bankDecider,
Store: store,
Snapshots: snapStore,
SnapshotEvery: 100,
}
Dynamic Consistency Boundary (DCB)
An alternative to stream-per-entity. Events are tagged and live in a single stream per bounded context. See dcb.events.
import "git.nullsoft.is/ash/eskit/dcb"
store := dcb.NewMemoryStore()
store.Append(ctx, []dcb.Event{
{Type: "CourseCreated", Data: data, Tags: []dcb.Tag{{Key: "course", Value: "c1"}}},
})
// Query by type and/or tags
query := dcb.NewQuery(dcb.QueryItem{
Types: []string{"StudentSubscribed", "CourseCapacityChanged"},
Tags: []dcb.Tag{{Key: "course", Value: "c1"}},
})
events, position, _ := store.Read(ctx, query)
// Append with consistency condition
store.Append(ctx, newEvents, dcb.AppendCondition{
FailIfEventsMatch: query,
After: position,
})
Serialization
ser := eskit.NewJSONSerializer() // encoding/json (default)
ser := eskit.NewJSONIterSerializer() // 3-6x faster, drop-in compatible
ser := eskit.NewGobSerializer() // Go-native binary
// Event registry for type-safe deserialization
registry := eskit.NewEventRegistry()
registry.Register("OrderCreated", func() any { return &OrderCreated{} })
Event Store Comparison
Traditional Stream-Based ES
| Store | Package | Best For | Persistence |
|---|---|---|---|
| In-memory | eskit |
Testing, prototyping | None |
| SQLite | eskit/sqlitestore |
Single-instance apps | Disk |
| PostgreSQL | eskit/pgstore |
Production, multi-instance | Server |
| NATS JetStream | eskit/natsstore |
Distributed, event-driven | JetStream |
DCB (Dynamic Consistency Boundary)
| Store | Package | Best For | Persistence |
|---|---|---|---|
| In-memory | eskit/dcb |
Testing, prototyping | None |
| SQLite | eskit/dcb |
Single-instance apps | Disk |
| PostgreSQL | planned | Production, multi-instance | Server |
Why no NATS store for DCB? DCB requires complex query-based reads (filter by event type + tags) and position-based optimistic concurrency across tag combinations. NATS JetStream is an ordered log — it can't do multi-criteria tag queries. PostgreSQL with GIN indexes on tags is the right tool for DCB at scale. NATS can still be used alongside Postgres to notify nodes about new events (pub/sub), but Postgres is the source of truth for DCB.
go get git.nullsoft.is/ash/eskit/sqlitestore
go get git.nullsoft.is/ash/eskit/pgstore
go get git.nullsoft.is/ash/eskit/natsstore
Profiler & Observability
profiler := eskit.NewProfiler()
handler.Use(eskit.ProfilerMiddleware[S, C, E](profiler))
stats := profiler.Stats("CommandHandler.Handle")
fmt.Printf("P95: %.1fms, P99: %.1fms\n", stats.P95Ms, stats.P99Ms)
// HTTP dashboard (JSON)
http.Handle("/debug/profiler", profiler.Handler())
// Instrumented store — logs + profiles all operations
instrStore := eskit.NewInstrumentedEventStore[E](store, logger, profiler)
Benchmarks
See benchmarks/BASELINE.md for full results.
go test ./benchmarks/ -bench=. -benchmem
Clustering & Scaling
eskit scales from a single binary to a multi-node cluster without rewriting your application.
Three Scaling Tiers
| Tier | Setup | Users | Cost |
|---|---|---|---|
| 1. Single Node | 1 binary + SQLite | 0–1,000 | €6/mo |
| 2. Multi-Node | N binaries + PostgreSQL + NATS | 1,000–100,000 | €30–100/mo |
| 3. Full Cluster | NATS super-cluster + PG replicas | 100,000+ | €500+/mo |
Transition between tiers is a config change — swap the store, add NATS. App code doesn't change.
Stream-Based ES vs DCB: Different Scaling Paths
| Concern | Stream-Based ES | DCB (Tag-Based ES) |
|---|---|---|
| Event Store | SQLite → PostgreSQL or NATS JetStream | SQLite → PostgreSQL (with GIN indexes) |
| Single Writer | MemoryLockRegistry → NATSLockRegistry or PgLockRegistry |
PostgreSQL advisory locks or SELECT FOR UPDATE |
| Event Notification | ChannelEventBus → NATSEventBus |
NATS pub/sub for notification only |
| Source of Truth | Any event store | PostgreSQL only (complex queries required) |
Key insight: NATS is great for ordered log operations (stream-based ES) and event notification (pub/sub). But DCB needs query-based reads with tag filtering and position-based concurrency — that's a relational database problem. Use Postgres for DCB at any scale.
EventBus — Local or Distributed
// Single node: in-process channels
bus := eskit.NewChannelEventBus()
// Multi-node: NATS pub/sub
bus, _ := eskit.NewNATSEventBus(natsConn)
After a successful append, publish events to the bus. Projections and automations subscribe via the bus instead of being called directly:
// Distributed projection — NATS queue group ensures exactly-once per event
projRunner := eskit.NewProjectionRunner[MyEvent](bus, projector)
projRunner.Start(ctx, "projections") // queue group name
// Distributed automation — same pattern
autoRunner := eskit.NewAutomationRunner[MyEvent](bus, reactors)
autoRunner.Start(ctx, "automations")
Distributed Single Writer
// In-process (single node)
registry := eskit.NewMemoryLockRegistry()
// Distributed via NATS KV (multi-node)
registry, _ := eskit.NewNATSLockRegistry(js, "node-1",
eskit.WithLockTTL(30 * time.Second),
)
// Distributed via PostgreSQL advisory locks
registry := pgstore.NewPgLockRegistry(pool)
handler.Use(eskit.SingleWriterMiddleware[S, C, E](registry, false))
Cluster-Aware Command Routing
clusterHandler, _ := eskit.NewClusterCommandHandler(eskit.ClusterConfig[S, C, E]{
Handler: handler,
LockReg: natsLockReg,
Bus: natsBus,
Conn: natsConn,
NodeID: "node-1",
MarshalCommand: json.Marshal,
UnmarshalCommand: func(b []byte) (MyCmd, error) { ... },
})
// Transparent: routes to lock owner if another node holds the stream
state, events, err := clusterHandler.Handle(ctx, "stream-1", myCommand)
Embedded NATS for Development
import "git.nullsoft.is/ash/eskit/embeddednats"
// Single binary with clustering built in
srv, _ := embeddednats.Start()
nc, _ := srv.Connect()
js, _ := nc.JetStream()
defer srv.Shutdown()
Perfect for development and testing — no external NATS needed. In production, connect to an external NATS cluster instead.
PostgreSQL Cluster Support
// Simple: single pool for reads and writes
store, _ := pgstore.NewClusterStore[MyEvent](ctx, pgstore.WithPool(pool))
// Scaled: read/write splitting with replicas
store, _ := pgstore.NewClusterStore[MyEvent](ctx,
pgstore.WithWritePool(primaryPool),
pgstore.WithReadPool(replicaPool),
)
Compatibility: Standard PostgreSQL, Citus, Neon, YugabyteDB. CockroachDB works for event storage but does not support advisory locks — use NATSLockRegistry instead of PgLockRegistry with CockroachDB.
Partitioning (high-volume DCB): For very high event volumes, partition the DCB events table by time range (monthly/weekly) or bounded context. Add time ranges to queries when possible. This is an optimization — not required until you have millions of events per bounded context.
Design Decisions
- Three patterns, not one. Most ES libraries only give you state changes. eskit makes state views and automations equally first-class.
- No aggregates. Deciders are pure functions. No god objects, no base classes.
- Vertical slices. Code organized by business capability, not technical layer.
- Given/When/Then testing. Every decider test is a specification that maps to the event model.
- Generics over interfaces for domain types — your types, not ours.
- Errors over panics — panics only for programming errors.
- TigerStyle — assertions on boundaries, limits on everything, errors always wrapped.
License
MIT