Documentation
¶
Index ¶
- Constants
- Variables
- func CommitOrRollback(p *AliyunMQTransProducer, receiptHandle string, trans string) (err error)
- func ConstructPubMessage(pubMsgReq *PublishMessageRequest) (err error)
- func ConstructRecMessage(entries *[]ConsumeMessageEntry)
- func ContainsSpecialChar(input string) (result bool)
- func ParseError(resp ErrorResponse, resource string) (err error)
- type AckMessageErrorEntry
- type AckMessageErrorResponse
- type AliyunMQAckMsgDecoder
- type AliyunMQClient
- func (p *AliyunMQClient) GetConsumer(instanceId string, topicName string, consumer string, messageTag string) MQConsumer
- func (p *AliyunMQClient) GetCredential() Credential
- func (p *AliyunMQClient) GetEndpoint() string
- func (p *AliyunMQClient) GetProducer(instanceId string, topicName string) MQProducer
- func (p *AliyunMQClient) GetTransProducer(instanceId string, topicName string, groupId string) MQTransProducer
- func (p *AliyunMQClient) Send(decoder MQDecoder, method Method, headers map[string]string, ...) (statusCode int, err error)
- func (p *AliyunMQClient) Send0(method Method, headers map[string]string, message interface{}, resource string) (*fasthttp.Response, error)
- type AliyunMQConsumer
- func (p *AliyunMQConsumer) AckMessage(receiptHandles []string) (err error)
- func (p *AliyunMQConsumer) ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, ...)
- func (p *AliyunMQConsumer) ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, ...)
- func (p *AliyunMQConsumer) Consumer() string
- func (p *AliyunMQConsumer) InstanceId() string
- func (p *AliyunMQConsumer) MessageTag() string
- func (p *AliyunMQConsumer) TopicName() string
- type AliyunMQDecoder
- type AliyunMQProducer
- type AliyunMQTransProducer
- func (p *AliyunMQTransProducer) Commit(receiptHandle string) (err error)
- func (p *AliyunMQTransProducer) ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, ...)
- func (p *AliyunMQTransProducer) GroupId() string
- func (p *AliyunMQTransProducer) InstanceId() string
- func (p *AliyunMQTransProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
- func (p *AliyunMQTransProducer) Rollback(receiptHandle string) (err error)
- func (p *AliyunMQTransProducer) TopicName() string
- type ConsumeMessageEntry
- type ConsumeMessageResponse
- type Credential
- type ErrAckItem
- type ErrorResponse
- type MQClient
- type MQConsumer
- type MQCredential
- type MQDecoder
- type MQProducer
- type MQTransProducer
- type MessageResponse
- type Method
- type PublishMessageRequest
- type PublishMessageResponse
- type ReceiptHandles
Constants ¶
View Source
const ( ClientName = "mq-go-sdk/1.0.3(fasthttp)" ClientVersion = "2015-06-06" DefaultTimeout int64 = 35 )
View Source
const ( GET Method = "GET" POST = "POST" DELETE = "DELETE" )
View Source
const ( AUTHORIZATION = "Authorization" CONTENT_TYPE = "Content-Type" CONTENT_MD5 = "Content-MD5" MQ_VERSION = "x-mq-version" HOST = "Host" DATE = "Date" KEEP_ALIVE = "Keep-Alive" SECURITY_TOKEN = "security-token" )
View Source
const ( ALIYUN_MQ_ERR_NS = "MQ" ALIYUN_MQ_ERR_TEMPSTR = "" /* 133-byte string literal not displayed */ )
View Source
const ( StartDeliverTime = "__STARTDELIVERTIME" TransCheckImmunityTime = "__TransCheckT" Keys = "KEYS" SHARDING = "__SHARDINGKEY" )
Variables ¶
View Source
var ( ErrSignMessageFailed = errors.TN(ALIYUN_MQ_ERR_NS, 1, "sign message failed, {{.err}}") ErrMarshalMessageFailed = errors.TN(ALIYUN_MQ_ERR_NS, 2, "marshal message filed, {{.err}}") ErrGeneralAuthHeaderFailed = errors.TN(ALIYUN_MQ_ERR_NS, 3, "general auth header failed, {{.err}}") ErrMessageProperty = errors.TN(ALIYUN_MQ_ERR_NS, 4, "message property can not contains:\" ' < > & : |, {{.err}}") ErrSendRequestFailed = errors.TN(ALIYUN_MQ_ERR_NS, 5, "send request failed, {{.err}}") ErrUnmarshalErrorResponseFailed = errors.TN(ALIYUN_MQ_ERR_NS, 7, "unmarshal error response failed, {{.err}}, ResponseBody: {{.resp}}") ErrUnmarshalResponseFailed = errors.TN(ALIYUN_MQ_ERR_NS, 8, "unmarshal response failed, {{.err}}") ErrMqServer = errors.TN(ALIYUN_MQ_ERR_NS, 101, ALIYUN_MQ_ERR_TEMPSTR) ErrAckMessage = errors.TN(ALIYUN_MQ_ERR_NS, 102, "aliyun_mq ack message error,code: {{.Code}}, message: {{.Message}}, receiptHandle: {{.ReceiptHandle}}, requestId: {{.RequestId}}") )
View Source
var (
DefaultNumOfMessages int32 = 16
)
Functions ¶
func CommitOrRollback ¶
func CommitOrRollback(p *AliyunMQTransProducer, receiptHandle string, trans string) (err error)
func ConstructPubMessage ¶
func ConstructPubMessage(pubMsgReq *PublishMessageRequest) (err error)
func ConstructRecMessage ¶
func ConstructRecMessage(entries *[]ConsumeMessageEntry)
func ContainsSpecialChar ¶
func ParseError ¶
func ParseError(resp ErrorResponse, resource string) (err error)
Types ¶
type AckMessageErrorEntry ¶
type AckMessageErrorEntry struct {
XMLName xml.Name `xml:"Error" json:"-"`
// Ack消息出错的错误码
ErrorCode string `xml:"ErrorCode" json:"error_code"`
// Ack消息出错的错误描述
ErrorMessage string `xml:"ErrorMessage" json:"error_messages"`
// Ack消息出错的消息句柄
ReceiptHandle string `xml:"ReceiptHandle,omitempty" json:"receipt_handle"`
}
type AckMessageErrorResponse ¶
type AckMessageErrorResponse struct {
XMLName xml.Name `xml:"Errors" json:"-"`
FailedMessages []AckMessageErrorEntry `xml:"Error" json:"errors"`
}
type AliyunMQAckMsgDecoder ¶
type AliyunMQAckMsgDecoder struct {
// contains filtered or unexported fields
}
func (*AliyunMQAckMsgDecoder) Decode ¶
func (p *AliyunMQAckMsgDecoder) Decode(reader io.Reader, v interface{}) (err error)
func (*AliyunMQAckMsgDecoder) DecodeError ¶
type AliyunMQClient ¶
type AliyunMQClient struct {
// contains filtered or unexported fields
}
func (*AliyunMQClient) GetConsumer ¶
func (p *AliyunMQClient) GetConsumer(instanceId string, topicName string, consumer string, messageTag string) MQConsumer
func (*AliyunMQClient) GetCredential ¶
func (p *AliyunMQClient) GetCredential() Credential
func (*AliyunMQClient) GetEndpoint ¶
func (p *AliyunMQClient) GetEndpoint() string
func (*AliyunMQClient) GetProducer ¶
func (p *AliyunMQClient) GetProducer(instanceId string, topicName string) MQProducer
func (*AliyunMQClient) GetTransProducer ¶
func (p *AliyunMQClient) GetTransProducer(instanceId string, topicName string, groupId string) MQTransProducer
type AliyunMQConsumer ¶
type AliyunMQConsumer struct {
// contains filtered or unexported fields
}
func (*AliyunMQConsumer) AckMessage ¶
func (p *AliyunMQConsumer) AckMessage(receiptHandles []string) (err error)
func (*AliyunMQConsumer) ConsumeMessage ¶
func (p *AliyunMQConsumer) ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
func (*AliyunMQConsumer) ConsumeMessageOrderly ¶
func (p *AliyunMQConsumer) ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
func (*AliyunMQConsumer) Consumer ¶
func (p *AliyunMQConsumer) Consumer() string
func (*AliyunMQConsumer) InstanceId ¶
func (p *AliyunMQConsumer) InstanceId() string
func (*AliyunMQConsumer) MessageTag ¶
func (p *AliyunMQConsumer) MessageTag() string
func (*AliyunMQConsumer) TopicName ¶
func (p *AliyunMQConsumer) TopicName() string
type AliyunMQDecoder ¶
type AliyunMQDecoder struct {
}
func (*AliyunMQDecoder) Decode ¶
func (p *AliyunMQDecoder) Decode(reader io.Reader, v interface{}) (err error)
func (*AliyunMQDecoder) DecodeError ¶
type AliyunMQProducer ¶
type AliyunMQProducer struct {
// contains filtered or unexported fields
}
func (*AliyunMQProducer) InstanceId ¶
func (p *AliyunMQProducer) InstanceId() string
func (*AliyunMQProducer) PublishMessage ¶
func (p *AliyunMQProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
func (*AliyunMQProducer) TopicName ¶
func (p *AliyunMQProducer) TopicName() string
type AliyunMQTransProducer ¶
type AliyunMQTransProducer struct {
// contains filtered or unexported fields
}
func (*AliyunMQTransProducer) Commit ¶
func (p *AliyunMQTransProducer) Commit(receiptHandle string) (err error)
func (*AliyunMQTransProducer) ConsumeHalfMessage ¶
func (p *AliyunMQTransProducer) ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
func (*AliyunMQTransProducer) GroupId ¶
func (p *AliyunMQTransProducer) GroupId() string
func (*AliyunMQTransProducer) InstanceId ¶
func (p *AliyunMQTransProducer) InstanceId() string
func (*AliyunMQTransProducer) PublishMessage ¶
func (p *AliyunMQTransProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
func (*AliyunMQTransProducer) Rollback ¶
func (p *AliyunMQTransProducer) Rollback(receiptHandle string) (err error)
func (*AliyunMQTransProducer) TopicName ¶
func (p *AliyunMQTransProducer) TopicName() string
type ConsumeMessageEntry ¶
type ConsumeMessageEntry struct {
MessageResponse
// 消息ID
MessageId string `xml:"MessageId" json:"message_id"`
// 消息句柄
ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"`
// 消息体MD5
MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"`
// 消息体
MessageBody string `xml:"MessageBody" json:"message_body"`
// 消息发送时间
PublishTime int64 `xml:"PublishTime" json:"publish_time"`
// 下次消费消息的时间(如果这次消费的消息没有Ack)
NextConsumeTime int64 `xml:"NextConsumeTime" json:"next_consume_time"`
// 第一次消费的时间,此值对于顺序消费没有意义
FirstConsumeTime int64 `xml:"FirstConsumeTime" json:"first_consume_time"`
// 消费次数
ConsumedTimes int64 `xml:"ConsumedTimes" json:"consumed_times"`
// 消息标签
MessageTag string `xml:"MessageTag" json:"message_tag"`
// 消息属性
Properties map[string]string `xml:"-" json:"-"`
// 序列化属性请勿使用
PropInner string `xml:"Properties,omitempty" json:"properties,omitempty"`
// 消息KEY
MessageKey string `xml:"-" json:"-"`
// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递
StartDeliverTime int64 `xml:"-" json:"-"`
// 顺序消息分区Key
ShardingKey string `xml:"-" json:"-"`
// 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息
TransCheckImmunityTime int `xml:"-" json:"-"`
}
type ConsumeMessageResponse ¶
type ConsumeMessageResponse struct {
XMLName xml.Name `xml:"Messages" json:"-"`
Messages []ConsumeMessageEntry `xml:"Message" json:"messages"`
}
type Credential ¶
type ErrAckItem ¶
type ErrorResponse ¶
type ErrorResponse struct {
XMLName xml.Name `xml:"Error" json:"-"`
// 错误码
Code string `xml:"Code,omitempty" json:"code,omitempty"`
// 错误描述
Message string `xml:"Message,omitempty" json:"message,omitempty"`
// 请求ID
RequestId string `xml:"RequestId,omitempty" json:"request_id,omitempty"`
// 请求HOST
HostId string `xml:"HostId,omitempty" json:"host_id,omitempty"`
}
type MQClient ¶
type MQClient interface {
GetProducer(instanceId string, topicName string) MQProducer
GetTransProducer(instanceId string, topicName string, groupId string) MQTransProducer
GetConsumer(instanceId string, topicName string, consumer string, messageTag string) MQConsumer
GetCredential() Credential
GetEndpoint() string
}
func NewAliyunMQClient ¶
type MQConsumer ¶
type MQConsumer interface {
// 主题名字
TopicName() string
// 实例ID,可空
InstanceId() string
// 消费者的名字
Consumer() string
// 消费消息过滤的标签
MessageTag() string
// 消费消息,如果该条消息没有被 {AckMessage} 确认消费成功,即在NextConsumeTime时会再次消费到该条消息
ConsumeMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
// 顺序消费消息,获取的消息可能是多个分区但是一个分区内消息一定是顺序的,对于一个分区的消息必须要全部ACK成功才能消费下一批消息
// 否则在NextConsumeTime后会再次消费到相同的消息
ConsumeMessageOrderly(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
// 确认消息消费成功
AckMessage(receiptHandles []string) (err error)
}
MQ的消息消费者
type MQCredential ¶
type MQCredential struct {
// contains filtered or unexported fields
}
func NewMQCredential ¶
func NewMQCredential(accessKeyId string, accessKeySecret string, securityToken string) *MQCredential
func (*MQCredential) AccessKeyId ¶
func (p *MQCredential) AccessKeyId() string
func (*MQCredential) AccessKeySecret ¶
func (p *MQCredential) AccessKeySecret() string
func (*MQCredential) SecurityToken ¶
func (p *MQCredential) SecurityToken() string
type MQDecoder ¶
type MQDecoder interface {
Decode(reader io.Reader, v interface{}) (err error)
DecodeError(bodyBytes []byte, resource string, requestId string) (decodedError error, err error)
}
func NewAliyunMQAckMsgDecoder ¶
func NewAliyunMQAckMsgDecoder() MQDecoder
func NewAliyunMQDecoder ¶
func NewAliyunMQDecoder() MQDecoder
type MQProducer ¶
type MQProducer interface {
// 主题名字
TopicName() string
// 实例ID,可空
InstanceId() string
// 发送消息
PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
}
MQ的消息生产者
type MQTransProducer ¶
type MQTransProducer interface {
// 主题名字
TopicName() string
// 实例ID,可空
InstanceId() string
// GroupId,非空
GroupId() string
// 发送消息
PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
// 消费事务半消息
ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
// 提交事务消息
Commit(receiptHandle string) (err error)
// 回滚事务消息
Rollback(receiptHandle string) (err error)
}
MQ的事务消息生产者
type MessageResponse ¶
type MessageResponse struct {
XMLName xml.Name `xml:"Message" json:"-"`
// 错误码
Code string `xml:"Code,omitempty" json:"code,omitempty"`
// 错误描述
Message string `xml:"Message,omitempty" json:"message,omitempty"`
// 请求ID
RequestId string `xml:"RequestId,omitempty" json:"request_id,omitempty"`
// 请求HOST
HostId string `xml:"HostId,omitempty" json:"host_id,omitempty"`
}
type PublishMessageRequest ¶
type PublishMessageRequest struct {
XMLName xml.Name `xml:"Message" json:"-"`
// 消息体
MessageBody string `xml:"MessageBody" json:"message_body"`
// 消息标签
MessageTag string `xml:"MessageTag,omitempty" json:"message_tag,omitempty"`
// 消息属性
Properties map[string]string `xml:"-" json:"-"`
// 消息KEY
MessageKey string `xml:"-" json:"-"`
// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递
StartDeliverTime int64 `xml:"-" json:"-"`
// 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息, 10~300s
TransCheckImmunityTime int `xml:"-" json:"-"`
// 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。全局顺序消息,该字段可以设置为任意非空字符串。
ShardingKey string `xml:"-" json:"-"`
// contains filtered or unexported fields
}
type PublishMessageResponse ¶
type PublishMessageResponse struct {
MessageResponse
// 消息ID
MessageId string `xml:"MessageId" json:"message_id"`
// 消息体MD5
MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"`
// 消息句柄只有事务消息才有
ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"`
}
type ReceiptHandles ¶
Click to show internal directories.
Click to hide internal directories.