watercc

package module
v0.0.0-...-f47d1cb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2022 License: MIT Imports: 6 Imported by: 0

README

watercc

Вспомогательная библиотека для работы с сообщениями, передающимися через RabbitMQ сервер.

Является обвязкой над библиотекой watermill и дополнительно реализует поверх неё RPC сервер и клиент.

Смотри примеры использования).

Documentation

Overview

Package watercc является вспомогательной библиотекой для перехода от RabbitMQ к более универсальному сервису, поддерживающими разные серверы коммуникации.

Кроме изменения интерфейса библиотека так же предлагает готовые объекты, реализующие не только приём и отправку сообщений, но и RPC сервер и клиент, построенные поверх них.

Example
package main

import (
	"context"
	"fmt"
	"runtime"
	"time"

	"github.com/peterdev80/watercc"

	"github.com/ThreeDotsLabs/watermill/message"
)

// amqpURI описывает адрес для подключения к RabbitMQ.
var amqpURI = "amqp://guest:guest@localhost:5672/"

func main() {
	// инициализируем подписку на сообщения
	subscriber, err := watercc.NewSubscriber(amqpURI, "a1.a2.a3", false)
	if err != nil {
		panic(err)
	}

	// инициализируем обработку сообщений
	router, err := watercc.NewRouter()
	if err != nil {
		panic(err)
	}

	// регистрируем обработчик сообщений
	router.AddNoPublisherHandler(
		"my_handler", // уникальное название обработчика сообщений
		"new_topic",  // название темы для чтения сообщений (совпадает с темой в примере публикации)
		subscriber,   // подписка на приём сообщений
		// обработчик входящих сообщений, публикующий их в консоль
		func(msg *message.Message) error {
			fmt.Printf("-> [%s] %q\n", msg.UUID, msg.Payload)

			return nil
		},
	)

	// инициализируем контекст для обработки входящих запросов
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*6)
	defer cancel()

	// запускаем обработку входящих запросов в отдельном потоке
	go func() {
		if err := router.Run(ctx); err != nil {
			panic(err)
		}
	}()

	runtime.Gosched() // позволяем запуститься потоку

	// инициализируем новый публикатор сообщений
	publisher, err := watercc.NewPublisher(amqpURI, "a1.a2.a3", false)
	if err != nil {
		panic(err)
	}
	defer publisher.Close()

	// публикуем тестовые сообщения с небольшой задержкой
	for messageCounter := 1; messageCounter < 5; messageCounter++ {
		// задержка перед публикацией
		time.Sleep(time.Second * 1)

		// формируем сообщение
		id := fmt.Sprintf("msg:%02d", messageCounter)            // генерируем идентификатор сообщения
		body := fmt.Sprintf("Hello, number %d!", messageCounter) // формируем тест сообщения
		msg := message.NewMessage(id, []byte(body))              // инициализируем сообщение

		// и публикуем его с использованием темы "new_topic"
		if err := publisher.Publish("new_topic", msg); err != nil {
			panic(err)
		}
	}

	<-ctx.Done() // ожидаем завершения обработки

}
Output:
-> [msg:01] "Hello, number 1!"
-> [msg:02] "Hello, number 2!"
-> [msg:03] "Hello, number 3!"
-> [msg:04] "Hello, number 4!"

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewPublisher

func NewPublisher(amqpURI string, topic string, durrable bool) (message.Publisher, error)

NewPublisher сгенерировать издателя.

func NewRouter

func NewRouter() (*message.Router, error)

NewRouter возвращает новый инициализированный message.Router для обработки запросов. Это вспомогательная функция для вызова message.NewRouter с настройками по умолчанию и стандартным логом.

func NewSubscriber

func NewSubscriber(amqpURI string, topic string, durrable bool) (message.Subscriber, error)

NewSubscriber возвращает инициализированного подписчика для получения сообщений на обработку.

func SetLogger

func SetLogger(l watermill.LoggerAdapter)

SetLogger устанавливает новую систему логов для всей библиотеки.

Не является потокобезопасным, поэтому рекомендуется производить изменения лога в самом начале работы с библиотекой.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client для отсылки запросов на удалённый сервер и получения ответа на них.

func NewClient

func NewClient(amqpURI, topic string) (*Client, error)

NewClient возвращает нового инициализированного клиента для отсылки запросов и получения ответов.

func (*Client) Close

func (rpc *Client) Close() error

Close завершает приём ответов и завершает работу клиента.

func (*Client) Send

func (rpc *Client) Send(ctx context.Context, command string, data []byte) (*message.Message, error)

Send отправляет запрос на сервер и возвращает ответ на него. В качестве параметров передаются название команды и бинарные данные с содержимым запроса.

type Handler

type Handler = func(*message.Message) (*message.Message, error)

Handler описывает формат функции для обработки сообщений.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server используется как асинхронный обработчик входящих команд и генерации ответов на них.

Example
package main

import (
	"context"
	"fmt"
	"strings"

	"github.com/peterdev80/watercc"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
)

// amqpURI описывает адрес для подключения к RabbitMQ.
var amqpURI = "amqp://guest:guest@localhost:5672/"

func main() {
	// инициализируем RPC сервер с именем "rpc_command"
	server, err := watercc.NewServer(amqpURI, "rpc_command")
	if err != nil {
		panic(err)
	}
	defer server.Close()

	// регистрируем для него обработчик команды
	handler := func(m *message.Message) (*message.Message, error) {
		str := strings.ToUpper(string(m.Payload))

		return message.NewMessage(watermill.NewUUID(), []byte(str)), nil
	}
	server.Register("UpperCase", handler)

	// инициализируем клиента для работы с сервером "rpc_command"
	client, err := watercc.NewClient(amqpURI, "rpc_command")
	if err != nil {
		panic(err)
	}
	defer client.Close()

	// отсылаем команду на сервер и ожидаем получение ответа
	resp, err := client.Send(context.Background(), "UpperCase", []byte("Hello world"))
	if err != nil {
		panic(err)
	}

	fmt.Println(string(resp.Payload))

}
Output:
HELLO WORLD

func NewServer

func NewServer(amqpURI, topic string) (*Server, error)

NewServer возвращает инициализированный серверный обработчик RPC.

func (*Server) Close

func (rpc *Server) Close() error

Close останавливает обработку входящих сообщений и закрывает соединение с сервером.

func (*Server) Register

func (rpc *Server) Register(command string, handler Handler)

Register регистрирует новый именованный обработчик.

Название команды может быть пустым. В этом случае она регистрируется как команда по умолчанию.

func (*Server) Unregister

func (rpc *Server) Unregister(command string)

Unregister удаляет обработчик с заданным именем. Если такая команда не зарегистрирована, то она игнорируется.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL