cloudtasks

package module
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: MIT Imports: 27 Imported by: 0

README

cloudtasks

Go Reference

Google Cloud Tasks integration with Cloud Run apps.

Install

go get github.com/altipla-consulting/cloudtasks

Usage

Declare queues

To set up the queue system with its corresponding models and APIs, you'll need to call the code declaring the queues from your application's main function and then register the HTTP handler:

func main() {
  if err := models.ConnectQueues(); err != nil {
    log.Fatal(err)
  }

  // Register the queue handlers.
  http.Handle(cloudtasks.Handler())
}

In the models/queues.go file:

var (
  QueueFoo cloudtasks.Queue
  QueueBar cloudtasks.Queue
)

func ConnectQueues() error {
  QueueFoo = cloudtasks.NewQueue("foo")
  QueueBar = cloudtasks.NewQueue("bar")

  return nil
}

Queues manage task consumption rates and the maximum number of concurrent tasks. These are pre-configured in Google Cloud.

A single queue can efficiently process multiple task types. It's recommended to use one queue for similar rate requirements instead of creating multiple queues, as this approach minimizes runtime overhead.

Defining tasks

Define background tasks within the same package that will execute them. By convention, task functions are named with the suffix xxFn, indicating they're background functions. The first argument in cloudtasks.Func is a unique string for debugging purposes. Declaring two tasks with the same name will trigger a panic.

var fooFn = cloudtasks.Func("foo", func(ctx context.Context, task *cloudtasks.Task) error {
  var arg int64
  if err := task.Read(&arg); err != nil {
    return errors.Trace(err)
  }

  // ... task content

  return nil
})

Ensure global task declarations occur during initialization, typically in the init() function.

Task Invocation

Invoke tasks by calling the previously defined functions:

func FooHandler(...) {
  // ... other code

  var arg int64
  if err := fooFn.Call(r.Context(), models.QueueFoo, arg); err != nil {
    return errors.Trace(err)
  }

  // ... other code
}

Tasks can accept any JSON-serializable data as arguments, excluding structs with private fields or methods. This flexibility allows for various data types, including basic types like numbers and strings, or more complex structures needed for task execution.

External task invocation

Invoke tasks of external applications with our helper that has retry and authorization built-in:

func FooHandler(w http.ResponseWriter, r *http.Request) error {
  // ... other code

  task := &cloudtasks.ExternalTask{
    URL: "https://foo-service-9omj3qcv6b-ew.a.run.app/myurl",
    Payload: arg,
  }
  if err := models.QueueFoo.SendExternal(r.Context(), task); err != nil {
    return errors.Trace(err)
  }

  // ... other code
}

Upgrades

v0 -> v1

cloudtasks.NewQueue() now needs one less parameter. The first parameter with the Cloud Run URL is now deterministically guessed from the environment and can be completely removed.

Contributing

You can make pull requests or create issues in GitHub. Any code you send should be formatted using make gofmt.

License

MIT License

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrCannotSendTask = errors.New("cloudtasks: cannot send task")

ErrCannotSendTask is returned from a Send call when the task has been correctly prepared but cannot be sent to the remote service. It can give you an opportunity to call the underlying implementation directly instead.

Functions

func Handler

func Handler() (string, http.Handler)

Types

type ExternalTask added in v0.2.0

type ExternalTask struct {
	URL     string
	Payload any
	// contains filtered or unexported fields
}

ExternalTask should be filled with the data of the task to call in an external Cloud Run application.

func (*ExternalTask) LogValue added in v0.3.0

func (task *ExternalTask) LogValue() slog.Value

LogValue implements slog.LogValuer.

type Function

type Function struct {
	// contains filtered or unexported fields
}

Function is a stored task implementation.

func Func

func Func(key string, fn TaskFn) *Function

Func builds and registers a new task implementation.

func (*Function) Call

func (f *Function) Call(ctx context.Context, queue Queue, payload interface{}, opts ...TaskOption) error

Call builds a task invocation and directly sends it individually to the queue.

If you are going to send multiple tasks at the same time is more efficient to build all of them with Task() first and then send them in batches with queue.SendTasks(). If sending a single task this function will be similar in performance to the batch method described before.

func (*Function) CallTask

func (f *Function) CallTask(ctx context.Context, queue Queue, payload interface{}, opts ...TaskOption) (*Task, error)

CallTask builds a task invocation, sends it and returns the built task.

func (*Function) Task

func (f *Function) Task(payload interface{}, opts ...TaskOption) (*Task, error)

Task builds a task invocation to the function. You can later send the task in batches using queue.SendTasks() or directly invoke Call() to make both things at the same time.

The payload can be a proto.Message or any other kind of interface that can be serialized to JSON. It would then be read on the task.

func (*Function) TestCall added in v0.4.0

func (f *Function) TestCall(t *testing.T, payload interface{}) error

TestCall makes a direct call to the handler with the payload as incoming payload. It requires a testing argument to be sure it is only used in tests.

type Queue

type Queue interface {
	// Send a new task to the queue.
	Send(ctx context.Context, task *Task) error

	// SendExternal sends a new task to an external URL.
	SendExternal(ctx context.Context, task *ExternalTask) error
}

Queue abstract any remote or local system that can execute a task.

func NewQueue

func NewQueue(name string, opts ...QueueOption) Queue

NewQueue initializes a new queue.

type QueueOption

type QueueOption func(*gcloudQueue)

QueueOption configures queues when creating them.

func WithForcedProductionMode added in v1.3.4

func WithForcedProductionMode() QueueOption

WithForcedProductionMode forces the queue to use the production mode. This is useful for testing purposes.

func WithHostname added in v1.1.1

func WithHostname(hostname string) QueueOption

WithHostname configures the queue for an application outside of Cloud Run. Pass only the hostname, without the protocol or trailing slash.

func WithProject added in v1.3.4

func WithProject(project, numericProject string) QueueOption

WithProject configures the project and numeric project ID for the queue. By default it will use the project and numeric project ID obtained from the metadata server.

func WithRegion

func WithRegion(region string) QueueOption

WithRegion configures a custom region for the queue. By default it will use the region of the Cloud Run service.

func WithServiceAccountEmail added in v1.3.4

func WithServiceAccountEmail(serviceAccountEmail string) QueueOption

WithServiceAccountEmail configures the service account email for the queue. By default it will use the default service account email obtained from the metadata server.

type Task

type Task struct {

	// Read only. The current retry count.
	Retries int64

	// Queue that received the task.
	Queue Queue
	// contains filtered or unexported fields
}

Task is a task sent or receieved from a queue.

func (*Task) LogValue added in v0.3.0

func (task *Task) LogValue() slog.Value

LogValue implements slog.LogValuer.

func (*Task) Name

func (task *Task) Name() string

Name returns the name of the task. By default it is autogenerated.

func (*Task) Read

func (task *Task) Read(dest interface{}) error

Read unmarshals the task payload into the provided destination.

type TaskFn added in v0.4.4

type TaskFn func(ctx context.Context, task *Task) error

TaskFn should be implemented by any task function.

type TaskOption

type TaskOption func(task *Task)

TaskOption configures tasks when creating them.

func WithDelay added in v1.3.5

func WithDelay(delay time.Duration) TaskOption

WithDelay schedules the task to run after the given delay. This overrides the schedule time.

func WithName

func WithName(name string) TaskOption

WithName configures a custom name for the task. By default it will be autogenerated. A custom name could be problematic with tombstones (task names that can't be repeated) and concurrency controls, so assign it with care and read Google Cloud Tasks documentation before using it.

func WithScheduleTime added in v1.3.5

func WithScheduleTime(time time.Time) TaskOption

WithScheduleTime schedules the task to run at the given time.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL