loge

package module
v0.0.0-...-79852d6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2026 License: MIT Imports: 46 Imported by: 0

README

loge

"Let's capture your logs!"

loge captures logs into compressed SQLite files and lets you query them by time range, labels, and log-line content. It speaks a Grafana-Loki-style push API and ships a loge search CLI for querying from the terminal.

How it works

POST /api/v1/push ──► bucket workers batch ──► flush to <bucket>.sqlite.zst
        │                                              │
        └──► write-ahead log (fsync) ──► 200 OK        ▼
                                              background compactor merges
                                              many small files into larger
                                              segment-*.sqlite.zst with a
                                              trigram line index + time index
  • Ingest is batched in memory and flushed to small, immutable, zstd-compressed SQLite files. With the write-ahead log enabled (default) each payload is fsynced before /push is acknowledged, so an acknowledged log is not lost on a crash (at-least-once; replayed on restart). A checkpoint keeps the log bounded: once a payload is durably in a queryable segment, its log record is pruned, so the WAL only ever holds the un-flushed tail (seconds of data) rather than the whole session.
  • Compaction runs in the background, merging the many small flush files into fewer, larger, time-local segments and building the expensive indexes once per segment instead of once per flush. Each segment is recorded in a small local catalog (catalog.sqlite) with its time bounds and label keys.
  • Queries ask the catalog for the segments whose time bounds overlap the requested window (pruning the rest without opening any file), match labels with json_extract, accelerate line search with the segment trigram index, fan out across the surviving segments in parallel, and merge results newest-first.
  • S3 tiering (optional): segments older than --s3-rotate-age are rotated to S3 and the catalog is flipped to point at their public URL; recent segments stay local. Queries read cold (S3) segments back over HTTP via the seekable-zstd VFS, fetching only the bytes an indexed query needs (with decoded frames cached). Pruning means most queries touch S3 zero times; an S3 outage degrades a query to its local results rather than failing. Ingest durability is unaffected — S3 only ever receives already-durable, compacted segments.

HTTP API

POST /api/v1/push

Accepts a stream payload as application/json, application/msgpack, or application/protobuf:

{
  "streams": [
    { "stream": { "app": "web" }, "values": [["<unix_ns>", "log line"]] }
  ]
}

Returns 200 OK when the payload is durably logged (write-ahead log enabled), or 202 Accepted when running with --durable=false (async, lost on crash). Invalid payloads (no streams, missing labels/values) return 400.

GET /api/v1/labels

Returns the set of label keys across all files: { "status": "success", "data": [...] }.

POST /api/v1/query

Body (all fields optional):

{
  "start": 1700000000000000000,
  "end": 1700000009000000000,
  "matchers": [{ "name": "app", "value": "web", "type": "=" }],
  "line": "error",
  "limit": 100
}

start/end are inclusive nanosecond timestamps (0 = unbounded). Matcher type is one of =, !=, =~, !~ (regex matchers are applied in Go). line is a substring filter on the log line. Response:

{
  "status": "success",
  "data": [
    {
      "timestamp": 1700000000000000000,
      "line": "...",
      "labels": { "app": "web" }
    }
  ]
}

Instead of structured matchers/line, you can pass a single LogQL-style selector as query (e.g. { "query": "{app=\"web\"} |= \"error\"" }); the server parses it into matchers + a line filter, the same syntax loge search and the web UI use.

When --api-key is set, this endpoint (and /labels, /stats, /search/plan) requires Authorization: Bearer <key>; ingest (/push) stays open.

Client libraries

  • Goslogloge is a log/slog handler that ships logs to /api/v1/push with batching on a background goroutine. Standard library only; importing it does not pull in the server's dependencies.
  • Ruby / Rails — the loge gem is an asynchronous, batching ::Logger plus a Railtie that broadcasts Rails.logger to loge when config.loge.endpoint (or LOGE_ENDPOINT) is set. Stdlib only.

Both render each record as a JSON line, send the timestamp out-of-band, and attach the severity as a level stream label.

Web UI

The server hosts a small web UI at / so you can query logs from a browser when the CLI isn't handy. It runs the same LogQL selectors, supports light/dark mode, and can live-tail by polling. When --api-key is set it prompts for the key (stored in the browser and sent as a bearer token); otherwise it's open. The UI is built from web/ into webdist/ and embedded into the binary, so no extra serving is needed — just open http://localhost:3000/.

loge search queries a running server over the POST /api/v1/query API using a LogQL-style selector, so you can read logs from the terminal without crafting JSON. It builds the same request the API consumes, so its results match the server.

loge search '{app="web", level=~"err.*"} |= "timeout"' --addr http://localhost:3000 --since 1h

The positional selector has two parts:

  • Stream-label matchers in { ... } — comma-separated name op "value" pairs, where op is one of =, !=, =~, !~ (the same operators as the query API; =~/!~ are regex). These filter the JSON stream labels. The block may be empty ({}) or omitted when only a keyword filter is given.
  • An optional keyword filter |= "term" — a substring matched against the log line with a LIKE scan, with a per-segment binary-fuse trigram filter pruning segments that can't contain it. Only |= is supported (regex/negated line filters are not), and term must be at least 3 characters so the trigram filter applies.
Flag Default Description
--addr http://localhost:3000 base URL of the running loge server
--since relative window start, e.g. 1h, 30m (from now)
--until relative window end, e.g. 5m ago (from now)
--start absolute window start: RFC3339 or unix nanoseconds (overrides --since)
--end absolute window end: RFC3339 or unix nanoseconds (overrides --until)
--limit 100 max results (server caps at 5000)
--output text output format: text or json
# all "web" logs in the last hour, human-readable
loge search '{app="web"}' --since 1h

# errors containing a keyword, as JSON, over an absolute window
loge search '{level=~"err.*"} |= "connection refused"' \
  --start 2026-06-01T00:00:00Z --end 2026-06-02T00:00:00Z --output json

# keyword-only search across all streams
loge search '|= "panic"'

Server flags

The server is the default command, so loge and loge serve are equivalent.

Flag Default Description
--port 3000 HTTP server port
--buckets 4 number of in-memory bucket workers
--payload-size 1000 batch size before a forced flush
--output-path tmp/ directory for the SQLite files
--flush-interval 1s how often a bucket flushes a non-empty batch
--compact-interval 30s how often to merge small files into segments (0 disables)
--compact-min-files 8 minimum flush files before a compaction pass runs
--durable true fsync each payload to the write-ahead log before acknowledging
--checkpoint-interval 2s how often to fsync new segments and prune the write-ahead log
--drop-on-backpressure false drop instead of blocking when the flush pipeline is saturated (ignored when --durable)
--query-concurrency 8 max segments a query opens in parallel
--api-key "" shared secret (bearer token) required to read/query and to load the web UI's data; empty disables auth
--s3-bucket "" S3 bucket to rotate old segments into (empty disables S3 tiering)
--s3-prefix loge/ key prefix for uploaded segments
--s3-endpoint "" custom S3 endpoint (e.g. MinIO); empty uses AWS
--s3-region "" S3 region (else from the AWS environment)
--s3-force-path-style false path-style addressing (needed for MinIO)
--s3-read-url-base "" public/CDN base URL reads are served from; empty derives it
--s3-acl "" canned ACL for uploads (e.g. public-read); empty relies on bucket policy
--s3-rotate-age 1h rotate local segments older than this to S3
--s3-rotate-interval 1m how often the rotation loop runs
--s3-rotate-grace 1m keep a rotated segment's local copy this long before deleting it
--s3-frame-cache-size 512 per-file decoded zstd frames cached for segment reads (each frame is ~64 KiB)

S3 reads are public, path-based HTTP GETs (auth must not ride in the URL query string — go-sqlite3 strips it), so the bucket/prefix must be readable without signed query params (public objects, a bucket policy, or a CDN). Upload credentials come from the standard AWS chain (env / shared config / IAM role).

Development

task test    # ginkgo -race
task ruby:test # rspec suite for the Ruby gem (ruby/)
task bench    # k6 push benchmarks (msgpack/json/protobuf)
task web     # bundle the web UI (web/ -> webdist/, esbuild)
task server   # run a local server (rebuilds the web UI first)

go build -o loge ./loge/main.go   # build the `loge` binary (serve + search)

The web UI bundle in webdist/ is committed so go build/go install work without a Node toolchain. After editing anything under web/, run task web and commit the regenerated webdist/.

Documentation

Overview

Package loge is an embedded log-storage engine: it ingests log payloads into rotating SQLite segments, compresses them with zstd, optionally tiers them to S3, and serves LogQL-style queries over the local and remote segments.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MarshalLabels

func MarshalLabels(tags map[string]string) string

MarshalLabels renders a label set as a JSON object string, escaping keys and values. Key order follows Go's map iteration order and is not sorted.

func ParseSelector

func ParseSelector(input string) ([]managers.Matcher, string, error)

ParseSelector parses a LogQL-style selector into stream-label matchers and an optional substring line filter. The accepted grammar is:

{name op "value", ...} |= "keyword"

where op is one of =, !=, =~, !~ (the four stream-label operators), and the optional trailing |= "keyword" is the only supported line filter — a substring match (a LIKE scan, with a per-segment binary-fuse trigram filter pruning segments that cannot contain it). The brace block may be empty ({}) or omitted entirely when only a |= filter is given. The keyword must be at least minTrigramLen characters so the trigram filter applies.

func RemoveWAL

func RemoveWAL(dir string) error

RemoveWAL deletes all log segments in dir. It is safe to call only after the ingest pipeline has durably flushed every replayed and appended payload (i.e. after a clean shutdown).

func ReplaySegments

func ReplaySegments(segments []string, fn func(*Payload)) (int, error)

ReplaySegments reads the given segments in order and invokes fn for each intact record. A torn tail (short read or CRC mismatch) ends a segment's replay without erroring, since it represents an interrupted write. It returns the number of records replayed.

func ReplayWAL

func ReplayWAL(dir string, fn func(*Payload)) (int, error)

ReplayWAL replays every segment found in dir.

Types

type BucketOption

type BucketOption func(*Buckets)

BucketOption configures optional Buckets behaviour.

func WithDurableReport

func WithDurableReport(fn func(filename string, seqs []uint64)) BucketOption

WithDurableReport registers a callback invoked after a file has been flushed and compressed, reporting the WAL sequence numbers it contains so they can be checkpointed.

func WithFlushCompression

func WithFlushCompression(level zstd.EncoderLevel) BucketOption

WithFlushCompression overrides the zstd level used to compress short-lived flush files on the latency-sensitive ingest path. Lower levels (SpeedFastest/SpeedDefault) cut compressor-worker CPU under sustained load; the durable on-disk/S3 size is governed by compaction (SpeedBestCompression), which recompresses surviving segments, so the flush-tier level trades hot-path CPU for the size of transient, soon-recompacted files.

func WithFlushCompressionName

func WithFlushCompressionName(name string) (BucketOption, error)

WithFlushCompressionName returns a flush-compression BucketOption selected by CLI level name (fastest/default/better/best). An unknown name is an error.

func WithFlushInterval

func WithFlushInterval(interval time.Duration) BucketOption

WithFlushInterval overrides how often a bucket worker flushes a non-empty batch. A non-positive duration is ignored.

func WithFlushQueue

func WithFlushQueue(n int) BucketOption

WithFlushQueue overrides the depth of the flush and compress job queues, which default to the bucket count. A deeper queue absorbs ingest bursts before backpressure engages, at the cost of memory; a non-positive value keeps the default.

func WithFlushWorkers

func WithFlushWorkers(n int) BucketOption

WithFlushWorkers overrides the number of flush and compress worker goroutines. They default to max(buckets/2, 2); a non-positive value keeps that default. This is the main lever for flush throughput on a machine with more CPUs than buckets/2, decoupling flush parallelism from the (memory-heavy) bucket count.

func WithWAL

func WithWAL(wal *WAL) BucketOption

WithWAL makes Append durably log each payload (and assign it a sequence number) before queueing it.

type Buckets

type Buckets struct {
	// contains filtered or unexported fields
}

Buckets is the ingest front end: it shards incoming payloads across a fixed set of buckets, each batching and flushing to its own SQLite segment files.

func NewBuckets

func NewBuckets(
	ctx context.Context,
	size int,
	payloadSize int,
	outputPath string,
	dropOnBackpressure bool,
	opts ...BucketOption,
) (*Buckets, error)

NewBuckets creates size buckets that flush under outputPath, batching up to payloadSize rows per segment. When dropOnBackpressure is set, Append drops payloads instead of blocking once the bucket queues are full.

func (*Buckets) Append

func (b *Buckets) Append(payload Payload) error

Append queues a payload for flushing. When a write-ahead log is attached the payload is durably logged (and assigned a sequence number) before queueing, and any error is returned so the caller can reject the request; otherwise it always succeeds.

func (*Buckets) Close

func (b *Buckets) Close() error

Close gracefully shuts down all bucket workers, flushers, and compressors. It ensures all in-flight data is flushed before returning.

type CLI

type CLI struct {
	Serve  ServeCmd  `cmd:"" default:"withargs"                                             help:"run the loge ingest+query HTTP server (default)"`
	Search SearchCmd `cmd:"" help:"query a running loge server with a LogQL-style selector"`
}

CLI is the top-level command. The server is the default command (so bare flags like `loge --port 6500` still start it), and `loge search` queries a running server. CLI itself has no Run() method on purpose: kong invokes the Run() of every selected node's ancestors, so a CLI.Run() would also fire for the search subcommand.

type Checkpointer

type Checkpointer struct {
	// contains filtered or unexported fields
}

Checkpointer bounds the write-ahead log. It receives reports that segments have been written, fsyncs those segments (so the data is durable on its own), then advances the WAL's durable watermark and lets it delete the now-redundant log segments. Reports for files that were already compacted away are treated as durable, since the compactor fsyncs its merged segment before deleting the sources.

func NewCheckpointer

func NewCheckpointer(wal *WAL, interval time.Duration) *Checkpointer

NewCheckpointer creates a checkpointer for wal. A non-positive interval uses the default.

func (*Checkpointer) Report

func (c *Checkpointer) Report(filename string, seqs []uint64)

Report notes that filename is now a durable segment containing seqs. It is the callback passed to buckets via WithDurableReport and is safe to call after Stop (the report is simply dropped).

func (*Checkpointer) Stop

func (c *Checkpointer) Stop()

Stop ends the checkpointer. It must be called only after the flush/compress pipeline has drained (so no further reports arrive).

type Compactor

type Compactor struct {
	// contains filtered or unexported fields
}

Compactor merges the many small per-flush SQLite files into fewer, larger time-local "segment" files, building the expensive trigram line index and the timestamp/label indexes once per segment instead of once per flush.

func NewCompactor

func NewCompactor(dir string, minFiles, maxFiles int, interval time.Duration, opts ...CompactorOption) *Compactor

NewCompactor builds a Compactor for dir. Zero values select defaults.

func (*Compactor) Compact

func (c *Compactor) Compact(ctx context.Context) (int, error)

Compact merges one batch of the oldest flush files into a single segment. It returns the number of source files merged (0 when there is nothing to do).

func (*Compactor) Run

func (c *Compactor) Run(ctx context.Context)

Run compacts on a ticker until ctx is cancelled.

type CompactorOption

type CompactorOption func(*Compactor)

CompactorOption configures optional Compactor behaviour.

func WithCompactorCatalog

func WithCompactorCatalog(catalog *managers.Catalog) CompactorOption

WithCompactorCatalog records each new segment in the catalog so the query path can prune by time without opening files.

type DefaultJSONSerializer

type DefaultJSONSerializer struct{}

DefaultJSONSerializer implements JSON encoding using github.com/goccy/go-json.

func (DefaultJSONSerializer) Deserialize

func (d DefaultJSONSerializer) Deserialize(c *echo.Context, i interface{}) error

Deserialize reads a JSON from a request body and converts it into an interface.

func (DefaultJSONSerializer) Serialize

func (d DefaultJSONSerializer) Serialize(c *echo.Context, i interface{}, indent string) error

Serialize converts an interface into a json and writes it to the response. You can optionally use the indent parameter to produce pretty JSONs.

type Entry

type Entry struct {
	Stream Stream `json:"stream" msg:"stream"`
	Values Values `json:"values" msg:"values"`
}

Entry pairs a stream's labels with its log values.

func (*Entry) DecodeMsg

func (z *Entry) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*Entry) EncodeMsg

func (z *Entry) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*Entry) MarshalMsg

func (z *Entry) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Entry) Msgsize

func (z *Entry) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Entry) UnmarshalMsg

func (z *Entry) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type LabelResponse

type LabelResponse struct {
	Status string   `json:"status" msg:"status"`
	Data   []string `json:"data"   msg:"data"`
}

LabelResponse is the JSON body returned by the label-listing endpoints.

func (*LabelResponse) DecodeMsg

func (z *LabelResponse) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*LabelResponse) EncodeMsg

func (z *LabelResponse) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*LabelResponse) MarshalMsg

func (z *LabelResponse) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*LabelResponse) Msgsize

func (z *LabelResponse) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*LabelResponse) UnmarshalMsg

func (z *LabelResponse) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type ObjectStore

type ObjectStore interface {
	// Put uploads the file at localPath under key and returns the public URL it
	// can be read back from over HTTP.
	Put(ctx context.Context, key, localPath string) (readURL string, err error)
	// Size returns the size of the stored object, for upload verification.
	Size(ctx context.Context, key string) (int64, error)
	// List returns the object keys stored under prefix.
	List(ctx context.Context, prefix string) ([]string, error)
	// ReadURL returns the public HTTP URL a key is read back from.
	ReadURL(key string) string
}

ObjectStore is the remote storage the uploader rotates segments to. It is an interface so rotation can be tested without real S3.

type Payload

type Payload struct {
	Streams Streams `json:"streams" msg:"streams"`
}

Payload is the wire form of a push request: one or more labeled streams.

func (*Payload) DecodeMsg

func (z *Payload) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*Payload) EncodeMsg

func (z *Payload) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*Payload) MarshalMsg

func (z *Payload) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Payload) Msgsize

func (z *Payload) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Payload) UnmarshalMsg

func (z *Payload) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*Payload) Valid

func (p *Payload) Valid() (string, bool)

Valid reports whether the payload is well-formed. On failure it returns a human-readable reason and false; on success it returns "" and true.

type PlanResponse

type PlanResponse struct {
	Status    string                `json:"status"`
	Hot       []managers.QueryEntry `json:"hot"`
	Segments  []PlanSegment         `json:"segments"`
	ExpiresAt int64                 `json:"expires_at"`
}

PlanResponse answers a client-side search plan request. The server scans the hot tier itself (Hot) and hands the client the pruned, presigned cold segments (Segments) to scan on its own machine; ExpiresAt is when the presigned URLs stop working.

type PlanSegment

type PlanSegment struct {
	ID  string `json:"id"`
	URL string `json:"url"`
}

PlanSegment is one cold-tier (S3) segment in a client-side search plan: its catalog ID and a short-lived presigned URL the client reads it from directly.

type QueryResponse

type QueryResponse struct {
	Status string                `json:"status"`
	Data   []managers.QueryEntry `json:"data"`
}

QueryResponse is the JSON shape returned by the query endpoint.

type S3Config

type S3Config struct {
	Bucket         string
	Prefix         string
	Endpoint       string // custom endpoint for S3-compatible stores (MinIO); empty = AWS
	Region         string
	ForcePathStyle bool
	ReadURLBase    string // public/CDN base for reads; empty = derived
	ACL            string // optional canned ACL, e.g. "public-read"
}

S3Config configures an S3Store. Credentials come from the standard AWS chain (environment, shared config, IAM role) — never from flags.

type S3Store

type S3Store struct {
	// contains filtered or unexported fields
}

S3Store is an ObjectStore backed by S3 (or any S3-compatible endpoint) via aws-sdk-go-v2. Reads happen out-of-band as plain public HTTP GETs (see ReadURLBase), so the store is only used for the write/verify path.

func NewS3Store

func NewS3Store(ctx context.Context, cfg S3Config) (*S3Store, error)

NewS3Store builds an S3Store from cfg.

func (*S3Store) List

func (s *S3Store) List(ctx context.Context, prefix string) ([]string, error)

List returns every object key stored under prefix, paginating through the bucket listing.

func (*S3Store) Presign

func (s *S3Store) Presign(ctx context.Context, key string, expiry time.Duration) (string, error)

Presign returns a short-lived presigned GET URL for key, valid for expiry. The URL carries the SigV4 signature in its query string, so a client can read a private object directly from S3 without AWS credentials. It is signed against the real S3 endpoint/bucket (not ReadURLBase/CDN), so it works for private buckets, and because Range is not a signed header the same URL serves every range read of a segment.

func (*S3Store) Put

func (s *S3Store) Put(ctx context.Context, key, localPath string) (string, error)

Put uploads localPath under key (multipart) and returns its public read URL.

func (*S3Store) ReadURL

func (s *S3Store) ReadURL(key string) string

ReadURL returns the public, path-based HTTP URL a key is read back from.

func (*S3Store) Size

func (s *S3Store) Size(ctx context.Context, key string) (int64, error)

Size returns the stored object's size via HeadObject.

type SearchCmd

type SearchCmd struct {
	Selector string `arg:"" help:"LogQL-style selector, e.g. '{app=\"web\"} |= \"timeout\"'"`

	Addr   string        `default:"http://localhost:3000"                                              help:"base URL of the loge server"`
	Since  time.Duration `help:"relative window start, e.g. 1h (from now); ignored if --start is set"`
	Until  time.Duration `help:"relative window end, e.g. 5m ago (from now); ignored if --end is set"`
	Start  string        `help:"absolute window start: RFC3339 or unix nanoseconds; overrides --since"`
	End    string        `help:"absolute window end: RFC3339 or unix nanoseconds; overrides --until"`
	Limit  int           `default:"100"                                                                help:"max results (server caps at 5000)"`
	Output string        `` /* 138-byte string literal not displayed */

	// Client-side ("--local") search: instead of having the server scan
	// everything, fetch a plan and scan the cold (S3) segments on this machine,
	// moving the heavy historical scan off the server.
	Local          bool   `help:"scan the cold (S3) segments on this machine; the server only scans its hot tier"`
	APIKey         string `` /* 171-byte string literal not displayed */
	Concurrency    int    `` /* 175-byte string literal not displayed */
	FrameCacheSize int    `` /* 180-byte string literal not displayed */

	// Out is where results are written; nil defaults to os.Stdout. Excluded from
	// kong flag parsing so tests can capture output.
	Out io.Writer `kong:"-"`
}

SearchCmd queries a running loge server over its HTTP API using a LogQL-style selector. It is a thin client: it builds the same managers.QueryRequest the server's POST /api/v1/query handler consumes, so CLI results match the server.

func (*SearchCmd) Run

func (c *SearchCmd) Run() error

Run executes the search subcommand: it parses the selector and queries the server (or, in --local mode, the segments directly) and writes the results.

type ServeCmd

type ServeCmd struct {
	Port               int           `` /* 172-byte string literal not displayed */
	Buckets            int           `` /* 172-byte string literal not displayed */
	PayloadSize        int           `` /* 172-byte string literal not displayed */
	OutputPath         string        `` /* 172-byte string literal not displayed */
	DropOnBackpressure bool          `default:"false"  help:"drop data instead of blocking when backpressure occurs"`
	FlushInterval      time.Duration `default:"1s"     help:"how often a bucket flushes a non-empty batch"`
	FlushCompression   string        `` /* 334-byte string literal not displayed */
	FlushWorkers       int           `` /* 153-byte string literal not displayed */
	FlushQueue         int           `` /* 160-byte string literal not displayed */
	CompactInterval    time.Duration `default:"30s"    help:"how often to compact small files into segments (0 disables)"`
	CompactMinFiles    int           `default:"8"      help:"minimum number of flush files before a compaction pass runs"`
	Durable            bool          `default:"true"   help:"write-ahead log each payload (fsync) before acknowledging; disable for faster, lossy-on-crash ingest"`
	CheckpointInterval time.Duration `default:"2s"     help:"how often to fsync new segments and prune the write-ahead log"`
	QueryConcurrency   int           `default:"8"      help:"max segments a query opens in parallel"`

	// Profiling (opt-in). The pprof listener binds to loopback only because
	// heap profiles can expose log contents.
	PprofPort          int `default:"0" help:"serve net/http/pprof on 127.0.0.1:<port> (0 disables)"                            name:"pprof-port"`
	PprofBlockRate     int `default:"0" help:"runtime.SetBlockProfileRate sampling rate in ns (0 disables block profiling)"     name:"pprof-block-rate"`
	PprofMutexFraction int `` /* 127-byte string literal not displayed */

	// S3 tiered storage (rotate old segments to S3, read them back over HTTP).
	// Leave --s3-bucket empty to keep everything local.
	S3Bucket         string        `` /* 197-byte string literal not displayed */
	S3Prefix         string        `default:"loge/" help:"key prefix for uploaded segments"                                                     name:"s3-prefix"`
	S3Endpoint       string        `` /* 199-byte string literal not displayed */
	S3Region         string        `` /* 197-byte string literal not displayed */
	S3ForcePathStyle bool          `` /* 134-byte string literal not displayed */
	S3ReadURLBase    string        `` /* 131-byte string literal not displayed */
	S3ACL            string        `default:""      help:"canned ACL for uploaded objects (e.g. public-read); empty relies on bucket policy"    name:"s3-acl"`
	S3RotateAge      time.Duration `` /* 128-byte string literal not displayed */
	S3RotateInterval time.Duration `` /* 133-byte string literal not displayed */
	S3RotateGrace    time.Duration `` /* 130-byte string literal not displayed */
	S3FrameCacheSize int           `` /* 134-byte string literal not displayed */
	S3PresignExpiry  time.Duration `` /* 132-byte string literal not displayed */

	// APIKey gates the read + search endpoints (/query, /labels, /stats,
	// /search/plan) and the web UI's data calls. When set, callers must present
	// it as a bearer token; when empty everything is unauthenticated (matching
	// the trusted-network posture). Ingest (/push) and the static UI assets stay
	// open regardless, so the login page can load.
	APIKey string `` /* 159-byte string literal not displayed */
}

ServeCmd runs the ingest+query HTTP server. Its fields are every server flag.

func (*ServeCmd) Run

func (c *ServeCmd) Run() error

Run starts the loge ingest+query HTTP server and blocks until it shuts down.

type StatsResponse

type StatsResponse struct {
	Status         string `json:"status"`
	SegmentsTotal  int    `json:"segments_total"`
	SegmentsLocal  int    `json:"segments_local"`
	SegmentsRemote int    `json:"segments_remote"`
	RowsTotal      int64  `json:"rows_total"`
	MinTimestamp   int64  `json:"min_timestamp"`
	MaxTimestamp   int64  `json:"max_timestamp"`
}

StatsResponse summarizes the catalog (segment tiering, row count, time span) for benchmarking and observability.

type Stream

type Stream map[string]string

Stream is the set of label key/value pairs identifying a log stream.

func (*Stream) DecodeMsg

func (z *Stream) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (Stream) EncodeMsg

func (z Stream) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (Stream) MarshalMsg

func (z Stream) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (Stream) Msgsize

func (z Stream) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Stream) UnmarshalMsg

func (z *Stream) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type Streams

type Streams []Entry

Streams is a list of label-keyed entries, the body of a Payload.

func (*Streams) DecodeMsg

func (z *Streams) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (Streams) EncodeMsg

func (z Streams) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (Streams) MarshalMsg

func (z Streams) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (Streams) Msgsize

func (z Streams) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Streams) UnmarshalMsg

func (z *Streams) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type Uploader

type Uploader struct {
	// contains filtered or unexported fields
}

Uploader rotates compacted segments older than a threshold to an ObjectStore (cold tier), flips the catalog to point at the remote copy, and deletes the local copy after a grace window. Recent segments stay local (hot tier).

func NewUploader

func NewUploader(dir string, catalog *managers.Catalog, store ObjectStore, opts ...UploaderOption) *Uploader

NewUploader builds an Uploader for dir.

func (*Uploader) ReconcileRemote

func (u *Uploader) ReconcileRemote(ctx context.Context) (int, error)

ReconcileRemote rebuilds catalog rows for remote (S3) segments from a bucket listing alone, so a fresh server with an empty catalog rediscovers cold-tier segments without opening any file. Keys whose bounds are encoded in the filename are cataloged from the name; legacy-named keys fall back to opening the segment over HTTP (the expensive path). It returns how many remote rows it added. Run it once at startup before the rotation loop starts.

func (*Uploader) Rotate

func (u *Uploader) Rotate(ctx context.Context) (int, error)

Rotate uploads every local segment older than the rotate age and flips the catalog to remote. A failed upload is logged and retried next cycle (the catalog is only flipped after the upload is verified), so an S3 outage never loses data — the segment just stays local.

func (*Uploader) Run

func (u *Uploader) Run(ctx context.Context)

Run rotates and sweeps on a ticker until ctx is cancelled.

func (*Uploader) Sweep

func (u *Uploader) Sweep(ctx context.Context) error

Sweep deletes the local copies of remote segments whose grace window has elapsed, so any query that resolved them locally has finished.

type UploaderOption

type UploaderOption func(*Uploader)

UploaderOption configures an Uploader.

func WithRotateAge

func WithRotateAge(d time.Duration) UploaderOption

WithRotateAge sets how old (since sealing) a local segment must be before it rotates to remote storage.

func WithRotateGrace

func WithRotateGrace(d time.Duration) UploaderOption

WithRotateGrace sets how long a rotated segment's local copy is kept (so in-flight queries that resolved it locally finish) before deletion.

func WithRotateInterval

func WithRotateInterval(d time.Duration) UploaderOption

WithRotateInterval sets how often the rotation loop runs.

func WithUploadPrefix

func WithUploadPrefix(prefix string) UploaderOption

WithUploadPrefix sets the key prefix for uploaded objects.

func WithUploaderVFS

func WithUploaderVFS(name string) UploaderOption

WithUploaderVFS sets the sqlite VFS name used to open remote segments when ReconcileRemote must fall back to reading a legacy-named segment over HTTP.

type Value

type Value [2]string

Value is a single log entry: a [timestamp, line] pair, both as strings.

func (*Value) DecodeMsg

func (z *Value) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*Value) EncodeMsg

func (z *Value) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*Value) MarshalMsg

func (z *Value) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Value) Msgsize

func (z *Value) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Value) Timestamp

func (v *Value) Timestamp() (int64, error)

Timestamp parses the nanosecond timestamp string. It returns an error (rather than silently returning 0) so callers can reject or skip values with malformed timestamps instead of corrupting time-range metadata.

func (*Value) UnmarshalMsg

func (z *Value) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type Values

type Values []Value

Values is an ordered list of log entries within a stream.

func (*Values) DecodeMsg

func (z *Values) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (Values) EncodeMsg

func (z Values) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (Values) MarshalMsg

func (z Values) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (Values) Msgsize

func (z Values) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Values) UnmarshalMsg

func (z *Values) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

WAL is an append-only, group-committing write-ahead log. A single writer goroutine batches concurrent appends and fsyncs them together, assigning each a sequence number.

func OpenWAL

func OpenWAL(dir string) (*WAL, error)

OpenWAL starts a WAL writer for dir, creating the directory if needed. New records are written to fresh segments; any pre-existing segments are recorded as "recovered" for ReplaySegments and are only removed by RemoveWAL after a clean shutdown.

func (*WAL) Append

func (w *WAL) Append(payload *Payload) (uint64, error)

Append durably writes payload to the log and returns its sequence number once it (and any batched concurrent appends) have been fsynced.

func (*WAL) Close

func (w *WAL) Close() error

Close stops the writer after flushing any queued appends.

func (*WAL) MarkDurable

func (w *WAL) MarkDurable(seqs []uint64) error

MarkDurable records that the given sequence numbers have reached a durable segment, advances the contiguous watermark, and deletes any sealed WAL segments that lie entirely below it. Sequence 0 (untracked/replayed payloads) is ignored.

func (*WAL) Recovered

func (w *WAL) Recovered() []string

Recovered returns the pre-existing segments captured at open time, to be replayed before the WAL is used for new writes.

func (*WAL) Rotate

func (w *WAL) Rotate()

Rotate seals the active segment (if it holds data) so its records become eligible for deletion once they are marked durable.

Directories

Path Synopsis
cmd
loge-loadgen command
Command loge-loadgen generates realistic web-access and structured JSON app logs and pushes them to a loge server's /api/v1/push endpoint.
Command loge-loadgen generates realistic web-access and structured JSON app logs and pushes them to a loge server's /api/v1/push endpoint.
Package filewatcher tracks the set of files in a directory that match a pattern, keeping it current as files are created, removed, or renamed, and lets callers iterate over the live set.
Package filewatcher tracks the set of files in a directory that match a pattern, keeping it current as files are created, removed, or renamed, and lets callers iterate over the live set.
Command loge is the CLI entry point for the loge log-storage engine: it parses the command line and dispatches to the selected subcommand.
Command loge is the CLI entry point for the loge log-storage engine: it parses the command line and dispatches to the selected subcommand.
Package managers indexes the engine's SQLite segments in a catalog and answers queries by fanning out across the local and S3-tiered segments.
Package managers indexes the engine's SQLite segments in a catalog and answers queries by fanning out across the local and S3-tiered segments.
Package slogloge provides a log/slog Handler that ships structured logs to a loge server's /api/v1/push endpoint.
Package slogloge provides a log/slog Handler that ships structured logs to a loge server's /api/v1/push endpoint.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL