auditlog

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: MIT Imports: 17 Imported by: 0

README

go-workflow-auditlog

Audit logging library for Azure/go-workflow — records every step execution event (attempts, retries, durations, errors, dependencies, final statuses) with timestamped events and export to JSON / NDJSON.

Why?

go-workflow runs steps concurrently in a DAG, but provides no built-in way to answer:

  • Which step took the longest?
  • How many retries did that flaky step need?
  • What's the full dependency graph?
  • Which steps were skipped, and why?

This library answers those questions by injecting audit callbacks into your workflow and capturing a complete, timestamped event stream.

Quick Start

package main

import (
    "context"
    "fmt"

    flow "github.com/Azure/go-workflow"
    "github.com/cenkalti/backoff/v4"

    "github.com/larsartmann/go-workflow-auditlog"
)

type FetchStep struct{ Data []byte }
func (s *FetchStep) Do(_ context.Context) error { s.Data = []byte("hello"); return nil }
func (s *FetchStep) String() string             { return "fetch" }

type SaveStep struct{ Input []byte }
func (s *SaveStep) Do(_ context.Context) error  { return nil }
func (s *SaveStep) String() string              { return "save" }

func main() {
    audit, _ := auditlog.New(auditlog.Config{
        Enabled:    true,
        WorkflowID: "my-pipeline",
    })

    fetch := &FetchStep{}
    save := &SaveStep{}

    w := &flow.Workflow{}
    w.Add(
        flow.Step(fetch),
        flow.Step(save).DependsOn(fetch),
    )

    // 1. Attach audit callbacks BEFORE running
    audit.Attach(w)

    // 2. Run the workflow
    _ = w.Do(context.Background())

    // 3. Snapshot final state AFTER running
    audit.Snapshot(w)

    // 4. Read the report
    report := audit.Report()
    fmt.Printf("Steps: %d, Events: %d\n", report.StepCount, report.EventCount)

    // Export
    _ = audit.ExportToFile("audit.json")
    _ = audit.ExportEventsToNDJSON("events.ndjson")
}

How It Works

go-workflow v0.1.13 provides BeforeStep and AfterStep callbacks per step, fired per attempt (each retry try). This library:

  1. Attach(w) — injects audit BeforeStep/AfterStep callbacks into every step in the workflow via State.MergeConfig. Must be called before w.Do(ctx).

  2. Execution — during w.Do(ctx), each step's callbacks fire on every attempt, recording timestamped attempt_start and attempt_end events with duration, error, and status.

  3. Snapshot(w) — reads the workflow's post-execution state to capture the full DAG structure, final statuses, retry/timeout config, and any steps that were skipped or canceled (which bypass callbacks entirely). Must be called after w.Do(ctx).

Why Snapshot is needed

Steps settled inline by Conditions (Skipped/Canceled) never enter the interceptor/callback chain. Snapshot(w) reads w.StateOf(step) and w.UpstreamOf(step) to fill these gaps.

Report Structure

{
  "version": "0.1.0",
  "workflow_id": "my-pipeline",
  "run_id": "a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6",
  "exported_at": "2026-06-18T15:21:09Z",
  "event_count": 4,
  "step_count": 2,
  "succeeded_count": 2,
  "failed_count": 0,
  "skipped_count": 0,
  "canceled_count": 0,
  "total_duration_ms": 15.23,
  "workflow_succeeded": true,
  "dropped_event_count": 0,
  "steps": [
    {
      "step_name": "fetch",
      "step_type": "FetchStep",
      "step_id": 1,
      "status": "succeeded",
      "attempt_count": 1,
      "duration_ms": 10.5,
      "has_retry": false,
      "has_timeout": false,
      "dependents": [{ "step_name": "save" }]
    }
  ],
  "events": [
    {
      "run_id": "a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6",
      "sequence": 1,
      "timestamp": "2026-06-18T15:21:09Z",
      "event_type": "attempt_start",
      "phase": "before",
      "step_name": "fetch",
      "attempt": 1
    }
  ]
}

API Reference

auditlog.New(config Config) (*Auditor, error)

Creates an auditor. When Config.Enabled is false, checks the WORKFLOW_AUDITLOG_ENABLED env var ("true", "1", "yes").

Auditor Methods
Method Description
Attach(w *flow.Workflow) *flow.Workflow Injects audit callbacks into all steps. Call before Do.
Snapshot(w *flow.Workflow) Captures final DAG state. Call after Do.
Report() WorkflowReport Returns the consolidated report.
Events() []Event Returns all captured events.
EventsCount() int Event count without copying.
DroppedEventCount() int64 Events dropped due to MaxEvents cap.
RunID() string The run identifier stamped on every event (for correlation).
ReportFiltered(opts ...ReportOption) WorkflowReport Returns a filtered report (by name/status/event-type/time).
ExportToFile(path string) error Writes report as JSON.
ExportEventsToNDJSON(path string) error Writes events as NDJSON.
WriteReportJSON(w io.Writer) error Writes report JSON to writer.
WriteEventsNDJSON(w io.Writer) error Writes NDJSON to writer.
ExportMermaid(path string) error Writes Mermaid DAG to file.
ExportPlantUML(path string) error Writes PlantUML DAG to file.
ExportGraphviz(path string) error Writes Graphviz DOT DAG to file.
WriteMermaid(w io.Writer) error Writes Mermaid DAG to writer.
WritePlantUML(w io.Writer) error Writes PlantUML DAG to writer.
WriteGraphviz(w io.Writer) error Writes Graphviz DOT DAG to writer.
WorkflowReport Methods
Method Description
report.StepByName(name) Find a step by name.
report.EventsByStep(name) Filter events by step.
report.EventsByType(type) Filter events by type.
report.FailedSteps() All failed/canceled steps.
report.SucceededSteps() All succeeded steps.
report.SkippedSteps() All skipped steps.
report.RetriedSteps() All steps with >1 attempt.
report.Filtered(opts ...ReportOption) WorkflowReport Returns a filtered copy of the report.
report.Diff(other WorkflowReport) DiffResult Compares two reports (added/removed/changed steps + duration delta).
report.Duration() time.Duration Wall-clock duration spanned by all events (earliest → latest).
report.Summary() string One-line human-readable summary.
report.WriteJSON(w io.Writer) error Serialize report as JSON.
report.WriteNDJSON(w io.Writer) error Serialize events as NDJSON.
report.WriteMermaid(w io.Writer) error Mermaid diagram.
report.WritePlantUML(w io.Writer) error PlantUML diagram.
report.WriteGraphviz(w io.Writer) error Graphviz DOT diagram.
report.WriteMermaidString() (string, error) Mermaid diagram as string.
report.WritePlantUMLString() (string, error) PlantUML diagram as string.
report.WriteGraphvizString() (string, error) Graphviz DOT diagram as string.
report.Validate() error Checks internal consistency (counts, status drift).
Package-Level Functions
Function Description
auditlog.LoadReport(path string) (WorkflowReport, error) Load a JSON report from a file.
auditlog.LoadReportFromReader(r io.Reader) (WorkflowReport, error) Load a JSON report from a reader.
auditlog.LoadReportFromBytes(b []byte) (WorkflowReport, error) Load a JSON report from bytes.
auditlog.ReadEvents(r io.Reader) ([]Event, error) Read NDJSON events (inverse of WriteEventsNDJSON).
auditlog.ReplayEvents(events []Event) (WorkflowReport, error) Reconstruct a report from a flat event stream.
auditlog.NewReportIndex(r WorkflowReport) *ReportIndex Precompute O(1) lookup maps over a report.
Sentinel Errors

These exported errors are returned by Validate() and New(). Match them with errors.Is:

Error Returned when
auditlog.ErrWorkflowIDPathSep Config.WorkflowID contains / or \.
auditlog.ErrEventCountMismatch Report EventCountlen(Events).
auditlog.ErrStepCountMismatch Report StepCountlen(Steps).
auditlog.ErrStatusDrift A step's Status disagrees with its derived status.
auditlog.ErrReplayNoEvents ReplayEvents received zero events.

Config

Field Default Description
Enabled false (checks env var) Turns audit logging on/off.
WorkflowID "default" Human-readable identifier.
RunID auto-generated (128-bit hex) Identifier for one execution; stamped on every event for trace correlation. Override to use your own trace ID.
OnEvent nil Callback fired after each event. Must not block.
MaxEvents 0 (unlimited) Caps stored events to prevent OOM.
InitialEventCapacity 256 Pre-allocates event slice.

Concurrency Model

  • Single sync.RWMutex protects all mutable state.
  • BeforeStep/AfterStep callbacks acquire the write lock once per call.
  • OnEvent callback fires outside the lock to prevent user code from blocking.
  • BuildReport() uses RLock — concurrent reads don't block each other.

Step Naming

go-workflow uses flow.String(step) for display names. By default this returns *TypeName(0xpointer) which is non-deterministic. For clean audit output, implement String() on your step types:

func (s *MyStep) String() string { return "my-meaningful-name" }

Or use flow.Name(step, "name") when adding to the workflow.

Known Limitations

  • Step name collisions: step identity is tracked internally by the flow.Steper pointer (always unique), but the JSON step_name field relies on flow.String(step). If two steps produce the same String() output, their JSON output is ambiguous. Give each step a distinct String().
  • Snapshot is mandatory for full DAG: Attach captures per-attempt events; the dependency graph and skipped/canceled statuses are only filled in after Snapshot(w) reads the workflow's final state.
  • Replay loses DAG edges: ReplayEvents reconstructs a report from a flat event stream. Dependencies, MaxAttempts, HasRetry, and HasTimeout are not available from events alone — use a Snapshot-captured report for full fidelity.
  • go-workflow retry data race: DefaultRetryOption.Backoff shares a single ExponentialBackOff instance that races under concurrent use. This is a known upstream issue in v0.1.13.
  • WorkflowID cannot contain path separators (/ or \): it is used in export filenames and would break path safety. Use Config.RunID or your own export paths for arbitrary identifiers.

Installation

go get github.com/larsartmann/go-workflow-auditlog

Requires Go 1.26+ and github.com/Azure/go-workflow v0.1.13.

License

MIT

Documentation

Overview

Package auditlog provides an audit logging library for Azure/go-workflow.

It records every step execution event — attempts, retries, durations, errors, dependencies, and final statuses — with timestamped events and export to JSON and NDJSON.

Quick start

audit, _ := auditlog.New(auditlog.Config{WorkflowID: "checkout"})
w := &flow.Workflow{}
w.Add(
	flow.Step(fetch),
	flow.Step(save).DependsOn(fetch),
)

audit.Attach(w)          // inject callbacks BEFORE Do
err := w.Do(ctx)         // run the workflow
audit.Snapshot(w)        // capture final DAG state AFTER Do

report := audit.Report() // machine-readable snapshot
_ = audit.ExportToFile("audit.json")
Example (BasicUsage)

Example_basicUsage shows the minimal workflow audit setup.

package main

import (
	"context"
	"fmt"

	flow "github.com/Azure/go-workflow"
	auditlog "github.com/larsartmann/go-workflow-auditlog"
)

// exampleStep is used by the Example functions.
type exampleStep struct {
	name string
}

func (s *exampleStep) Do(_ context.Context) error { return nil }
func (s *exampleStep) String() string             { return s.name }

func main() {
	audit, _ := auditlog.New(auditlog.Config{
		Enabled:    true,
		WorkflowID: "demo",
	})

	step := &exampleStep{name: "fetch"}
	w := &flow.Workflow{}
	w.Add(flow.Step(step))

	audit.Attach(w)
	_ = w.Do(context.Background())
	audit.Snapshot(w)

	report := audit.Report()
	fmt.Printf("Steps: %d, Succeeded: %d\n", report.StepCount, report.SucceededCount)

}
Output:
Steps: 1, Succeeded: 1
Example (ExportToFile)

Example_exportToFile shows how to export the audit report to JSON.

package main

import (
	"context"
	"fmt"
	"os"

	flow "github.com/Azure/go-workflow"
	auditlog "github.com/larsartmann/go-workflow-auditlog"
)

// exampleStep is used by the Example functions.
type exampleStep struct {
	name string
}

func (s *exampleStep) Do(_ context.Context) error { return nil }
func (s *exampleStep) String() string             { return s.name }

func main() {
	audit, _ := auditlog.New(auditlog.Config{Enabled: true})

	w := &flow.Workflow{}
	w.Add(flow.Step(&exampleStep{name: "step"}))

	audit.Attach(w)
	_ = w.Do(context.Background())
	audit.Snapshot(w)

	_ = audit.ExportToFile(os.TempDir() + "/audit-example.json")

	fmt.Println("exported")

}
Output:
exported
Example (Filtering)

Example_filtering shows how to filter a report.

package main

import (
	"context"
	"errors"
	"fmt"

	flow "github.com/Azure/go-workflow"
	auditlog "github.com/larsartmann/go-workflow-auditlog"
)

// exampleStep is used by the Example functions.
type exampleStep struct {
	name string
}

func (s *exampleStep) Do(_ context.Context) error { return nil }
func (s *exampleStep) String() string             { return s.name }

func main() {
	audit, _ := auditlog.New(auditlog.Config{Enabled: true})

	w := &flow.Workflow{}
	w.Add(
		flow.Step(&exampleStep{name: "ok"}),
		flow.Step(flow.Func("bad", func(_ context.Context) error { return errors.New("fail") })),
	)

	audit.Attach(w)
	_ = w.Do(context.Background())
	audit.Snapshot(w)

	filtered := audit.ReportFiltered(auditlog.WithStepsByStatus(auditlog.StepStatusSucceeded))
	fmt.Printf("Filtered steps: %d\n", filtered.StepCount)

}
Output:
Filtered steps: 1
Example (MermaidDiagram)

Example_mermaidDiagram shows how to generate a Mermaid DAG visualization. The diagram is written to any io.Writer — here we use io.Discard since the output is non-deterministic across runs.

package main

import (
	"context"
	"io"

	flow "github.com/Azure/go-workflow"
	auditlog "github.com/larsartmann/go-workflow-auditlog"
)

// exampleStep is used by the Example functions.
type exampleStep struct {
	name string
}

func (s *exampleStep) Do(_ context.Context) error { return nil }
func (s *exampleStep) String() string             { return s.name }

func main() {
	audit, _ := auditlog.New(auditlog.Config{Enabled: true})

	a := &exampleStep{name: "fetch"}
	b := &exampleStep{name: "save"}

	w := &flow.Workflow{}
	w.Add(
		flow.Step(a),
		flow.Step(b).DependsOn(a),
	)

	audit.Attach(w)
	_ = w.Do(context.Background())
	audit.Snapshot(w)

	report := audit.Report()
	_ = report.WriteMermaid(io.Discard)

}

Index

Examples

Constants

View Source
const EnvKeyEnabled = "WORKFLOW_AUDITLOG_ENABLED"

EnvKeyEnabled is the environment variable that controls audit logging. Set to "true", "1", or "yes" to enable. Any other value (or unset) disables it.

View Source
const SchemaVersion = "0.1.0"

SchemaVersion is the current report schema version.

Variables

View Source
var (
	ErrEmpty         = errors.New("ndjson input is empty")
	ErrNoEvents      = errors.New("ndjson input contains no events")
	ErrOversizedLine = errors.New("ndjson line exceeds maximum size")
)

Sentinel errors for NDJSON reading.

View Source
var (
	// ErrEventCountMismatch indicates the report's EventCount field does not
	// match the length of its Events slice.
	ErrEventCountMismatch = errors.New("event_count does not match len(events)")
	// ErrStepCountMismatch indicates the report's StepCount field does not
	// match the length of its Steps slice.
	ErrStepCountMismatch = errors.New("step_count does not match len(steps)")
	// ErrStatusDrift indicates a step's stored Status disagrees with the
	// status implied by its Error pointer (see [StepInfo.DeriveStatus]).
	ErrStatusDrift = errors.New("step status does not match derived status")
)

Sentinel errors returned by WorkflowReport.Validate. Consumers can match on these with errors.Is to distinguish validation failure modes without parsing error text.

View Source
var ErrReplayNoEvents = errors.New("no events to replay")

ErrReplayNoEvents is returned when ReplayEvents receives zero events.

View Source
var ErrWorkflowIDPathSep = errors.New("config.WorkflowID must not contain path separators")

ErrWorkflowIDPathSep is returned by Config.Validate (and thus New) when Config.WorkflowID contains a path separator, which would break file-based export paths. Consumers can match on it with errors.Is.

Functions

This section is empty.

Types

type Auditor

type Auditor struct {
	// contains filtered or unexported fields
}

Auditor wraps a flow.Workflow with audit logging.

func New

func New(config Config) (*Auditor, error)

New creates an audit log Auditor.

When Config.Enabled is false (the zero value), New checks the WORKFLOW_AUDITLOG_ENABLED environment variable. Set it to "true", "1", or "yes" to enable audit logging without changing code.

If WorkflowID is empty it defaults to "default".

Returns an error if Config.Validate() fails.

func (*Auditor) Attach

func (a *Auditor) Attach(w *flow.Workflow) *flow.Workflow

Attach injects audit BeforeStep/AfterStep callbacks into every step in the workflow. Call this BEFORE w.Do(ctx).

The callbacks are merged into each step's existing config via State.MergeConfig, so user-defined callbacks (Input, Output, BeforeStep, AfterStep) are preserved. Audit callbacks are appended last so they observe the final error.

When the Auditor is disabled, Attach is a no-op.

func (*Auditor) DroppedEventCount

func (a *Auditor) DroppedEventCount() int64

DroppedEventCount returns the number of events dropped due to Config.MaxEvents.

func (*Auditor) Events

func (a *Auditor) Events() []Event

Events returns a defensive copy of all captured events.

func (*Auditor) EventsCount

func (a *Auditor) EventsCount() int

EventsCount returns the number of captured events without copying the slice.

func (*Auditor) ExportEventsToNDJSON

func (a *Auditor) ExportEventsToNDJSON(path string) error

ExportEventsToNDJSON writes every event as NDJSON to path.

func (*Auditor) ExportGraphviz

func (a *Auditor) ExportGraphviz(path string) error

ExportGraphviz writes the step DAG as Graphviz DOT to path.

func (*Auditor) ExportMermaid

func (a *Auditor) ExportMermaid(path string) error

ExportMermaid writes the step DAG as Mermaid to path.

func (*Auditor) ExportPlantUML

func (a *Auditor) ExportPlantUML(path string) error

ExportPlantUML writes the step DAG as PlantUML to path.

func (*Auditor) ExportToFile

func (a *Auditor) ExportToFile(path string) error

ExportToFile writes the full WorkflowReport as indented JSON to path.

func (*Auditor) Report

func (a *Auditor) Report() WorkflowReport

Report returns a consolidated snapshot of everything observed so far.

func (*Auditor) ReportFiltered

func (a *Auditor) ReportFiltered(opts ...ReportOption) WorkflowReport

ReportFiltered returns a filtered report. Convenience method on Auditor.

func (*Auditor) RunID

func (a *Auditor) RunID() string

RunID returns the run identifier stamped on every captured event. Useful for correlating the audit log with external systems (traces, logs) before a full report is built.

func (*Auditor) Snapshot

func (a *Auditor) Snapshot(w *flow.Workflow)

Snapshot reads the workflow's final state after Do() to capture the full DAG structure, final statuses, and any steps that were skipped or canceled (which bypass Before/After callbacks entirely).

Call this AFTER w.Do(ctx) returns.

When the Auditor is disabled, Snapshot is a no-op.

func (*Auditor) WriteEventsNDJSON

func (a *Auditor) WriteEventsNDJSON(writer io.Writer) error

WriteEventsNDJSON writes every captured event as line-delimited JSON to writer.

func (*Auditor) WriteGraphviz

func (a *Auditor) WriteGraphviz(writer io.Writer) error

WriteGraphviz writes the step DAG as a Graphviz DOT diagram to the writer.

func (*Auditor) WriteMermaid

func (a *Auditor) WriteMermaid(writer io.Writer) error

WriteMermaid writes the step DAG as a Mermaid diagram to the writer.

func (*Auditor) WritePlantUML

func (a *Auditor) WritePlantUML(writer io.Writer) error

WritePlantUML writes the step DAG as a PlantUML diagram to the writer.

func (*Auditor) WriteReportJSON

func (a *Auditor) WriteReportJSON(writer io.Writer) error

WriteReportJSON writes the full WorkflowReport as indented JSON to writer.

type Config

type Config struct {
	// Enabled turns audit logging on or off. When false the Auditor is a no-op.
	// If left as zero-value (false), New() checks the WORKFLOW_AUDITLOG_ENABLED env var.
	Enabled bool
	// WorkflowID is an optional human-readable identifier for the workflow.
	WorkflowID string
	// RunID is an optional identifier for a single execution ("run") of the
	// workflow. It is stamped on every Event and on the WorkflowReport so that
	// all observations from one execution can be correlated across systems
	// (e.g. matched to a distributed trace). If empty, New() generates a random
	// 128-bit hex ID.
	RunID string
	// OnEvent is called after each event is captured, outside the recorder
	// lock so it cannot deadlock the recorder. Must not block.
	// Note: concurrent steps invoke this concurrently — the callback must be
	// goroutine-safe (e.g. guard shared state with a mutex). Nil disables it.
	OnEvent func(Event)
	// MaxEvents caps the number of events stored in memory. When 0 (default),
	// events grow without bound. When > 0, the recorder stops appending new
	// events after reaching the cap and increments DroppedEventCount.
	MaxEvents int
	// InitialEventCapacity pre-allocates the events slice to avoid runtime
	// reallocations. When 0, defaults to 256.
	InitialEventCapacity int
}

Config controls the audit log behaviour.

func (Config) Validate

func (c Config) Validate() error

Validate returns an error if the config is invalid.

type DiffResult

type DiffResult struct {
	AddedSteps    []StepDiff `json:"added_steps,omitempty"`
	RemovedSteps  []StepDiff `json:"removed_steps,omitempty"`
	StatusChanged []StepDiff `json:"status_changed,omitempty"`
	DurationDelta float64    `json:"duration_delta_ms"`
}

DiffResult describes the differences between two workflow reports.

func (DiffResult) HasChanges

func (d DiffResult) HasChanges() bool

HasChanges returns true if the diff found any differences.

type Event

type Event struct {
	StepRef

	RunID      string     `json:"run_id,omitempty"`
	Sequence   int        `json:"sequence"`
	Timestamp  time.Time  `json:"timestamp"`
	EventType  EventType  `json:"event_type"`
	Phase      Phase      `json:"phase"`
	Attempt    int        `json:"attempt,omitempty"`
	DurationMs *float64   `json:"duration_ms,omitempty"`
	Error      *string    `json:"error,omitempty"`
	Status     StepStatus `json:"status,omitempty"`
}

Event is a single, timestamped observation from a workflow step execution.

func ReadEvents

func ReadEvents(reader io.Reader) ([]Event, error)

ReadEvents reads NDJSON-encoded events from reader. Each line must be a valid JSON Event object. Blank lines are skipped.

Returns ErrEmpty if the input contains no bytes, ErrNoEvents if all lines were blank, or ErrOversizedLine if any line exceeds 1 MB.

func (Event) Duration

func (e Event) Duration() float64

Duration returns the event duration in milliseconds, or 0 if unavailable.

func (Event) HasError

func (e Event) HasError() bool

HasError returns true if the event recorded an error.

func (Event) IsAfter

func (e Event) IsAfter() bool

IsAfter returns true if the event is the end (after) phase of an operation.

func (Event) IsAttemptEnd

func (e Event) IsAttemptEnd() bool

IsAttemptEnd returns true if the event is an attempt-end event.

func (Event) IsAttemptStart

func (e Event) IsAttemptStart() bool

IsAttemptStart returns true if the event is an attempt-start event.

func (Event) IsBefore

func (e Event) IsBefore() bool

IsBefore returns true if the event is the start (before) phase of an operation.

type EventType

type EventType string

EventType categorizes audit log events.

Every event is one of two types, mirroring the two go-workflow callbacks: AttemptStart (from BeforeStep) and AttemptEnd (from AfterStep). EventType is intentionally redundant with Phase — an AttemptStart always carries PhaseBefore, an AttemptEnd always carries PhaseAfter. Both fields are kept so consumers can filter by either axis (event kind or lifecycle position) without cross-referencing.

const (
	// EventTypeAttemptStart fires when a step attempt begins (each retry try).
	EventTypeAttemptStart EventType = "attempt_start"
	// EventTypeAttemptEnd fires when a step attempt finishes (each retry try).
	EventTypeAttemptEnd EventType = "attempt_end"
)

func (EventType) Color

func (e EventType) Color() string

Color returns the CSS color token for this event type, used in HTML visualizations.

func (EventType) Label

func (e EventType) Label() string

Label returns the human-readable display label for this event type.

type Phase

type Phase string

Phase indicates whether an event is the start or end of an operation.

It is deliberately redundant with EventType: AttemptStart ↔ PhaseBefore and AttemptEnd ↔ PhaseAfter. The duplication is retained in the JSON output so that consumers can filter on lifecycle position ("before"/"after") without knowing the event-type vocabulary, and vice versa.

const (
	PhaseBefore Phase = "before"
	PhaseAfter  Phase = "after"
)

type Recorder

type Recorder struct {
	// contains filtered or unexported fields
}

Recorder captures workflow execution events in-memory with minimal overhead.

Locking Protocol

All mutable state is protected by a single sync.RWMutex (mu):

Write path: mu.Lock() — recordBeforeStep, recordAfterStep, snapshot
Read path:  mu.RLock() — BuildReport, Events, EventsCount

The onEvent callback is always called outside the lock to prevent user code from blocking or deadlocking the recorder.

func NewRecorder

func NewRecorder(workflowID, runID string, onEvent func(Event)) *Recorder

NewRecorder creates a new event recorder.

workflowID identifies the workflow (stable across runs); runID identifies a single execution and is stamped on every captured Event so all observations from one run can be correlated. Pass a non-empty runID (e.g. a trace ID) to integrate with external observability systems.

func (*Recorder) BuildReport

func (r *Recorder) BuildReport() WorkflowReport

BuildReport assembles a machine-readable WorkflowReport from all captured events and step records.

func (*Recorder) DroppedEventCount

func (r *Recorder) DroppedEventCount() int64

DroppedEventCount returns the number of events dropped due to MaxEvents cap.

func (*Recorder) Events

func (r *Recorder) Events() []Event

Events returns a defensive copy of all captured events.

func (*Recorder) EventsCount

func (r *Recorder) EventsCount() int

EventsCount returns the number of captured events without copying the slice.

func (*Recorder) RunID

func (r *Recorder) RunID() string

RunID returns the run identifier stamped on every captured event. It is safe to call concurrently.

type ReportIndex

type ReportIndex struct {
	// contains filtered or unexported fields
}

ReportIndex precomputes lookup maps over a WorkflowReport for repeated O(1) queries. Build it once from a final report, then query cheaply — useful when a consumer inspects the same report many times (e.g. in a UI or analysis loop).

The index shares the report's backing slices; it is read-only and must not outlive mutations to the underlying report. Rebuild the index (call NewReportIndex) if the report changes.

func NewReportIndex

func NewReportIndex(r WorkflowReport) *ReportIndex

NewReportIndex builds an O(1) lookup index over the given report. The report is not retained beyond its slices; callers may discard the WorkflowReport value but must not mutate its Steps or Events in place afterward.

func (*ReportIndex) EventsByStep

func (idx *ReportIndex) EventsByStep(name string) []Event

EventsByStep returns all events for the given step name (nil if none). O(1).

func (*ReportIndex) EventsByType

func (idx *ReportIndex) EventsByType(t EventType) []Event

EventsByType returns all events matching the given type (nil if none). O(1).

func (*ReportIndex) StepByID

func (idx *ReportIndex) StepByID(id int) *StepInfo

StepByID returns a pointer to the step with the given StepInfo.StepID, or nil. O(1).

func (*ReportIndex) StepByName

func (idx *ReportIndex) StepByName(name string) *StepInfo

StepByName returns a pointer to the first step with the given name, or nil. O(1).

type ReportOption

type ReportOption func(*reportFilter)

ReportOption configures a filter for WorkflowReport.Filtered.

func WithEventsByType

func WithEventsByType(eventType EventType) ReportOption

WithEventsByType filters to only events matching the given type.

func WithStepsByName

func WithStepsByName(names ...string) ReportOption

WithStepsByName filters to only steps matching any of the given names.

func WithStepsByStatus

func WithStepsByStatus(statuses ...StepStatus) ReportOption

WithStepsByStatus filters to only steps matching any of the given statuses.

func WithTimeRange

func WithTimeRange(from, to time.Time) ReportOption

WithTimeRange filters events to those within the given time range (inclusive).

type StepDiff

type StepDiff struct {
	Name      string     `json:"name"`
	Status    StepStatus `json:"status"`
	OldStatus StepStatus `json:"old_status,omitempty"`
	Duration  float64    `json:"duration_ms,omitempty"`
}

StepDiff captures a single step's state in a diff context. For status changes, Status holds the new value and OldStatus the previous one (OldStatus is empty for added steps).

type StepInfo

type StepInfo struct {
	StepRef

	// StepID is a 1-based, unique identifier assigned when the step is first
	// observed. It disambiguates steps that share the same Name (which can
	// happen when two step types produce identical String() output). Stable
	// within a single report/run; not guaranteed stable across runs.
	StepID       int        `json:"step_id,omitempty"`
	Status       StepStatus `json:"status"`
	AttemptCount int        `json:"attempt_count"`
	MaxAttempts  int        `json:"max_attempts,omitempty"`
	StartedAt    *time.Time `json:"started_at,omitempty"`
	FinishedAt   *time.Time `json:"finished_at,omitempty"`
	DurationMs   *float64   `json:"duration_ms,omitempty"`
	Dependencies []StepRef  `json:"dependencies,omitempty"`
	Dependents   []StepRef  `json:"dependents,omitempty"`
	Error        *string    `json:"error,omitempty"`
	HasRetry     bool       `json:"has_retry"`
	HasTimeout   bool       `json:"has_timeout"`
}

StepInfo aggregates all observed data for a single workflow step.

func (StepInfo) DeriveStatus

func (s StepInfo) DeriveStatus() StepStatus

DeriveStatus computes the step status from the step's own error pointer. This is the canonical derivation — the stored Status field should always match this method so it can never drift from the underlying data.

If Snapshot() has not been called yet, the status may be pending/running. After Snapshot(), the status reflects the workflow's final state.

func (StepInfo) Duration

func (s StepInfo) Duration() float64

Duration returns the step duration in milliseconds, or 0 if unavailable.

func (StepInfo) HasError

func (s StepInfo) HasError() bool

HasError returns true if the step recorded an error.

type StepRef

type StepRef struct {
	Name     string `json:"step_name"`
	StepType string `json:"step_type,omitempty"`
}

StepRef identifies a step within a workflow. Embedded in Event and StepInfo for JSON flattening.

type StepStatus

type StepStatus string

StepStatus mirrors flow.StepStatus as a stable string enum for JSON export.

const (
	StepStatusPending   StepStatus = "pending"
	StepStatusRunning   StepStatus = "running"
	StepStatusSucceeded StepStatus = "succeeded"
	StepStatusFailed    StepStatus = "failed"
	StepStatusCanceled  StepStatus = "canceled"
	StepStatusSkipped   StepStatus = "skipped"
)

func (StepStatus) Icon

func (s StepStatus) Icon() string

Icon returns a display emoji for this step status.

func (StepStatus) IsError

func (s StepStatus) IsError() bool

IsError returns true if the step failed or was canceled.

func (StepStatus) IsTerminal

func (s StepStatus) IsTerminal() bool

IsTerminal returns true if the step has reached a terminal state (succeeded, failed, canceled, or skipped).

func (StepStatus) Label

func (s StepStatus) Label() string

Label returns the human-readable display label for this step status.

func (StepStatus) String

func (s StepStatus) String() string

String returns the step status name.

type WorkflowReport

type WorkflowReport struct {
	Version           string    `json:"version"`
	WorkflowID        string    `json:"workflow_id"`
	RunID             string    `json:"run_id,omitempty"`
	ExportedAt        time.Time `json:"exported_at"`
	EventCount        int       `json:"event_count"`
	StepCount         int       `json:"step_count"`
	SucceededCount    int       `json:"succeeded_count"`
	FailedCount       int       `json:"failed_count"`
	SkippedCount      int       `json:"skipped_count"`
	CanceledCount     int       `json:"canceled_count"`
	TotalDurationMs   float64   `json:"total_duration_ms"`
	WorkflowSucceeded bool      `json:"workflow_succeeded"`
	DroppedEventCount int64     `json:"dropped_event_count"`
	// Reconstructed is true when the report was built by ReplayEvents from a
	// flat event stream rather than from live workflow hooks.
	Reconstructed bool       `json:"reconstructed,omitempty"`
	Events        []Event    `json:"events,omitempty"`
	Steps         []StepInfo `json:"steps"`
}

WorkflowReport is a consolidated, machine-readable snapshot of the audit log.

func LoadReport

func LoadReport(path string) (WorkflowReport, error)

LoadReport reads a JSON WorkflowReport from a file path. This is the inverse of ExportToFile.

func LoadReportFromBytes

func LoadReportFromBytes(data []byte) (WorkflowReport, error)

LoadReportFromBytes parses a JSON WorkflowReport from a byte slice.

func LoadReportFromReader

func LoadReportFromReader(reader io.Reader) (WorkflowReport, error)

LoadReportFromReader reads a JSON WorkflowReport from any io.Reader.

func ReplayEvents

func ReplayEvents(events []Event) (WorkflowReport, error)

ReplayEvents reconstructs a WorkflowReport from a flat event stream.

This is the inverse of ExportEventsToNDJSON: write events to NDJSON, then later read them back and reconstruct the report for offline analysis.

Limitations:

  • Dependencies/dependents are not inferred (events carry no DAG edges). Use Snapshot-captured reports for full DAG info.
  • MaxAttempts/HasRetry/HasTimeout are not available from events alone.
  • The report has Reconstructed=true.

func (WorkflowReport) Diff

Diff compares this report against another and returns the differences. Useful for detecting regressions between workflow runs.

Output slices are sorted by step name for deterministic results across runs.

func (WorkflowReport) Duration

func (r WorkflowReport) Duration() time.Duration

Duration returns the total wall-clock duration spanned by all events, from the earliest to the latest timestamp. This is different from TotalDurationMs (which sums individual step durations and may overcount when steps run in parallel).

func (WorkflowReport) EventsByStep

func (r WorkflowReport) EventsByStep(stepName string) []Event

EventsByStep returns all events for the given step name.

func (WorkflowReport) EventsByType

func (r WorkflowReport) EventsByType(t EventType) []Event

EventsByType returns all events matching the given event type.

func (WorkflowReport) FailedSteps

func (r WorkflowReport) FailedSteps() []StepInfo

FailedSteps returns all steps with an error status (failed or canceled).

func (WorkflowReport) Filtered

func (r WorkflowReport) Filtered(opts ...ReportOption) WorkflowReport

Filtered returns a new report containing only the steps and events that match all of the given filter options. Aggregate counts are recomputed.

With no options, returns a copy of the report.

func (WorkflowReport) RetriedSteps

func (r WorkflowReport) RetriedSteps() []StepInfo

RetriedSteps returns all steps that had more than one attempt.

func (WorkflowReport) SkippedSteps

func (r WorkflowReport) SkippedSteps() []StepInfo

SkippedSteps returns all steps that were skipped.

func (WorkflowReport) StepByName

func (r WorkflowReport) StepByName(name string) *StepInfo

StepByName returns the first StepInfo matching the given exact name. Returns nil if no step matches.

func (WorkflowReport) SucceededSteps

func (r WorkflowReport) SucceededSteps() []StepInfo

SucceededSteps returns all steps that succeeded.

func (WorkflowReport) Summary

func (r WorkflowReport) Summary() string

Summary returns a human-readable one-line summary of the report.

func (WorkflowReport) Validate

func (r WorkflowReport) Validate() error

Validate checks internal consistency of the report: denormalized count fields must match the actual slice lengths, and every step's Status must match its DeriveStatus. Returns nil if consistent.

The status drift check catches the case where a step's stored Status field disagrees with what its Error pointer implies — e.g., Status=Pending with a non-nil Error (which DeriveStatus would map to Failed).

func (WorkflowReport) WriteGraphviz

func (r WorkflowReport) WriteGraphviz(writer io.Writer) error

WriteGraphviz writes the step dependency DAG as a Graphviz DOT digraph. Nodes are colored by status (green=succeeded, red=failed, gray=skipped). The output is valid DOT, consumable by `dot -Tsvg` or any Graphviz renderer.

func (WorkflowReport) WriteGraphvizString

func (r WorkflowReport) WriteGraphvizString() (string, error)

WriteGraphvizString returns the Graphviz DOT diagram as a string. Returns a non-nil error only if diagram generation fails.

func (WorkflowReport) WriteJSON

func (r WorkflowReport) WriteJSON(writer io.Writer) error

WriteJSON writes the report as indented JSON to the writer.

func (WorkflowReport) WriteMermaid

func (r WorkflowReport) WriteMermaid(writer io.Writer) error

WriteMermaid writes the step dependency DAG as a Mermaid flowchart diagram. Nodes are colored by status (green=succeeded, red=failed, gray=skipped).

func (WorkflowReport) WriteMermaidString

func (r WorkflowReport) WriteMermaidString() (string, error)

WriteMermaidString returns the Mermaid diagram as a string. Returns a non-nil error only if diagram generation fails.

func (WorkflowReport) WriteNDJSON

func (r WorkflowReport) WriteNDJSON(writer io.Writer) error

WriteNDJSON writes the report's events as newline-delimited JSON. Each line is a single Event object. This is the inverse of ReadEvents.

func (WorkflowReport) WritePlantUML

func (r WorkflowReport) WritePlantUML(writer io.Writer) error

WritePlantUML writes the step dependency DAG as a PlantUML component diagram.

func (WorkflowReport) WritePlantUMLString

func (r WorkflowReport) WritePlantUMLString() (string, error)

WritePlantUMLString returns the PlantUML diagram as a string. Returns a non-nil error only if diagram generation fails.

Directories

Path Synopsis
Command workflow-auditlog-demo demonstrates the go-workflow-auditlog library with a data pipeline: fetch → validate → transform → save, with retry, fan-out, error handling, and audit export.
Command workflow-auditlog-demo demonstrates the go-workflow-auditlog library with a data pipeline: fetch → validate → transform → save, with retry, fan-out, error handling, and audit export.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL