amq

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2022 License: Apache-2.0 Imports: 8 Imported by: 4

README

AMQ定义

AMQ引擎定义,允许各系统之间通过AMQ进行消息异步交互,默认情况下系统未开启AMQ功能,如果本系统有需要使用需要在配置 文件 /base/amq中进行如下配置方可开启:

 {
    "enabled":"true", //是否开启AMQ服务
}

同时,业务系统对每个AMQ节点需要注册一个或多个{@link Processor}消息处理器,否则收到的消息由于无法找到对应的消息处理器而会被忽略,从而对业务造成影响,业务系统需要实现上述接口并在启动{@link Client}之前手动注册消息处理器到对应的客户端中。

另外,对于不同的AMQ节点需要在配置中心进行如下的配置,配置的key为:

/system/base/amq/{node}

其中{node}为节点的唯一标示,其数据格式可参看{@link AMQClient}的配置说明。

Client定义

提供给业务系统使用和AMQ进行交互的接口,允许业务系统发送消息到AMQ和处理从AMQ中收到的消息。每个AMQ客户端都需要指定一个唯一的标示以及初始化AMQ客户端所需要的配置参数(config),配置参数的格式如下:

 {
   "provider" : "rabbit", // AMQ的提供器
   "parameter" : {          // AMQ提供器的初始化参数,不同AMQ提供所需要的初始化参数不一样
     "key1" : "value1",
     "key2" : "value2"
   },
   "partitions" : 2         // 节点的分区数(默认1,可选配置)
 }

需要特别注意的是,每个系统实例化一个{@link Client}后,该实例会唯一的只监听使用该系统ID标示的一个队列,而这个队列的名称格式为:

sys_amq_{systemId}_{node}

其中{systemId}即为当前系统的唯一四位数数字标示,而{node}则标明该客户端连接到的AMQ节点。

获取当前系统对当前节点的分区配置(可选),如果配置了则只监听指定的分区,需要在/system/base/amq/{systemId}中按照如下格式配置:

 client.Start([]int{1,2,3})

温馨提示:

获取目标系统的队列名称可使用方法 client.BuildQueueName(systemId)来获取

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HandleAck

func HandleAck(msg *message.MsgPayload, listener provider.MessageListener) (*message.MsgPayload, error)

*

  • 处理收到的单向/双向事务消息的应答消息。 *
  • @param message
  • @param listener
  • @throws AMQException

func HandleNew

func HandleNew(msg *message.MsgPayload, listener provider.MessageListener) (*message.MsgPayload, error)

*

  • 处理收到的新消息(包括通知消息和事务消息)。 *
  • @param message
  • @param listener
  • @throws AMQException

Types

type Amq

type Amq struct {
	// contains filtered or unexported fields
}

*

  • AMQ引擎定义,允许各系统之间通过AMQ进行消息异步交互,默认情况下系统未开启AMQ功能,如果本系统有需要使用需要在配置
  • 文件<b>/base/amq</b>中进行如下配置方可开启:
  • <pre>
  • amq.enabled=true
  • </pre>
  • <p>
  • 同时,业务系统对每个AMQ节点需要注册一个或多个{@link Processor}消息处理器,否则收到的消息由于无法找到对应的
  • 消息处理器而会被忽略,从而对业务造成影响,业务系统需要实现上述接口并在启动{@link Client}之前手动注册消息处理
  • 器到对应的客户端中。
  • </p>
  • <p>
  • 另外,对于不同的AMQ节点需要在配置中心进行如下的配置,配置的key为:<b>/system/base/amq/{node}</b>,其中{node}
  • 为节点的唯一标示,其数据格式可参看{@link AMQClient}的配置说明。
  • </p>

func Engine

func Engine(conf configuration.Configuration, systemId string) (amq *Amq)

*

  • 获取企业消息总线(AMQ)管理引擎的唯一实例。
  • @param config
  • @return

func (*Amq) Clean

func (e *Amq) Clean()

*

  • 判断当前支持的AMQ节点中是否包含了指定的节点。 *
  • @param node
  • @return

func (*Amq) Client

func (e *Amq) Client(node node.Node) (*Client, error)

配置示例:create /system/base/amq/biz

type Client

type Client struct {
	// contains filtered or unexported fields
}

func (*Client) AddProcessor

func (c *Client) AddProcessor(processors ...Processor)

*

  • 为当前客户端添加一个或多个消息处理器,需要确保该方法在{@link #start()}方法之前调用,否则系统会抛出异常。 *
  • @param processors

func (*Client) BuildQueueName

func (c *Client) BuildQueueName(systemId string) string

*

  • 使用当前客户端构建一个amq消息的目标队列名称,目标队列名称满足格式:sys_amq_{systemId}_{node},
  • 其中{systemId}为目标系统的四位数数字ID,{node}为目标系统监听的amq节点标示(参考{@link AMQNode}。 *
  • @param destSystemId 目标系统ID
  • @param destNode 目标AMQ节点
  • @return

func (*Client) BuildQueueNameByPartition

func (c *Client) BuildQueueNameByPartition(systemId string, partition int) string

*

  • 等同{@link #buildQueueName(string)}方法,唯一不同的是如果节点配置为了多分区则需要指定分区编号
  • 来依次构建每个分区的队列,分区编号从0开始。 *
  • @param destSystemId
  • @param destNode
  • @param partition
  • @return

func (*Client) Close

func (c *Client) Close()

*

  • 关闭所有的资源,该方法不会抛出任何异常。

func (*Client) IsMultiplePartition

func (c *Client) IsMultiplePartition() bool

*

  • 判断当前节点是否支持多分区。 *
  • @return

func (*Client) Send

func (c *Client) Send(msg interface{}) error

*

  • 发送新消息到AMQ中,这里是所有新消息的发送入口,如果发送失败则会抛出异常。请注意,消息的目标队列名称请使用
  • 方法 {@link #buildQueueName(long, String, int)} 来构建并设置,不满足格式的目标队列名称会导致消息发送失败。 *
  • @param message
  • @throws AMQException

func (*Client) Start

func (c *Client) Start(partitions []int) (closer func(), err error)

*

  • 在当前节点上启动进行本地系统的队列监听,该方法请务必在{@link #AddProcessor(...Processor)}方法之后调用,否则可能会导致部分消息
  • 由于没有对应的消息处理器而丢失。<font color="red">特别注意:如果收到的消息类型没有对应的消息处理器,系统只会简单的丢弃并打印告警信息!</font>
  • 同时注意,该方法只能被调用一次,如果多次调用则后续的调用会抛出异常。 *
  • @throws error

type ClientConfig

type ClientConfig struct {
	Provider   string            `json:"provider"`
	Parameter  map[string]string `json:"parameter"`
	Partitions int               `json:"partitions"` // 分区数量
}

type Config

type Config struct {
	// contains filtered or unexported fields
}

type Processor

type Processor interface {
	/**
	 * 获取要处理的消息类型,对应{@link AMQMessage}中的type字段。
	 *
	 * @return
	 */
	GetType() string

	/**
	 * 接收新消息并进行处理,如果新消息为事务消息并且需要返回处理结果则请返回处理结果,否则返回Null即可。
	 *
	 * @param message
	 * @throws AMQException
	 */
	OnReceived(msg interface{}) (*message.MsgBody, error)

	/**
	 * 单向/双向事务消息中的接收方应答消息的接收处理,对于单向事务消息处理完成后返回Null即可,对于双向事务消息则还需要
	 * 返回系统的处理结果给到接收方进行后续处理。
	 *
	 * @param msgId
	 * @param rsp
	 * @return
	 */
	OnRecipientAckReceived(msgId string, rsp *message.MsgBody) (*message.MsgBody, error)

	/**
	 * 双向事务消息中的发送方的确认应答消息,由接收方进行处理。
	 *
	 * @param msgId 事务消息的唯一ID
	 * @param rsp   发送方返回的应答消息
	 */
	OnSenderAckReceived(msgId string, rsp *message.MsgBody) error
}

*

  • AMQ消息的处理器接口定义,业务系统实现该接口后需要手动注册到{@link AMQClient}中去方可生效。

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL