job

package module
v0.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: MIT Imports: 39 Imported by: 8

README

go-job

go-job is a flexible job runner and scheduler written in Go that allows you to embed configuration metadata directly into your job files. By extracting configuration options from comments across different file types (YAML, shell, JavaScript, SQL, etc.), go-job makes it easy to define job behavior alongside your scripts.

Features

  • Multi-format Metadata Extraction: Supports extracting configuration from:
    • YAML Front Matter: Using the standard --- markers.
    • Shell Scripts: Metadata specified in comment lines using #.
    • SQL Scripts: Metadata specified using -- comments.
    • JavaScript:
      • Single-line Comments: e.g. // config ...
      • Block Comments: e.g.
        /** config
         * schedule: "0 12 * * *"
         * timeout: 300s
         * retries: 3
         * debug: true
         * run_once: true
         * script_type: shell
         * transaction: true
         * env:
         *  APP_NAME: "test"
         *  API_KEY: "my-secret-key"
         * metadata:
         *  key: value
         */
        
  • Multiple Execution Engines:
    • Shell Engine: Execute shell scripts with environment variables and timeout control
    • JavaScript Engine: Run JavaScript code with Node.js-like environment (uses goja)
    • SQL Engine: Execute SQL scripts with transaction support
  • Source Providers: Flexible system for loading script content from different sources:
    • FileSystem Provider: Load scripts from local directories
    • Database Provider: Load scripts from database tables
  • Configurable Registry: Store and retrieve jobs with the in-memory registry
  • Runner: Orchestrates job discovery, task creation, and registration
  • Task Scheduling: Integration with cron-based schedulers for automated job execution
  • Scheduling Helpers: Derive upcoming run times and task scheduling metadata without re-parsing YAML
  • Robust Timeout Handling: Configure timeouts at both the engine and job level
  • Metadata-driven Configuration: Extract job configuration directly from script file comments
  • Customizable Logging: Pluggable logger interface for integration with existing logging frameworks
  • Error Handling: Configurable error handlers for task creation and execution failures
  • Extensible Architecture: Easily add new script types or execution engines

Installation

go get github.com/goliatone/go-job

Usage

Complete Example with Scheduler
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/goliatone/go-command/cron"
    "github.com/goliatone/go-job"
)

func main() {
    // Create a context that can be cancelled
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Set up signal handling for graceful shutdown
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigCh
        log.Println("Shutdown signal received, stopping job runner...")
        cancel()
    }()

    // Create a task creator with filesystem source provider
    taskCreator := job.NewTaskCreator(
        job.NewFileSystemSourceProvider("./scripts"),
        []job.Engine{
            job.NewShellRunner(
                job.WithShellTimeout(60*time.Second),
            ),
            job.NewJSRunner(
                job.WithJSTimeout(30*time.Second),
            ),
            job.NewSQLRunner(
                job.WithSQLDatabase("postgres", os.Getenv("DATABASE_DSN")),
                job.WithSQLTimeout(60*time.Second),
            ),
        },
    )

    // Create the job runner
    runner := job.NewRunner(
        job.WithTaskCreator(taskCreator),
    )

    // Start the job runner to discover and register tasks
    if err := runner.Start(ctx); err != nil {
        log.Fatalf("Failed to start job runner: %v", err)
    }

    // Get all discovered tasks
    tasks := runner.RegisteredTasks()
    log.Printf("Discovered %d tasks\n", len(tasks))
    for _, task := range tasks {
        schedule := job.TaskScheduleFromTask(task)
        log.Printf(
            "Task %s -> expression=%s run_once=%t retries=%d timeout=%s\n",
            task.GetID(), schedule.Expression, schedule.RunOnce, schedule.MaxRetries, schedule.Timeout,
        )
    }

    // Create a scheduler
    scheduler := cron.NewScheduler()

    // Register tasks with the scheduler
    for _, task := range tasks {
        _, err := scheduler.AddHandler(task.GetHandlerConfig().ToCommandConfig(), task.GetHandler())
        if err != nil {
            log.Printf("Failed to register task %s: %v\n", task.GetID(), err)
            continue
        }
        log.Printf("Registered task: %s\n", task.GetID())
    }

    // Start the scheduler
    if err := scheduler.Start(ctx); err != nil {
        log.Fatalf("Failed to start scheduler: %v", err)
    }

    // Wait for context cancellation (from signals)
    <-ctx.Done()

    // Graceful shutdown
    scheduler.Stop(ctx)
    runner.Stop(ctx)
    log.Println("Shutdown complete")
}
Dispatching Tasks Through go-command

TaskCommander lets you publish discovered tasks into a go-command router or dispatcher as Commander[*job.ExecutionMessage] handlers. Use the mux helper for flow/mux-driven dispatch and the dispatcher helper for bus-style dispatch.

package main

import (
	"context"

	"github.com/goliatone/go-command/dispatcher"
	"github.com/goliatone/go-command/router"
	"github.com/goliatone/go-job"
)

func main() {
	taskCreator := job.NewTaskCreator(
		job.NewFileSystemSourceProvider("./scripts"),
		[]job.Engine{job.NewShellRunner()},
	)

	tasks, _ := taskCreator.CreateTasks(context.Background())

	// Subscribe tasks to the mux (or dispatcher); keep the subscriptions to unsubscribe later.
	mux := router.NewMux()
	muxSubs := job.RegisterTasksWithMux(mux, tasks)
	defer func() { for _, s := range muxSubs { s.Unsubscribe() } }()

	dispatcherSubs := job.RegisterTasksWithDispatcher(tasks)
	defer func() { for _, s := range dispatcherSubs { s.Unsubscribe() } }()

	// Dispatch a message; TaskCommander will validate it and execute the task.
	dispatcher.Dispatch(context.Background(), &job.ExecutionMessage{
		JobID:      "my-script.sh",
		ScriptPath: "./scripts/my-script.sh",
	})

	// Or look up the mux pattern directly for flow-style routing:
	pattern := job.TaskCommandPattern(tasks[0])
	for _, entry := range mux.Get(pattern) {
		_ = entry.Handler.Execute(context.Background(), &job.ExecutionMessage{})
	}
}

Under the hood, TaskCommander wraps a job.Task, validates incoming ExecutionMessage payloads, and runs the task handler. Use this path when you need mux/dispatcher-driven execution instead of scheduler-driven execution.

ExecutionMessage validation & defaults

  • Required: job_id and script_path. TaskCommander/CompleteExecutionMessage will fill these from the task metadata, but if they remain empty the command fails fast with a validation error (text code JOB_EXEC_MSG_INVALID).
  • Defaults: parameters is normalized to an empty map, and dedup_policy defaults to ignore when unspecified so idempotency checks remain safe.
  • FSM correlation fields are additive and preserved by queue codecs/envelopes: machine_id, entity_id, execution_id, expected_state, expected_version, resume_event.
Queue Execution (Adapters + Worker)

go-job ships with a durable queue runtime under queue/:

  • queue interfaces and storage contract (queue.Storage).
  • queue/adapters/redis and queue/adapters/postgres implementations.
  • queue/worker runtime with ack/nack, retries, DLQ, cancellation hooks, lease heartbeat extension, and lifecycle hook metadata.
  • queue/idempotency shared idempotency stores for Redis/Postgres.
  • queue.StorageOutboxAdapter for orchestrator-style outbox dispatch loops (ClaimPending, MarkCompleted, MarkFailed).

Configuration sketch:

queue:
  backend: redis
  visibility_timeout: 60s
  dlq_enabled: true
worker:
  concurrency: 4
  idle_delay: 100ms
  retry:
    max_attempts: 10
    backoff: fixed
    interval: 500ms

Minimal wiring:

storage := queueRedis.NewStorage(redisClient, queueRedis.WithVisibilityTimeout(60*time.Second))
adapter := queueRedis.NewAdapter(storage)

worker := worker.NewWorker(adapter, worker.WithConcurrency(4))
_ = worker.RegisterAll(tasks)
_ = worker.Start(ctx)

_, _ = adapter.Enqueue(ctx, msg)
_, _ = adapter.EnqueueAfter(ctx, msg, 30*time.Second)

Outbox compatibility wiring:

outbox := queue.NewStorageOutboxAdapter(storage)
entries, _ := outbox.ClaimPending(ctx, "dispatcher-1", 100, 30*time.Second)
for _, entry := range entries {
	// dispatch entry.Message to your scheduler/executor
	_ = outbox.MarkCompleted(ctx, entry.ID, entry.LeaseToken)
}

Worker control/status surface:

_ = worker.Pause()
status := worker.Status() // status.Status == "paused"
_ = worker.Resume()
_ = worker.Stop(ctx)

Status values align with orchestrated execution expectations:

  • running: actively consuming queue deliveries.
  • paused: workers are alive but dequeue/dispatch is suspended.
  • stopped: worker has been stopped (or never started).
Basic Example (Manual Execution)
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/goliatone/go-job"
)

func main() {
    // Create a task creator with filesystem source provider
    taskCreator := job.NewTaskCreator(
        job.NewFileSystemSourceProvider("./scripts"),
        []job.Engine{
            job.NewShellRunner(job.WithShellTimeout(time.Minute)),
            job.NewJSRunner(job.WithJSTimeout(time.Minute)),
        },
    )

    // Create context
    ctx := context.Background()

    // Discover and create tasks
    tasks, err := taskCreator.CreateTasks(ctx)
    if err != nil {
        fmt.Printf("Error creating tasks: %v\n", err)
        return
    }

    // Execute a specific task
    for _, task := range tasks {
        if task.GetID() == "my-script.js" {
            handler := task.GetHandler()
            if err := handler(); err != nil {
                fmt.Printf("Error executing task: %v\n", err)
            }
            break
        }
    }
}
Creating a Script with Metadata
JavaScript Example
/** config
 * schedule: "0 */5 * * * *"  // Run every 5 minutes
 * timeout: 30s
 * retries: 2
 * run_once: false
 * env:
 *   API_KEY: "my-secret-key"
 *   DEBUG: "true"
 */

console.log("Starting job execution");

// Access environment variables
console.log(`API Key: ${API_KEY}`);

// Make HTTP requests using fetch
fetch("https://api.example.com/data")
  .then(response => response.json())
  .then(data => {
    console.log("Received data:", data);
  })
  .catch(error => {
    console.error("Error fetching data:", error);
  });
Shell Script Example
#!/bin/bash
# config
# schedule: "0 0 * * *"  # Run daily at midnight
# timeout: 120s
# retries: 3
# env:
#   DB_HOST: localhost
#   DB_USER: admin

echo "Running backup script"
pg_dump -h "$DB_HOST" -U "$DB_USER" my_database > /backups/backup-$(date +%Y%m%d).sql
SQL Script Example
-- config
-- schedule: "0 4 * * *"
-- timeout: 60s
-- transaction: true
-- metadata:
--   driver: postgres
--   dsn: postgres://user:password@localhost/mydb

-- This script will run in a transaction
INSERT INTO audit_log (event_type, description)
VALUES ('DAILY_CLEANUP', 'Removing old records');

DELETE FROM temporary_data
WHERE created_at < NOW() - INTERVAL '30 days';
Using Database Source Provider
package main

import (
    "context"
    "database/sql"
    "log"
    "time"

    "github.com/goliatone/go-job"
    _ "github.com/lib/pq"
)

func main() {
    // Connect to database
    db, err := sql.Open("postgres", "postgres://user:pass@localhost/jobs")
    if err != nil {
        log.Fatalf("Failed to connect to database: %v", err)
    }
    defer db.Close()

    // Create database source provider
    // Expects a table with columns: path (string), content (bytea/text)
    // Defaults to PostgreSQL-style placeholders ($1, $2, ...)
    dbProvider := job.NewDBSourceProvider(db, "scripts")

    // Override placeholder style for drivers that use '?' (e.g. SQLite/MySQL)
    // dbProvider.WithPlaceholder(job.SQLQuestionPlaceholder)

    // Create task creator with database provider
    taskCreator := job.NewTaskCreator(
        dbProvider,
        []job.Engine{
            job.NewShellRunner(job.WithShellTimeout(time.Minute)),
            job.NewJSRunner(job.WithJSTimeout(time.Minute)),
        },
    )

    // Discover and create tasks from database
    tasks, err := taskCreator.CreateTasks(context.Background())
    if err != nil {
        log.Fatalf("Error creating tasks: %v", err)
    }

    log.Printf("Discovered %d tasks from database\n", len(tasks))
}
Executing a Job Manually with Engine
package main

import (
    "context"
    "fmt"
    "os"
    "time"

    "github.com/goliatone/go-job"
)

func main() {
    // Create a shell engine
    shellEngine := job.NewShellRunner(
        job.WithShellTimeout(time.Minute),
    )

    // Read script content
    content, err := os.ReadFile("./scripts/backup.sh")
    if err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    // Parse metadata and script content
    config, scriptContent, err := job.NewYAMLMetadataParser().Parse(content)
    if err != nil {
        fmt.Printf("Error parsing metadata: %v\n", err)
    }

    // Create execution message
    msg := &job.ExecutionMessage{
        JobID:      "backup.sh",
        ScriptPath: "./scripts/backup.sh",
        Config:     config,
        Parameters: map[string]interface{}{
            "script": scriptContent,
        },
    }

    // Execute the script
    ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
    defer cancel()

    if err := shellEngine.Execute(ctx, msg); err != nil {
        fmt.Printf("Error executing script: %v\n", err)
        return
    }

    fmt.Println("Script executed successfully")
}
Payload Envelope & Context

Use job.Envelope to standardize payloads with actor/scope metadata and an optional idempotency key. Helpers enforce size limits and validation:

env := job.Envelope{
    Actor: &job.Actor{ID: "user-123", Role: "admin"},
    Scope: job.Scope{TenantID: "acme"},
    Params: map[string]any{"export_id": 42},
    IdempotencyKey: "export-42",
}

payload, _ := job.EncodeEnvelope(env)      // JSON with size guard
decoded, _ := job.DecodeEnvelope(payload)  // round-trips with validation

Optional go-auth adapter (build with -tags goauth) can attach/extract actor context:

adapter := job.GoAuthAdapter{}
env := adapter.AttachActor(ctx, job.Envelope{Params: params})
ctx = adapter.InjectActor(ctx, env)
Result Metadata

Small execution results can be captured and stored via job.Result with size-guarded helpers:

res := job.Result{Status: "success", Message: "done", Size: 512, Duration: time.Second}
payload, _ := job.EncodeResult(res)          // JSON with max-size guard
decoded, _ := job.DecodeResult(payload)
runner.SetResult("job-id", decoded)          // persists in registry (memory by default)
stored, _ := runner.GetResult("job-id")      // retrieve for UIs/history
Idempotency / Deduplication

ExecutionMessage supports idempotency keys and dedup policies (drop|merge|replace|ignore) enforced by TaskCommander:

cmd := job.NewTaskCommander(task)
msg := &job.ExecutionMessage{
    JobID:          task.GetID(),
    ScriptPath:     task.GetPath(),
    IdempotencyKey: "export-42",
    DedupPolicy:    job.DedupPolicyDrop,
}
// second call with same key returns ErrIdempotentDrop when policy=drop
_ = cmd.Execute(ctx, msg)
Retry/Backoff Profiles

Configure retries per job with fixed or exponential backoff and optional jitter:

cfg := job.Config{
    Retries: 2,
    Backoff: job.BackoffConfig{
        Strategy:    job.BackoffExponential,
        Interval:    100 * time.Millisecond,
        MaxInterval: time.Second,
        Jitter:      true,
    },
}

task := job.NewBaseTask("id", "/tmp/script.sh", "shell", cfg, "echo hi", engine)
cmd := job.NewTaskCommander(task)
_ = cmd.Execute(ctx, &job.ExecutionMessage{JobID: task.GetID(), ScriptPath: task.GetPath()})

Configuration Options

Common Configuration Options
Option Description Default
schedule Cron expression for scheduling * * * * *
timeout Maximum execution time 1 minute
no_timeout Disable execution timeout false
retries Number of retry attempts 0
debug Enable debug mode false
run_once Run job only once false
script_type Override script type detection Auto-detected
env Environment variables for execution {}
metadata Additional metadata for engines {}
Engine-Specific Options
SQL Engine
Option Description
transaction Execute SQL in a transaction
driver SQL driver name (in metadata)
dsn Data source name (in metadata)
Shell Engine
Option Description
use_env Pass system environment variables (in metadata)

Advanced Features

Custom Logger

Integrate your own logging framework by implementing the Logger interface:

type Logger interface {
    Debug(format string, args ...any)
    Info(format string, args ...any)
    Warn(format string, args ...any)
    Error(format string, args ...any)
}

Example with a custom logger:

taskCreator := job.NewTaskCreator(provider, engines).
    WithLogger(myCustomLogger)
Custom Error Handling

Configure custom error handlers for task creation failures:

taskCreator := job.NewTaskCreator(provider, engines).
    WithErrorHandler(func(task job.Task, err error) {
        if task != nil {
            log.Printf("Task %s failed: %v", task.GetID(), err)
        } else {
            log.Printf("Task creation failed: %v", err)
        }
    })
Runner Configuration

The Runner orchestrates job discovery and task registration:

runner := job.NewRunner(
    job.WithTaskCreator(taskCreator),
    job.WithRegistry(customRegistry),
    job.WithErrorHandler(customErrorHandler),
)

Architecture

go-job uses a modular architecture with several key components:

  • Runner: Orchestrates the job discovery, task creation, and registration process
  • Engines: Execute specific script types (Shell, JavaScript, SQL)
  • Registry: Stores and manages task definitions (in-memory implementation provided)
  • MetadataParser: Extracts configuration from script file comments (supports YAML front matter)
  • SourceProvider: Loads script content from various sources (filesystem or database)
  • TaskCreator: Discovers scripts and creates task instances using appropriate engines
  • Task: Represents a schedulable job with its configuration, handler, and execution context

License

MIT

Scheduling Helpers

go-job exposes utilities to inspect and utilise scheduling metadata without re-reading script files.

package main

import (
    "fmt"
    "time"

    "github.com/goliatone/go-job"
)

func main() {
    // Compute the next execution time for a cron expression using the same parser
    next, err := job.NextRun("0 */2 * * *", time.Now())
    if err != nil {
        panic(err)
    }
    fmt.Println("Next run:", next)

    // Enable second-level precision when required
    nextSeconds, _ := job.NextRun("*/15 * * * * *", time.Now(), job.WithSecondsPrecision())
    fmt.Println("Next run (seconds precision):", nextSeconds)

    // Extract run semantics from a task configuration
    cfg := job.Config{Schedule: "0 12 * * *", RunOnce: true, Retries: 2}
    schedule := job.NewTaskSchedule(cfg)
    fmt.Printf("Expression=%s RunOnce=%t Retries=%d\n", schedule.Expression, schedule.RunOnce, schedule.MaxRetries)
}

Documentation

Index

Constants

View Source
const (
	// DefaultEnvelopeMaxBytes caps encoded envelope size unless overridden via options.
	DefaultEnvelopeMaxBytes = 64 * 1024
	// MaxIdempotencyKeyLength enforces sane limits on idempotency keys.
	MaxIdempotencyKeyLength = 256
)
View Source
const (
	// DefaultResultMaxBytes caps encoded result size unless overridden.
	DefaultResultMaxBytes = 32 * 1024
)

Variables

View Source
var (
	// DefaultTimeout is used to setup the default timeout for tasks
	DefaultTimeout  = time.Minute
	DefaultSchedule = "* * * * *"
)
View Source
var DefaultMatchPatterns = []MatchPattern{
	{
		Name:          "yaml",
		StartPattern:  `^---\s*$`,
		EndPattern:    `^---\s*$`,
		CommentPrefix: "",
	},
	{
		Name:          "javascript",
		StartPattern:  `^/{2,}\s*config`,
		EndPattern:    `^(?!/{2,})`,
		CommentPrefix: "//",
		IsBlock:       false,
	},
	{
		Name:          "javascript_block",
		StartPattern:  `^/\*\*\s*config(.*)$`,
		EndPattern:    `^\*/`,
		CommentPrefix: "*",
		IsBlock:       true,
	},
	{
		Name:          "shell",
		StartPattern:  `^#{1,}\s*config`,
		EndPattern:    `^(?!#{1,})`,
		CommentPrefix: "#",
		IsBlock:       false,
	},
	{
		Name:          "sql",
		StartPattern:  `^-{2,}\s*config`,
		EndPattern:    `^(?!-{2,})`,
		CommentPrefix: "--",
		IsBlock:       false,
	},
}
View Source
var (
	ErrConcurrencyLimit = errors.New("concurrency limit reached", errors.CategoryRateLimit).
		WithCode(errors.CodeTooManyRequests)
)
View Source
var (
	ErrIdempotentDrop = errors.New("job dropped due to idempotency policy", errors.CategoryConflict).
		WithCode(errors.CodeConflict)
)
View Source
var (
	ErrQuotaExceeded = errors.New("quota exceeded", errors.CategoryRateLimit).
		WithCode(errors.CodeTooManyRequests)
)
View Source
var ErrScriptTooLarge = errors.New("script exceeds maximum size limit")

Functions

func DefaultTaskIDProvider added in v0.7.0

func DefaultTaskIDProvider(scriptPath string) string

DefaultTaskIDProvider preserves the existing behaviour of using the filename as the task ID.

func EncodeEnvelope added in v0.11.0

func EncodeEnvelope(env Envelope, opts ...EnvelopeOption) ([]byte, error)

EncodeEnvelope marshals the envelope to JSON applying validation, sanitization, and size limits.

func EncodeResult added in v0.11.0

func EncodeResult(res Result, opts ...ResultOption) ([]byte, error)

EncodeResult marshals the result with validation and size checks.

func NewMemoryRegistry

func NewMemoryRegistry() *memoryRegistry

func NewTaskCreator

func NewTaskCreator(provider SourceProvider, engines []Engine) *taskCreator

func NewTerminalError added in v0.15.0

func NewTerminalError(code TerminalErrorCode, reason string, err error) error

NewTerminalError constructs a non-retryable error marker.

func NewYAMLMetadataParser

func NewYAMLMetadataParser(patterns ...MatchPattern) *yamlMetadataParser

func NextRun added in v0.7.0

func NextRun(expression string, after time.Time, opts ...SchedulerOption) (time.Time, error)

NextRun returns the next execution time for the provided cron expression using the same parser configuration as the embedded scheduler utilities.

func RegisterTasksWithDispatcher added in v0.10.0

func RegisterTasksWithDispatcher(tasks []Task) []dispatcher.Subscription

RegisterTasksWithDispatcher subscribes tasks into the go-command dispatcher. Returns subscriptions so callers can manage lifecycle.

func RegisterTasksWithMux added in v0.9.0

func RegisterTasksWithMux(mux *router.Mux, tasks []Task) []router.Subscription

RegisterTasksWithMux registers tasks as commanders on the provided mux and returns subscriptions for later teardown.

func SQLQuestionPlaceholder added in v0.8.0

func SQLQuestionPlaceholder(int) string

SQLQuestionPlaceholder returns the standard question-mark placeholder used by drivers like SQLite or MySQL.

func SetupFetch added in v0.4.0

func SetupFetch(vm *goja.Runtime) error

SetupFetch preserves the previous public API and wires fetch to a background context.

func SetupFetchWithContext added in v0.9.0

func SetupFetchWithContext(ctx context.Context, vm *goja.Runtime) error

SetupFetchWithContext binds a fetch implementation to the provided context so requests are cancelled when the parent execution context is done.

func TaskCommandPattern added in v0.9.0

func TaskCommandPattern(task Task) string

TaskCommandPattern builds a mux pattern for the task commander.

func TestComputeBackoffDelay added in v0.11.0

func TestComputeBackoffDelay(attempt int, cfg BackoffConfig) time.Duration

TestComputeBackoffDelay exposes backoff calculation for tests.

func TestSetBackoffRand added in v0.11.0

func TestSetBackoffRand(r *rand.Rand) func()

TestSetBackoffRand replaces the jitter random source. Returns a restore func.

func TestSetBackoffSleep added in v0.11.0

func TestSetBackoffSleep(sleeper func(context.Context, time.Duration) error) func()

TestSetBackoffSleep replaces the sleeper used during retries. Returns a restore func.

Types

type Actor added in v0.11.0

type Actor struct {
	ID             string            `json:"id,omitempty"`
	Subject        string            `json:"subject,omitempty"`
	Role           string            `json:"role,omitempty"`
	ResourceRoles  map[string]string `json:"resource_roles,omitempty"`
	Metadata       map[string]any    `json:"metadata,omitempty"`
	ImpersonatorID string            `json:"impersonator_id,omitempty"`
	IsImpersonated bool              `json:"is_impersonated,omitempty"`
}

Actor captures who initiated the job.

type ActorAuthenticator added in v0.11.0

type ActorAuthenticator interface {
	ActorFromContext(ctx context.Context) (any, bool)
	WithActorContext(ctx context.Context, actor any) context.Context
}

ActorAuthenticator abstracts actor context extraction/injection so go-auth is optional.

type BackoffConfig added in v0.11.0

type BackoffConfig struct {
	Strategy    BackoffStrategy `json:"strategy" yaml:"strategy"`
	Interval    time.Duration   `json:"interval" yaml:"interval"`
	MaxInterval time.Duration   `json:"max_interval" yaml:"max_interval"`
	Jitter      bool            `json:"jitter" yaml:"jitter"`
}

BackoffConfig configures retry timing.

type BackoffStrategy added in v0.11.0

type BackoffStrategy string
const (
	BackoffNone        BackoffStrategy = "none"
	BackoffFixed       BackoffStrategy = "fixed"
	BackoffExponential BackoffStrategy = "exponential"
)

type BaseEngine

type BaseEngine struct {
	FileExtensions []string
	Timeout        time.Duration
	MetadataParser MetadataParser
	FS             fs.FS
	SourceProvider SourceProvider
	EngineType     string
	Self           Engine
	// contains filtered or unexported fields
}

func NewBaseEngine

func NewBaseEngine(self Engine, engingeType string, exts ...string) *BaseEngine

func (*BaseEngine) CanHandle

func (e *BaseEngine) CanHandle(path string) bool

CanHandle checks if this engine can process the given file based on its extension

func (*BaseEngine) GetExecutionContext

func (e *BaseEngine) GetExecutionContext(ctx context.Context) (context.Context, context.CancelFunc)

func (*BaseEngine) GetExecutionTimeout

func (e *BaseEngine) GetExecutionTimeout(ctx context.Context) time.Duration

func (*BaseEngine) GetScriptContent

func (e *BaseEngine) GetScriptContent(msg *ExecutionMessage) (string, error)

func (*BaseEngine) Name

func (e *BaseEngine) Name() string

Name returns the engine identifier

func (*BaseEngine) ParseJob

func (e *BaseEngine) ParseJob(path string, content []byte) (Task, error)

ParseJob extracts metadata and content from a job script file

func (*BaseEngine) SetLogger added in v0.7.0

func (e *BaseEngine) SetLogger(logger Logger)

SetLogger replaces the engine logger, falling back to the default provider when nil.

func (*BaseEngine) SetLoggerProvider added in v0.7.0

func (e *BaseEngine) SetLoggerProvider(provider LoggerProvider)

SetLoggerProvider swaps the underlying logger provider used by the engine.

func (*BaseEngine) SetTaskIDProvider added in v0.7.0

func (e *BaseEngine) SetTaskIDProvider(provider TaskIDProvider)

SetTaskIDProvider allows engines to override the default ID generation strategy.

type BasicQuotaChecker added in v0.11.0

type BasicQuotaChecker struct {
	PayloadSizeLimit int
	MaxRetries       int
}

BasicQuotaChecker enforces payload size and retry count limits.

func (BasicQuotaChecker) Check added in v0.11.0

type ConcurrencyLimiter added in v0.11.0

type ConcurrencyLimiter struct {
	// contains filtered or unexported fields
}

ConcurrencyLimiter enforces per-key concurrency limits.

func NewConcurrencyLimiter added in v0.11.0

func NewConcurrencyLimiter() *ConcurrencyLimiter

func (*ConcurrencyLimiter) Acquire added in v0.11.0

func (c *ConcurrencyLimiter) Acquire(msg *ExecutionMessage, limit int) (func(), error)

Acquire reserves a slot for the given message respecting the limit. Returns a release func.

func (*ConcurrencyLimiter) WithScopeExtractor added in v0.11.0

func (c *ConcurrencyLimiter) WithScopeExtractor(fn func(*ExecutionMessage) string) *ConcurrencyLimiter

WithScopeExtractor sets a callback to derive scope keys (e.g., tenant) for per-scope limits.

type Config

type Config struct {
	Schedule       string            `yaml:"schedule" json:"schedule"`
	Retries        int               `yaml:"retries" json:"retries"`
	Timeout        time.Duration     `yaml:"duration" json:"duration"`
	Deadline       time.Time         `yaml:"deadline" json:"deadline"`
	NoTimeout      bool              `yaml:"no_timeout" json:"no_timeout"`
	Debug          bool              `yaml:"debug" json:"debug"`
	RunOnce        bool              `yaml:"run_once" json:"run_once"`
	MaxRuns        int               `yaml:"max_runs" json:"max_runs"`
	ExitOnError    bool              `yaml:"exit_on_error" json:"exit_on_error"`
	ScriptType     string            `yaml:"script_type" json:"script_type"`
	Transaction    bool              `yaml:"transaction" json:"transaction"`
	Metadata       map[string]any    `yaml:"metadata" json:"metadata"`
	Env            map[string]string `yaml:"env" json:"env"`
	Backoff        BackoffConfig     `yaml:"backoff" json:"backoff"`
	MaxConcurrency int               `yaml:"max_concurrency" json:"max_concurrency"`
}

handler options Deadline time.Time `json:"deadline"` MaxRetries int `json:"max_retries"` MaxRuns int `json:"max_runs"` RunOnce bool `json:"run_once"`

type CronManager added in v0.11.0

type CronManager struct {
	// contains filtered or unexported fields
}

CronManager provides runtime CRUD and reconciliation for cron schedules.

func NewCronManager added in v0.11.0

func NewCronManager(registry Registry, scheduler cronScheduler) *CronManager

NewCronManager wires schedule management against a task registry and a cron scheduler.

func (*CronManager) Delete added in v0.11.0

func (m *CronManager) Delete(ctx context.Context, id string) error

Delete removes a schedule and unsubscribes it from the scheduler.

func (*CronManager) List added in v0.11.0

func (m *CronManager) List() []ScheduleDefinition

List returns a copy of registered schedules.

func (*CronManager) Reconcile added in v0.11.0

func (m *CronManager) Reconcile(ctx context.Context, desired []ScheduleDefinition) (ReconcileResult, error)

Reconcile aligns current schedules with the desired set, adding, updating, and removing as needed.

func (*CronManager) Register added in v0.11.0

func (m *CronManager) Register(ctx context.Context, def ScheduleDefinition) error

Register registers a new cron schedule; returns an error if the ID already exists.

func (*CronManager) Update added in v0.11.0

func (m *CronManager) Update(ctx context.Context, def ScheduleDefinition) error

Update replaces an existing schedule in-place.

func (*CronManager) WithConcurrencyLimiter added in v0.11.0

func (m *CronManager) WithConcurrencyLimiter(limiter *ConcurrencyLimiter) *CronManager

WithConcurrencyLimiter overrides the limiter used for scheduled runs.

func (*CronManager) WithIdempotencyTracker added in v0.11.0

func (m *CronManager) WithIdempotencyTracker(tracker *IdempotencyTracker) *CronManager

WithIdempotencyTracker overrides the tracker used for scheduled runs.

func (*CronManager) WithQuotaChecker added in v0.11.0

func (m *CronManager) WithQuotaChecker(qc QuotaChecker) *CronManager

WithQuotaChecker overrides quota enforcement for scheduled runs.

type DBSourceProvider added in v0.4.0

type DBSourceProvider struct {
	Table string
	DB    *sql.DB
	// contains filtered or unexported fields
}

func NewDBSourceProvider added in v0.4.0

func NewDBSourceProvider(db *sql.DB, table string) *DBSourceProvider

func (*DBSourceProvider) GetScript added in v0.4.0

func (p *DBSourceProvider) GetScript(path string) ([]byte, error)

func (*DBSourceProvider) ListScripts added in v0.4.0

func (p *DBSourceProvider) ListScripts(ctx context.Context) ([]ScriptInfo, error)

func (*DBSourceProvider) WithPlaceholder added in v0.8.0

func (p *DBSourceProvider) WithPlaceholder(fn func(int) string) *DBSourceProvider

WithPlaceholder overrides the SQL placeholder generator used in parameterised queries.

type DeduplicationPolicy added in v0.11.0

type DeduplicationPolicy string
const (
	DedupPolicyIgnore  DeduplicationPolicy = "ignore"
	DedupPolicyDrop    DeduplicationPolicy = "drop"
	DedupPolicyMerge   DeduplicationPolicy = "merge"
	DedupPolicyReplace DeduplicationPolicy = "replace"
)

type Engine

type Engine interface {
	Name() string
	ParseJob(path string, content []byte) (Task, error)
	CanHandle(path string) bool
	Execute(ctx context.Context, msg *ExecutionMessage) error
}

type Envelope added in v0.11.0

type Envelope struct {
	Actor           *Actor         `json:"actor,omitempty"`
	Scope           Scope          `json:"scope,omitempty"`
	Params          map[string]any `json:"params,omitempty"`
	IdempotencyKey  string         `json:"idempotency_key,omitempty"`
	RawContentBytes int            `json:"-"`
}

Envelope wraps the payload passed to job handlers with optional actor/scope metadata and an idempotency key for upstream deduplication.

func DecodeEnvelope added in v0.11.0

func DecodeEnvelope(data []byte, opts ...EnvelopeOption) (Envelope, error)

DecodeEnvelope unmarshals JSON data into an Envelope, enforcing size limits and validation.

func (Envelope) EnvelopeActor added in v0.14.0

func (env Envelope) EnvelopeActor() any

EnvelopeActor returns the actor metadata for envelope codecs.

func (Envelope) EnvelopeIdempotencyKey added in v0.14.0

func (env Envelope) EnvelopeIdempotencyKey() string

EnvelopeIdempotencyKey returns the idempotency key for envelope codecs.

func (Envelope) EnvelopeParams added in v0.14.0

func (env Envelope) EnvelopeParams() map[string]any

EnvelopeParams returns params for envelope codecs.

func (Envelope) EnvelopeScope added in v0.14.0

func (env Envelope) EnvelopeScope() any

EnvelopeScope returns the scope metadata for envelope codecs.

func (*Envelope) SetEnvelopeParams added in v0.14.0

func (env *Envelope) SetEnvelopeParams(params map[string]any)

SetEnvelopeParams updates params after sanitization.

func (*Envelope) SetEnvelopeRawContentBytes added in v0.14.0

func (env *Envelope) SetEnvelopeRawContentBytes(size int)

SetEnvelopeRawContentBytes stores the decoded payload size.

func (Envelope) Validate added in v0.11.0

func (env Envelope) Validate() error

Validate enforces basic constraints on the envelope fields.

type EnvelopeCodec added in v0.14.0

type EnvelopeCodec interface {
	Encode(value any) ([]byte, error)
	Decode(data []byte, value any) error
}

EnvelopeCodec encodes and decodes envelope payloads.

type EnvelopeOption added in v0.11.0

type EnvelopeOption func(*envelopeConfig)

EnvelopeOption customizes encode/decode behaviour.

func WithEnvelopeMaxBytes added in v0.11.0

func WithEnvelopeMaxBytes(limit int) EnvelopeOption

WithEnvelopeMaxBytes sets the maximum allowed encoded size in bytes.

func WithEnvelopeSanitizer added in v0.11.0

func WithEnvelopeSanitizer(fn EnvelopeSanitizer) EnvelopeOption

WithEnvelopeSanitizer applies a sanitizer to Params before encoding/after decoding.

type EnvelopeParamsSetter added in v0.14.0

type EnvelopeParamsSetter interface {
	SetEnvelopeParams(map[string]any)
}

EnvelopeParamsSetter allows codecs to apply sanitized params.

type EnvelopePayload added in v0.14.0

type EnvelopePayload interface {
	EnvelopeActor() any
	EnvelopeScope() any
	EnvelopeIdempotencyKey() string
	EnvelopeParams() map[string]any
}

EnvelopePayload exposes envelope metadata without tying callers to a struct type.

type EnvelopeRawContentSetter added in v0.14.0

type EnvelopeRawContentSetter interface {
	SetEnvelopeRawContentBytes(int)
}

EnvelopeRawContentSetter allows codecs to capture decoded size.

type EnvelopeSanitizer added in v0.11.0

type EnvelopeSanitizer func(map[string]any) map[string]any

EnvelopeSanitizer allows callers to scrub params before encoding or after decoding.

type EnvelopeValidator added in v0.14.0

type EnvelopeValidator interface {
	Validate() error
}

EnvelopeValidator allows envelopes to provide custom validation.

type ExecutionMessage

type ExecutionMessage struct {
	// JobID identifies the task to run. Filled from Task.GetID() when using TaskCommander/CompleteExecutionMessage.
	JobID string `json:"job_id" yaml:"job_id"`
	// ScriptPath is the filesystem path to the script. Filled from Task.GetPath() when using TaskCommander/CompleteExecutionMessage.
	ScriptPath string `json:"script_path" yaml:"script_path"`
	// Canonical FSM correlation fields used by durable orchestrator execution.
	MachineID       string `json:"machine_id,omitempty" yaml:"machine_id,omitempty"`
	EntityID        string `json:"entity_id,omitempty" yaml:"entity_id,omitempty"`
	ExecutionID     string `json:"execution_id,omitempty" yaml:"execution_id,omitempty"`
	ExpectedState   string `json:"expected_state,omitempty" yaml:"expected_state,omitempty"`
	ExpectedVersion int64  `json:"expected_version,omitempty" yaml:"expected_version,omitempty"`
	ResumeEvent     string `json:"resume_event,omitempty" yaml:"resume_event,omitempty"`
	Config          Config `json:"config" yaml:"config"`
	// Parameters carries runtime inputs. Defaults to an empty map to avoid nil dereferences when normalized.
	Parameters     map[string]any `json:"parameters" yaml:"parameters"`
	IdempotencyKey string         `json:"idempotency_key" yaml:"idempotency_key"`
	// DedupPolicy determines how idempotency keys are handled. Defaults to ignore when left empty.
	DedupPolicy    DeduplicationPolicy         `json:"dedup_policy" yaml:"dedup_policy"`
	Result         *Result                     `json:"result,omitempty" yaml:"result,omitempty"`
	OutputCallback func(stdout, stderr string) `json:"-" yaml:"-"`
}

ExecutionMessage represents a request to execute a job script. Required fields: JobID and ScriptPath (either provided by the caller or by the Task metadata). Optional fields: Config, Parameters, IdempotencyKey, DedupPolicy, Result, and OutputCallback.

func BuildExecutionMessageForTask added in v0.9.0

func BuildExecutionMessageForTask(task Task, params map[string]any) (*ExecutionMessage, error)

BuildExecutionMessageForTask returns an ExecutionMessage populated with task metadata and cached script content, avoiding re-reading from providers.

func CompleteExecutionMessage added in v0.9.0

func CompleteExecutionMessage(task Task, msg *ExecutionMessage) (*ExecutionMessage, error)

CompleteExecutionMessage merges the provided message (which may already have overrides) with task defaults and cached script content.

func (ExecutionMessage) Type

func (msg ExecutionMessage) Type() string

Type returns the message type for the command system

func (ExecutionMessage) Validate

func (msg ExecutionMessage) Validate() error

Validate ensures the message contains required fields.

type ExecutionMessageBuilder added in v0.9.0

type ExecutionMessageBuilder interface {
	BuildExecutionMessage(params map[string]any) (*ExecutionMessage, error)
}

ExecutionMessageBuilder builds a message with cached script content.

type FetchOptions

type FetchOptions struct {
	Method  string            `json:"method"`
	Headers map[string]string `json:"headers"`
	Body    any               `json:"body"`
	Timeout int               `json:"timeout"` //milliseconds
}

FetchOptions represents the options for the fetch function

type FetchResponse

type FetchResponse struct {
	Status     int                 `json:"status"`
	StatusText string              `json:"statusText"`
	Headers    map[string][]string `json:"headers"`
	URL        string              `json:"url"`
	Body       []byte              `json:"-"`
}

FetchResponse represents the response from a fetch call

type FieldsLogger added in v0.7.0

type FieldsLogger interface {
	WithFields(fields map[string]any) Logger
}

FieldsLogger allows attaching persistent structured key/value pairs to a logger.

type FileSystemSourceProvider

type FileSystemSourceProvider struct {
	// contains filtered or unexported fields
}

func NewFileSystemSourceProvider

func NewFileSystemSourceProvider(rootDir string, fss ...fs.FS) *FileSystemSourceProvider

func (*FileSystemSourceProvider) GetScript

func (p *FileSystemSourceProvider) GetScript(path string) ([]byte, error)

func (*FileSystemSourceProvider) ListScripts

func (p *FileSystemSourceProvider) ListScripts(ctx context.Context) ([]ScriptInfo, error)

func (*FileSystemSourceProvider) WithIgnoreGlobs added in v0.9.0

func (p *FileSystemSourceProvider) WithIgnoreGlobs(patterns ...string) *FileSystemSourceProvider

WithIgnoreGlobs skips files or directories matching any glob pattern (filepath.Match semantics). Patterns are matched against paths relative to rootDir, using "/" separators.

func (*FileSystemSourceProvider) WithIgnorePaths added in v0.9.0

func (p *FileSystemSourceProvider) WithIgnorePaths(paths ...string) *FileSystemSourceProvider

WithIgnorePaths skips exact relative paths (files or directories) during discovery.

func (*FileSystemSourceProvider) WithMaxFileSize added in v0.7.0

func (p *FileSystemSourceProvider) WithMaxFileSize(limit int64) *FileSystemSourceProvider

type GoAuthAdapter added in v0.11.0

type GoAuthAdapter struct {
	Sanitizer        EnvelopeSanitizer
	Authenticator    ActorAuthenticator
	MapAuthActor     func(src any) (*Actor, Scope, error)
	BuildAuthContext func(env Envelope) any
}

GoAuthAdapter bridges an ActorAuthenticator into Envelope handling. MapAuthActor converts authenticator-specific actor contexts into our Actor/Scope. BuildAuthContext maps Envelope actor/scope back to the authenticator context shape.

func (GoAuthAdapter) AttachActor added in v0.11.0

func (a GoAuthAdapter) AttachActor(ctx context.Context, env Envelope) Envelope

AttachActor fills the envelope with actor/scope metadata from the authenticator context when missing.

func (GoAuthAdapter) InjectActor added in v0.11.0

func (a GoAuthAdapter) InjectActor(ctx context.Context, env Envelope) context.Context

InjectActor writes the envelope actor/scope metadata back into the authenticator context for downstream consumers.

type HandlerOptions added in v0.9.0

type HandlerOptions struct {
	command.HandlerConfig
	ExitOnError bool `json:"exit_on_error" yaml:"exit_on_error"`
}

HandlerOptions extends command.HandlerConfig with exit-on-error semantics and implements the getter interfaces expected by go-command runner configurators.

func (HandlerOptions) GetDeadline added in v0.9.0

func (h HandlerOptions) GetDeadline() time.Time

GetDeadline satisfies runner.DeadlineGetter.

func (HandlerOptions) GetExitOnError added in v0.9.0

func (h HandlerOptions) GetExitOnError() bool

GetExitOnError satisfies runner.ExitOnErrorGetter.

func (HandlerOptions) GetExpression added in v0.9.0

func (h HandlerOptions) GetExpression() string

GetExpression exposes the cron expression for schedulers.

func (HandlerOptions) GetMaxRetries added in v0.9.0

func (h HandlerOptions) GetMaxRetries() int

GetMaxRetries satisfies runner.MaxRetriesGetter.

func (HandlerOptions) GetMaxRuns added in v0.9.0

func (h HandlerOptions) GetMaxRuns() int

GetMaxRuns satisfies runner.MaxRunsGetter.

func (HandlerOptions) GetNoTimeout added in v0.9.0

func (h HandlerOptions) GetNoTimeout() bool

GetNoTimeout satisfies runner.NoTimeoutGetter when present.

func (HandlerOptions) GetRunOnce added in v0.9.0

func (h HandlerOptions) GetRunOnce() bool

GetRunOnce satisfies runner.RunOnceGetter.

func (HandlerOptions) GetTimeout added in v0.9.0

func (h HandlerOptions) GetTimeout() time.Duration

GetTimeout satisfies runner.TimeoutGetter.

func (HandlerOptions) ToCommandConfig added in v0.9.0

func (h HandlerOptions) ToCommandConfig() command.HandlerConfig

ToCommandConfig returns the embedded command.HandlerConfig.

type IdempotencyTracker added in v0.11.0

type IdempotencyTracker struct {
	// contains filtered or unexported fields
}

IdempotencyTracker tracks idempotency keys to enforce deduplication policies.

func NewIdempotencyTracker added in v0.11.0

func NewIdempotencyTracker() *IdempotencyTracker

func (*IdempotencyTracker) AfterExecute added in v0.11.0

func (t *IdempotencyTracker) AfterExecute(key string, policy DeduplicationPolicy, execErr error)

func (*IdempotencyTracker) BeforeExecute added in v0.11.0

func (t *IdempotencyTracker) BeforeExecute(key string, policy DeduplicationPolicy) (dedupDecision, error)

type JSEngine

type JSEngine struct {
	*BaseEngine
	// contains filtered or unexported fields
}

func NewJSRunner

func NewJSRunner(opts ...JSOption) *JSEngine

func (*JSEngine) Execute

func (e *JSEngine) Execute(ctx context.Context, msg *ExecutionMessage) error

Execute runs a JavaScript file in a Node-like environment using goja_nodejs' eventloop.

func (*JSEngine) SetTaskIDProvider added in v0.7.0

func (e *JSEngine) SetTaskIDProvider(provider TaskIDProvider)

SetTaskIDProvider overrides the ID derivation strategy for tasks parsed by the JS engine.

type JSONEnvelopeCodec added in v0.14.0

type JSONEnvelopeCodec struct {
	// contains filtered or unexported fields
}

JSONEnvelopeCodec marshals envelopes using JSON with size limits and sanitization.

func NewJSONEnvelopeCodec added in v0.14.0

func NewJSONEnvelopeCodec(opts ...EnvelopeOption) *JSONEnvelopeCodec

NewJSONEnvelopeCodec builds a JSON codec configured by envelope options.

func (*JSONEnvelopeCodec) Decode added in v0.14.0

func (c *JSONEnvelopeCodec) Decode(data []byte, value any) error

Decode unmarshals an envelope, enforcing size limits and sanitization.

func (*JSONEnvelopeCodec) Encode added in v0.14.0

func (c *JSONEnvelopeCodec) Encode(value any) ([]byte, error)

Encode marshals the envelope, applying sanitization and size limits.

type JSOption

type JSOption func(*JSEngine)

func WithJSExtension

func WithJSExtension(ext string) JSOption

func WithJSFS

func WithJSFS(dirfs fs.FS) JSOption

WithJSFS sets the default execution timeout

func WithJSLogger added in v0.3.0

func WithJSLogger(logger Logger) JSOption

func WithJSMetadataParser

func WithJSMetadataParser(parser MetadataParser) JSOption

WithJSMetadataParser sets a custom metadata parser

func WithJSModuleLoader

func WithJSModuleLoader(loader func(path string) ([]byte, error)) JSOption

func WithJSPanicHandler

func WithJSPanicHandler(handler func(funcName string, fields ...map[string]any)) JSOption

func WithJSPathResolver

func WithJSPathResolver(resolver func(base, path string) string) JSOption

func WithJSTimeout

func WithJSTimeout(timeout time.Duration) JSOption

WithJSTimeout sets the default execution timeout

type LogLevel added in v0.7.0

type LogLevel int

LogLevel represents the minimum severity the standard logger should emit.

const (
	LevelTrace LogLevel = iota
	LevelDebug
	LevelInfo
	LevelWarn
	LevelError
	LevelFatal
)

func (LogLevel) String added in v0.7.0

func (l LogLevel) String() string

type Logger added in v0.3.0

type Logger interface {
	Trace(msg string, args ...any)
	Debug(msg string, args ...any)
	Info(msg string, args ...any)
	Warn(msg string, args ...any)
	Error(msg string, args ...any)
	Fatal(msg string, args ...any)
	WithContext(ctx context.Context) Logger
}

Logger defines the leveled logging contract used across go-job.

func GoLogger added in v0.7.0

func GoLogger(logger glog.Logger) Logger

GoLogger wraps a go-logger Logger into the job Logger contract.

type LoggerAware added in v0.7.0

type LoggerAware interface {
	SetLogger(logger Logger)
}

LoggerAware components can accept a logger instance.

type LoggerProvider added in v0.7.0

type LoggerProvider interface {
	GetLogger(name string) Logger
}

LoggerProvider produces named loggers. Implementations may scope logs by name.

func GoLoggerProvider added in v0.7.0

func GoLoggerProvider(provider glog.LoggerProvider) LoggerProvider

GoLoggerProvider converts a go-logger provider into the job LoggerProvider contract.

func NewStdLoggerProvider added in v0.7.0

func NewStdLoggerProvider(opts ...StdLoggerOption) LoggerProvider

NewStdLoggerProvider returns a lightweight logger provider that writes structured log lines to the supplied writer. By default it discards output, providing a silent fallback for dependants that do not configure logging explicitly.

type LoggerProviderAware added in v0.7.0

type LoggerProviderAware interface {
	SetLoggerProvider(provider LoggerProvider)
}

LoggerProviderAware components can accept a logger provider.

type MatchPattern

type MatchPattern struct {
	Name          string
	StartPattern  string
	EndPattern    string
	CommentPrefix string
	IsBlock       bool // true for block comment styles (e.g. /** ... */)
}

type MetadataParser

type MetadataParser interface {
	Parse(content []byte) (Config, string, error)
}

type NonRetryableError added in v0.15.0

type NonRetryableError interface {
	error
	NonRetryable() bool
	NonRetryableReason() string
}

NonRetryableError is implemented by errors that should not be retried by worker policy.

type Option

type Option func(*Runner)

func WithErrorHandler

func WithErrorHandler(handler func(Task, error)) Option

func WithLoggerProvider added in v0.7.0

func WithLoggerProvider(provider LoggerProvider) Option

func WithMetadataParser

func WithMetadataParser(parser MetadataParser) Option

func WithRegistry

func WithRegistry(registry Registry) Option

func WithTaskCreator

func WithTaskCreator(creator TaskCreator) Option

func WithTaskEventHandler added in v0.7.0

func WithTaskEventHandler(handler TaskEventHandler) Option

func WithTaskIDProvider added in v0.7.0

func WithTaskIDProvider(provider TaskIDProvider) Option

type Processor added in v0.2.0

type Processor interface {
	Process([]byte) ([]byte, error)
}

type QuotaChecker added in v0.11.0

type QuotaChecker interface {
	Check(*ExecutionMessage) error
}

type ReconcileResult added in v0.11.0

type ReconcileResult struct {
	Added   []string
	Updated []string
	Removed []string
}

ReconcileResult captures the diff outcome when aligning schedules.

type Registry

type Registry interface {
	List() []Task
	Add(job Task) error
	Get(id string) (Task, bool)
	SetResult(id string, result Result) error
	GetResult(id string) (Result, bool)
}

type Result added in v0.11.0

type Result struct {
	Status    string         `json:"status,omitempty"`
	Message   string         `json:"message,omitempty"`
	OutputURL string         `json:"output_url,omitempty"`
	Size      int64          `json:"size,omitempty"`
	Duration  time.Duration  `json:"duration,omitempty"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

Result captures execution metadata for a job.

func DecodeResult added in v0.11.0

func DecodeResult(data []byte, opts ...ResultOption) (Result, error)

DecodeResult unmarshals result payload enforcing size limits and validation.

func (Result) Validate added in v0.11.0

func (r Result) Validate() error

Validate enforces simple constraints on the result metadata.

type ResultCodec added in v0.11.0

type ResultCodec interface {
	Marshal(Result) ([]byte, error)
	Unmarshal([]byte) (Result, error)
}

ResultCodec allows custom serialization of result metadata.

type ResultOption added in v0.11.0

type ResultOption func(*resultConfig)

ResultOption customizes encode/decode behaviour.

func WithResultCodec added in v0.11.0

func WithResultCodec(codec ResultCodec) ResultOption

WithResultCodec sets a custom codec for serialization.

func WithResultMaxBytes added in v0.11.0

func WithResultMaxBytes(limit int) ResultOption

WithResultMaxBytes sets the maximum allowed encoded size in bytes.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

func NewRunner

func NewRunner(opts ...Option) *Runner

func (*Runner) GetResult added in v0.11.0

func (r *Runner) GetResult(jobID string) (Result, bool)

GetResult retrieves result metadata for a given job ID.

func (*Runner) RegisteredTasks

func (r *Runner) RegisteredTasks() []Task

func (*Runner) SetResult added in v0.11.0

func (r *Runner) SetResult(jobID string, result Result) error

SetResult stores result metadata for a given job ID.

func (*Runner) Start

func (r *Runner) Start(ctx context.Context) error

func (*Runner) Stop

func (r *Runner) Stop(_ context.Context) error

type SQLEngine

type SQLEngine struct {
	*BaseEngine
	// contains filtered or unexported fields
}

func NewSQLRunner

func NewSQLRunner(opts ...SQLOption) *SQLEngine

func (*SQLEngine) Execute

func (e *SQLEngine) Execute(ctx context.Context, msg *ExecutionMessage) error

func (*SQLEngine) SetTaskIDProvider added in v0.7.0

func (e *SQLEngine) SetTaskIDProvider(provider TaskIDProvider)

SetTaskIDProvider overrides the ID derivation strategy for tasks parsed by the SQL engine.

type SQLOption

type SQLOption func(*SQLEngine)

func WithSQLClient added in v0.2.0

func WithSQLClient(db *sql.DB) SQLOption

WithSQLClient sets the db client

func WithSQLDatabase

func WithSQLDatabase(driverName, dataSourceName string) SQLOption

WithDatabase sets the database connection

func WithSQLExtension

func WithSQLExtension(ext string) SQLOption

WithSQLExtension adds file extensions that this SQLOption can handle

func WithSQLFS

func WithSQLFS(dirfs fs.FS) SQLOption

WithSQLFS sets the default filesystem timeout

func WithSQLLogger added in v0.3.0

func WithSQLLogger(logger Logger) SQLOption

func WithSQLMetadataParser

func WithSQLMetadataParser(parser MetadataParser) SQLOption

WithSQLMetadataParser sets a custom metadata parser

func WithSQLTimeout

func WithSQLTimeout(timeout time.Duration) SQLOption

WithTimeout sets the default execution timeout

type ScheduleDefinition added in v0.11.0

type ScheduleDefinition struct {
	ID         string           `json:"id" yaml:"id"`
	Expression string           `json:"expression" yaml:"expression"`
	Message    ExecutionMessage `json:"message" yaml:"message"`
}

ScheduleDefinition describes a cron-driven job execution. Expression defines the cron spec, and Message carries the payload and execution options (including retries, backoff, idempotency, and limits).

func (ScheduleDefinition) Validate added in v0.11.0

func (d ScheduleDefinition) Validate() error

Validate ensures the schedule definition contains required fields.

type ScheduleLoader added in v0.11.0

type ScheduleLoader func(ctx context.Context) ([]ScheduleDefinition, error)

ScheduleLoader fetches desired schedules, e.g. from go-settings.

type ScheduleQuotesProcessor added in v0.2.0

type ScheduleQuotesProcessor struct{}

ScheduleQuotesProcessor ensures that schedule values like @every are properly quoted so the parser does not barf an error

func (*ScheduleQuotesProcessor) Process added in v0.2.0

func (s *ScheduleQuotesProcessor) Process(data []byte) ([]byte, error)

type ScheduleSyncCommand added in v0.11.0

type ScheduleSyncCommand struct {
	// contains filtered or unexported fields
}

ScheduleSyncCommand reconciles schedules from an external source (settings) into the CronManager.

func NewScheduleSyncCommand added in v0.11.0

func NewScheduleSyncCommand(manager *CronManager, loader ScheduleLoader, opts ...ScheduleSyncOption) *ScheduleSyncCommand

NewScheduleSyncCommand wires a sync command implementing both CLICommand and CronCommand.

func (*ScheduleSyncCommand) CLIHandler added in v0.11.0

func (c *ScheduleSyncCommand) CLIHandler() any

CLIHandler satisfies command.CLICommand to trigger reconciliation manually.

func (*ScheduleSyncCommand) CLIOptions added in v0.11.0

func (c *ScheduleSyncCommand) CLIOptions() command.CLIConfig

CLIOptions returns CLI metadata for registration.

func (*ScheduleSyncCommand) CronHandler added in v0.11.0

func (c *ScheduleSyncCommand) CronHandler() func() error

CronHandler satisfies command.CronCommand to run periodic reconciliation.

func (*ScheduleSyncCommand) CronOptions added in v0.11.0

func (c *ScheduleSyncCommand) CronOptions() command.HandlerConfig

CronOptions exposes the cron expression for the sync command.

type ScheduleSyncOption added in v0.11.0

type ScheduleSyncOption func(*ScheduleSyncCommand)

ScheduleSyncOption customizes the sync command.

func WithScheduleSyncCLIDescription added in v0.11.0

func WithScheduleSyncCLIDescription(desc string) ScheduleSyncOption

WithScheduleSyncCLIDescription overrides the CLI description.

func WithScheduleSyncCLIGroup added in v0.11.0

func WithScheduleSyncCLIGroup(group string) ScheduleSyncOption

WithScheduleSyncCLIGroup sets the CLI group.

func WithScheduleSyncCLIName added in v0.11.0

func WithScheduleSyncCLIName(name string) ScheduleSyncOption

WithScheduleSyncCLIName overrides the CLI command name.

func WithScheduleSyncCron added in v0.11.0

func WithScheduleSyncCron(expr string) ScheduleSyncOption

WithScheduleSyncCron overrides the cron expression for periodic reconciliation.

type SchedulerOption added in v0.7.0

type SchedulerOption func(*schedulerConfig)

SchedulerOption allows callers to control the behaviour of the NextRun helper.

func WithLocation added in v0.7.0

func WithLocation(loc *time.Location) SchedulerOption

WithLocation overrides the time zone used by NextRun when none is supplied.

func WithSecondsPrecision added in v0.7.0

func WithSecondsPrecision() SchedulerOption

WithSecondsPrecision enables second-level cron expressions when calculating NextRun.

type Scope added in v0.11.0

type Scope struct {
	TenantID       string            `json:"tenant_id,omitempty"`
	OrganizationID string            `json:"organization_id,omitempty"`
	Labels         map[string]string `json:"labels,omitempty"`
}

Scope captures tenant/organization or other scoping information for the job.

type ScriptInfo

type ScriptInfo struct {
	ID      string         `json:"id"`
	Path    string         `json:"path"`
	Content []byte         `json:"content"`
	Meta    map[string]any `json:"metadata"`
}

type ShellEngine

type ShellEngine struct {
	*BaseEngine
	// contains filtered or unexported fields
}

func NewShellRunner

func NewShellRunner(opts ...ShellOption) *ShellEngine

func (*ShellEngine) Execute

func (e *ShellEngine) Execute(ctx context.Context, msg *ExecutionMessage) error

func (*ShellEngine) SetTaskIDProvider added in v0.7.0

func (e *ShellEngine) SetTaskIDProvider(provider TaskIDProvider)

SetTaskIDProvider overrides the ID derivation strategy for tasks parsed by the shell engine.

type ShellOption

type ShellOption func(*ShellEngine)

func WithShellEnvironment

func WithShellEnvironment(env []string) ShellOption

WithShellEnvironment sets additional environment variables

func WithShellExtension

func WithShellExtension(ext string) ShellOption

WithShellExtension adds file extensions that this engine can handle

func WithShellFS

func WithShellFS(dirfs fs.FS) ShellOption

WithShellFS sets the default filesystem timeout

func WithShellLogger added in v0.3.0

func WithShellLogger(logger Logger) ShellOption

func WithShellMetadataParser

func WithShellMetadataParser(parser MetadataParser) ShellOption

WithShellMetadataParser sets a custom metadata parser

func WithShellShell

func WithShellShell(shell string, args ...string) ShellOption

WithShellShell sets the shell executable and arguments

func WithShellTimeout

func WithShellTimeout(timeout time.Duration) ShellOption

WithShellTimeout sets the default execution timeout

func WithShellWorkingDirectory

func WithShellWorkingDirectory(dir string) ShellOption

WithShellWorkingDirectory sets the working directory for script execution

type SourceProvider

type SourceProvider interface {
	GetScript(path string) (content []byte, err error)
	ListScripts(ctx context.Context) ([]ScriptInfo, error)
}

type StdLoggerOption added in v0.7.0

type StdLoggerOption func(*stdLoggerProvider)

StdLoggerOption customises the behaviour of the default stdout logger.

func WithStdLoggerMinLevel added in v0.7.0

func WithStdLoggerMinLevel(level LogLevel) StdLoggerOption

WithStdLoggerMinLevel changes the minimum level emitted by the logger.

func WithStdLoggerTimestampFunc added in v0.7.0

func WithStdLoggerTimestampFunc(fn func() time.Time) StdLoggerOption

WithStdLoggerTimestampFunc overrides the time source used for log entries.

func WithStdLoggerWriter added in v0.7.0

func WithStdLoggerWriter(w io.Writer) StdLoggerOption

WithStdLoggerWriter overrides the destination for log lines.

type Task

type Task interface {
	GetID() string
	// GetHandler is the function that we a command needs to implement in order to be able to execute it in the background
	GetHandler() func() error
	GetHandlerConfig() HandlerOptions
	GetConfig() Config
	GetPath() string
	GetEngine() Engine
	Execute(ctx context.Context, msg *ExecutionMessage) error
}

Task represents a schedulable job discovered from the filesystem

func NewBaseTask

func NewBaseTask(
	id, path, scriptType string,
	config Config,
	scriptContent string,
	engine Engine,
) Task

type TaskCommander added in v0.9.0

type TaskCommander struct {
	Task Task
	// contains filtered or unexported fields
}

TaskCommander adapts a Task to the command.Commander interface.

func NewTaskCommander added in v0.9.0

func NewTaskCommander(task Task) *TaskCommander

func (*TaskCommander) Execute added in v0.9.0

func (c *TaskCommander) Execute(ctx context.Context, msg *ExecutionMessage) error

func (*TaskCommander) WithConcurrencyLimiter added in v0.11.0

func (c *TaskCommander) WithConcurrencyLimiter(limiter *ConcurrencyLimiter) *TaskCommander

WithConcurrencyLimiter overrides the limiter used for concurrency control.

func (*TaskCommander) WithIdempotencyTracker added in v0.11.0

func (c *TaskCommander) WithIdempotencyTracker(tracker *IdempotencyTracker) *TaskCommander

WithIdempotencyTracker overrides the tracker used for deduplication checks.

func (*TaskCommander) WithQuotaChecker added in v0.11.0

func (c *TaskCommander) WithQuotaChecker(qc QuotaChecker) *TaskCommander

WithQuotaChecker overrides quota enforcement.

func (*TaskCommander) WithRetryOverride added in v0.14.0

func (c *TaskCommander) WithRetryOverride(maxRetries int) *TaskCommander

WithRetryOverride forces TaskCommander to use the provided retry count.

func (*TaskCommander) WithScopeExtractor added in v0.11.0

func (c *TaskCommander) WithScopeExtractor(fn func(*ExecutionMessage) string) *TaskCommander

WithScopeExtractor sets a scope extractor for concurrency keys.

func (*TaskCommander) WithSharedIdempotencyStore added in v0.15.0

func (c *TaskCommander) WithSharedIdempotencyStore(store qidempotency.Store, ttl time.Duration) *TaskCommander

WithSharedIdempotencyStore enables distributed idempotency checks across workers.

type TaskCreator

type TaskCreator interface {
	CreateTasks(ctx context.Context) ([]Task, error)
}

type TaskEvent added in v0.7.0

type TaskEvent struct {
	Type       TaskEventType
	TaskID     string
	ScriptPath string
	Task       Task
	Err        error
}

TaskEvent captures contextual information about task registration outcomes.

type TaskEventEmitter added in v0.7.0

type TaskEventEmitter interface {
	AddTaskEventHandler(TaskEventHandler)
}

TaskEventEmitter task creators can implement this to publish registration events upstream.

type TaskEventHandler added in v0.7.0

type TaskEventHandler func(TaskEvent)

TaskEventHandler consumes task registration events emitted by the runner lifecycle.

type TaskEventType added in v0.7.0

type TaskEventType string

TaskEventType discriminates between different kinds of task registration events.

const (
	// TaskEventRegistered signals that a task was successfully registered.
	TaskEventRegistered TaskEventType = "registered"
	// TaskEventRegistrationFailed signals that a task failed to register.
	TaskEventRegistrationFailed TaskEventType = "registration_failed"
)

type TaskIDProvider added in v0.7.0

type TaskIDProvider func(scriptPath string) string

TaskIDProvider defines the strategy used to derive a task identifier from a script path.

type TaskIDProviderAware added in v0.7.0

type TaskIDProviderAware interface {
	SetTaskIDProvider(TaskIDProvider)
}

TaskIDProviderAware engines can implement this to receive the active TaskIDProvider.

type TaskRunner

type TaskRunner interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	RegisteredTasks() []Task
}

type TaskSchedule added in v0.7.0

type TaskSchedule struct {
	Expression string        `json:"expression"`
	RunOnce    bool          `json:"run_once"`
	MaxRetries int           `json:"max_retries"`
	Timeout    time.Duration `json:"timeout"`
}

TaskSchedule captures scheduling semantics for a task.

func NewTaskSchedule added in v0.7.0

func NewTaskSchedule(cfg Config) TaskSchedule

NewTaskSchedule builds a TaskSchedule from a job Config.

func TaskScheduleFromTask added in v0.7.0

func TaskScheduleFromTask(task Task) TaskSchedule

TaskScheduleFromTask extracts scheduling semantics from a Task implementation.

type TerminalError added in v0.15.0

type TerminalError struct {
	Code   TerminalErrorCode
	Reason string
	Err    error
}

TerminalError represents a non-retryable execution failure.

func (*TerminalError) Error added in v0.15.0

func (e *TerminalError) Error() string

func (*TerminalError) NonRetryable added in v0.15.0

func (e *TerminalError) NonRetryable() bool

func (*TerminalError) NonRetryableReason added in v0.15.0

func (e *TerminalError) NonRetryableReason() string

func (*TerminalError) Unwrap added in v0.15.0

func (e *TerminalError) Unwrap() error

type TerminalErrorCode added in v0.15.0

type TerminalErrorCode string

TerminalErrorCode identifies non-retryable error classes.

const (
	// TerminalErrorCodeStaleStateMismatch marks stale/state-mismatch execution paths.
	TerminalErrorCodeStaleStateMismatch TerminalErrorCode = "stale_state_mismatch"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL