hot_db - In-Memory Snapshot DB PatternThis package implements a high-throughput query pattern:
The current implementation in this repo is shopping/orders-specific, but this README is intentionally written as a generic hot_db blueprint.
Init(ctx, sourceDB):
Fetch(...):
swap.Close():
hot_db/hot_db.go: lifecycle, queue, worker, swap, refresh loop.hot_db/loader.go: schema creation and source->snapshot load.hot_db/query.go: query builder and row mapping.hot_db/*_test.go: correctness and concurrency/swap tests.hot_db Schema GuidanceFor any domain, normalize repeating attributes into mapping tables:
events (event_id, started_at, ended_at, status, tenant_id)event_tags (event_id, tag)events(started_at, ended_at)events(status)events(tenant_id)event_tags(tag, event_id)This replaces CSV-style filtering hacks with indexed join/filter logic.
Current SQLite driver stack:
import _ "github.com/ncruces/go-sqlite3/driver"
import _ "github.com/ncruces/go-sqlite3/embed"
import _ "github.com/ncruces/go-sqlite3/vfs/memdb"
Add them:
go get github.com/ncruces/go-sqlite3/driver \
github.com/ncruces/go-sqlite3/embed \
github.com/ncruces/go-sqlite3/vfs/memdb
Treat this as a generic Snapshot Query Engine:
sourceDB) to pull full snapshot window.The queue/swap mechanics are generic; only loader/query mapping are domain-specific.
hot_db Template (Copy/Paste Starter)Use this if you want a fresh reusable package for arbitrary datasets.
hot_db/manager.gopackage hot_db
import (
"context"
"database/sql"
"fmt"
"log"
"sync/atomic"
"time"
)
type QueryParams struct {
Ctx context.Context
From time.Time
To time.Time
Filters map[string][]string
}
type Row struct {
Fields map[string]any
}
type QueryFunc func(db *sql.DB, p QueryParams) ([]Row, error)
type BuildFunc func(ctx context.Context, source *sql.DB) (*sql.DB, error)
type Engine struct {
active atomic.Pointer[sql.DB]
reqCh chan request
done chan struct{}
closed atomic.Bool
sourceDB *sql.DB
build BuildFunc
query QueryFunc
refresh time.Duration
}
type opKind int
const (
opFetch opKind = iota
opSwap
opClose
)
type request struct {
kind opKind
params QueryParams
respCh chan result
newDB *sql.DB
swapDone chan struct{}
closeCh chan struct{}
}
type result struct {
rows []Row
err error
}
func Init(ctx context.Context, source *sql.DB, build BuildFunc, query QueryFunc, refreshEvery time.Duration) (*Engine, error) {
if refreshEvery <= 0 {
refreshEvery = 2 * time.Hour
}
e := &Engine{
reqCh: make(chan request),
done: make(chan struct{}),
sourceDB: source,
build: build,
query: query,
refresh: refreshEvery,
}
initial, err := e.build(ctx, e.sourceDB)
if err != nil {
return nil, fmt.Errorf("initial snapshot: %w", err)
}
e.active.Store(initial)
go e.worker()
go e.refreshLoop(ctx)
return e, nil
}
func (e *Engine) Fetch(p QueryParams) ([]Row, error) {
if e.closed.Load() {
return nil, fmt.Errorf("hot_db engine closed")
}
resp := make(chan result, 1)
select {
case e.reqCh <- request{kind: opFetch, params: p, respCh: resp}:
case <-p.Ctx.Done():
return nil, p.Ctx.Err()
}
select {
case r := <-resp:
return r.rows, r.err
case <-p.Ctx.Done():
return nil, p.Ctx.Err()
}
}
func (e *Engine) Swap(newDB *sql.DB) {
done := make(chan struct{})
e.reqCh <- request{kind: opSwap, newDB: newDB, swapDone: done}
<-done
}
func (e *Engine) Close() error {
if !e.closed.CompareAndSwap(false, true) {
return nil
}
close(e.done)
ack := make(chan struct{})
e.reqCh <- request{kind: opClose, closeCh: ack}
<-ack
active := e.active.Swap(nil)
if active != nil {
return active.Close()
}
return nil
}
func (e *Engine) worker() {
for req := range e.reqCh {
switch req.kind {
case opFetch:
db := e.active.Load()
if db == nil {
req.respCh <- result{err: fmt.Errorf("hot_db has no active snapshot")}
continue
}
rows, err := e.query(db, req.params)
req.respCh <- result{rows: rows, err: err}
case opSwap:
old := e.active.Swap(req.newDB)
if old != nil {
if err := old.Close(); err != nil {
log.Printf("hot_db close old db: %v", err)
}
}
if req.swapDone != nil {
close(req.swapDone)
}
case opClose:
close(req.closeCh)
return
}
}
}
func (e *Engine) refreshLoop(ctx context.Context) {
t := time.NewTicker(e.refresh)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-e.done:
return
case <-t.C:
newDB, err := e.build(ctx, e.sourceDB)
if err != nil {
log.Printf("hot_db refresh failed, keeping current: %v", err)
continue
}
e.Swap(newDB)
log.Printf("hot_db snapshot refreshed")
}
}
}
hot_db/build.gopackage hot_db
import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/ncruces/go-sqlite3/driver"
_ "github.com/ncruces/go-sqlite3/embed"
_ "github.com/ncruces/go-sqlite3/vfs/memdb"
)
// Example theme: ecommerce order activity.
// Replace with your own normalized model.
const schemaSQL = `
CREATE TABLE orders (
order_id TEXT PRIMARY KEY,
customer_id TEXT NOT NULL,
created_at TEXT NOT NULL,
fulfilled_at TEXT,
order_status TEXT NOT NULL
);
CREATE TABLE order_items (
order_id TEXT NOT NULL,
sku TEXT NOT NULL,
qty INTEGER NOT NULL,
PRIMARY KEY (order_id, sku)
);
CREATE INDEX idx_orders_created_at ON orders(created_at);
CREATE INDEX idx_orders_status ON orders(order_status);
CREATE INDEX idx_order_items_sku ON order_items(sku, order_id);
`
func BuildSnapshot(ctx context.Context, source *sql.DB) (*sql.DB, error) {
uri := fmt.Sprintf("file:/snapshot_%d.db?vfs=memdb", time.Now().UnixNano())
mem, err := sql.Open("sqlite3", uri)
if err != nil {
return nil, err
}
if _, err := mem.ExecContext(ctx, schemaSQL); err != nil {
_ = mem.Close()
return nil, err
}
rows, err := source.QueryContext(ctx, `
SELECT order_id, customer_id, created_at, fulfilled_at, order_status
FROM source_orders
WHERE created_at >= ? AND created_at <= ?
`, time.Now().AddDate(-2, 0, 0), time.Now())
if err != nil {
_ = mem.Close()
return nil, err
}
defer rows.Close()
tx, err := mem.BeginTx(ctx, nil)
if err != nil {
_ = mem.Close()
return nil, err
}
insertOrder, err := tx.PrepareContext(ctx, `
INSERT INTO orders(order_id, customer_id, created_at, fulfilled_at, order_status)
VALUES(?,?,?,?,?)
`)
if err != nil {
_ = tx.Rollback()
_ = mem.Close()
return nil, err
}
defer insertOrder.Close()
insertItem, err := tx.PrepareContext(ctx, `
INSERT INTO order_items(order_id, sku, qty) VALUES(?,?,?)
`)
if err != nil {
_ = tx.Rollback()
_ = mem.Close()
return nil, err
}
defer insertItem.Close()
for rows.Next() {
var orderID string
var customerID string
var createdAt string
var fulfilledAt sql.NullString
var status string
if err := rows.Scan(&orderID, &customerID, &createdAt, &fulfilledAt, &status); err != nil {
_ = tx.Rollback()
_ = mem.Close()
return nil, err
}
if _, err := insertOrder.ExecContext(ctx, orderID, customerID, createdAt, nullable(fulfilledAt), status); err != nil {
_ = tx.Rollback()
_ = mem.Close()
return nil, err
}
// Example: pull item rows from another source table/query per order.
itemRows, err := source.QueryContext(ctx, `
SELECT sku, qty FROM source_order_items WHERE order_id = ?
`, orderID)
if err != nil {
_ = tx.Rollback()
_ = mem.Close()
return nil, err
}
for itemRows.Next() {
var sku string
var qty int
if err := itemRows.Scan(&sku, &qty); err != nil {
itemRows.Close()
_ = tx.Rollback()
_ = mem.Close()
return nil, err
}
if _, err := insertItem.ExecContext(ctx, orderID, sku, qty); err != nil {
itemRows.Close()
_ = tx.Rollback()
_ = mem.Close()
return nil, err
}
}
if err := itemRows.Err(); err != nil {
itemRows.Close()
_ = tx.Rollback()
_ = mem.Close()
return nil, err
}
itemRows.Close()
}
if err := rows.Err(); err != nil {
_ = tx.Rollback()
_ = mem.Close()
return nil, err
}
if err := tx.Commit(); err != nil {
_ = mem.Close()
return nil, err
}
return mem, nil
}
func nullable(v sql.NullString) any {
if v.Valid {
return v.String
}
return nil
}
hot_db/query.gopackage hot_db
import (
"database/sql"
"fmt"
"strings"
)
func QuerySnapshot(db *sql.DB, p QueryParams) ([]Row, error) {
var b strings.Builder
args := []any{p.From, p.To}
b.WriteString(`
SELECT o.order_id, o.customer_id, o.created_at, o.fulfilled_at, o.order_status,
COALESCE(GROUP_CONCAT(DISTINCT oi.sku), '') AS skus
FROM orders o
LEFT JOIN order_items oi ON oi.order_id = o.order_id
WHERE o.created_at >= ? AND o.created_at <= ?`)
if statuses := p.Filters["status"]; len(statuses) > 0 {
b.WriteString(` AND o.order_status IN (`)
for i := range statuses {
if i > 0 {
b.WriteString(",")
}
b.WriteString("?")
args = append(args, statuses[i])
}
b.WriteString(")")
}
if skus := p.Filters["sku"]; len(skus) > 0 {
b.WriteString(` AND o.order_id IN (
SELECT order_id FROM order_items WHERE sku IN (`)
for i := range skus {
if i > 0 {
b.WriteString(",")
}
b.WriteString("?")
args = append(args, skus[i])
}
b.WriteString(`))`)
}
b.WriteString(` GROUP BY o.order_id, o.customer_id, o.created_at, o.fulfilled_at, o.order_status
ORDER BY o.created_at DESC`)
rows, err := db.QueryContext(p.Ctx, b.String(), args...)
if err != nil {
return nil, err
}
defer rows.Close()
var out []Row
for rows.Next() {
var orderID, customerID, createdAt, status, skus string
var fulfilledAt sql.NullString
if err := rows.Scan(&orderID, &customerID, &createdAt, &fulfilledAt, &status, &skus); err != nil {
return nil, err
}
latencyHours := 0.0
if fulfilledAt.Valid {
// Parse/compute in your preferred timestamp format.
// Placeholder shown as a field for analytics use-cases.
latencyHours = 1.0
}
out = append(out, Row{
Fields: map[string]any{
"order_id": orderID,
"customer_id": customerID,
"created_at": createdAt,
"fulfilled_at": nullable(fulfilledAt),
"order_status": status,
"skus_csv": skus,
"latency_hours": fmt.Sprintf("%.2f", latencyHours),
},
})
}
return out, rows.Err()
}
engine, err := hot_db.Init(
ctx,
sourceDB,
hot_db.BuildSnapshot,
hot_db.QuerySnapshot,
2*time.Hour,
)
if err != nil {
panic(err)
}
defer engine.Close()
rows, err := engine.Fetch(hot_db.QueryParams{
Ctx: ctx,
From: time.Now().AddDate(0, 0, -30),
To: time.Now(),
Filters: map[string][]string{
"status": {"processing", "fulfilled"},
"sku": {"SKU-123", "SKU-999"},
},
})
When adapting hot_db to another project: