dispatch

package module
v0.0.0-...-57df71f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: MIT Imports: 7 Imported by: 0

README

dispatch

Go Reference Go Report Card CI codecov

A flexible message routing framework for event-driven Go applications.

Features

  • Multi-Source Routing — Route messages from webhooks, message queues, or custom formats through a single processor
  • Discriminator Pattern — Cheap detection before expensive parsing for O(1) hot-path matching
  • Typed Handlers — Automatic JSON unmarshaling and validation with generics
  • Proc/Func Pattern — Fire-and-forget procedures or request-response functions
  • Replier Interface — Built-in support for request-response transports (Step Functions, etc.)
  • Pluggable Hooks — Observability without coupling to specific logging or metrics systems
  • Format Agnostic — Inspector/View abstraction supports JSON, protobuf, or custom formats
  • Zero Allocation Matching — Uses gjson for efficient JSON field lookups

Installation

go get github.com/bjaus/dispatch

Requires Go 1.25 or later.

Quick Start

package main

import (
    "context"
    "encoding/json"
    "log"

    "github.com/bjaus/dispatch"
)

// Define your payload type
type UserCreatedPayload struct {
    UserID string `json:"user_id"`
    Email  string `json:"email"`
}

// Define a procedure (fire-and-forget)
type UserCreatedProc struct{}

func (p *UserCreatedProc) Run(ctx context.Context, payload UserCreatedPayload) error {
    log.Printf("User created: %s (%s)", payload.UserID, payload.Email)
    return nil
}

// Define a source to parse your message format
type mySource struct{}

func (s *mySource) Name() string { return "my-source" }

func (s *mySource) Discriminator() dispatch.Discriminator {
    return dispatch.HasFields("type", "payload")
}

func (s *mySource) Parse(raw []byte) (dispatch.Message, error) {
    var env struct {
        Type    string          `json:"type"`
        Payload json.RawMessage `json:"payload"`
    }
    if err := json.Unmarshal(raw, &env); err != nil {
        return dispatch.Message{}, err
    }
    return dispatch.Message{Key: env.Type, Payload: env.Payload}, nil
}

func main() {
    // Create router
    r := dispatch.New()

    // Add source
    r.AddSource(&mySource{})

    // Register procedure
    dispatch.RegisterProc(r, "user/created", &UserCreatedProc{})

    // Process a message
    msg := []byte(`{"type": "user/created", "payload": {"user_id": "123", "email": "test@example.com"}}`)
    if err := r.Process(context.Background(), msg); err != nil {
        log.Fatal(err)
    }
}

Architecture

The package separates concerns into three layers:

Layer Responsibility
Sources Parse raw bytes, extract routing key + payload
Router Match keys to handlers, orchestrate dispatch flow
Handlers Pure business logic with typed payloads (Proc or Func)
Proc vs Func

The package provides two handler patterns:

// Proc: Fire-and-forget (returns only error)
type Proc[T any] interface {
    Run(ctx context.Context, payload T) error
}

// Func: Request-response (returns result and error)
type Func[T, R any] interface {
    Call(ctx context.Context, payload T) (R, error)
}

Use Proc for event handlers where you don't need to send a response. Use Func for request-response patterns like Step Functions tasks.

Discriminator Pattern

Sources implement a two-phase matching strategy:

  1. Discriminator — Cheap field presence/value checks using the Inspector/View abstraction
  2. Parse — Full envelope parsing only after discriminator matches

This avoids expensive parsing when messages don't match, and enables O(1) hot-path matching via adaptive ordering (last successful source is tried first).

func (s *mySource) Discriminator() dispatch.Discriminator {
    // Cheap check: does the message have these fields?
    return dispatch.And(
        dispatch.HasFields("source", "detail-type", "detail"),
        dispatch.FieldEquals("source", "my.service"),
    )
}
Inspector Groups

By default, all sources use the JSON inspector. For mixed formats (e.g., JSON + protobuf), use groups:

r := dispatch.New()

// Default group uses JSON inspector
r.AddSource(webhookSource)
r.AddSource(apiSource)

// Custom group for protobuf messages
r.AddGroup(protoInspector, grpcSource, kafkaSource)

Handler Registration

// Register a procedure (fire-and-forget)
dispatch.RegisterProc(r, "user/created", &UserCreatedProc{})

// Register a function (request-response)
dispatch.RegisterFunc(r, "lookup-user", &LookupUserFunc{})

// Or use function adapters for simple cases
dispatch.RegisterProcFunc(r, "ping", func(ctx context.Context, p PingPayload) error {
    return nil
})

dispatch.RegisterFuncFunc(r, "echo", func(ctx context.Context, in Input) (*Output, error) {
    return &Output{Value: in.Value}, nil
})

Replier Interface

For transports that require sending responses back (like Step Functions), sources can provide a Replier:

type Replier interface {
    Reply(ctx context.Context, result json.RawMessage) error
    Fail(ctx context.Context, err error) error
}

Example Step Functions source:

type sfnReplier struct {
    sfn   SFNClient
    token string
}

func (r *sfnReplier) Reply(ctx context.Context, result json.RawMessage) error {
    return r.sfn.SendTaskSuccess(ctx, r.token, result)
}

func (r *sfnReplier) Fail(ctx context.Context, err error) error {
    return r.sfn.SendTaskFailure(ctx, r.token, err)
}

func (s *sfnSource) Parse(raw []byte) (dispatch.Message, error) {
    // ... parse envelope ...
    return dispatch.Message{
        Key:     taskType,
        Payload: payload,
        Replier: &sfnReplier{sfn: s.sfn, token: token},
    }, nil
}

When a Replier is present:

  • On success: router calls Replier.Reply with the marshaled result (or {} for Procs)
  • On error: router calls Replier.Fail with the error

Discriminators

Composable predicates for source matching:

// Check field presence
dispatch.HasFields("type", "payload")

// Check field value
dispatch.FieldEquals("source", "aws.events")

// Combine with And/Or
dispatch.And(
    dispatch.HasFields("detail-type"),
    dispatch.Or(
        dispatch.FieldEquals("source", "service.a"),
        dispatch.FieldEquals("source", "service.b"),
    ),
)

Hooks

Add observability without coupling to specific systems:

r := dispatch.New(
    dispatch.WithOnParse(func(ctx context.Context, source, key string) context.Context {
        slog.InfoContext(ctx, "parsing message", "source", source, "key", key)
        return ctx
    }),
    dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
        slog.InfoContext(ctx, "handler succeeded", "source", source, "key", key, "duration", d)
    }),
    dispatch.WithOnFailure(func(ctx context.Context, source, key string, err error, d time.Duration) {
        slog.ErrorContext(ctx, "handler failed", "source", source, "key", key, "error", err, "duration", d)
    }),
)
Available Hooks
Hook Called When
WithOnParse After source parses message (enriches context)
WithOnDispatch Just before handler executes
WithOnSuccess After handler succeeds
WithOnFailure After handler fails
WithOnNoSource No source matches the message
WithOnNoHandler No handler registered for key
WithOnUnmarshalError JSON unmarshal fails
WithOnValidationError Payload validation fails
Source-Specific Hooks

Sources can implement hook interfaces for source-specific behavior:

type OnParseHook interface {
    OnParse(ctx context.Context, key string) context.Context
}

type OnSuccessHook interface {
    OnSuccess(ctx context.Context, key string, duration time.Duration)
}

Validation

Payloads implementing Validate() error are automatically validated:

type UserPayload struct {
    UserID string `json:"user_id"`
    Email  string `json:"email"`
}

func (p *UserPayload) Validate() error {
    if p.UserID == "" {
        return errors.New("user_id is required")
    }
    if p.Email == "" {
        return errors.New("email is required")
    }
    return nil
}

Works with any validation library (ozzo-validation, go-playground/validator, etc.) as long as your payload has a Validate() error method.

Error Handling

Error hooks control skip vs. fail behavior:

r := dispatch.New(
    // Skip unknown events (go to DLQ)
    dispatch.WithOnNoHandler(func(ctx context.Context, source, key string) error {
        log.Printf("skipping unknown event: %s", key)
        return nil // nil = skip, error = fail
    }),

    // Skip malformed payloads
    dispatch.WithOnUnmarshalError(func(ctx context.Context, source, key string, err error) error {
        log.Printf("bad payload: %v", err)
        return nil
    }),
)

Integration Patterns

HTTP Webhook Handler
func webhookHandler(w http.ResponseWriter, r *http.Request) {
    body, _ := io.ReadAll(r.Body)
    if err := router.Process(r.Context(), body); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    w.WriteHeader(http.StatusOK)
}
Message Queue Consumer
func consume(ctx context.Context, queue MessageQueue) error {
    for {
        msg, err := queue.Receive(ctx)
        if err != nil {
            return err
        }
        if err := router.Process(ctx, msg.Body); err != nil {
            msg.Nack() // retry later
            continue
        }
        msg.Ack()
    }
}
Kafka Consumer
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        if err := c.router.Process(session.Context(), msg.Value); err != nil {
            slog.Error("processing failed", "error", err, "topic", msg.Topic)
            continue
        }
        session.MarkMessage(msg, "")
    }
    return nil
}

Testing

go test -v ./...

License

MIT License - see LICENSE for details.

Documentation

Overview

Package dispatch provides a flexible message routing framework for event-driven systems.

The dispatch package routes messages from multiple sources (EventBridge, SNS, Step Functions, Kinesis, or custom formats) to typed handlers. It handles envelope parsing, payload unmarshaling, validation, and response semantics — letting you focus on business logic.

Quick Start

Define a procedure (no return value) for fire-and-forget patterns:

type UserCreatedProc struct {
    onboarding Onboarding
}

type UserCreatedPayload struct {
    UserID string `json:"user_id"`
    Email  string `json:"email"`
}

func (p *UserCreatedProc) Run(ctx context.Context, payload UserCreatedPayload) error {
    return p.onboarding.RegisterUser(ctx, payload.UserID, payload.Email)
}

Or define a function (returns result) for request-response patterns:

type LookupUserFunc struct {
    client IdentityClient
}

func (f *LookupUserFunc) Call(ctx context.Context, in LookupInput) (*LookupResult, error) {
    user, err := f.client.GetUser(ctx, in.UserID)
    if err != nil {
        return nil, err
    }
    return &LookupResult{Email: user.Email}, nil
}

Create a router, add sources, and register handlers:

r := dispatch.New()

r.AddSource(myEventBridgeSource)

dispatch.RegisterProc(r, "my.service:user/created", &UserCreatedProc{onboarding})
dispatch.RegisterFunc(r, "my.service:lookup-user", &LookupUserFunc{client})

// Process messages
err := r.Process(ctx, rawMessageBytes)

Design Philosophy

The package separates concerns into three layers:

  • Sources: Parse raw bytes and extract routing keys + payloads
  • Router: Matches keys to handlers, orchestrates the dispatch flow
  • Handlers: Pure business logic with typed payloads (Proc or Func)

This separation allows:

  • Multiple message formats on a single queue
  • Transport-agnostic handler code
  • Consistent observability via hooks
  • Easy testing with mock sources

Proc vs Func

The package provides two handler patterns:

  • Proc[T]: For fire-and-forget operations (Run returns only error)
  • Func[T, R]: For request-response operations (Call returns result and error)

Sources can set Message.Replier to enable response handling. When a Replier is present, the router automatically calls Replier.Reply on success (with the marshaled result for Func, or {} for Proc) or Replier.Fail on error.

Discriminator Pattern

Sources implement a two-phase matching strategy for efficient routing:

  1. Discriminator: Cheap field presence/value checks
  2. Parse: Full envelope parsing only after discriminator matches

This avoids expensive JSON parsing when messages don't match a source, and enables O(1) hot-path matching via adaptive ordering (the last successful source is tried first on subsequent messages).

func (s *mySource) Discriminator() dispatch.Discriminator {
    return dispatch.And(
        dispatch.HasFields("source", "detail-type"),
        dispatch.FieldEquals("source", "my.service"),
    )
}

Composable discriminators are provided:

  • HasFields: Check for field presence
  • FieldEquals: Check field value
  • And: All discriminators must match
  • Or: Any discriminator must match

Inspector and View

The Inspector/View abstraction enables format-agnostic field access:

type Inspector interface {
    Inspect(raw []byte) (View, error)
}

type View interface {
    HasField(path string) bool
    GetString(path string) (string, bool)
    GetBytes(path string) ([]byte, bool)
}

By default, the router uses JSONInspector for all sources. For mixed formats (e.g., JSON and protobuf), use AddGroup with a custom inspector:

r := dispatch.New()
r.AddSource(jsonSource)                          // Uses default JSON inspector
r.AddGroup(protoInspector, grpcSource, kafkaSource) // Custom inspector

Sources

A Source parses raw message bytes and returns routing information:

type Source interface {
    Name() string
    Discriminator() Discriminator
    Parse(raw []byte) (Message, error)
}

Sources are evaluated in registration order using their Discriminator. Once a matching source is found, the router calls its Parse method. If Parse returns an error, processing stops and the error is returned; the router does not fall back to other sources.

The Message struct contains:

  • Key: routing key to match against registered handlers
  • Version: optional schema version for version-aware routing
  • Payload: raw JSON to unmarshal into the handler's type
  • Replier: optional interface for request-response patterns

Example source implementation:

type mySource struct{}

func (s *mySource) Name() string { return "my-source" }

func (s *mySource) Discriminator() dispatch.Discriminator {
    return dispatch.HasFields("type", "payload")
}

func (s *mySource) Parse(raw []byte) (dispatch.Message, error) {
    var env struct {
        Type    string          `json:"type"`
        Payload json.RawMessage `json:"payload"`
    }
    if err := json.Unmarshal(raw, &env); err != nil {
        return dispatch.Message{}, err
    }
    if env.Type == "" {
        return dispatch.Message{}, errors.New("missing type field")
    }
    return dispatch.Message{
        Key:     env.Type,
        Payload: env.Payload,
    }, nil
}

Use SourceFunc for simple sources without a struct:

r.AddSource(dispatch.SourceFunc("custom", dispatch.HasFields("event"), parseFunc))

Handlers

Procedures implement the Proc interface (fire-and-forget):

type Proc[T any] interface {
    Run(ctx context.Context, payload T) error
}

Functions implement the Func interface (request-response):

type Func[T, R any] interface {
    Call(ctx context.Context, payload T) (R, error)
}

The router automatically:

  • Unmarshals the JSON payload to the handler's type
  • Validates the payload if it implements Validate() error
  • Calls the handler with the typed payload
  • Sends the response via Replier if present

Use ProcFunc/FuncFunc for simple cases without a struct:

dispatch.RegisterProcFunc(r, "ping", func(ctx context.Context, p PingPayload) error {
    return nil
})

dispatch.RegisterFuncFunc(r, "lookup", func(ctx context.Context, in Input) (*Result, error) {
    return &Result{...}, nil
})

Replier

Sources can provide a Replier in Message for transport-specific response handling. For example, Step Functions requires SendTaskSuccess or SendTaskFailure after processing:

type sfnReplier struct {
    sfn   SFNClient
    token string
}

func (r *sfnReplier) Reply(ctx context.Context, result json.RawMessage) error {
    return r.sfn.SendTaskSuccess(ctx, r.token, result)
}

func (r *sfnReplier) Fail(ctx context.Context, err error) error {
    return r.sfn.SendTaskFailure(ctx, r.token, err)
}

func (s *sfnSource) Parse(raw []byte) (dispatch.Message, error) {
    // ... parse envelope ...
    return dispatch.Message{
        Key:     taskType,
        Payload: payload,
        Replier: &sfnReplier{sfn: s.sfn, token: token},
    }, nil
}

Hooks

Hooks provide observability without coupling to specific logging or metrics systems. Use functional options to configure hooks:

r := dispatch.New(
    dispatch.WithOnParse(func(ctx context.Context, source, key string) context.Context {
        return logx.WithCtx(ctx, slog.String("source", source), slog.String("key", key))
    }),
    dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
        metrics.Timing("dispatch.success", d, "source:"+source)
    }),
    dispatch.WithOnFailure(func(ctx context.Context, source, key string, err error, d time.Duration) {
        metrics.Incr("dispatch.error", "source:"+source)
    }),
)

Available hooks:

  • WithOnParse: Called after parsing, enriches context
  • WithOnDispatch: Called just before handler executes
  • WithOnSuccess: Called after handler succeeds
  • WithOnFailure: Called after handler fails
  • WithOnNoSource: Called when no source matches
  • WithOnNoHandler: Called when no handler is registered
  • WithOnUnmarshalError: Called on JSON unmarshal errors
  • WithOnValidationError: Called on validation errors

Multiple hooks of the same type are called in order.

Source-Specific Hooks

Sources can implement optional hook interfaces to add source-specific behavior. These hooks run after global hooks, and both are always called:

type OnParseHook interface {
    OnParse(ctx context.Context, key string) context.Context
}

type OnSuccessHook interface {
    OnSuccess(ctx context.Context, key string, duration time.Duration)
}

type OnFailureHook interface {
    OnFailure(ctx context.Context, key string, err error, duration time.Duration)
}

For error-returning hooks, if either global or source returns an error, that error is returned. This allows sources to override global skip/fail policies.

Validation

Payloads that implement Validate() error are automatically validated after unmarshaling:

type UserPayload struct {
    UserID string `json:"user_id"`
    Email  string `json:"email"`
}

func (p *UserPayload) Validate() error {
    return validation.ValidateStruct(p,
        validation.Field(&p.UserID, validation.Required),
        validation.Field(&p.Email, validation.Required, is.Email),
    )
}

Validation errors trigger the OnValidationError hook.

Error Handling

The OnNoSource, OnNoHandler, OnUnmarshalError, and OnValidationError hooks control what happens when errors occur:

  • Return nil to skip the message (it goes to DLQ if configured)
  • Return an error to fail (message retries based on queue configuration)

By default, all errors cause failures. Override with hooks to skip bad messages:

r := dispatch.New(
    dispatch.WithOnUnmarshalError(func(ctx context.Context, source, key string, err error) error {
        logger.Error("bad payload", "error", err)
        return nil // skip to DLQ, don't retry
    }),
)

Thread Safety

Router is safe for concurrent use after configuration is complete. Do not call AddSource, AddGroup, or RegisterProc/RegisterFunc after calling Process.

Example
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/bjaus/dispatch"
)

// UserCreatedPayload is the payload for user/created events.
type UserCreatedPayload struct {
	UserID string `json:"user_id"`
	Email  string `json:"email"`
}

// UserCreatedHandler handles user/created events.
type UserCreatedHandler struct{}

func (h *UserCreatedHandler) Handle(ctx context.Context, p UserCreatedPayload) error {
	fmt.Printf("User created: %s (%s)\n", p.UserID, p.Email)
	return nil
}

// simpleSource is a minimal source implementation for examples.
type simpleSource struct{}

func (s *simpleSource) Name() string { return "simple" }

func (s *simpleSource) Discriminator() dispatch.Discriminator {
	return dispatch.HasFields("type", "payload")
}

func (s *simpleSource) Parse(raw []byte) (dispatch.Parsed, error) {
	var env struct {
		Type    string          `json:"type"`
		Payload json.RawMessage `json:"payload"`
	}
	if err := json.Unmarshal(raw, &env); err != nil {
		return dispatch.Parsed{}, err
	}
	if env.Type == "" {
		return dispatch.Parsed{}, fmt.Errorf("missing type field")
	}
	return dispatch.Parsed{
		Key:     env.Type,
		Payload: env.Payload,
	}, nil
}

func main() {
	// Create router with hooks
	r := dispatch.New(
		dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
			log.Printf("[%s] %s succeeded (%v)", source, key, d)
		}),
		dispatch.WithOnFailure(func(ctx context.Context, source, key string, err error, d time.Duration) {
			log.Printf("[%s] %s failed: %v (%v)", source, key, err, d)
		}),
	)

	// Add source
	r.AddSource(&simpleSource{})

	// Register handler
	dispatch.Register(r, "user/created", &UserCreatedHandler{})

	// Process a message
	msg := []byte(`{"type": "user/created", "payload": {"user_id": "123", "email": "test@example.com"}}`)
	if err := r.Process(context.Background(), msg); err != nil {
		log.Fatal(err)
	}

}
Output:
User created: 123 (test@example.com)
Example (Completion)
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/bjaus/dispatch"
)

// completionSource demonstrates a source with completion callback.
type completionSource struct{}

func (s *completionSource) Name() string { return "completion" }

func (s *completionSource) Discriminator() dispatch.Discriminator {
	return dispatch.HasFields("task", "token", "payload")
}

func (s *completionSource) Parse(raw []byte) (dispatch.Parsed, error) {
	var env struct {
		Task    string          `json:"task"`
		Token   string          `json:"token"`
		Payload json.RawMessage `json:"payload"`
	}
	if err := json.Unmarshal(raw, &env); err != nil {
		return dispatch.Parsed{}, err
	}
	if env.Token == "" {
		return dispatch.Parsed{}, fmt.Errorf("missing token field")
	}
	return dispatch.Parsed{
		Key:     env.Task,
		Payload: env.Payload,
		Complete: func(ctx context.Context, err error) error {
			if err != nil {
				fmt.Printf("Task %s failed: %v\n", env.Token, err)
			} else {
				fmt.Printf("Task %s succeeded\n", env.Token)
			}
			return nil
		},
	}, nil
}

func main() {
	r := dispatch.New()
	r.AddSource(&completionSource{})

	dispatch.RegisterFunc(r, "process", func(ctx context.Context, p struct{ Value int }) error {
		fmt.Println("Processing value:", p.Value)
		return nil
	})

	msg := []byte(`{"task": "process", "token": "abc123", "payload": {"value": 42}}`)
	_ = r.Process(context.Background(), msg)

}
Output:
Processing value: 42
Task abc123 succeeded
Example (HandlerFunc)
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/bjaus/dispatch"
)

// simpleSource is a minimal source implementation for examples.
type simpleSource struct{}

func (s *simpleSource) Name() string { return "simple" }

func (s *simpleSource) Discriminator() dispatch.Discriminator {
	return dispatch.HasFields("type", "payload")
}

func (s *simpleSource) Parse(raw []byte) (dispatch.Parsed, error) {
	var env struct {
		Type    string          `json:"type"`
		Payload json.RawMessage `json:"payload"`
	}
	if err := json.Unmarshal(raw, &env); err != nil {
		return dispatch.Parsed{}, err
	}
	if env.Type == "" {
		return dispatch.Parsed{}, fmt.Errorf("missing type field")
	}
	return dispatch.Parsed{
		Key:     env.Type,
		Payload: env.Payload,
	}, nil
}

func main() {
	r := dispatch.New()
	r.AddSource(&simpleSource{})

	// Register with a function instead of a struct
	dispatch.RegisterFunc(r, "ping", func(ctx context.Context, p struct{ Message string }) error {
		fmt.Println("Ping:", p.Message)
		return nil
	})

	msg := []byte(`{"type": "ping", "payload": {"message": "hello"}}`)
	_ = r.Process(context.Background(), msg)

}
Output:
Ping: hello
Example (MultipleHooks)
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/bjaus/dispatch"
)

// simpleSource is a minimal source implementation for examples.
type simpleSource struct{}

func (s *simpleSource) Name() string { return "simple" }

func (s *simpleSource) Discriminator() dispatch.Discriminator {
	return dispatch.HasFields("type", "payload")
}

func (s *simpleSource) Parse(raw []byte) (dispatch.Parsed, error) {
	var env struct {
		Type    string          `json:"type"`
		Payload json.RawMessage `json:"payload"`
	}
	if err := json.Unmarshal(raw, &env); err != nil {
		return dispatch.Parsed{}, err
	}
	if env.Type == "" {
		return dispatch.Parsed{}, fmt.Errorf("missing type field")
	}
	return dispatch.Parsed{
		Key:     env.Type,
		Payload: env.Payload,
	}, nil
}

func main() {
	// Pass multiple hooks to New
	r := dispatch.New(
		dispatch.WithOnDispatch(func(ctx context.Context, source, key string) {
			fmt.Printf("Processing %s from %s\n", key, source)
		}),
		dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
			fmt.Printf("Metric: %s.%s.success\n", source, key)
		}),
	)
	r.AddSource(&simpleSource{})

	dispatch.RegisterFunc(r, "test", func(ctx context.Context, p struct{}) error {
		return nil
	})

	msg := []byte(`{"type": "test", "payload": {}}`)
	_ = r.Process(context.Background(), msg)

}
Output:
Processing test from simple
Metric: simple.test.success
Example (SkipBadMessages)
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/bjaus/dispatch"
)

// simpleSource is a minimal source implementation for examples.
type simpleSource struct{}

func (s *simpleSource) Name() string { return "simple" }

func (s *simpleSource) Discriminator() dispatch.Discriminator {
	return dispatch.HasFields("type", "payload")
}

func (s *simpleSource) Parse(raw []byte) (dispatch.Parsed, error) {
	var env struct {
		Type    string          `json:"type"`
		Payload json.RawMessage `json:"payload"`
	}
	if err := json.Unmarshal(raw, &env); err != nil {
		return dispatch.Parsed{}, err
	}
	if env.Type == "" {
		return dispatch.Parsed{}, fmt.Errorf("missing type field")
	}
	return dispatch.Parsed{
		Key:     env.Type,
		Payload: env.Payload,
	}, nil
}

func main() {
	r := dispatch.New(
		dispatch.WithOnNoHandler(func(ctx context.Context, source, key string) error {
			fmt.Println("Skipping unknown event:", key)
			return nil // return nil to skip, error to fail
		}),
		dispatch.WithOnUnmarshalError(func(ctx context.Context, source, key string, err error) error {
			fmt.Println("Skipping bad payload:", err)
			return nil // skip bad payloads
		}),
	)

	r.AddSource(&simpleSource{})

	// No handler registered for "unknown" - will be skipped
	msg := []byte(`{"type": "unknown", "payload": {}}`)
	err := r.Process(context.Background(), msg)
	fmt.Println("Error:", err)

}
Output:
Skipping unknown event: unknown
Error: <nil>
Example (SourceFunc)
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/bjaus/dispatch"
)

func main() {
	r := dispatch.New()

	// Use SourceFunc for simple sources
	r.AddSource(dispatch.SourceFunc("custom", dispatch.HasFields("event", "data"), func(raw []byte) (dispatch.Parsed, error) {
		var env struct {
			Event string          `json:"event"`
			Data  json.RawMessage `json:"data"`
		}
		if err := json.Unmarshal(raw, &env); err != nil {
			return dispatch.Parsed{}, err
		}
		if env.Event == "" {
			return dispatch.Parsed{}, fmt.Errorf("missing event field")
		}
		return dispatch.Parsed{Key: env.Event, Payload: env.Data}, nil
	}))

	dispatch.RegisterFunc(r, "hello", func(ctx context.Context, p struct{ Name string }) error {
		fmt.Println("Hello,", p.Name)
		return nil
	})

	msg := []byte(`{"event": "hello", "data": {"name": "World"}}`)
	_ = r.Process(context.Background(), msg)

}
Output:
Hello, World

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrInvalidJSON = errors.New("invalid JSON")

ErrInvalidJSON is returned when the input is not valid JSON.

Functions

func RegisterFunc

func RegisterFunc[T, R any](r *Router, key string, f Func[T, R])

RegisterFunc adds a function (returns result) for a routing key. The key must match the Key field returned by a source's Parse method.

Example:

dispatch.RegisterFunc(r, "lookup-user", &LookupUserFunc{client: client})

func RegisterFuncFunc

func RegisterFuncFunc[T, R any](r *Router, key string, fn func(ctx context.Context, payload T) (R, error))

RegisterFuncFunc is a convenience function for registering a function function.

Example:

dispatch.RegisterFuncFunc(r, "lookup-user", func(ctx context.Context, in Input) (*Result, error) {
    return &Result{...}, nil
})

func RegisterProc

func RegisterProc[T any](r *Router, key string, p Proc[T])

RegisterProc adds a procedure (no result) for a routing key. The key must match the Key field returned by a source's Parse method.

This is a package-level function (not a method) due to Go generics limitations: methods cannot have type parameters independent of the receiver.

Example:

dispatch.RegisterProc(r, "user/created", &UserCreatedProc{db: db})
dispatch.RegisterProc(r, "user/deleted", &UserDeletedProc{db: db})

func RegisterProcFunc

func RegisterProcFunc[T any](r *Router, key string, fn func(ctx context.Context, payload T) error)

RegisterProcFunc is a convenience function for registering a procedure function.

Example:

dispatch.RegisterProcFunc(r, "user/created", func(ctx context.Context, p Payload) error {
    return nil
})

Types

type Discriminator

type Discriminator interface {
	Match(v View) bool
}

Discriminator determines if a source should handle a message based on the message content. Discriminators are cheap to evaluate compared to full parsing.

func And

func And(ds ...Discriminator) Discriminator

And returns a Discriminator that matches when all discriminators match.

func FieldEquals

func FieldEquals(path, value string) Discriminator

FieldEquals returns a Discriminator that matches when the path exists and equals the given string value.

func HasFields

func HasFields(paths ...string) Discriminator

HasFields returns a Discriminator that matches when all paths exist.

func Or

func Or(ds ...Discriminator) Discriminator

Or returns a Discriminator that matches when any discriminator matches.

type Func

type Func[T, R any] interface {
	Call(ctx context.Context, payload T) (R, error)
}

Func (function) processes a message and returns a typed result. Use this for request-response patterns like Step Functions tasks.

The type parameters are: T for input payload, R for result. The router automatically unmarshals T, validates it, and marshals R.

Example:

type LookupUserFunc struct {
    client IdentityClient
}

func (f *LookupUserFunc) Call(ctx context.Context, in LookupInput) (*LookupResult, error) {
    user, err := f.client.GetUser(ctx, in.UserID)
    if err != nil {
        return nil, err
    }
    return &LookupResult{Email: user.Email}, nil
}

type FuncFunc

type FuncFunc[T, R any] func(ctx context.Context, payload T) (R, error)

FuncFunc is a function adapter for Func. Use for simple functions that don't need a struct:

dispatch.RegisterFunc(r, "lookup-user", func(ctx context.Context, in Input) (*Result, error) {
    return &Result{...}, nil
})

func (FuncFunc[T, R]) Call

func (f FuncFunc[T, R]) Call(ctx context.Context, payload T) (R, error)

Call implements the Func interface.

type Inspector

type Inspector interface {
	Inspect(raw []byte) (View, error)
}

Inspector examines raw bytes and returns a View for field queries. Different inspectors handle different formats (JSON, protobuf, etc.).

func JSONInspector

func JSONInspector() Inspector

JSONInspector returns an Inspector that uses gjson for field access.

type Message

type Message struct {
	// Key is the routing key used to find the handler.
	// This is matched against keys passed to RegisterProc/RegisterFunc.
	Key string

	// Version is the schema version of the payload, if available.
	// Sources should populate this for version-aware routing.
	Version string

	// Payload is the raw JSON to unmarshal into the handler's type.
	Payload json.RawMessage

	// Replier handles sending responses back to the caller.
	// For fire-and-forget sources (EventBridge, SNS), this is nil.
	// For request-response sources (Step Functions), this sends results back.
	//
	// When Replier is set and a Func is registered:
	//   - On success: router marshals result and calls Replier.Reply
	//   - On error: router calls Replier.Fail
	//
	// When Replier is set and a Proc is registered:
	//   - On success: router calls Replier.Reply with empty JSON ({})
	//   - On error: router calls Replier.Fail
	Replier Replier
}

Message contains the result of source parsing.

type OnDispatchFunc

type OnDispatchFunc func(ctx context.Context, source, key string)

OnDispatchFunc is called just before the handler executes.

type OnDispatchHook

type OnDispatchHook interface {
	OnDispatch(ctx context.Context, key string)
}

OnDispatchHook is an optional interface that sources can implement to add source-specific pre-dispatch behavior. Called after global OnDispatch hooks.

type OnFailureFunc

type OnFailureFunc func(ctx context.Context, source, key string, err error, duration time.Duration)

OnFailureFunc is called after the handler fails.

type OnFailureHook

type OnFailureHook interface {
	OnFailure(ctx context.Context, key string, err error, duration time.Duration)
}

OnFailureHook is an optional interface that sources can implement to add source-specific behavior on handler failure. Called after global OnFailure hooks.

type OnNoHandlerFunc

type OnNoHandlerFunc func(ctx context.Context, source, key string) error

OnNoHandlerFunc is called when no handler is registered for the routing key. Return nil to skip, return an error to fail.

type OnNoHandlerHook

type OnNoHandlerHook interface {
	OnNoHandler(ctx context.Context, key string) error
}

OnNoHandlerHook is an optional interface that sources can implement to add source-specific behavior when no handler is found. Called after global hooks; if either returns an error, that error is used.

type OnNoSourceFunc

type OnNoSourceFunc func(ctx context.Context, raw []byte) error

OnNoSourceFunc is called when no source can parse the message. Return nil to skip the message, return an error to fail.

type OnParseErrorFunc

type OnParseErrorFunc func(ctx context.Context, source string, err error) error

OnParseErrorFunc is called when a source's Parse method returns an error. Return nil to skip the message, return an error to fail.

type OnParseFunc

type OnParseFunc func(ctx context.Context, source, key string) context.Context

OnParseFunc is called after a source successfully parses a message. Use this to enrich the context with logging fields or trace spans. The returned context is used for the rest of the request.

type OnParseHook

type OnParseHook interface {
	OnParse(ctx context.Context, key string) context.Context
}

OnParseHook is an optional interface that sources can implement to add source-specific context enrichment. Called after global OnParse hooks.

type OnSuccessFunc

type OnSuccessFunc func(ctx context.Context, source, key string, duration time.Duration)

OnSuccessFunc is called after the handler completes successfully.

type OnSuccessHook

type OnSuccessHook interface {
	OnSuccess(ctx context.Context, key string, duration time.Duration)
}

OnSuccessHook is an optional interface that sources can implement to add source-specific behavior on handler success. Called after global OnSuccess hooks.

type OnUnmarshalErrorFunc

type OnUnmarshalErrorFunc func(ctx context.Context, source, key string, err error) error

OnUnmarshalErrorFunc is called when JSON unmarshaling fails. Return nil to skip, return an error to fail.

type OnUnmarshalErrorHook

type OnUnmarshalErrorHook interface {
	OnUnmarshalError(ctx context.Context, key string, err error) error
}

OnUnmarshalErrorHook is an optional interface that sources can implement to add source-specific behavior on unmarshal errors. Called after global hooks; if either returns an error, that error is used.

type OnValidationErrorFunc

type OnValidationErrorFunc func(ctx context.Context, source, key string, err error) error

OnValidationErrorFunc is called when payload validation fails. Return nil to skip, return an error to fail.

type OnValidationErrorHook

type OnValidationErrorHook interface {
	OnValidationError(ctx context.Context, key string, err error) error
}

OnValidationErrorHook is an optional interface that sources can implement to add source-specific behavior on validation errors. Called after global hooks; if either returns an error, that error is used.

type Option

type Option func(*Router)

Option configures Router behavior.

func WithInspector

func WithInspector(i Inspector) Option

WithInspector sets the default inspector for sources added with AddSource.

func WithOnDispatch

func WithOnDispatch(fn OnDispatchFunc) Option

WithOnDispatch adds a hook called just before the handler executes. Multiple hooks are called in order.

Example:

dispatch.WithOnDispatch(func(ctx context.Context, source, key string) {
    logger.Info(ctx, "dispatching event", "key", key)
})

func WithOnFailure

func WithOnFailure(fn OnFailureFunc) Option

WithOnFailure adds a hook called after the handler fails. Multiple hooks are called in order.

Example:

dispatch.WithOnFailure(func(ctx context.Context, source, key string, err error, d time.Duration) {
    metrics.Incr("dispatch.failure", "source:"+source)
    logger.Error(ctx, "handler failed", "error", err)
})

func WithOnNoHandler

func WithOnNoHandler(fn OnNoHandlerFunc) Option

WithOnNoHandler adds a hook called when no handler is registered for the key. Return nil to skip, return an error to fail. Multiple hooks are called in order; first error wins.

Example:

dispatch.WithOnNoHandler(func(ctx context.Context, source, key string) error {
    logger.Warn(ctx, "no handler", "key", key)
    return nil // skip
})

func WithOnNoSource

func WithOnNoSource(fn OnNoSourceFunc) Option

WithOnNoSource adds a hook called when no source can parse the message. Return nil to skip, return an error to fail. Multiple hooks are called in order; first error wins.

Example:

dispatch.WithOnNoSource(func(ctx context.Context, raw []byte) error {
    logger.Warn(ctx, "unknown message format")
    return nil // skip to DLQ
})

func WithOnParse

func WithOnParse(fn OnParseFunc) Option

WithOnParse adds a hook called after a source successfully parses a message. Multiple hooks are called in order, with context chaining through each.

Example:

dispatch.WithOnParse(func(ctx context.Context, source, key string) context.Context {
    return logx.WithCtx(ctx, slog.String("source", source))
})

func WithOnParseError

func WithOnParseError(fn OnParseErrorFunc) Option

WithOnParseError adds a hook called when a source's Parse method returns an error. Return nil to skip, return an error to fail. Multiple hooks are called in order; first error wins.

Example:

dispatch.WithOnParseError(func(ctx context.Context, source string, err error) error {
    logger.Error(ctx, "parse failed", "source", source, "error", err)
    return nil // skip bad messages
})

func WithOnSuccess

func WithOnSuccess(fn OnSuccessFunc) Option

WithOnSuccess adds a hook called after the handler completes successfully. Multiple hooks are called in order.

Example:

dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
    metrics.Timing("dispatch.success", d, "source:"+source)
})

func WithOnUnmarshalError

func WithOnUnmarshalError(fn OnUnmarshalErrorFunc) Option

WithOnUnmarshalError adds a hook called when JSON unmarshaling fails. Return nil to skip, return an error to fail. Multiple hooks are called in order; first error wins.

Example:

dispatch.WithOnUnmarshalError(func(ctx context.Context, source, key string, err error) error {
    logger.Error(ctx, "bad payload", "error", err)
    return nil // skip bad payloads
})

func WithOnValidationError

func WithOnValidationError(fn OnValidationErrorFunc) Option

WithOnValidationError adds a hook called when payload validation fails. Return nil to skip, return an error to fail. Multiple hooks are called in order; first error wins.

Example:

dispatch.WithOnValidationError(func(ctx context.Context, source, key string, err error) error {
    logger.Error(ctx, "validation failed", "error", err)
    return nil // skip invalid payloads
})

type Proc

type Proc[T any] interface {
	Run(ctx context.Context, payload T) error
}

Proc (procedure) processes a message without returning a result. Use this for fire-and-forget patterns like event handlers.

The type parameter T is the payload type. The router automatically unmarshals JSON to T and validates it if T implements Validate() error.

Example:

type UserCreatedProc struct {
    db *sql.DB
}

func (p *UserCreatedProc) Run(ctx context.Context, payload UserCreatedPayload) error {
    _, err := p.db.ExecContext(ctx, "INSERT INTO users ...", payload.UserID)
    return err
}

type ProcFunc

type ProcFunc[T any] func(ctx context.Context, payload T) error

ProcFunc is a function adapter for Proc. Use for simple procedures that don't need a struct:

dispatch.RegisterProc(r, "user/created", func(ctx context.Context, p Payload) error {
    return nil
})

func (ProcFunc[T]) Run

func (f ProcFunc[T]) Run(ctx context.Context, payload T) error

Run implements the Proc interface.

type Replier

type Replier interface {
	// Reply sends a successful response with the given JSON payload.
	Reply(ctx context.Context, result json.RawMessage) error

	// Fail sends a failure response with the given error.
	Fail(ctx context.Context, err error) error
}

Replier sends responses back to the message originator. Implement this for request-response transport patterns.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router dispatches messages to registered handlers based on routing keys.

Usage:

  1. Create a router with New
  2. Add sources with AddSource (or AddGroup for custom inspectors)
  3. Register handlers with Register
  4. Process messages with Process

Router is safe for concurrent use after configuration. Do not call AddSource, AddGroup, or Register after calling Process.

func New

func New(opts ...Option) *Router

New creates a Router with the given options.

By default, the router uses JSONInspector for source matching. Use WithInspector to override.

Example:

r := dispatch.New(
    dispatch.WithOnParse(func(ctx context.Context, source, key string) context.Context {
        return logx.WithCtx(ctx, slog.String("source", source))
    }),
    dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
        metrics.Timing("dispatch.success", d)
    }),
)

func (*Router) AddGroup

func (r *Router) AddGroup(inspector Inspector, sources ...Source)

AddGroup registers sources with a custom inspector. Use this when you have sources that use a different message format (e.g., protobuf).

Groups are checked after the default group, in registration order.

Example:

r.AddGroup(protoInspector, grpcSource, kafkaSource)

func (*Router) AddSource

func (r *Router) AddSource(s Source)

AddSource registers a source to the default inspector group. Sources are matched using their Discriminator, then parsed in registration order.

Example:

r.AddSource(eventBridgeSource)
r.AddSource(snsSource)
r.AddSource(sfnSource)

func (*Router) Process

func (r *Router) Process(ctx context.Context, raw []byte) error

Process parses the raw message, routes to the appropriate handler, and sends responses via the Replier if present.

The processing flow:

  1. Use discriminators to find a matching source
  2. Parse the message with the matched source
  3. Look up the handler by the parsed routing key
  4. Unmarshal the payload to the handler's type
  5. Validate the payload if it implements Validatable
  6. Call the handler
  7. Send response via Replier if present (success or failure)

Hooks are called at appropriate points throughout this flow.

Example:

// In an SQS consumer
func (s *Subscriber) ProcessMessage(ctx context.Context, msg sqs.Message) error {
    return s.router.Process(ctx, []byte(*msg.Body))
}

// In a Lambda handler
func handler(ctx context.Context, event json.RawMessage) error {
    return router.Process(ctx, event)
}

type Source

type Source interface {
	// Name returns the source identifier for logging and metrics.
	Name() string

	// Discriminator returns a predicate for cheap message detection.
	// The router calls this before Parse to avoid expensive parsing
	// when the message format doesn't match.
	Discriminator() Discriminator

	// Parse attempts to parse raw bytes as this source's format.
	// Returns the parsed message and nil if successful, or an error
	// describing why parsing failed.
	Parse(raw []byte) (Message, error)
}

Source parses raw message bytes and extracts routing information.

Sources are registered with Router.AddSource and matched using their Discriminator before Parse is called. This allows cheap detection before expensive parsing.

Implement Source to support different message formats:

  • EventBridge events
  • SNS notifications
  • Step Functions task tokens
  • Kinesis records
  • SQS messages
  • Custom formats

Example:

type mySource struct{}

func (s *mySource) Name() string { return "my-source" }

func (s *mySource) Discriminator() dispatch.Discriminator {
    return dispatch.HasFields("type", "payload")
}

func (s *mySource) Parse(raw []byte) (dispatch.Message, error) {
    var env struct {
        Type    string          `json:"type"`
        Payload json.RawMessage `json:"payload"`
    }
    if err := json.Unmarshal(raw, &env); err != nil {
        return dispatch.Message{}, err
    }
    return dispatch.Message{Key: env.Type, Payload: env.Payload}, nil
}

func SourceFunc

func SourceFunc(name string, disc Discriminator, parse func([]byte) (Message, error)) Source

SourceFunc creates a Source from a name, discriminator, and parse function. Use for simple sources that don't need a struct:

r.AddSource(dispatch.SourceFunc(
    "legacy",
    dispatch.HasFields("type", "payload"),
    func(raw []byte) (dispatch.Message, error) {
        // parse logic
    },
))

type View

type View interface {
	// HasField returns true if the path exists in the message.
	HasField(path string) bool

	// GetString returns the string value at path, or false if not found
	// or not a string.
	GetString(path string) (string, bool)

	// GetBytes returns the raw bytes at path, or false if not found.
	// For JSON, this returns the raw JSON value (including quotes for strings).
	GetBytes(path string) ([]byte, bool)
}

View provides format-agnostic field access for discriminator matching.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL