arcana

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2026 License: MIT Imports: 23 Imported by: 0

README

Arcana

Go Reference Go Report Card

Reactive Graph Sync Engine for Go — real-time data synchronization between PostgreSQL and any frontend via normalized diffs.

Website | Quick Start | API Reference | TypeScript SDK

Inspired by ZeroCache, Replicache, and Electric SQL — built for Go backends with a TypeScript SDK for frontends.

Features

  • Embeddable library — not a standalone service, imports directly into your Go app
  • Graph-based subscriptions — define named SQL queries with typed parameters and table dependencies
  • Built-in WebSocket transport — zero-dependency real-time sync, no external services needed
  • Mutations — server-side write operations with automatic reactive invalidation
  • Normalized in-memory store — shared data across subscriptions with RefCount GC
  • JSON Patch diffs — RFC 6902 compliant, minimal data over the wire
  • Two update streamstable_diff (row data, broadcast) + view_diff (structure, per-client)
  • Reconnect sync — catch-up via version history or full snapshot fallback
  • Three change detection modes — explicit notify, PostgreSQL LISTEN/NOTIFY with auto-triggers, WAL logical replication
  • Pluggable transport — built-in WebSocket, Centrifugo, or any custom transport
  • TypeScript SDK — reactive client with React, Vue 3, and Svelte 5 adapters, offline persistence, and codegen
  • DevTools panel — built-in dashboard for runtime inspection of graphs, subscriptions, and connections
  • Production-ready — retry with backoff, reconnect, UUID validation, configurable limits

Installation

go get github.com/FrankFMY/arcana

Quick Start

package main

import (
    "context"
    "net/http"

    "github.com/FrankFMY/arcana"
    "github.com/jackc/pgx/v5/pgxpool"
)

func main() {
    pool, _ := pgxpool.New(context.Background(), "postgres://...")

    engine := arcana.New(arcana.Config{
        Pool: arcana.PgxQuerier(pool),
        AuthFunc: func(r *http.Request) (*arcana.Identity, error) {
            return &arcana.Identity{
                SeanceID:    r.Header.Get("X-Seance-ID"),
                UserID:      "user-1",
                WorkspaceID: "workspace-1",
            }, nil
        },
    })

    engine.Register(UserList)
    engine.Start(context.Background())
    defer engine.Stop()

    mux := http.NewServeMux()
    mux.Handle("/arcana/", http.StripPrefix("/arcana", engine.Handler()))
    mux.Handle("/ws", engine.WSHandler())
    http.ListenAndServe(":8080", mux)
}

That's it — built-in WebSocket transport with zero external dependencies. For Centrifugo or custom transports, see Transport.

Core Concepts

Graphs

A graph is a named, parameterized SQL query that produces normalized data. When the underlying data changes, Arcana re-runs the query and pushes only the diff to subscribed clients.

var UserList = arcana.GraphDef{
    Key: "user_list",

    // Typed parameters with validation
    Params: arcana.ParamSchema{
        "org_id": arcana.ParamUUID().Required(),
        "limit":  arcana.ParamInt().Default(50),
        "role":   arcana.ParamString().OneOf("admin", "member", "viewer").Build(),
    },

    // Declares which tables/columns this graph depends on.
    // Changes to these columns trigger re-evaluation.
    Deps: []arcana.TableDep{
        {Table: "users", Columns: []string{"id", "name", "email", "role"}},
    },

    // Factory runs the SQL query and returns normalized refs + rows.
    Factory: func(ctx context.Context, q arcana.Querier, p arcana.Params) (*arcana.Result, error) {
        wsID := arcana.WorkspaceID(ctx)

        rows, err := q.Query(ctx,
            `SELECT id, name, email, role FROM users
             WHERE workspace_id = $1 AND org_id = $2 ORDER BY name LIMIT $3`,
            wsID, p.UUID("org_id"), p.Int("limit"),
        )
        if err != nil {
            return nil, err
        }
        defer rows.Close()

        result := arcana.NewResult()
        for rows.Next() {
            var id, name, email, role string
            rows.Scan(&id, &name, &email, &role)

            // AddRow stores the normalized row data
            result.AddRow("users", id, map[string]any{
                "id": id, "name": name, "email": email, "role": role,
            })

            // AddRef defines the view structure (which rows belong to this view)
            result.AddRef(arcana.Ref{
                Table: "users", ID: id,
                Fields: []string{"id", "name", "email", "role"},
            })
        }
        return result, nil
    },
}
Normalized Store

Arcana maintains a 4-level normalized in-memory store: workspace -> table -> row_id -> fields. Multiple subscriptions can reference the same row — a RefCount GC automatically cleans up rows no longer referenced by any subscription.

Two Update Streams

When data changes, Arcana sends two types of messages:

Stream Channel Content Scope
table_diff workspace:{id} Row field changes (JSON Patch) Broadcast to all clients in workspace
view_diff views:{seanceID} Refs structure changes + new row data Per-client (seance)

This separation means row data is sent once to the workspace (shared), while each client only receives structural changes relevant to their subscriptions.

Identity & Context

Every subscription runs within an authenticated context:

type Identity struct {
    SeanceID    string   // Unique client session ID
    UserID      string   // Authenticated user
    WorkspaceID string   // Tenant/organization scope
    Role        string   // User role
    Permissions []string // Fine-grained permissions
}

Inside a factory, use context helpers:

Factory: func(ctx context.Context, q arcana.Querier, p arcana.Params) (*arcana.Result, error) {
    wsID := arcana.WorkspaceID(ctx)      // Workspace isolation
    user := arcana.User(ctx)             // Full identity
    if !user.HasPermission("read:users") {
        return nil, arcana.ErrForbidden
    }
    // ...
}

Mutations

Register server-side write operations that automatically trigger reactive updates:

var CreateTask = arcana.MutationDef{
    Key: "create_task",
    Params: arcana.ParamSchema{
        "title":  arcana.ParamString().Required(),
        "status": arcana.ParamString().Default("todo"),
    },
    Handler: func(ctx context.Context, q arcana.Querier, p arcana.Params) (*arcana.MutationResult, error) {
        // Insert into database...
        id := "new-task-id"

        return &arcana.MutationResult{
            Data: map[string]any{"id": id},
            Changes: []arcana.Change{
                {Table: "tasks", RowID: id, Columns: []string{"id", "title", "status"}},
            },
        }, nil
    },
}

engine.RegisterMutation(CreateTask)

Execute via HTTP (POST /mutate) or WebSocket ({"type":"mutate","action":"create_task","params":{...}}). Changes listed in the result are automatically fed into the invalidation pipeline.

Change Detection

Arcana supports three modes for detecting data changes:

1. Explicit Notify (Default)

Call engine.Notify() after database mutations. Simple, requires app instrumentation.

func (r *UserRepo) Update(ctx context.Context, id, name string) error {
    _, err := r.pool.Exec(ctx, "UPDATE users SET name = $1 WHERE id = $2", name, id)
    if err == nil {
        engine.NotifyTable(ctx, "users", id, []string{"name"})
    }
    return err
}
2. PostgreSQL LISTEN/NOTIFY

Auto-generated triggers send change notifications via PostgreSQL's NOTIFY mechanism. Zero app instrumentation needed after setup.

// Generate and install triggers for all registered tables
stmts := arcana.GenerateTriggerSQL("arcana_changes", engine.Registry().RepTable())
for _, sql := range stmts {
    pool.Exec(ctx, sql)
}
// Or use the convenience function:
arcana.EnsureTriggers(ctx, pool, "arcana_changes", engine.Registry().RepTable())

// Configure the engine to listen
engine := arcana.New(arcana.Config{
    ChangeDetector: arcana.NewPGNotifyListener(arcana.PGNotifyConfig{
        Conn:    pgConn,
        Channel: "arcana_changes",
    }),
    // ...
})

PGNotifyListener features:

  • Automatic reconnect with exponential backoff
  • Configurable max retries
  • JSON payload parsing with table/row/column extraction
3. WAL Logical Replication

Listens to PostgreSQL's logical replication stream (pgoutput plugin). Most reliable — captures all DML operations with zero app changes. Requires wal_level=logical.

engine := arcana.New(arcana.Config{
    ChangeDetector: arcana.NewWALListener(arcana.WALConfig{
        ConnString:     "postgres://user:pass@localhost:5432/mydb",
        SlotName:       "arcana_slot",     // default
        Publication:    "arcana_pub",      // default
        Tables:         []string{"users", "orders"}, // or empty for all
        StandbyTimeout: 10 * time.Second,  // default
    }),
    // ...
})

WALListener features:

  • Automatic publication and replication slot creation
  • Relation cache for column name resolution
  • Changed column detection (only reports actually modified columns)
  • Standby status updates for PostgreSQL keepalive

Note: You can combine modes. When using PGNotifyListener or WALListener as the ChangeDetector, explicit engine.Notify() calls still work — both paths feed into the same invalidation pipeline.

Transport

The Transport interface abstracts message delivery to clients:

type Transport interface {
    SendToSeance(ctx context.Context, seanceID string, msg Message) error
    SendToWorkspace(ctx context.Context, workspaceID string, msg Message) error
    DisconnectSeance(ctx context.Context, seanceID string) error
}
WebSocket Transport (Built-in, Default)

When no Transport is provided, Arcana automatically creates a built-in WebSocket transport. No external services needed.

engine := arcana.New(arcana.Config{
    Pool:     querier,
    AuthFunc: authFunc,
    // WSTransport is created automatically
})
engine.Start(ctx)

mux.Handle("/ws", engine.WSHandler()) // Mount the WS endpoint

The WS transport supports two auth modes:

  • HTTP upgrade authAuthFunc reads cookies/headers during the upgrade
  • Token-based auth — client sends {"type":"auth","token":"..."} as the first message
arcana.WSTransportConfig{
    AuthFunc:      authFunc,       // HTTP upgrade auth
    TokenAuthFunc: tokenAuthFunc,  // Token-based auth (SPA/mobile)
    WriteBufferSize: 256,          // Messages buffered per connection (default: 256)
    PingInterval:    30 * time.Second,
    PingTimeout:     10 * time.Second,
}

WS protocol supports: subscribe, unsubscribe, sync, mutate — all via a single connection with request/reply correlation (id/reply_to).

Centrifugo Transport
transport := arcana.NewCentrifugoTransport(arcana.CentrifugoConfig{
    APIURL:     "http://localhost:8000", // Base URL (without /api — appended automatically)
    APIKey:     "your-centrifugo-api-key",
    HTTPClient: &http.Client{Timeout: 5 * time.Second}, // optional
    Retries:    3, // optional, default: 0
})

Important: The Centrifugo transport appends /api to the APIURL internally. Pass the base URL (e.g., http://localhost:8000), not http://localhost:8000/api.

Features:

  • Batch publishing — sends multiple messages in a single HTTP call via /api/batch
  • Retry with exponential backoff — configurable retry count (100ms -> 500ms -> 2.5s)
  • Channel namingworkspace:{id} for table diffs, views:{seanceID} for view diffs
Custom Transport

Implement the Transport interface for any delivery mechanism (raw WebSocket, SSE, gRPC streams):

type MyTransport struct{}

func (t *MyTransport) SendToSeance(ctx context.Context, seanceID string, msg arcana.Message) error {
    // Deliver msg to the specific client session
    return nil
}

func (t *MyTransport) SendToWorkspace(ctx context.Context, wsID string, msg arcana.Message) error {
    // Broadcast msg to all clients in the workspace
    return nil
}

func (t *MyTransport) DisconnectSeance(ctx context.Context, seanceID string) error {
    // Force-disconnect a client session
    return nil
}

HTTP API

Mount the handler on your router:

mux.Handle("/arcana/", http.StripPrefix("/arcana", engine.Handler()))
Endpoints
Method Path Description
POST /subscribe Subscribe to a graph view
POST /unsubscribe Unsubscribe from a view
POST /sync Reconnect sync (catch-up or snapshot)
POST /mutate Execute a registered mutation
GET /active List active subscriptions for the current seance
GET /schema Representation table (all tables/columns across graphs)
GET /health Health check with engine stats

All endpoints require authentication via AuthFunc. Responses use the envelope format: {"ok": true, "data": {...}} or {"ok": false, "error": "..."}.

Subscribe
POST /subscribe
{
    "view": "user_list",
    "params": {"org_id": "550e8400-...", "limit": 20}
}

Response:
{
    "ok": true,
    "data": {
        "params_hash": "a1b2c3d4",
        "version": 1,
        "refs": [
            {"table": "users", "id": "u1", "fields": ["id", "name", "email"]}
        ],
        "tables": {
            "users": {
                "u1": {"id": "u1", "name": "Alice", "email": "alice@example.com"}
            }
        },
        "total": 150
    }
}

// "total" is included when the factory calls result.SetTotal(n).
// Useful for paginated views where clients need the full count.
Sync (Reconnect)

When a client reconnects, it sends its last known version for each view. The engine responds with either catch-up patches or a full snapshot:

POST /sync
{
    "views": [
        {"view": "user_list", "params_hash": "a1b2c3d4", "version": 3}
    ]
}

Response (catch-up mode):
{
    "ok": true,
    "data": {
        "views": [{
            "view": "user_list",
            "params_hash": "a1b2c3d4",
            "mode": "catch_up",
            "patches": [
                {"version": 4, "refs_patch": [...], "tables": {...}},
                {"version": 5, "refs_patch": [...], "tables": {...}}
            ]
        }]
    }
}

Response (snapshot mode — version gap too large):
{
    "ok": true,
    "data": {
        "views": [{
            "view": "user_list",
            "params_hash": "a1b2c3d4",
            "mode": "snapshot",
            "version": 42,
            "refs": [...],
            "tables": {...}
        }]
    }
}

The SnapshotThreshold config controls when sync falls back to a full snapshot (default: 50 versions).

DevTools

Mount the built-in DevTools dashboard for runtime inspection:

mux.Handle("/devtools/", http.StripPrefix("/devtools", engine.DevToolsHandler()))

Provides:

  • GET /arcana/devtools/ — dark-themed HTML dashboard with auto-refresh
  • GET /arcana/devtools/state — JSON endpoint with graphs, mutations, subscriptions, WS connections, data store stats

Configuration

arcana.Config{
    // Required
    Pool:      arcana.PgxQuerier(pool),              // PostgreSQL connection pool
    Transport: transport,                             // Message delivery

    // Authentication
    AuthFunc:  func(r *http.Request) (*arcana.Identity, error) { ... },

    // Change detection (nil = ExplicitNotifier)
    ChangeDetector: nil,

    // Tuning
    InvalidationDebounce:      50 * time.Millisecond, // Batch changes within window
    MaxSubscriptionsPerSeance: 100,                    // Per-client subscription limit
    SnapshotThreshold:         50,                     // Version gap for full snapshot vs catch-up
    GCInterval:                time.Minute,            // Unreferenced row cleanup interval
}

Params API

Defining Parameters
arcana.ParamSchema{
    "org_id":   arcana.ParamUUID().Required(),            // Required UUID
    "limit":    arcana.ParamInt().Default(50),             // Optional int with default
    "status":   arcana.ParamString().OneOf("active", "archived").Build(), // Enum
    "verbose":  arcana.ParamBool().Default(false),         // Optional boolean
    "min_price": arcana.ParamFloat().Build(),              // Optional float
}
Using Parameters in Factories
Factory: func(ctx context.Context, q arcana.Querier, p arcana.Params) (*arcana.Result, error) {
    orgID := p.UUID("org_id")       // string
    limit := p.Int("limit")         // int (50 if not provided)
    status := p.String("status")    // string
    verbose := p.Bool("verbose")    // bool
    minPrice := p.Float("min_price") // float64
    raw := p.Raw()                  // map[string]any
    // ...
}
Validating Parameters
resolved, err := arcana.ValidateParams(schema, rawInput)
// strict mode rejects unknown parameters:
resolved, err := arcana.ValidateParams(schema, rawInput, true)

Result API

result := arcana.NewResult()

// Add a normalized row (table, row_id, fields)
result.AddRow("users", "u1", map[string]any{
    "id": "u1", "name": "Alice", "email": "alice@example.com",
})

// Add a ref (defines view structure — which rows belong to this view)
result.AddRef(arcana.Ref{
    Table: "users", ID: "u1",
    Fields: []string{"id", "name", "email"},
})

// Nested refs for hierarchical data
result.AddRef(arcana.Ref{
    Table: "orders", ID: "o1",
    Fields: []string{"id", "total"},
    Nested: map[string]arcana.Ref{
        "customer": {Table: "users", ID: "u1", Fields: []string{"id", "name"}},
    },
})

// Pagination: set total count for paginated results
result.SetTotal(150) // total matching rows (before LIMIT/OFFSET)

// Inspect
result.RowCount()  // number of rows added
result.Refs()      // all refs
result.Tables()    // map[table]map[rowID]map[field]any
result.Total()     // total count set via SetTotal (0 if not set)

Errors

arcana.ErrForbidden            // 403 — permission denied
arcana.ErrNotFound             // 404 — graph or subscription not found
arcana.ErrInvalidParams        // 400 — parameter validation failed
arcana.ErrTooManySubscriptions // 429 — exceeded MaxSubscriptionsPerSeance
arcana.ErrAlreadyStarted       // engine.Start() called twice
arcana.ErrNotStarted           // engine.Stop() called before Start()

Engine Stats

stats := engine.Stats()
// EngineStats{
//     Running:             true,
//     RegisteredGraphs:    5,
//     ActiveSubscriptions: 42,
//     SeancesWithSubs:     12,
//     DataStoreRows:       350,
// }

TypeScript Codegen

Generate type-safe TypeScript definitions from your Go graph registry:

go run ./cmd/arcana-gen -output ./sdk/generated/

Produces tables.d.ts (all table schemas) and views.d.ts (graph parameters and dependencies).

Database Adapter

Arcana uses a Querier interface compatible with any SQL driver:

type Querier interface {
    Query(ctx context.Context, sql string, args ...any) (Rows, error)
    QueryRow(ctx context.Context, sql string, args ...any) Row
}

For pgx v5:

pool, _ := pgxpool.New(ctx, connStr)
querier := arcana.PgxQuerier(pool)

Project Structure

arcana.go                  Engine: New, Start, Stop, Notify, Handler, Stats
config.go                  Configuration with sensible defaults
types.go                   Core types: GraphDef, Ref, PatchOp, Params, ParamSchema, Identity
result.go                  Factory result accumulator (AddRow, AddRef, SetTotal)
mutation.go                Mutation types and execution logic
errors.go                  Exported error values
context.go                 Identity context helpers (WithIdentity, WorkspaceID, User)
registry.go                Graph + mutation registry with inverted indices
store.go                   4-level normalized DataStore with RefCount GC
diff.go                    JSON Patch (RFC 6902) diff engine
invalidator.go             Change -> Factory re-run -> diff -> transport pipeline
manager.go                 Subscription lifecycle, sync (catch-up/snapshot)
subscription.go            Subscription with version history ring buffer
handler.go                 HTTP endpoints (/subscribe, /unsubscribe, /sync, /mutate, /active, /schema, /health)
middleware.go              Auth middleware for HTTP handler
devtools.go                DevTools panel (HTML dashboard + JSON state endpoint)
transport.go               Transport interface
transport_ws.go            Built-in WebSocket transport (default)
transport_centrifugo.go    Centrifugo HTTP API (publish, batch, retry with backoff)
change.go                  ChangeDetector interface
change_explicit.go         Explicit notify (default, channel-based)
change_pgnotify.go         PostgreSQL LISTEN/NOTIFY with auto-reconnect
change_wal.go              PostgreSQL WAL logical replication (pgoutput)
pgnotify_triggers.go       Auto-generate PostgreSQL trigger functions
pgx_adapter.go             pgxpool.Pool -> Querier adapter
codegen.go                 TypeScript type generation
sdk/                       TypeScript client SDK
  sdk/client.ts            ArcanaClient with offline support
  sdk/adapters/react.ts    React hooks (useView, useRow, useMutation, useStatus)
  sdk/adapters/vue.ts      Vue 3 composables (useView, useRow, useMutation, useStatus)
  sdk/adapters/svelte.ts   Svelte 5 adapter (createSubscription)
  sdk/transports/ws.ts     WebSocket transport for SDK
cmd/arcana-gen/            Codegen CLI tool
examples/basic/            Minimal working example
examples/dashboard/        Real-time dashboard with WS, mutations, and DevTools

Testing

# Unit tests (fast, no Docker)
go test -short ./...

# Full suite including integration tests (requires Docker for testcontainers)
go test -race ./... -count=1

# Integration tests only
go test -run Integration -v -timeout 120s

Contributing

See CONTRIBUTING.md for guidelines.

License

MIT

Author

Artem Pryanishnikov

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrForbidden is returned when a user lacks permission to access a graph.
	ErrForbidden = errors.New("arcana: forbidden")

	// ErrNotFound is returned when a requested graph or subscription does not exist.
	ErrNotFound = errors.New("arcana: not found")

	// ErrInvalidParams is returned when subscription parameters fail validation.
	ErrInvalidParams = errors.New("arcana: invalid params")

	// ErrTooManySubscriptions is returned when a seance exceeds its subscription limit.
	ErrTooManySubscriptions = errors.New("arcana: too many subscriptions")

	// ErrAlreadyStarted is returned when Start is called on a running engine.
	ErrAlreadyStarted = errors.New("arcana: already started")

	// ErrNotStarted is returned when operations are attempted before Start.
	ErrNotStarted = errors.New("arcana: not started")

	// ErrTransportNotReady is returned when WSTransport is used before Engine.Start.
	ErrTransportNotReady = errors.New("arcana: transport not ready")
)

Functions

func ComputeParamsHash

func ComputeParamsHash(graphKey string, params map[string]any) string

ComputeParamsHash produces a deterministic hash of graphKey + sorted params.

func EnsureTriggers

func EnsureTriggers(ctx context.Context, exec TriggerExec, channel string, tables map[string][]string) error

EnsureTriggers creates pg_notify triggers for all registered tables.

func GenerateTables

func GenerateTables(w io.Writer, registry *Registry) error

GenerateTables writes TypeScript type definitions for all tables based on the representation table.

func GenerateTriggerSQL

func GenerateTriggerSQL(channel string, tables map[string][]string) []string

GenerateTriggerSQL generates CREATE OR REPLACE FUNCTION + DROP/CREATE TRIGGER SQL for each table in the tables map (table_name -> []tracked_columns). channel is the pg_notify channel name (e.g. "arcana_changes"). Returns a slice of SQL statements (3 per table: function + drop trigger + create trigger).

func GenerateViews

func GenerateViews(w io.Writer, registry *Registry) error

GenerateViews writes TypeScript type definitions for all views with their params and refs structure.

func ValidateParams

func ValidateParams(schema ParamSchema, raw map[string]any, strict ...bool) (map[string]any, error)

ValidateParams validates raw input against a ParamSchema, applying defaults and type checks. Returns resolved values or an error. When strict is true, any parameter key not defined in the schema causes an error.

func WithIdentity

func WithIdentity(ctx context.Context, id *Identity) context.Context

WithIdentity stores an Identity in the context.

func WorkspaceID

func WorkspaceID(ctx context.Context) string

WorkspaceID is a shorthand to get workspace ID from the context identity.

Types

type AuthFunc

type AuthFunc func(r *http.Request) (*Identity, error)

AuthFunc extracts an Identity from an HTTP request.

type BatchMessage added in v0.1.1

type BatchMessage struct {
	Channel string
	Msg     Message
}

BatchMessage represents a single message in a batch publish.

type CentrifugoConfig

type CentrifugoConfig struct {
	APIURL     string
	APIKey     string
	HTTPClient *http.Client
	Retries    int
}

CentrifugoConfig holds configuration for CentrifugoTransport.

type CentrifugoTransport

type CentrifugoTransport struct {
	// contains filtered or unexported fields
}

CentrifugoTransport publishes messages via Centrifugo HTTP API.

func NewCentrifugoTransport

func NewCentrifugoTransport(cfg CentrifugoConfig) *CentrifugoTransport

NewCentrifugoTransport creates a transport that publishes via Centrifugo HTTP API.

func (*CentrifugoTransport) DisconnectSeance

func (t *CentrifugoTransport) DisconnectSeance(ctx context.Context, seanceID string) error

DisconnectSeance disconnects a client by seance ID.

func (*CentrifugoTransport) SendBatch added in v0.1.1

func (t *CentrifugoTransport) SendBatch(ctx context.Context, msgs []BatchMessage) error

SendBatch publishes multiple messages in a single HTTP call.

func (*CentrifugoTransport) SendToSeance

func (t *CentrifugoTransport) SendToSeance(ctx context.Context, seanceID string, msg Message) error

SendToSeance publishes a message to the seance-specific channel.

func (*CentrifugoTransport) SendToWorkspace

func (t *CentrifugoTransport) SendToWorkspace(ctx context.Context, workspaceID string, msg Message) error

SendToWorkspace publishes a message to the workspace channel.

type Change

type Change struct {
	Table   string   `json:"table"`
	RowID   string   `json:"id"`
	Op      string   `json:"op,omitempty"` // "INSERT", "UPDATE", "DELETE"
	Columns []string `json:"columns,omitempty"`
}

Change represents a data mutation event from the application or database.

func ParsePGNotifyPayload

func ParsePGNotifyPayload(payload string) (Change, error)

ParsePGNotifyPayload parses a JSON payload from pg_notify into a Change. Expected format: {"table":"users","id":"uuid","op":"UPDATE","columns":["name"]}

type ChangeDetector

type ChangeDetector interface {
	// Start begins listening for changes. The handler is called for each detected change.
	Start(ctx context.Context, handler func(Change)) error

	// Stop terminates the change detection loop.
	Stop() error
}

ChangeDetector listens for data changes and dispatches them to the engine.

type Config

type Config struct {
	// Pool is the PostgreSQL connection pool (required).
	Pool Querier

	// Transport delivers messages to clients (required).
	Transport Transport

	// AuthFunc extracts identity from an incoming HTTP request.
	AuthFunc AuthFunc

	// ChangeDetector determines how data changes are detected.
	// Defaults to an internal explicit notifier if nil.
	ChangeDetector ChangeDetector

	// InvalidationDebounce batches changes within this window. Default: 50ms.
	InvalidationDebounce time.Duration

	// MaxSubscriptionsPerSeance limits subscriptions per client session. Default: 100.
	MaxSubscriptionsPerSeance int

	// SnapshotThreshold sends a full snapshot instead of diffs when version
	// difference exceeds this value. Default: 50.
	SnapshotThreshold int

	// GCInterval is how often the store garbage-collects unreferenced rows. Default: 1min.
	GCInterval time.Duration

	// WSConfig configures the built-in WebSocket transport.
	// Used when Transport is nil (auto-creation mode).
	// If nil and Transport is nil, default WSTransportConfig is used.
	WSConfig *WSTransportConfig
}

Config holds the configuration for an Arcana engine instance.

type DataStore

type DataStore struct {
	// contains filtered or unexported fields
}

DataStore is the in-memory normalized data store. Structure: workspace_id → table_name → row_id → StoreRow.

func NewDataStore

func NewDataStore() *DataStore

NewDataStore creates an empty DataStore.

func (*DataStore) DecrRef

func (s *DataStore) DecrRef(wsID, table, rowID string) bool

DecrRef atomically decrements the reference count. Returns true if the count reached zero (candidate for GC).

func (*DataStore) Delete

func (s *DataStore) Delete(wsID, table, rowID string) (*StoreRow, bool)

Delete removes a row from the store. Returns the deleted row and true, or nil and false if the row didn't exist.

func (*DataStore) GC

func (s *DataStore) GC(wsID, table, rowID string)

GC removes a row if its RefCount is zero or negative.

func (*DataStore) GCAll

func (s *DataStore) GCAll()

GCAll iterates all rows and removes those with RefCount <= 0.

func (*DataStore) Get

func (s *DataStore) Get(wsID, table, rowID string) (*StoreRow, bool)

Get retrieves a row from the store. Returns nil, false if not found.

func (*DataStore) GetFields

func (s *DataStore) GetFields(wsID, table, rowID string) map[string]any

GetFields returns a copy of the row's fields. Returns nil if not found.

func (*DataStore) IncrRef

func (s *DataStore) IncrRef(wsID, table, rowID string)

IncrRef atomically increments the reference count of a row.

func (*DataStore) RowCount added in v0.1.1

func (s *DataStore) RowCount() int

RowCount returns the total number of rows across all workspaces and tables.

func (*DataStore) RowVersion

func (s *DataStore) RowVersion(wsID, table, rowID string) int64

RowVersion returns the version of a row, or 0 if not found.

func (*DataStore) Upsert

func (s *DataStore) Upsert(wsID, table, rowID string, fields map[string]any) (oldFields map[string]any, diff []PatchOp, isNew bool)

Upsert inserts or updates a row. Returns the diff (patch operations), whether the row was newly created, and the old fields snapshot.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the main entry point for the Arcana reactive sync library.

func New

func New(config Config) *Engine

New creates a new Arcana engine with the given configuration.

func (*Engine) DevToolsHandler added in v0.2.0

func (e *Engine) DevToolsHandler() http.Handler

DevToolsHandler returns an http.Handler that serves a DevTools dashboard for inspecting engine state. Mount with StripPrefix at your chosen path:

mux.Handle("/devtools/", http.StripPrefix("/devtools", engine.DevToolsHandler()))

func (*Engine) Handler

func (e *Engine) Handler() http.Handler

Handler returns an http.Handler for the Arcana endpoints. Mount this on your router: mux.Mount("/arcana", engine.Handler())

func (*Engine) Mutate added in v0.2.0

func (e *Engine) Mutate(ctx context.Context, req MutateRequest) (*MutateResponse, error)

Mutate executes a registered mutation. Identity must be in the context.

func (*Engine) Notify

func (e *Engine) Notify(ctx context.Context, change Change)

Notify sends a data change event to the engine for processing.

func (*Engine) NotifyTable

func (e *Engine) NotifyTable(ctx context.Context, table, rowID string, columns []string)

NotifyTable is a convenience method for notifying about a table change.

func (*Engine) Register

func (e *Engine) Register(defs ...GraphDef) error

Register adds graph definitions to the engine. Must be called before Start.

func (*Engine) RegisterMutation added in v0.2.0

func (e *Engine) RegisterMutation(defs ...MutationDef) error

RegisterMutation adds mutation definitions to the engine. Must be called before Start.

func (*Engine) Registry

func (e *Engine) Registry() *Registry

Registry returns the engine's graph registry (for codegen and inspection).

func (*Engine) Start

func (e *Engine) Start(ctx context.Context) error

Start initializes all internal components and begins processing.

func (*Engine) Stats added in v0.1.1

func (e *Engine) Stats() EngineStats

Stats returns current engine statistics for monitoring/admin endpoints.

func (*Engine) Stop

func (e *Engine) Stop() error

Stop gracefully shuts down the engine.

func (*Engine) WSHandler added in v0.2.0

func (e *Engine) WSHandler() http.Handler

WSHandler returns the http.Handler for WebSocket connections. Mount this at your chosen path: mux.Handle("/arcana/ws", engine.WSHandler()) Returns nil if the engine is not using WSTransport.

type EngineStats added in v0.1.1

type EngineStats struct {
	Running             bool `json:"running"`
	RegisteredGraphs    int  `json:"registered_graphs"`
	ActiveSubscriptions int  `json:"active_subscriptions"`
	SeancesWithSubs     int  `json:"seances_with_subs"`
	DataStoreRows       int  `json:"data_store_rows"`
}

EngineStats holds runtime statistics about the engine.

type ExplicitNotifier

type ExplicitNotifier struct {
	// contains filtered or unexported fields
}

ExplicitNotifier is the default ChangeDetector that receives changes via a channel. Application code calls engine.Notify() which sends to this channel.

func NewExplicitNotifier

func NewExplicitNotifier(bufferSize int) *ExplicitNotifier

NewExplicitNotifier creates a notifier with the given buffer size.

func (*ExplicitNotifier) Send

func (n *ExplicitNotifier) Send(change Change)

Send enqueues a change for processing.

func (*ExplicitNotifier) Start

func (n *ExplicitNotifier) Start(_ context.Context, handler func(Change)) error

Start begins processing changes from the channel.

func (*ExplicitNotifier) Stop

func (n *ExplicitNotifier) Stop() error

Stop terminates the processing loop.

type FactoryFunc

type FactoryFunc func(ctx context.Context, q Querier, p Params) (*Result, error)

FactoryFunc executes SQL queries and returns normalized refs + rows.

type GraphDef

type GraphDef struct {
	Key     string
	Params  ParamSchema
	Deps    []TableDep
	Factory FactoryFunc
}

GraphDef defines a reactive data graph — a named SQL-backed query with dependencies and a factory function that produces normalized results.

type Identity

type Identity struct {
	SeanceID    string
	UserID      string
	WorkspaceID string
	Role        string
	Permissions []string
}

Identity represents an authenticated user session.

func IdentityFromCtx

func IdentityFromCtx(ctx context.Context) *Identity

IdentityFromCtx retrieves the Identity from the context.

func User

func User(ctx context.Context) *Identity

User returns the Identity from context (alias for IdentityFromCtx).

func (*Identity) HasPermission

func (id *Identity) HasPermission(perm string) bool

HasPermission checks whether the identity has the given permission.

type Invalidator

type Invalidator struct {
	// contains filtered or unexported fields
}

Invalidator processes data changes and dispatches updates to affected subscriptions.

func NewInvalidator

func NewInvalidator(registry *Registry, store *DataStore, transport Transport, subs SubscriptionStore, pool Querier, logger *slog.Logger) *Invalidator

NewInvalidator creates an Invalidator.

func (*Invalidator) Invalidate

func (inv *Invalidator) Invalidate(ctx context.Context, change Change)

Invalidate processes a Change event: finds affected graphs, filters by columns, and dispatches table_diff and view_diff to affected subscriptions.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager orchestrates subscription lifecycle and connects Registry, DataStore, Invalidator, and Transport.

func NewManager

func NewManager(registry *Registry, store *DataStore, transport Transport, pool Querier, config *Config, logger *slog.Logger) *Manager

NewManager creates a Manager.

func (*Manager) GetActive

func (m *Manager) GetActive(seanceID string) []*Subscription

GetActive returns all active subscriptions for a seance.

func (*Manager) GetByGraphKey

func (m *Manager) GetByGraphKey(graphKey string) []*Subscription

GetByGraphKey implements SubscriptionStore for Invalidator.

func (*Manager) Stats added in v0.1.1

func (m *Manager) Stats() (subs int, seances int)

Stats returns subscription count and seance count.

func (*Manager) Subscribe

func (m *Manager) Subscribe(ctx context.Context, req SubscribeRequest) (*SubscribeResponse, error)

Subscribe creates a new subscription, executes the factory, and returns initial data.

func (*Manager) SubscriptionCount

func (m *Manager) SubscriptionCount() int

SubscriptionCount returns total active subscriptions.

func (*Manager) Sync

func (m *Manager) Sync(ctx context.Context, req SyncRequest) (*SyncResponse, error)

Sync processes a reconnect request, returning catch-up diffs or full snapshots.

func (*Manager) Unsubscribe

func (m *Manager) Unsubscribe(seanceID, paramsHash string) error

Unsubscribe removes a subscription identified by seanceID and paramsHash.

func (*Manager) UnsubscribeAll

func (m *Manager) UnsubscribeAll(seanceID string)

UnsubscribeAll removes all subscriptions for a seance (on disconnect).

func (*Manager) UpdateSubscription

func (m *Manager) UpdateSubscription(subID string, refs []Ref, version int64, entry *VersionEntry)

UpdateSubscription updates a subscription's LastRefs and Version, and records a version history entry for catch-up sync.

type Message

type Message struct {
	Type string `json:"type"`
	Data any    `json:"data"`
}

Message is the envelope sent over the transport layer.

type MutateRequest added in v0.2.0

type MutateRequest struct {
	Action      string         `json:"action"`
	Params      map[string]any `json:"params"`
	SeanceID    string         `json:"-"`
	UserID      string         `json:"-"`
	WorkspaceID string         `json:"-"`
}

MutateRequest is the input for a mutation execution.

type MutateResponse added in v0.2.0

type MutateResponse struct {
	OK   bool `json:"ok"`
	Data any  `json:"data,omitempty"`
}

MutateResponse is returned after a successful mutation.

type MutationDef added in v0.2.0

type MutationDef struct {
	Key     string
	Params  ParamSchema
	Handler MutationFunc
}

MutationDef defines a named write operation with parameter validation and a handler function. Register mutations with Engine.RegisterMutation.

type MutationFunc added in v0.2.0

type MutationFunc func(ctx context.Context, q Querier, p Params) (*MutationResult, error)

MutationFunc executes a mutation. Identity is available via IdentityFromCtx(ctx). Returns a result containing response data and a list of changes for reactive invalidation.

type MutationResult added in v0.2.0

type MutationResult struct {
	// Data is returned to the client in the response.
	Data any `json:"data,omitempty"`

	// Changes lists the tables/rows affected by this mutation.
	// Arcana auto-notifies these after the handler returns,
	// triggering reactive invalidation of affected views.
	Changes []Change `json:"-"`
}

MutationResult holds the outcome of a mutation.

type Notification

type Notification struct {
	Channel string
	Payload string
}

Notification represents a PostgreSQL notification.

type PGNotifyConfig

type PGNotifyConfig struct {
	Conn       PGNotifyConn
	Channel    string // default: "arcana_changes"
	Logger     *slog.Logger
	MaxRetries int // 0 = unlimited retries
}

PGNotifyConfig configures the PGNotifyListener.

type PGNotifyConn

type PGNotifyConn interface {
	Exec(ctx context.Context, sql string, args ...any) error
	WaitForNotification(ctx context.Context) (*Notification, error)
}

PGNotifyConn abstracts a PostgreSQL connection capable of LISTEN/NOTIFY. Compatible with pgx.Conn.

type PGNotifyListener

type PGNotifyListener struct {
	// contains filtered or unexported fields
}

PGNotifyListener listens to PostgreSQL LISTEN/NOTIFY for change detection.

func NewPGNotifyListener

func NewPGNotifyListener(cfg PGNotifyConfig) *PGNotifyListener

NewPGNotifyListener creates a listener for PostgreSQL notifications.

func (*PGNotifyListener) Start

func (l *PGNotifyListener) Start(ctx context.Context, handler func(Change)) error

Start begins listening on the PostgreSQL notification channel.

func (*PGNotifyListener) Stop

func (l *PGNotifyListener) Stop() error

Stop terminates the listener.

type ParamDef

type ParamDef struct {
	Type         ParamType
	Required     bool
	DefaultValue any
	AllowedVals  []string
}

ParamDef describes a single parameter: its type, constraints, and default value.

type ParamDefBuilder

type ParamDefBuilder struct {
	// contains filtered or unexported fields
}

ParamDefBuilder provides a fluent API for constructing ParamDef values.

func ParamBool

func ParamBool() ParamDefBuilder

ParamBool creates a boolean parameter definition.

func ParamFloat

func ParamFloat() ParamDefBuilder

ParamFloat creates a float parameter definition.

func ParamInt

func ParamInt() ParamDefBuilder

ParamInt creates an integer parameter definition.

func ParamString

func ParamString() ParamDefBuilder

ParamString creates a string parameter definition.

func ParamUUID

func ParamUUID() ParamDefBuilder

ParamUUID creates a UUID parameter definition.

func (ParamDefBuilder) Build

func (b ParamDefBuilder) Build() ParamDef

Build returns the completed ParamDef without setting required or default.

func (ParamDefBuilder) Default

func (b ParamDefBuilder) Default(v any) ParamDef

Default sets a default value and returns the completed definition.

func (ParamDefBuilder) OneOf

func (b ParamDefBuilder) OneOf(vals ...string) ParamDefBuilder

OneOf restricts allowed string values (only for ParamTypeString).

func (ParamDefBuilder) Required

func (b ParamDefBuilder) Required() ParamDef

Required marks the parameter as required.

type ParamSchema

type ParamSchema map[string]ParamDef

ParamSchema defines the expected parameters for a graph subscription.

type ParamType

type ParamType int

ParamType represents the type of a graph parameter.

const (
	ParamTypeString ParamType = iota
	ParamTypeInt
	ParamTypeUUID
	ParamTypeBool
	ParamTypeFloat
)

type Params

type Params struct {
	// contains filtered or unexported fields
}

Params provides typed access to resolved subscription parameters.

func NewParams

func NewParams(values map[string]any) Params

NewParams creates a Params instance from a raw value map.

func (Params) Bool

func (p Params) Bool(key string) bool

Bool returns the parameter value as a bool.

func (Params) Float

func (p Params) Float(key string) float64

Float returns the parameter value as a float64.

func (Params) Int

func (p Params) Int(key string) int

Int returns the parameter value as an int.

func (Params) Raw

func (p Params) Raw() map[string]any

Raw returns the underlying values map.

func (Params) String

func (p Params) String(key string) string

String returns the parameter value as a string.

func (Params) UUID

func (p Params) UUID(key string) string

UUID returns the parameter value as a string (UUID representation).

type PatchOp

type PatchOp struct {
	Op    string `json:"op"`
	Path  string `json:"path"`
	Value any    `json:"value,omitempty"`
}

PatchOp represents a single JSON Patch (RFC 6902) operation.

func DiffFields

func DiffFields(old, new map[string]any) []PatchOp

DiffFields computes a JSON Patch (RFC 6902) between two field maps of the same row. Only top-level keys are compared.

func DiffRefs

func DiffRefs(old, new []Ref) []PatchOp

DiffRefs computes a JSON Patch between two ref lists. Matching is done by table:id, not by position.

type Querier

type Querier interface {
	Query(ctx context.Context, sql string, args ...any) (Rows, error)
	QueryRow(ctx context.Context, sql string, args ...any) Row
}

Querier abstracts database query execution (compatible with pgx).

func PgxQuerier

func PgxQuerier(pool *pgxpool.Pool) Querier

PgxQuerier adapts a pgxpool.Pool to the Querier interface. pgx.Rows and pgx.Row structurally satisfy arcana.Rows and arcana.Row.

type Ref

type Ref struct {
	Table  string         `json:"table"`
	ID     string         `json:"id"`
	Fields []string       `json:"fields"`
	Nested map[string]Ref `json:"nested,omitempty"`
}

Ref represents a reference to a normalized row, defining view structure.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry stores graph definitions, mutation definitions, and provides lookup indices.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates an empty Registry.

func (*Registry) Get

func (r *Registry) Get(key string) (*GraphDef, bool)

Get returns a graph definition by key.

func (*Registry) GetByTable

func (r *Registry) GetByTable(table string) []*GraphDef

GetByTable returns all graph definitions that depend on the given table.

func (*Registry) GetMutation added in v0.2.0

func (r *Registry) GetMutation(key string) (*MutationDef, bool)

GetMutation returns a mutation definition by key.

func (*Registry) GraphCount

func (r *Registry) GraphCount() int

GraphCount returns the number of registered graphs.

func (*Registry) HasColumn

func (r *Registry) HasColumn(table, column string) bool

HasColumn checks if a table+column is tracked by any graph.

func (*Registry) Keys

func (r *Registry) Keys() []string

Keys returns all registered graph keys.

func (*Registry) MutationKeys added in v0.2.0

func (r *Registry) MutationKeys() []string

MutationKeys returns all registered mutation keys.

func (*Registry) Register

func (r *Registry) Register(defs ...GraphDef) error

Register adds one or more graph definitions to the registry. Returns an error if a graph key is already registered.

func (*Registry) RegisterMutation added in v0.2.0

func (r *Registry) RegisterMutation(defs ...MutationDef) error

RegisterMutation adds one or more mutation definitions to the registry.

func (*Registry) RepTable

func (r *Registry) RepTable() map[string][]string

RepTable returns the representation table: table → sorted column list. This is the union of all columns declared in graph Deps for each table.

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result accumulates the output of a graph Factory execution. It holds both normalized table rows and the ref structure that defines the view.

func NewResult

func NewResult() *Result

NewResult creates an empty Result.

func (*Result) AddRef

func (r *Result) AddRef(ref Ref)

AddRef appends a reference to the result's ref list.

func (*Result) AddRow

func (r *Result) AddRow(table, id string, fields map[string]any)

AddRow adds a normalized row to the result's table data. If the row already exists, fields are merged (new values overwrite).

func (*Result) Refs

func (r *Result) Refs() []Ref

Refs returns the accumulated references.

func (*Result) RowCount

func (r *Result) RowCount() int

RowCount returns the total number of rows across all tables.

func (*Result) SetTotal added in v0.1.3

func (r *Result) SetTotal(n int)

SetTotal stores the total row count for paginated graph results. This is typically populated from a COUNT(*) OVER() window function.

func (*Result) Tables

func (r *Result) Tables() map[string]map[string]map[string]any

Tables returns the accumulated table data. Returns: table_name → row_id → fields.

func (*Result) Total added in v0.1.3

func (r *Result) Total() int

Total returns the total row count set by SetTotal. Returns 0 if SetTotal was never called.

type Row

type Row interface {
	Scan(dest ...any) error
}

Row abstracts a single-row result from a database query.

type Rows

type Rows interface {
	Next() bool
	Scan(dest ...any) error
	Close()
	Err() error
}

Rows abstracts a result set from a database query.

type StoreRow

type StoreRow struct {
	Fields   map[string]any
	Version  int64
	RefCount int32 // managed atomically
	// contains filtered or unexported fields
}

StoreRow is a single row in the normalized store.

type SubscribeRequest

type SubscribeRequest struct {
	GraphKey    string
	Params      map[string]any
	SeanceID    string
	UserID      string
	WorkspaceID string
}

SubscribeRequest contains the parameters for a Subscribe call.

type SubscribeResponse

type SubscribeResponse struct {
	ParamsHash string                               `json:"params_hash"`
	Version    int64                                `json:"version"`
	Refs       []Ref                                `json:"refs"`
	Tables     map[string]map[string]map[string]any `json:"tables"`
	Total      int                                  `json:"total,omitempty"`
	SeanceID   string                               `json:"seance_id"`
}

SubscribeResponse contains the initial data returned to the client.

type Subscription

type Subscription struct {
	ID          string
	SeanceID    string
	WorkspaceID string
	UserID      string
	GraphKey    string
	Params      map[string]any
	ParamsHash  string
	LastRefs    []Ref
	Version     int64
	CreatedAt   time.Time

	// VersionHistory stores recent view_diff messages for catch-up sync.
	// Ring buffer of size SnapshotThreshold.
	VersionHistory []VersionEntry
}

Subscription represents an active client subscription to a graph view.

type SubscriptionStore

type SubscriptionStore interface {
	// GetByGraphKey returns all active subscriptions for a graph key.
	GetByGraphKey(graphKey string) []*Subscription

	// UpdateSubscription updates a subscription's LastRefs, Version, and records
	// a version history entry for catch-up sync.
	UpdateSubscription(subID string, refs []Ref, version int64, entry *VersionEntry)
}

SubscriptionStore provides access to active subscriptions for invalidation.

type SyncRequest

type SyncRequest struct {
	SeanceID    string
	WorkspaceID string
	Views       []SyncViewRequest
}

SyncRequest contains the parameters for a Sync call.

type SyncResponse

type SyncResponse struct {
	Views []SyncViewResponse `json:"views"`
}

SyncResponse contains the sync result for the client.

type SyncViewRequest

type SyncViewRequest struct {
	View       string `json:"view"`
	ParamsHash string `json:"params_hash"`
	Version    int64  `json:"version"`
}

SyncViewRequest describes a single view to sync.

type SyncViewResponse

type SyncViewResponse struct {
	View       string         `json:"view"`
	ParamsHash string         `json:"params_hash"`
	Mode       string         `json:"mode"` // "catch_up" or "snapshot"
	Patches    []VersionEntry `json:"patches,omitempty"`
	// Full snapshot fields (used when mode == "snapshot")
	Version int64                                `json:"version,omitempty"`
	Refs    []Ref                                `json:"refs,omitempty"`
	Tables  map[string]map[string]map[string]any `json:"tables,omitempty"`
	Total   int                                  `json:"total,omitempty"`
}

SyncViewResponse contains the sync result for a single view.

type TableDep

type TableDep struct {
	Table   string
	Columns []string
}

TableDep declares a dependency on a database table and specific columns. When any of these columns change, graphs depending on this table are invalidated.

type TableStore

type TableStore struct {
	// contains filtered or unexported fields
}

TableStore holds all rows for a single table within a workspace.

type Transport

type Transport interface {
	// SendToSeance delivers a message to a specific client session.
	SendToSeance(ctx context.Context, seanceID string, msg Message) error

	// SendToWorkspace delivers a message to all sessions in a workspace.
	SendToWorkspace(ctx context.Context, workspaceID string, msg Message) error

	// DisconnectSeance forcibly disconnects a client session.
	DisconnectSeance(ctx context.Context, seanceID string) error
}

Transport abstracts the message delivery layer (Centrifugo, WebSocket, SSE).

type TriggerExec

type TriggerExec interface {
	Exec(ctx context.Context, sql string, args ...any) (any, error)
}

TriggerExec can execute DDL statements. *pgxpool.Pool satisfies this.

type VersionEntry

type VersionEntry struct {
	Version   int64                                `json:"version"`
	RefsPatch []PatchOp                            `json:"refs_patch"`
	Tables    map[string]map[string]map[string]any `json:"tables,omitempty"`
}

VersionEntry records a single version's diff for catch-up sync.

type WALConfig added in v0.1.2

type WALConfig struct {
	// ConnString is the PostgreSQL connection string.
	// Must NOT include replication=database — it will be added automatically.
	ConnString string

	// SlotName is the replication slot name. Default: "arcana_slot".
	SlotName string

	// Publication is the publication name. Default: "arcana_pub".
	Publication string

	// Tables to replicate. If empty, all tables in the publication are used.
	Tables []string

	// Logger for structured logging.
	Logger *slog.Logger

	// StandbyTimeout is how often to send standby status updates. Default: 10s.
	StandbyTimeout time.Duration
}

WALConfig configures the WALListener.

type WALListener added in v0.1.2

type WALListener struct {
	// contains filtered or unexported fields
}

WALListener implements ChangeDetector using PostgreSQL logical replication. Requires wal_level=logical on the PostgreSQL server.

func NewWALListener added in v0.1.2

func NewWALListener(cfg WALConfig) *WALListener

NewWALListener creates a WAL-based change detector.

func (*WALListener) Start added in v0.1.2

func (l *WALListener) Start(ctx context.Context, handler func(Change)) error

Start begins listening for WAL changes.

func (*WALListener) Stop added in v0.1.2

func (l *WALListener) Stop() error

Stop terminates the WAL listener.

type WSTransport added in v0.2.0

type WSTransport struct {
	// contains filtered or unexported fields
}

WSTransport is a built-in WebSocket transport implementing Transport and http.Handler.

func NewWSTransport added in v0.2.0

func NewWSTransport(cfg WSTransportConfig) *WSTransport

NewWSTransport creates a built-in WebSocket transport.

func (*WSTransport) ConnStats added in v0.2.0

func (t *WSTransport) ConnStats() (conns int, workspaces int)

ConnStats returns connection count and workspace count for WSTransport.

func (*WSTransport) DisconnectSeance added in v0.2.0

func (t *WSTransport) DisconnectSeance(ctx context.Context, seanceID string) error

DisconnectSeance forcibly disconnects a client session.

func (*WSTransport) SendToSeance added in v0.2.0

func (t *WSTransport) SendToSeance(ctx context.Context, seanceID string, msg Message) error

SendToSeance delivers a message to a specific client session.

func (*WSTransport) SendToWorkspace added in v0.2.0

func (t *WSTransport) SendToWorkspace(ctx context.Context, workspaceID string, msg Message) error

SendToWorkspace delivers a message to all sessions in a workspace.

func (*WSTransport) ServeHTTP added in v0.2.0

func (t *WSTransport) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP handles WebSocket upgrade requests.

type WSTransportConfig added in v0.2.0

type WSTransportConfig struct {
	// AuthFunc authenticates the HTTP upgrade request (cookies, headers).
	// If nil, the engine's AuthFunc is used automatically.
	AuthFunc AuthFunc

	// TokenAuthFunc authenticates a token sent in the first WS message.
	// Used for SPA/mobile clients where cookies are unavailable.
	// If both AuthFunc and TokenAuthFunc are nil, connections are rejected.
	TokenAuthFunc func(token string) (*Identity, error)

	// WriteBufferSize is the number of messages buffered per connection
	// before drops occur. Default: 256.
	WriteBufferSize int

	// PingInterval is how often to send WebSocket pings. Default: 30s.
	PingInterval time.Duration

	// PingTimeout is how long to wait for a pong before disconnecting. Default: 10s.
	PingTimeout time.Duration

	// AcceptOptions are passed to websocket.Accept for the upgrade.
	AcceptOptions *websocket.AcceptOptions
}

WSTransportConfig configures the built-in WebSocket transport.

type WorkspaceStore

type WorkspaceStore struct {
	// contains filtered or unexported fields
}

WorkspaceStore holds all table data for a single workspace.

Directories

Path Synopsis
cmd
arcana-gen command
Command arcana-gen generates TypeScript type definitions from a registered Arcana graph registry.
Command arcana-gen generates TypeScript type definitions from a registered Arcana graph registry.
examples
basic command
Example: basic Arcana setup with explicit notify.
Example: basic Arcana setup with explicit notify.
dashboard command
Example: real-time dashboard with WebSocket transport, mutations, and DevTools.
Example: real-time dashboard with WebSocket transport, mutations, and DevTools.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL