hot_db - In-Memory Snapshot DB Pattern

This package implements a high-throughput query pattern:

The current implementation in this repo is shopping/orders-specific, but this README is intentionally written as a generic hot_db blueprint.


What This Package Does

  1. Init(ctx, sourceDB):
    • Builds the initial in-memory SQLite snapshot from source records (last 2 years).
    • Starts a single worker goroutine.
    • Starts a 2-hour refresh loop.
  2. Fetch(...):
    • Enqueues a read request on the worker channel.
    • Worker executes query against current active snapshot.
  3. Refresh loop:
    • Builds a new in-memory DB off to the side.
    • Enqueues swap.
    • Worker swaps pointer and closes old DB.
  4. Close():
    • Stops worker + refresh loop.
    • Closes current snapshot.

Current Repo Files (Reference)


Generic hot_db Schema Guidance

For any domain, normalize repeating attributes into mapping tables:

This replaces CSV-style filtering hacks with indexed join/filter logic.


Driver + Dependencies

Current SQLite driver stack:

import _ "github.com/ncruces/go-sqlite3/driver"
import _ "github.com/ncruces/go-sqlite3/embed"
import _ "github.com/ncruces/go-sqlite3/vfs/memdb"

Add them:

go get github.com/ncruces/go-sqlite3/driver \
       github.com/ncruces/go-sqlite3/embed \
       github.com/ncruces/go-sqlite3/vfs/memdb

How To Reuse In Other Projects

Treat this as a generic Snapshot Query Engine:

  1. Define your normalized in-memory schema.
  2. Define source query (sourceDB) to pull full snapshot window.
  3. Define query API for your use-case.
  4. Keep queue + worker + swap lifecycle unchanged.

The queue/swap mechanics are generic; only loader/query mapping are domain-specific.


Standalone hot_db Template (Copy/Paste Starter)

Use this if you want a fresh reusable package for arbitrary datasets.

hot_db/manager.go

package hot_db

import (
	"context"
	"database/sql"
	"fmt"
	"log"
	"sync/atomic"
	"time"
)

type QueryParams struct {
	Ctx     context.Context
	From    time.Time
	To      time.Time
	Filters map[string][]string
}

type Row struct {
	Fields map[string]any
}

type QueryFunc func(db *sql.DB, p QueryParams) ([]Row, error)
type BuildFunc func(ctx context.Context, source *sql.DB) (*sql.DB, error)

type Engine struct {
	active   atomic.Pointer[sql.DB]
	reqCh    chan request
	done     chan struct{}
	closed   atomic.Bool
	sourceDB *sql.DB
	build    BuildFunc
	query    QueryFunc
	refresh  time.Duration
}

type opKind int

const (
	opFetch opKind = iota
	opSwap
	opClose
)

type request struct {
	kind     opKind
	params   QueryParams
	respCh   chan result
	newDB    *sql.DB
	swapDone chan struct{}
	closeCh  chan struct{}
}

type result struct {
	rows []Row
	err  error
}

func Init(ctx context.Context, source *sql.DB, build BuildFunc, query QueryFunc, refreshEvery time.Duration) (*Engine, error) {
	if refreshEvery <= 0 {
		refreshEvery = 2 * time.Hour
	}
	e := &Engine{
		reqCh:    make(chan request),
		done:     make(chan struct{}),
		sourceDB: source,
		build:    build,
		query:    query,
		refresh:  refreshEvery,
	}
	initial, err := e.build(ctx, e.sourceDB)
	if err != nil {
		return nil, fmt.Errorf("initial snapshot: %w", err)
	}
	e.active.Store(initial)
	go e.worker()
	go e.refreshLoop(ctx)
	return e, nil
}

func (e *Engine) Fetch(p QueryParams) ([]Row, error) {
	if e.closed.Load() {
		return nil, fmt.Errorf("hot_db engine closed")
	}
	resp := make(chan result, 1)
	select {
	case e.reqCh <- request{kind: opFetch, params: p, respCh: resp}:
	case <-p.Ctx.Done():
		return nil, p.Ctx.Err()
	}
	select {
	case r := <-resp:
		return r.rows, r.err
	case <-p.Ctx.Done():
		return nil, p.Ctx.Err()
	}
}

func (e *Engine) Swap(newDB *sql.DB) {
	done := make(chan struct{})
	e.reqCh <- request{kind: opSwap, newDB: newDB, swapDone: done}
	<-done
}

func (e *Engine) Close() error {
	if !e.closed.CompareAndSwap(false, true) {
		return nil
	}
	close(e.done)
	ack := make(chan struct{})
	e.reqCh <- request{kind: opClose, closeCh: ack}
	<-ack
	active := e.active.Swap(nil)
	if active != nil {
		return active.Close()
	}
	return nil
}

func (e *Engine) worker() {
	for req := range e.reqCh {
		switch req.kind {
		case opFetch:
			db := e.active.Load()
			if db == nil {
				req.respCh <- result{err: fmt.Errorf("hot_db has no active snapshot")}
				continue
			}
			rows, err := e.query(db, req.params)
			req.respCh <- result{rows: rows, err: err}
		case opSwap:
			old := e.active.Swap(req.newDB)
			if old != nil {
				if err := old.Close(); err != nil {
					log.Printf("hot_db close old db: %v", err)
				}
			}
			if req.swapDone != nil {
				close(req.swapDone)
			}
		case opClose:
			close(req.closeCh)
			return
		}
	}
}

func (e *Engine) refreshLoop(ctx context.Context) {
	t := time.NewTicker(e.refresh)
	defer t.Stop()
	for {
		select {
		case <-ctx.Done():
			return
		case <-e.done:
			return
		case <-t.C:
			newDB, err := e.build(ctx, e.sourceDB)
			if err != nil {
				log.Printf("hot_db refresh failed, keeping current: %v", err)
				continue
			}
			e.Swap(newDB)
			log.Printf("hot_db snapshot refreshed")
		}
	}
}

hot_db/build.go

package hot_db

import (
	"context"
	"database/sql"
	"fmt"
	"time"

	_ "github.com/ncruces/go-sqlite3/driver"
	_ "github.com/ncruces/go-sqlite3/embed"
	_ "github.com/ncruces/go-sqlite3/vfs/memdb"
)

// Example theme: ecommerce order activity.
// Replace with your own normalized model.
const schemaSQL = `
CREATE TABLE orders (
    order_id TEXT PRIMARY KEY,
    customer_id TEXT NOT NULL,
    created_at TEXT NOT NULL,
    fulfilled_at TEXT,
    order_status TEXT NOT NULL
);

CREATE TABLE order_items (
    order_id TEXT NOT NULL,
    sku TEXT NOT NULL,
    qty INTEGER NOT NULL,
    PRIMARY KEY (order_id, sku)
);

CREATE INDEX idx_orders_created_at ON orders(created_at);
CREATE INDEX idx_orders_status ON orders(order_status);
CREATE INDEX idx_order_items_sku ON order_items(sku, order_id);
`

func BuildSnapshot(ctx context.Context, source *sql.DB) (*sql.DB, error) {
	uri := fmt.Sprintf("file:/snapshot_%d.db?vfs=memdb", time.Now().UnixNano())
	mem, err := sql.Open("sqlite3", uri)
	if err != nil {
		return nil, err
	}
	if _, err := mem.ExecContext(ctx, schemaSQL); err != nil {
		_ = mem.Close()
		return nil, err
	}

	rows, err := source.QueryContext(ctx, `
		SELECT order_id, customer_id, created_at, fulfilled_at, order_status
		FROM source_orders
		WHERE created_at >= ? AND created_at <= ?
	`, time.Now().AddDate(-2, 0, 0), time.Now())
	if err != nil {
		_ = mem.Close()
		return nil, err
	}
	defer rows.Close()

	tx, err := mem.BeginTx(ctx, nil)
	if err != nil {
		_ = mem.Close()
		return nil, err
	}

	insertOrder, err := tx.PrepareContext(ctx, `
		INSERT INTO orders(order_id, customer_id, created_at, fulfilled_at, order_status)
		VALUES(?,?,?,?,?)
	`)
	if err != nil {
		_ = tx.Rollback()
		_ = mem.Close()
		return nil, err
	}
	defer insertOrder.Close()

	insertItem, err := tx.PrepareContext(ctx, `
		INSERT INTO order_items(order_id, sku, qty) VALUES(?,?,?)
	`)
	if err != nil {
		_ = tx.Rollback()
		_ = mem.Close()
		return nil, err
	}
	defer insertItem.Close()

	for rows.Next() {
		var orderID string
		var customerID string
		var createdAt string
		var fulfilledAt sql.NullString
		var status string
		if err := rows.Scan(&orderID, &customerID, &createdAt, &fulfilledAt, &status); err != nil {
			_ = tx.Rollback()
			_ = mem.Close()
			return nil, err
		}
		if _, err := insertOrder.ExecContext(ctx, orderID, customerID, createdAt, nullable(fulfilledAt), status); err != nil {
			_ = tx.Rollback()
			_ = mem.Close()
			return nil, err
		}

		// Example: pull item rows from another source table/query per order.
		itemRows, err := source.QueryContext(ctx, `
			SELECT sku, qty FROM source_order_items WHERE order_id = ?
		`, orderID)
		if err != nil {
			_ = tx.Rollback()
			_ = mem.Close()
			return nil, err
		}
		for itemRows.Next() {
			var sku string
			var qty int
			if err := itemRows.Scan(&sku, &qty); err != nil {
				itemRows.Close()
				_ = tx.Rollback()
				_ = mem.Close()
				return nil, err
			}
			if _, err := insertItem.ExecContext(ctx, orderID, sku, qty); err != nil {
				itemRows.Close()
				_ = tx.Rollback()
				_ = mem.Close()
				return nil, err
			}
		}
		if err := itemRows.Err(); err != nil {
			itemRows.Close()
			_ = tx.Rollback()
			_ = mem.Close()
			return nil, err
		}
		itemRows.Close()
	}
	if err := rows.Err(); err != nil {
		_ = tx.Rollback()
		_ = mem.Close()
		return nil, err
	}
	if err := tx.Commit(); err != nil {
		_ = mem.Close()
		return nil, err
	}

	return mem, nil
}

func nullable(v sql.NullString) any {
	if v.Valid {
		return v.String
	}
	return nil
}

hot_db/query.go

package hot_db

import (
	"database/sql"
	"fmt"
	"strings"
)

func QuerySnapshot(db *sql.DB, p QueryParams) ([]Row, error) {
	var b strings.Builder
	args := []any{p.From, p.To}
	b.WriteString(`
		SELECT o.order_id, o.customer_id, o.created_at, o.fulfilled_at, o.order_status,
		       COALESCE(GROUP_CONCAT(DISTINCT oi.sku), '') AS skus
		FROM orders o
		LEFT JOIN order_items oi ON oi.order_id = o.order_id
		WHERE o.created_at >= ? AND o.created_at <= ?`)

	if statuses := p.Filters["status"]; len(statuses) > 0 {
		b.WriteString(` AND o.order_status IN (`)
		for i := range statuses {
			if i > 0 {
				b.WriteString(",")
			}
			b.WriteString("?")
			args = append(args, statuses[i])
		}
		b.WriteString(")")
	}

	if skus := p.Filters["sku"]; len(skus) > 0 {
		b.WriteString(` AND o.order_id IN (
			SELECT order_id FROM order_items WHERE sku IN (`)
		for i := range skus {
			if i > 0 {
				b.WriteString(",")
			}
			b.WriteString("?")
			args = append(args, skus[i])
		}
		b.WriteString(`))`)
	}

	b.WriteString(` GROUP BY o.order_id, o.customer_id, o.created_at, o.fulfilled_at, o.order_status
	               ORDER BY o.created_at DESC`)

	rows, err := db.QueryContext(p.Ctx, b.String(), args...)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var out []Row
	for rows.Next() {
		var orderID, customerID, createdAt, status, skus string
		var fulfilledAt sql.NullString
		if err := rows.Scan(&orderID, &customerID, &createdAt, &fulfilledAt, &status, &skus); err != nil {
			return nil, err
		}

		latencyHours := 0.0
		if fulfilledAt.Valid {
			// Parse/compute in your preferred timestamp format.
			// Placeholder shown as a field for analytics use-cases.
			latencyHours = 1.0
		}

		out = append(out, Row{
			Fields: map[string]any{
				"order_id":       orderID,
				"customer_id":    customerID,
				"created_at":     createdAt,
				"fulfilled_at":   nullable(fulfilledAt),
				"order_status":   status,
				"skus_csv":       skus,
				"latency_hours":  fmt.Sprintf("%.2f", latencyHours),
			},
		})
	}
	return out, rows.Err()
}

Usage

engine, err := hot_db.Init(
	ctx,
	sourceDB,
	hot_db.BuildSnapshot,
	hot_db.QuerySnapshot,
	2*time.Hour,
)
if err != nil {
	panic(err)
}
defer engine.Close()

rows, err := engine.Fetch(hot_db.QueryParams{
	Ctx:  ctx,
	From: time.Now().AddDate(0, 0, -30),
	To:   time.Now(),
	Filters: map[string][]string{
		"status": {"processing", "fulfilled"},
		"sku":    {"SKU-123", "SKU-999"},
	},
})

Operational Notes


Porting Checklist

When adapting hot_db to another project:


Why This Pattern Works Well