Event Sourcing toolkit for Go — decider pattern with generics, pluggable stores, snapshotting
Find a file
Ash e22f258a67
Some checks failed
CI / test-nats (push) Failing after 6s
CI / test-codec-otelkit (push) Successful in 16s
CI / test-integration (push) Failing after 6s
CI / benchmarks (push) Failing after 4s
CI / test-sqlitestore (push) Failing after 38s
CI / test-pgstore (push) Failing after 1m23s
CI / test-core (push) Successful in 1m36s
refactor: rename module path from ash/eskit to nullsoft/eskit
Repository moved to nullsoft/eskit — module path now matches.
2026-03-14 10:29:22 +00:00
.forgejo/workflows ci: fix workflow for sub-modules, parallel test jobs 2026-02-22 16:41:18 +01:00
benchmarks refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
cmd refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
codec refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
command refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
commandlog refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
commandqueue refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
conformance refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
dcb refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
docs refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
embeddednats refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
eventstoretest refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
examples refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
gdpr refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
hooks refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
id fix: 3 race/correctness bugs found by red team audit — Mar 14 2026-03-14 03:07:28 +00:00
integration refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
liveprojection refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
metrics refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
middleware refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
natsbatch refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
natscluster refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
natscommand refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
natseventbus refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
natslock refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
natsnotifier refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
natsstore refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
otelkit refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
pgprocessor refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
pgqueue refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
pgstore refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
pgview refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
processor refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
projection refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
prommetrics refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
rebuild refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
runner refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
snapshot feat: configurable decider snapshots with schema versioning (#57) 2026-02-22 09:30:36 +01:00
sqlitequeue refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
sqlitestore refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
sqlview refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
subscription refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
typereg refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
.gitignore chore: add *.test to gitignore, remove test binaries 2026-03-06 12:26:08 +00:00
adversarial_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
alignment_test.go fix: add stream_type to Delete/Tombstone/Snapshot/AdvisoryLock + sanitize NOTIFY channel — Closes #240, Closes #241 2026-03-10 19:29:17 +00:00
append_options.go feat: add StreamType to core types, interfaces, and all store implementations — ref #191 2026-03-06 12:18:24 +00:00
append_options_edge_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
audit.go feat(metadata): add principal and causality tracking 2026-02-19 22:04:06 +00:00
bench-before.txt chore: save pre-refactor benchmarks — ref #191 2026-03-06 12:10:14 +00:00
BENCHMARK.txt feat: multi-tenancy with context-based tenant isolation 2026-02-19 20:31:28 +00:00
BENCHMARK_COMPARISON.md feat: benchmark comparison eskit vs Myrra (#11) 2026-02-20 08:23:23 +00:00
BENCHMARK_HINTS.txt feat(commandbus): add stream existence hints with options pattern — Refs #36 2026-02-20 01:29:48 +00:00
benchmark_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
changenotifier.go fix: race conditions in scanRing and MultiSubscription.Close 2026-03-09 23:12:44 +00:00
changenotifier_adversarial_test.go refactor: replace ChangeNotifier with O(1) broadcast implementation 2026-03-09 22:52:38 +00:00
changenotifier_bench_test.go refactor: replace ChangeNotifier with O(1) broadcast implementation 2026-03-09 22:52:38 +00:00
changenotifier_redteam_test.go feat: O(1) ChangeNotifier — close-channel broadcast replacing O(N) channel fan-out 2026-03-09 23:25:47 +00:00
changenotifier_test.go refactor: replace ChangeNotifier with O(1) broadcast implementation 2026-03-09 22:52:38 +00:00
context.go feat: context-based metadata propagation 2026-03-03 19:47:02 +00:00
context_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
crypto.go fix: red team round 3 — nonce pool leak, null byte injection, archive TOCTOU 2026-03-10 15:57:48 +00:00
crypto_test.go perf(crypto): add sync.Pool for AES cipher blocks — Closes #202 2026-03-10 07:14:06 +00:00
decider.go feat: replace StreamID() with Stream() on Command interface — ref #191 2026-03-06 12:42:15 +00:00
decider_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
decidertest.go refactor: use cmp.Diff throughout all tests 2026-02-19 14:17:51 +00:00
deletion.go fix: add stream_type to Delete/Tombstone/Snapshot/AdvisoryLock + sanitize NOTIFY channel — Closes #240, Closes #241 2026-03-10 19:29:17 +00:00
deletion_bench_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
deletion_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
deserialize.go refactor: remove deprecated watcher, serializer, and registry_store — Closes #231 2026-03-10 12:17:41 +00:00
deserialize_test.go refactor: remove deprecated watcher, serializer, and registry_store — Closes #231 2026-03-10 12:17:41 +00:00
dispatcher.go feat: add StreamType to core types, interfaces, and all store implementations — ref #191 2026-03-06 12:18:24 +00:00
dispatcher_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
dlq.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
dlq_test.go refactor: remove all backward compat — clean EventDispatcher + Subscription API 2026-02-24 17:04:14 +00:00
domains_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
errors.go fix: red team round 3 — nonce pool leak, null byte injection, archive TOCTOU 2026-03-10 15:57:48 +00:00
eskit fix: prevent double-start panic in Processor, TodoList, TodoProcessor (found by red team) 2026-03-03 14:36:25 +00:00
eskit_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
event.go fix: Stream.Validate missing ID length check + ResolveEventType nil panic + DeserializeWithCodec nil unmarshal 2026-03-08 03:05:43 +00:00
event_registry.go refactor: remove deprecated watcher, serializer, and registry_store — Closes #231 2026-03-10 12:17:41 +00:00
event_type_bench_test.go fix: harden Register[E] and command type system — production quality 2026-02-25 20:35:32 +00:00
event_type_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
eventbus.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
eventbus_channel.go feat: configurable timing/buffer constants + SSE fallback poll — Closes #129 2026-03-01 10:41:51 +00:00
eventbus_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
fuzz_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
gdpr.md feat: CommandBus, EventSubscription, GDPR crypto-shredding 2026-02-19 20:51:37 +00:00
go.mod refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
go.sum feat: per-projection SQLite database for parallel rebuild — Closes #162 2026-03-01 17:54:39 +00:00
handler.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
handler_retry_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
helpers.go feat: add StreamType to core types, interfaces, and all store implementations — ref #191 2026-03-06 12:18:24 +00:00
helpers_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
integration_sse_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
LICENSE Initial release: core decider pattern, memory & SQLite stores, snapshotting 2026-02-18 23:02:03 +00:00
lifecycle.go fix: add stream_type to Delete/Tombstone/Snapshot/AdvisoryLock + sanitize NOTIFY channel — Closes #240, Closes #241 2026-03-10 19:29:17 +00:00
lifecycle_edge_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
lifecycle_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
load_options_edge_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
memory.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
memory_deletion.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
memory_lifecycle.go fix: add stream_type to Delete/Tombstone/Snapshot/AdvisoryLock + sanitize NOTIFY channel — Closes #240, Closes #241 2026-03-10 19:29:17 +00:00
memory_store_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
metadata_test.go feat(metadata): add principal and causality tracking 2026-02-19 22:04:06 +00:00
middleware.go fix: add stream_type to Delete/Tombstone/Snapshot/AdvisoryLock + sanitize NOTIFY channel — Closes #240, Closes #241 2026-03-10 19:29:17 +00:00
middleware_builtin.go fix: add stream_type to Delete/Tombstone/Snapshot/AdvisoryLock + sanitize NOTIFY channel — Closes #240, Closes #241 2026-03-10 19:29:17 +00:00
middleware_builtin_edge_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
middleware_builtin_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
middleware_comprehensive_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
middleware_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
multi_subscription.go feat: O(1) ChangeNotifier — close-channel broadcast replacing O(N) channel fan-out 2026-03-09 23:25:47 +00:00
multi_subscription_test.go fix: eliminate all vet warnings 2026-03-09 23:28:29 +00:00
observability.go fix: add stream_type to Delete/Tombstone/Snapshot/AdvisoryLock + sanitize NOTIFY channel — Closes #240, Closes #241 2026-03-10 19:29:17 +00:00
observability_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
profiler.go fix: add stream_type to Delete/Tombstone/Snapshot/AdvisoryLock + sanitize NOTIFY channel — Closes #240, Closes #241 2026-03-10 19:29:17 +00:00
profiler_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
projection_comprehensive_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
projection_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
projectiontest.go feat: add ProjectionTest helper for projection unit testing 2026-02-26 18:45:32 +01:00
projectiontest_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
raw_event.go feat: add StreamType to core types, interfaces, and all store implementations — ref #191 2026-03-06 12:18:24 +00:00
raw_event_edge_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
README.md feat(subscription): projection lag health endpoint — Closes #233 2026-03-10 12:42:04 +00:00
redteam6_root_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
redteam_dlq_test.go fix: DLQ concurrent Replay double-process + order slice ghost accumulation 2026-02-28 03:06:15 +00:00
redteam_feb28_test.go fix: add stream_type to Delete/Tombstone/Snapshot/AdvisoryLock + sanitize NOTIFY channel — Closes #240, Closes #241 2026-03-10 19:29:17 +00:00
redteam_mar01_test.go fix: add stream_type to Delete/Tombstone/Snapshot/AdvisoryLock + sanitize NOTIFY channel — Closes #240, Closes #241 2026-03-10 19:29:17 +00:00
redteam_mar02_test.go fix: update all test files to pass streamType parameter — ref #191 2026-03-06 12:25:37 +00:00
redteam_mar07_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
redteam_mar08_test.go feat: O(1) ChangeNotifier — close-channel broadcast replacing O(N) channel fan-out 2026-03-09 23:25:47 +00:00
redteam_mar10_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
redteam_mar12_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
redteam_mar13_test.go fix: MemorySnapshotStore null byte key collision — use length-prefixed keys 2026-03-13 03:05:13 +00:00
redteam_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
register.go feat: add RegisterAs[T] for backward-compatible wire names 2026-03-01 21:53:07 +00:00
register_test.go fix: harden Register[E] and command type system — production quality 2026-02-25 20:35:32 +00:00
runner.go refactor: remove all backward compat — clean EventDispatcher + Subscription API 2026-02-24 17:04:14 +00:00
runner_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
serve_changes.go refactor: replace ChangeNotifier with O(1) broadcast implementation 2026-03-09 22:52:38 +00:00
serve_changes_test.go feat: add MultiSubscription for multi-projection SSE — Closes #130 2026-03-01 10:52:10 +00:00
singlewriter.go fix: MemoryLockRegistry key collision via null byte separator 2026-03-12 03:07:43 +00:00
singlewriter_redteam_r5_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
singlewriter_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
snapshot_store.go fix: MemorySnapshotStore null byte key collision — use length-prefixed keys 2026-03-13 03:05:13 +00:00
stateview.go feat: CommandHandler auto-retry + uint64 bounds check — Closes #156, Closes #153 2026-03-01 12:40:05 +00:00
store.go feat: add StreamType to core types, interfaces, and all store implementations — ref #191 2026-03-06 12:18:24 +00:00
store_features_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00
STORE_PARITY.md docs: add store feature parity matrix — Refs #139 2026-03-10 08:04:04 +00:00
stream.go fix: Stream.Validate missing ID length check + ResolveEventType nil panic + DeserializeWithCodec nil unmarshal 2026-03-08 03:05:43 +00:00
tenant.go feat: add StreamType to core types, interfaces, and all store implementations — ref #191 2026-03-06 12:18:24 +00:00
tenant_test.go fix: update all test files to pass streamType parameter — ref #191 2026-03-06 12:25:37 +00:00
typecache.go perf(id,typecache): beat Myrra in all benchmark categories 2026-02-20 08:53:41 +00:00
typecache_test.go perf(id,typecache): beat Myrra in all benchmark categories 2026-02-20 08:53:41 +00:00
upcaster.go feat: schema evolution / event upcasting 2026-02-19 20:30:54 +00:00
upcaster_test.go feat: schema evolution / event upcasting 2026-02-19 20:30:54 +00:00
upcasting_integration_test.go refactor: rename module path from ash/eskit to nullsoft/eskit 2026-03-14 10:29:22 +00:00

eskit — Event Sourcing Toolkit for Go

Go Reference

A pragmatic, generic event sourcing framework for Go. Built around the decider pattern with first-class support for all three event modeling patterns: State Changes, State Views, and Automations.

go get git.nullsoft.is/ash/eskit

Quick Start — Counter

package main

import (
	"context"
	"fmt"

	"git.nullsoft.is/ash/eskit"
)

type Incremented struct{}
type Decremented struct{}
type Increment struct{}
type Decrement struct{}

var counter = eskit.Decider[int, any, any]{
	InitialState: func() int { return 0 },
	Decide: func(state int, cmd any) ([]any, error) {
		switch cmd.(type) {
		case Increment:
			return []any{Incremented{}}, nil
		case Decrement:
			return []any{Decremented{}}, nil
		default:
			return nil, fmt.Errorf("unknown command: %T", cmd)
		}
	},
	Evolve: func(state int, event any) int {
		switch event.(type) {
		case Incremented:
			return state + 1
		case Decremented:
			return state - 1
		default:
			return state
		}
	},
}

func main() {
	ctx := context.Background()
	store := eskit.NewMemoryStore[any]()
	handler := eskit.NewDeciderHandler(store, counter)

	state, _, _ := handler.Handle(ctx, "counter", "1", Increment{})
	state, _, _ = handler.Handle(ctx, "counter", "1", Increment{})
	state, _, _ = handler.Handle(ctx, "counter", "1", Decrement{})

	fmt.Println("Counter:", state) // Counter: 1
}

See examples/counter/ for the runnable version.

Features

  • Decider pattern — pure functions: Decide(state, cmd) → events, Evolve(state, event) → state
  • Generic stores — MemoryStore, SQLite, PostgreSQL, NATS JetStream
  • Subscriptions — durable, checkpointed event delivery via subscription.Subscription
  • EventDispatcher — unified event dispatch engine with O(1) type filtering for both projections and processors
  • Subscription — single struct for registering event handlers (name, event type filter, handler function)
  • State ViewsStateView[E] struct for durable checkpoint-based projections, with sqlview helper for SQL-backed read models and OnChange callback for real-time notifications
  • Change notificationsChangeNotifier for in-process pub/sub of projection changes, ChangeRelay interface for cross-server broadcast (PG LISTEN/NOTIFY implementation included)
  • Real-time SSEServeChanges and MultiSubscription for wiring projections to Server-Sent Events with fallback polling, debounce, and Datastar integration
  • Snapshots — configurable snapshot intervals with schema versioning
  • Middleware — logging, metrics, retry, single-writer, custom
  • Codecs — JSON, jsoniter, CBOR, Protobuf (pluggable via codec.Codec, multi-codec migration support)
  • Ordering guarantees — strict global sequence delivery with gap detection, self-healing SequenceChecker, and safe-by-default blocking (see docs/ordering-and-gaps.md)
  • Schema evolution — upcasters for event versioning
  • GDPR — crypto-shredding for PII in events (envelope encryption with AES-256-GCM, sync.Pool-optimized cipher and nonce reuse)
  • Multi-tenancy — tenant isolation middleware
  • Observability — OpenTelemetry tracing + metrics (separate otelkit module)
  • NATS clustering — distributed commands, event bus, locks
  • Dynamic Consistency Boundary (DCB) — cross-stream decision logic
  • Health checks — HTTP health endpoint for projection lag monitoring, suitable for load balancer checks (see docs/operations.md)
  • Testing — Given/When/Then helpers, conformance suites
  • Store Parity — feature parity matrix across all backends (STORE_PARITY.md)

Sub-Modules

Core eskit has zero heavy dependencies (only go-cmp for test helpers). Store backends, codecs, and integrations are separate modules — you only pull in what you use.

The core module includes: deciders, MemoryStore, subscriptions, projections, live projections, middleware, snapshots, commands, GDPR crypto-shredding, DCB, processor, conformance testing, and all interfaces. Everything that doesn't require CGO or heavy third-party dependencies.

Separate modules for heavy dependencies:

# Store backends
go get git.nullsoft.is/ash/eskit/sqlitestore   # SQLite
go get git.nullsoft.is/ash/eskit/pgstore       # PostgreSQL
go get git.nullsoft.is/ash/eskit/natsstore     # NATS JetStream

# Serialization
go get git.nullsoft.is/ash/eskit/codec         # jsoniter, CBOR, Protobuf

# Command queue
go get git.nullsoft.is/ash/eskit/sqlitequeue   # SQLite command queue (durable, single-server)
go get git.nullsoft.is/ash/eskit/pgqueue       # PostgreSQL command queue

# NATS integrations
go get git.nullsoft.is/ash/eskit/natseventbus  # Event bus
go get git.nullsoft.is/ash/eskit/natscommand   # Distributed CommandBus
go get git.nullsoft.is/ash/eskit/natscluster   # Cluster command routing
go get git.nullsoft.is/ash/eskit/natsbatch     # Batch event processing
go get git.nullsoft.is/ash/eskit/natslock      # Distributed locks
go get git.nullsoft.is/ash/eskit/natsnotifier  # Event notifications
go get git.nullsoft.is/ash/eskit/embeddednats  # Embedded server (testing)

# Observability
go get git.nullsoft.is/ash/eskit/otelkit       # OpenTelemetry tracing + metrics

# Metrics
go get git.nullsoft.is/ash/eskit/metrics       # Recorder interface (zero deps)
go get git.nullsoft.is/ash/eskit/prommetrics   # Prometheus implementation

# Runtime
go get git.nullsoft.is/ash/eskit/runner        # Concurrent service runner

# Event processors
go get git.nullsoft.is/ash/eskit/processor     # Event-reactive processors & todolist pattern

Append Modes

By default, Append uses optimistic concurrency control — pass the expected stream version to detect conflicts. For append-only streams where ordering doesn't matter, use AppendAny:

// Standard — optimistic concurrency check
store.Append(ctx, "order", "123", 3, events)

// Append-only mode — skip version check (useful for log/audit streams)
store.Append(ctx, "audit", "log-1", eskit.AppendAny, events)

See docs/stores.md for details on when to use each mode.

Type Registration

Events are plain structs — no interfaces needed. Register them once, and the wire name is derived from the Go type:

reg := eskit.NewEventRegistry()
eskit.Register[OrderPlaced](reg)      // → "sales.OrderPlaced"
eskit.Register[PaymentProcessed](reg) // → "finance.PaymentProcessed"

store, _ := sqlitestore.New[any](db, sqlitestore.WithRegistry[any](reg))

Commands are plain structs — no interface needed. Register them with the bus, providing the stream type and an ID extractor:

type PlaceOrder struct {
    OrderID string
    Items   []Item
}

command.Register(bus, store, orderDecider, "order", func(c PlaceOrder) string {
    return c.OrderID
})

No string names. No EventType() methods. No CommandName() methods. The Go type system is the single source of truth.

The Decider Pattern

Domain logic as pure functions — easy to test, compose, and reason about:

Command → Decide(state, command) → []Event
State + Event → Evolve(state, event) → State

Test with Given/When/Then:

eskit.Given(decider, pastEvents...).
    When(command).
    Then(t, expectedEvents...)

Documentation

Topic Link
Store backends & comparison docs/stores.md
Serialization & codecs docs/serialization.md
Middleware docs/middleware.md
Subscriptions & watchers docs/subscriptions.md
Ordering guarantees & gap detection docs/ordering-and-gaps.md
Snapshots docs/snapshots.md
Advanced (GDPR, clustering, multi-tenancy, etc.) docs/advanced.md
Architecture docs/architecture.md
Processors (reactive + TodoProcessor) docs/processor.md
Metadata, correlation & causation docs/metadata.md
Observability (OTel tracing + metrics) docs/observability.md
State Views (green slices) & OnChange docs/state-views.md
Zero-downtime projection rebuild docs/rebuild.md
Change notifications (ChangeNotifier, ChangeRelay) docs/subscriptions.md
Real-time SSE & Datastar integration docs/sse-guide.md
Quick start guide docs/quickstart.md
Performance docs/performance.md
Testing guide docs/testing.md
FAQ docs/faq.md
Schema separation (eventstore + readmodel) docs/schema-separation.md
Production tuning (pools, checkpoints, Postgres) docs/production-tuning.md
Known limitations & improvement opportunities docs/limitations.md
Store feature parity matrix STORE_PARITY.md

Sub-Packages

Package Description
typereg Thread-safe type registry for wire deserialization of commands and events
commandqueue Transport-agnostic command queue interface with in-memory, SQLite, Postgres, and NATS implementations
processor Event-reactive processors with typed handlers and todolist pattern

See docs/command-queue.md for the command queue architecture and migration guide.

Conformance Suite

The eventstoretest package provides a comprehensive conformance test suite that any EventStore implementation must pass. Run it against your store with a single call:

func TestMyStore(t *testing.T) {
    eventstoretest.RunSuite(t, func(t *testing.T) eskit.EventStore[eventstoretest.TestEvent] {
        return mystore.New(t)
    })
}

The suite covers:

Category Tests
Basic CRUD Append single/multiple, read back, empty streams, stream isolation, sequential appends
Concurrency Optimistic concurrency control, wrong version detection, concurrent appends
Ordering Insertion order preserved, monotonically increasing versions
Read Patterns LoadFrom version offsets, beyond-end reads, empty stream reads
AppendAny Unconditional appends to new/existing streams, concurrent AppendAny
Stream Type Isolation Events scoped to stream types, cross-type isolation
Event Fields StreamType, EventType, StreamID, GlobalSequence all populated correctly
Special Stream Names Hyphens, underscores, UUIDs, slashes, unicode in stream IDs
Metadata Preservation, many extra keys, batch application, unicode values
Metadata Edge Cases Large metadata maps, batch metadata propagation, unicode metadata values
Idempotency Duplicate version append rejection
Edge Cases Large payloads, empty data, many streams, many events, unicode data, binary-like data, long stream IDs, unique event IDs, timestamp validation
Stress Concurrent appends (different streams), concurrent OCC (same stream), concurrent readers + writers

A companion RunBenchmarkSuite provides standardized performance benchmarks across stores.

Fuzz Testing

Fuzz tests cover codec deserialization (JSON, CBOR, Protobuf) and core operations (stream IDs, event types) to catch panics on untrusted input:

cd codec && go test -fuzz=FuzzJSONUnmarshal -fuzztime=30s
cd codec && go test -fuzz=FuzzCBORUnmarshal -fuzztime=30s
go test -fuzz=FuzzStreamID -fuzztime=30s
go test -fuzz=FuzzEventTypeName -fuzztime=30s

Lifecycle Hooks

The hooks package provides composable middleware hooks at three boundaries: event append, command dispatch, and projection processing.

Event Append Hooks

Wrap any EventStore with before/after append hooks:

import "git.nullsoft.is/ash/eskit/hooks"

// Validation hook — reject events that fail business rules
store := hooks.WrapStore(innerStore,
    hooks.WithBeforeAppend(func(ctx context.Context, streamID string, events []MyEvent) error {
        if streamID == "" {
            return errors.New("stream ID required")
        }
        return nil // allow append
    }),
    hooks.WithAfterAppend(func(ctx context.Context, streamID string, events []eskit.Event[MyEvent]) {
        log.Printf("appended %d events to %s", len(events), streamID)
    }),
)
  • BeforeAppend can reject (return error stops the append, no events written)
  • AfterAppend is fire-and-forget (panics are recovered, never breaks the append)
  • Multiple hooks per point, executed in registration order
  • Zero overhead when no hooks are registered (returns unwrapped store)

Command Dispatch Hooks

mw := hooks.CommandHookMiddleware(
    []hooks.BeforeCommandFunc{
        func(ctx context.Context, cmd any) error {
            log.Printf("dispatching: %T", cmd)
            return nil
        },
    },
    []hooks.AfterCommandFunc{
        func(ctx context.Context, cmd any, err error) {
            if err != nil {
                log.Printf("command failed: %v", err)
            }
        },
    },
)

Projection Hooks

Handle projection errors with Skip/Retry/Halt strategies, and get notified when caught up:

handler := hooks.NewProjectionHandler(innerHandler,
    hooks.WithOnError(func(ctx context.Context, event eskit.Event[MyEvent], err error) hooks.ErrorAction {
        if isTransient(err) {
            return hooks.Retry
        }
        return hooks.Skip // skip poison events
    }),
    hooks.WithOnCaughtUp[MyEvent](func(ctx context.Context, position uint64) {
        log.Printf("projection caught up to position %d", position)
    }),
    hooks.WithMaxRetries[MyEvent](5),
)

Atomic Checkpoint (Crash-Safe Projections)

For non-idempotent side effects (sending emails, charging payments), use CheckpointInTx to save the checkpoint within the same database transaction as Evolve. If the process crashes after Evolve but before commit, both the projection update and checkpoint are rolled back — preventing double processing on restart.

var EmailView = pgview.Config[domain.Event]{
    Name:           "email-sender",
    EventTypes:     []string{"OrderConfirmed"},
    CheckpointInTx: true, // checkpoint saved in same TX as Evolve
    Evolve: func(ctx context.Context, tx pgx.Tx, event eskit.Event[domain.Event]) error {
        // Record the email in the DB (same transaction)
        _, err := tx.Exec(ctx, `INSERT INTO sent_emails ...`, ...)
        return err
    },
}

Works with both pgview and sqlview. The subscription automatically injects checkpoint info via context — no manual wiring needed.

Leader Election (Multi-Node)

Running N instances means N copies of every processor and projection — duplicate emails, duplicate payments. eskit solves this with leader election via Postgres advisory locks.

Why Not Just Checkpoints?

The checkpoint is a bookmark, not a lock. Two nodes with the same checkpoint name both read the same position, process the same events, and execute duplicate side effects. The checkpoint tracks progress — it doesn't coordinate access.

How It Works

  1. On Start(), the processor calls LeaderLock.Acquire(ctx, "processor:send-emails")
  2. Winner: holds the lock, reads events, executes side effects, advances checkpoint
  3. Others: block on Acquire, waiting for the lock — zero CPU, zero duplicates
  4. Graceful shutdown (SIGTERM/context cancel): lock is released via defer release()instant handoff, next instance acquires immediately
  5. Hard crash: Postgres detects dead connection, releases advisory lock (~30s depending on TCP keepalive). New leader replays from checkpoint, rebuilds TodoList, resumes.

Rolling deploy scenario:

  1. Old node receives SIGTERM → defer release() runs → lock released
  2. New node was blocking on Acquire → acquires instantly
  3. Replays from shared checkpoint → rebuilds TodoList → executes pending items
  4. Zero downtime for normal deploys. 30s gap only on hard crashes.

⚠️ At-least-once guarantee: If a crash occurs after a side effect executes but before the checkpoint saves, the event replays and the side effect runs again. All Execute handlers and subscription Handlers with side effects must be idempotent. Use idempotency keys, INSERT ... ON CONFLICT DO NOTHING, or check-before-act patterns.

Configuration

Single-node (development) — no changes needed:

store := processor.NewMemoryStore[string]()
proc := processor.NewTodoProcessor(store, processor.TodoConfig[string]{
    Name:    "send-emails",
    Execute: sendEmail,
    Interval: time.Second,
    // No leader lock → all instances process (fine for single node)
})

Multi-node (production) — add leader election:

store := processor.NewMemoryStore[string]()
proc := processor.NewTodoProcessor(store, processor.TodoConfig[string]{
    Name:    "send-emails",
    Execute:    sendEmail,
    LeaderLock: pgstore.NewPgLockRegistry(pool),               // advisory lock — one leader
})

Subscription-only (projections):

sub, _ := subscription.New(subscription.Config[MyEvent]{
    ConsumerID: "order-summary",
    Reader:     pgStore,
    Checkpoint: pgstore.NewPgCheckpoint(pool, "order-summary"),
    Handler:    updateProjection,
    LeaderLock: pgstore.NewPgLockRegistry(pool),
})

Everything in Postgres. No extra infrastructure. No extra tables. Advisory locks use the same connection pool.

Batch checkpointing reduces DB writes during catch-up — save every 100 events instead of every event:

cfg := subscription.FromStateView(view, reader, cp,
    subscription.WithCheckpointEvery[MyEvent](100),           // save every 100 events
    subscription.WithCheckpointFlushInterval[MyEvent](time.Second), // or every 1s
)

See docs/subscriptions.md for details.

Projections (State Views) with Leader Election

Projections are inherently safer than processors because they're idempotent — they UPSERT to a read model. If an event replays, the projection writes the same data again. No harm done.

Single-node: no LeaderLock needed. The subscription reads, updates the projection, checkpoints. Crash → replay from checkpoint → re-UPSERT → same result.

Multi-node: add LeaderLock to prevent duplicate work (wasted CPU, not data corruption):

// Define the projection (pgview handles Evolve + CheckpointInTx)
var OrderSummary = pgview.Config[domain.Event]{
    Name:           "order-summary",
    EventTypes:     []string{"OrderPlaced", "OrderShipped"},
    CheckpointInTx: true,  // checkpoint in same TX as projection update
    Evolve: func(ctx context.Context, tx pgx.Tx, event eskit.Event[domain.Event]) error {
        _, err := tx.Exec(ctx, `INSERT INTO order_summaries ... ON CONFLICT DO UPDATE ...`)
        return err
    },
}

// Wire it with LeaderLock via subscription
sub, _ := subscription.New(subscription.Config[domain.Event]{
    ConsumerID: "order-summary",
    Reader:     pgStore,
    Checkpoint: pgstore.NewPgCheckpoint(pool, "order-summary"),
    Handler:    OrderSummary.Handler(pool),
    LeaderLock: pgstore.NewPgLockRegistry(pool),  // only one node runs this projection
})

Three levels of safety:

Level Config Replay window Use when
Basic default Handler → checkpoint save Single node, idempotent projections
Atomic CheckpointInTx: true Zero (same TX) Multi-node projections, paranoid mode
Leader + Atomic Both Zero + single writer Production multi-node (recommended)

CheckpointInTx eliminates the replay window. LeaderLock eliminates duplicate processing. Together: one node, zero gaps, zero duplicates.

When to Use Subscription vs Automation

Plain subscription + LeaderLock — for simple side effects (trigger → execute):

// Trigger event arrives → execute immediately → checkpoint advances
sub, _ := subscription.New(subscription.Config[MyEvent]{
    ConsumerID: "send-welcome-email",
    Reader:     pgStore,
    Checkpoint: pgstore.NewPgCheckpoint(pool, "send-welcome-email"),
    LeaderLock: pgstore.NewPgLockRegistry(pool),
    Handler: func(ctx context.Context, event GlobalEvent[MyEvent]) error {
        if event.EventType == "UserRegistered" {
            return sendWelcomeEmail(ctx, event.Data)  // execute inline
        }
        return nil
    },
})

Handler returns nil → checkpoint advances. Handler returns error → event replays. Crash → event replays. Simple, correct, no extra state.

Automation — for deferred execution, completion tracking, or retry with backoff:

The processor package adds a TodoList between event reading and execution. The checkpoint advances only after side effects execute — never before. This means:

  • Crash before execute → events replay, side effects re-run (at-least-once)
  • Failed items retry up to MaxAttempts → move to DLQ → checkpoint advances
  • Completion events can cancel pending items before they execute

Use Automation when you need to batch items, wait for external completion signals, or have complex retry logic. For everything else, prefer a plain subscription.

Processor

The processor package provides two patterns for event-driven processing: typed event handlers for immediate reactions, and the todolist pattern for stateful, filtered processing.

Pattern 1: Typed Event Handlers

Use processor.New with On[T] handlers when you want to react to specific events immediately. The processor subscribes only to the event types you register.

import "git.nullsoft.is/ash/eskit/processor"

p := processor.New(pgStore, checkpoint, bus, []processor.Handler{
    processor.On[OrderPlaced](func(ctx context.Context, e OrderPlaced) error {
        return bus.Send(ctx, ShipOrder{OrderID: e.OrderID})
    }),
    processor.On[PaymentFailed](func(ctx context.Context, e PaymentFailed) error {
        return bus.Send(ctx, CancelOrder{OrderID: e.OrderID})
    }),
}, processor.WithName("order-processor"))

go p.Start(ctx)

Pattern 2: TodoProcessor

Use NewTodoProcessor with a Store when you need to claim and process work items from a state-view table. Exactly-once across multiple servers via FOR UPDATE SKIP LOCKED.

import (
    "git.nullsoft.is/ash/eskit/processor"
    "git.nullsoft.is/ash/eskit/pgprocessor"
)

// Store claims from a state-view table (pgview populates it)
store := pgprocessor.NewStore[ChargeItem](pool, pgprocessor.Config{
    Table:      "charges",
    ClaimWhere: "pending = true AND attempts < 3",
    MarkSQL:    "UPDATE charges SET pending = false WHERE item_key = $1",
})

// Same processor works with MemStore (tests) or Postgres (production)
charger := processor.NewTodoProcessor(store, processor.TodoConfig[ChargeItem]{
    Name:    "charge-processor",
    Workers: 4,
    Execute: func(ctx context.Context, key string, item ChargeItem) error {
        // ctx has metadata (correlation, causation) — flows to commands automatically
        return bus.Send(ctx, ChargeCard{OrderID: key, Amount: item.Amount})
    },
})

go charger.Start(ctx)

Batch execution — process multiple items per commit for higher throughput:

proc := processor.NewTodoProcessor(store, processor.TodoConfig[ChargeItem]{
    Name:      "bulk-charger",
    BatchSize: 50,
    ExecuteBatch: func(ctx context.Context, items []processor.BatchItem[ChargeItem]) error {
        for _, it := range items {
            if err := chargeCard(ctx, it.Key, it.Item); err != nil {
                return err // entire batch rolls back
            }
        }
        return nil
    },
})

See docs/processor.md for batch configuration, benchmarks, and trade-offs.

Context Metadata — Zero Boilerplate

Metadata (correlation, causation, principal) flows through context.Context automatically. Set it once at the HTTP boundary:

// HTTP middleware — start a chain
ctx = eskit.NewChain(ctx, eskit.Principal{Kind: eskit.PrincipalUser, ID: userID})

// Handlers — just send commands, metadata flows through ctx
bus.Send(ctx, PlaceOrder{OrderID: "123"})

// Reactive processors — auto-propagated, zero boilerplate
processor.On[OrderPlaced](func(ctx context.Context, e OrderPlaced) error {
    return bus.Send(ctx, ChargePayment{OrderID: e.OrderID})
})

See Metadata Guide for details.

When to Use Which

Pattern Best for
On[T] handlers Immediate reactions: event → command, no state needed
TodoProcessor Work queues: claim items, retry on failure, exactly-once across servers

Observability

Full OpenTelemetry instrumentation via the separate otelkit module:

bus.Use(otelkit.CommandTracing(tracer))
bus.Use(otelkit.CommandMetrics(meter))
store = otelkit.WithTracing(otelkit.WithMetrics(store, meter), tracer)

See Observability Guide for full wiring with all components.

Command Persistence

The commandlog package records every command dispatched through the CommandBus — invaluable for production debugging, audit trails, and replay analysis.

Setup

import (
    "git.nullsoft.is/ash/eskit/commandlog"
    "git.nullsoft.is/ash/eskit/commandlog/memlog"   // or sqlitelog
)

// Create a store (memory for dev, SQLite for production).
store := memlog.New()

// Add middleware to the command bus.
bus.Use(commandlog.Middleware(store))

With Correlation and Metadata

bus.Use(commandlog.Middleware(store,
    commandlog.WithCorrelationFunc(func(ctx context.Context, cmd command.Command) string {
        return correlationIDFromContext(ctx)
    }),
    commandlog.WithMetadataFunc(func(ctx context.Context, cmd command.Command) map[string]string {
        return map[string]string{"user": userFromContext(ctx)}
    }),
))

Querying the Log

// Get a specific command record.
record, _ := store.Get(ctx, "cmd-id")

// Find all commands in a saga/correlation chain.
chain, _ := store.FindByCorrelation(ctx, "saga-42")

// List recent commands with pagination.
recent, _ := store.List(ctx, commandlog.QueryOpts{Limit: 50})

// Find by command type with time range filtering.
orders, _ := store.FindByType(ctx, "PlaceOrder", commandlog.QueryOpts{
    After: time.Now().Add(-24 * time.Hour),
})

Backends

Backend Module Use Case
Memory commandlog/memlog Testing, development
SQLite commandlog/sqlitelog Production, single-node

Write your own by implementing commandlog.CommandLogStore.

Examples

CLI Tool

The eskit CLI provides management and inspection of eskit event stores.

Installation

go install git.nullsoft.is/ash/eskit/cmd/eskit@latest

Usage

# Connect to a SQLite event store
eskit --store sqlite:///path/to/app.db <command>

# Output as JSON for scripting
eskit --store sqlite:///path/to/app.db --json streams list

# Decode CBOR payloads
eskit --store sqlite:///path/to/app.db --codec cbor events show 42

# Override stored codec (force JSON decode)
eskit --store sqlite:///path/to/app.db --codec json events show 42

# Show raw hex dump (skip decoding entirely)
eskit --store sqlite:///path/to/app.db --raw events show 42

Commands

Command Description
streams list List all streams with event counts
streams show <id> Show events in a stream, decoded
events search --correlation <id> Find events by correlation ID
events search --type <event-type> Find events by event type
events show <global-pos> Show single event with full detail
projections list Show projection checkpoint positions
projections rebuild <name> Reset a projection checkpoint to 0
commands list List recent commands (from command log)
commands show <id> Show command with resulting events
store stats Event count, stream count, DB size
store health Connectivity check

Flags

Flag Description
--store Store DSN (sqlite:///path or postgres://user:pass@host/db)
--json Output as JSON instead of table
--codec Payload codec (json, cbor, protobuf); auto-detect if omitted. Overrides the codec stored in each event row.
--raw Show raw payload bytes as hex dump (skip all decoding)
--limit Max results to return

Examples

# List all streams
$ eskit --store sqlite:///tmp/myapp.db streams list
STREAM      EVENTS  LATEST VERSION  LAST EVENT
----------  ------  --------------  --------------------------
account-1   5       5               2025-01-15T10:30:00Z
order-42    3       3               2025-01-15T11:00:00Z

# Search events by correlation ID (JSON output)
$ eskit --store sqlite:///tmp/myapp.db --json events search --correlation req-abc123
[
  {"id": 1, "stream_id": "account-1", "version": 1, "event_type": "AccountOpened", ...},
  {"id": 5, "stream_id": "order-42", "version": 1, "event_type": "OrderCreated", ...}
]

# Check store health
$ eskit --store sqlite:///tmp/myapp.db store health
Store is healthy.

# Get store statistics
$ eskit --store sqlite:///tmp/myapp.db store stats
Events:  1523
Streams: 42
DB Size: 2.3 MB

Store Migration

The migrate subcommand copies all event data between eskit stores, preserving stream IDs, event order, versions, codecs, timestamps, and all metadata.

# Basic migration between SQLite databases
eskit migrate --from sqlite:///source.db --to sqlite:///target.db

# Custom batch size (default 500)
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --batch-size 1000

# Dry-run: count events/streams without writing
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --dry-run

# Verify event counts match after migration
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --verify

# Resumable migration with checkpoint file
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --checkpoint migrate.ckpt

# Re-encode events from JSON to CBOR during migration
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --recode-to cbor
Flag Description
--from Source store DSN (required)
--to Target store DSN (required)
--batch-size Events per write transaction (default 500, max 100000)
--dry-run Validate source readable, count events/streams, write nothing
--verify Post-migration comparison of event counts per stream
--checkpoint File to track completed streams for resumable migrations
--recode-to Re-encode events to json or cbor during migration

Progress is reported to stderr: [42/128 streams] [15,234 events] [stream: order-abc123]

The migration architecture uses MigrationSource and MigrationTarget interfaces, making it straightforward to add new backends (Postgres, NATS, etc.).

Codec Support

The CLI automatically decodes event payloads based on the codec name stored in each event row:

Codec Decoding
json, jsoniter Standard JSON decode
cbor CBOR binary decode (maps normalized for JSON output)
protobuf Generic wire-format decode (field numbers + values, no schema needed)
gob Hex dump (requires registered types not available in CLI)
Unknown/custom Hex dump with codec name shown

Use --codec to override the stored codec (e.g., when metadata is wrong). Use --raw to skip decoding entirely and see the raw hex dump.

License

MIT — see LICENSE.