Documentation
¶
Overview ¶
Package goutils - common utility support package to be used by many projects
Index ¶
- func CreateBasicGCPPubSubClient(ctxt context.Context, projectID string) (*pubsub.Client, error)
- func DefineHTTPClient(parentCtxt context.Context, retryConfig HTTPClientRetryConfig, ...) (*resty.Client, error)
- func GetJWTTokenFromContext(ctx context.Context) (*jwt.Token, error)
- func ModifyLogMetadataByRestRequestParam(ctxt context.Context, theTags log.Fields)
- func TimeBoundedWaitGroupWait(wgCtxt context.Context, wg *sync.WaitGroup, timeout time.Duration) error
- type AsyncQueue
- type ClientCredOAuthTokenManagerParam
- type Component
- type Condition
- type ErrorDetail
- type ErrorNoDataAvailable
- type ErrorTimeout
- type ErrorUnexpectedType
- type HTTPClientAuthConfig
- type HTTPClientRetryConfig
- type HTTPClientTransportConfig
- type HTTPRequestLogLevel
- type HTTPRequestMetricHelper
- type HTTPRequestRetryParam
- type IntervalTimer
- type JWTCheckMiddleware
- type LogMetadataModifier
- type MessageBus
- type MessageTopic
- type MetricsCollector
- type OAuthTokenManager
- type OIDPClientParam
- type OpenIDProviderClient
- type PriorityQueueEntry
- type PubSubClient
- type PubSubMessageHandler
- type PubSubMetricHelper
- type PubSubRequestResponseClientParam
- type Queue
- type RRInboundRequestHandler
- type RRMessageParser
- type ReqRespMessage
- type ReqRespMessageHandler
- type ReqRespTimeoutHandler
- type RequestCallParam
- type RequestResponseClient
- type RequestResponseDriver
- func (d *RequestResponseDriver) InstallHandler(requestType reflect.Type, handler RRInboundRequestHandler)
- func (d *RequestResponseDriver) MakeRequest(ctxt context.Context, requestInstanceName string, targetID string, ...) ([]interface{}, error)
- func (d *RequestResponseDriver) ProcessInboundRequest(ctxt context.Context, msg ReqRespMessage) error
- type RestAPIBaseResponse
- type RestAPIHandler
- func (h RestAPIHandler) GetStdRESTErrorMsg(ctxt context.Context, respCode int, errMsg string, errDetail string) RestAPIBaseResponse
- func (h RestAPIHandler) GetStdRESTSuccessMsg(ctxt context.Context) RestAPIBaseResponse
- func (h RestAPIHandler) LoggingMiddleware(next http.HandlerFunc) http.HandlerFunc
- func (h RestAPIHandler) ReadRequestIDFromContext(ctxt context.Context) string
- func (h RestAPIHandler) WriteRESTResponse(w http.ResponseWriter, respCode int, resp interface{}, ...) error
- type RestRequestOAuthTokenKey
- type RestRequestParam
- type RestRequestParamKey
- type Sequencer
- type TaskProcessor
- type TaskProcessorMetricHelper
- type TaskProcessorSupportHandler
- type TimeoutHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateBasicGCPPubSubClient ¶ added in v0.4.0
CreateBasicGCPPubSubClient define a basic GCP PubSub client
@param ctxt context.Context - execution context @param projectID string - GCP project ID @returns new client
func DefineHTTPClient ¶ added in v0.6.1
func DefineHTTPClient( parentCtxt context.Context, retryConfig HTTPClientRetryConfig, authConfig *HTTPClientAuthConfig, transportConfig *HTTPClientTransportConfig, ) (*resty.Client, error)
DefineHTTPClient helper function to define a resty HTTP client
@param parentCtxt context.Context - caller context @param retryConfig HTTPClientRetryConfig - HTTP client retry config @param authConfig *HTTPClientAuthConfig - HTTP client auth config @param transportConfig *HTTPClientTransportConfig - HTTP client transport config @returns new resty client
func GetJWTTokenFromContext ¶ added in v0.10.0
GetJWTTokenFromContext parse out the JWT token recorded in the request context
func ModifyLogMetadataByRestRequestParam ¶
ModifyLogMetadataByRestRequestParam update log metadata with info from RestRequestParam
@param ctxt context.Context - a request context @param theTags log.Fields - a log metadata to update
func TimeBoundedWaitGroupWait ¶ added in v0.4.0
func TimeBoundedWaitGroupWait( wgCtxt context.Context, wg *sync.WaitGroup, timeout time.Duration, ) error
TimeBoundedWaitGroupWait is a wrapper around wait group wait with a time limit
@param wgCtxt context.Context - context associated with the wait group @param wg *sync.WaitGroup - the wait group to @param timeout time.Duration - wait timeout duration
Types ¶
type AsyncQueue ¶ added in v0.8.0
type AsyncQueue[V any] interface { /* Len get the current queue length @return current queue length */ Len() int /* Push enqueue data @generic V any - the data type being passed through the queue @param ctx context.Context - calling context @param data V - data to enqueue */ Push(ctx context.Context, data V) error /* Pop dequeue data. If caller choices to wait until data is available, the maximum duration for the wait is controlled by the context. The queue uses "Condition" to signal to any awaiting caller that data is available. To support that, the caller needs to supply wake up chan for use with "Condition". @generic V any - the data type being passed through the queue @param ctx context.Context - calling context @param blocking bool - whether to block until data is available @param newDataSignalFlag chan bool - wake up chan for use with "Condition" @return data from queue @error ErrorNoDataAvailable - queue is empty @error ErrorTimeout - timed out waiting for data */ Pop(ctx context.Context, blocking bool, newDataSignalFlag chan bool) (V, error) }
AsyncQueue asynchronous queue
func GetNewAsyncPriorityQueue ¶ added in v0.8.0
func GetNewAsyncPriorityQueue[V PriorityQueueEntry]( _ context.Context, instanceName string, logTags log.Fields, ) (AsyncQueue[V], error)
GetNewAsyncPriorityQueue define new asynchronous priority queue
@generic V PriorityQueueEntry - the data type being passed through the queue @param ctx context.Context - calling context @param instanceName string - queue instance name @param logTags log.Fields - metadata fields to include in the logs @returns new AsyncQueue instance
func GetNewAsyncQueue ¶ added in v0.8.0
func GetNewAsyncQueue[V any]( _ context.Context, instanceName string, logTags log.Fields, ) (AsyncQueue[V], error)
GetNewAsyncQueue define new asynchronous queue
@generic V any - the data type being passed through the queue @param ctx context.Context - calling context @param instanceName string - queue instance name @param logTags log.Fields - metadata fields to include in the logs @returns new AsyncQueue instance
type ClientCredOAuthTokenManagerParam ¶ added in v0.5.0
type ClientCredOAuthTokenManagerParam struct {
// IDPIssuerURL OpenID provider issuing URL
IDPIssuerURL string `validate:"required,url"`
// ClientID OAuth client ID
ClientID string `validate:"required"`
// ClientSecret OAuth client secret
ClientSecret string `validate:"required"`
// TargetAudience the token's target audience
TargetAudience *string
// LogTags metadata fields to include in the logs
LogTags log.Fields
// CustomLogModifiers additional log metadata modifiers to use
CustomLogModifiers []LogMetadataModifier
// TimeBuffer time buffer before a token expires to perform the token refresh / renew.
// This helps in situations where there is a time offset between the client and the
// server.
TimeBuffer time.Duration
// SupportTaskMetricsHelper metrics collection helper for the support tasks
SupportTaskMetricsHelper TaskProcessorMetricHelper
}
ClientCredOAuthTokenManagerParam configuration for client credential flow oauth token manager
type Component ¶
type Component struct {
// LogTags the Apex logging message metadata tags
LogTags log.Fields
// LogTagModifiers is the list of log metadata modifier callbacks
LogTagModifiers []LogMetadataModifier
}
Component is the base structure for all components
func (Component) GetLogTagsForContext ¶
GetLogTagsForContext creates a new Apex log.Fields metadata structure for a specific context
@param ctxt context.Context - a request context @return the new Apec log.Fields metadata
func (Component) NewLogTagsForContext ¶
NewLogTagsForContext generates a new deep-copied LogTags for an execution context
@return a new log.Fields
type Condition ¶ added in v0.8.0
type Condition interface {
// NotifyOne notify one waiter
NotifyOne() error
// NotifyAll notify all waiters
NotifyAll() error
/*
Wait caller will block and wait to be notified. The maximum duration for the wait is
controlled by the context.
The signaling channel is provided by the caller, and allows the same channel to be reused
for subsequent calls.
IMPORTANT: the signaling channel must be buffered
@param ctx context.Context - calling context
@param wakeUp chan bool - signaling channel
@error ErrorTimeout - timed out waiting for signal
*/
Wait(ctx context.Context, wakeUp chan bool) error
}
Condition variable which has similar behavior to the C++11 std::condition_variable
func GetNewCondition ¶ added in v0.8.0
func GetNewCondition() Condition
GetNewCondition get new condition variable.
This condition has similar behavior to the C++11 std::condition_variable.
@return new condition variable
type ErrorDetail ¶
type ErrorDetail struct {
// Code is the response code
Code int `json:"code" validate:"required"`
// Msg is an optional descriptive message
Msg string `json:"message,omitempty"`
// Detail is an optional descriptive message providing additional details on the error
Detail string `json:"detail,omitempty"`
}
ErrorDetail is the response detail in case of error
type ErrorNoDataAvailable ¶ added in v0.8.0
type ErrorNoDataAvailable struct{}
ErrorNoDataAvailable no data available error
func (ErrorNoDataAvailable) Error ¶ added in v0.8.0
func (ErrorNoDataAvailable) Error() string
Error implement error interface
type ErrorTimeout ¶ added in v0.8.0
type ErrorTimeout struct{}
ErrorTimeout operation timed out error
func (ErrorTimeout) Error ¶ added in v0.8.0
func (ErrorTimeout) Error() string
Error implement error interface
type ErrorUnexpectedType ¶ added in v0.8.0
ErrorUnexpectedType data has unexpected
func (ErrorUnexpectedType) Error ¶ added in v0.8.0
func (e ErrorUnexpectedType) Error() string
Error implement error interface
type HTTPClientAuthConfig ¶ added in v0.6.1
type HTTPClientAuthConfig struct {
// IssuerURL OpenID provider issuer URL
IssuerURL string `json:"issuer"`
// ClientID OAuth client ID
ClientID string `json:"client_id"`
// ClientSecret OAuth client secret
ClientSecret string `json:"client_secret"`
// TargetAudience target audience `aud` to acquire a token for
TargetAudience *string `json:"target_audience"`
// LogTags auth middleware log tags
LogTags log.Fields
}
HTTPClientAuthConfig HTTP client OAuth middleware configuration
Currently only support client-credential OAuth flow configuration
type HTTPClientRetryConfig ¶ added in v0.6.1
type HTTPClientRetryConfig struct {
// MaxAttempts max number of retry attempts
MaxAttempts int `json:"max_attempts"`
// InitWaitTime wait time before the first wait retry
InitWaitTime time.Duration `json:"initialWaitTimeInSec"`
// MaxWaitTime max wait time
MaxWaitTime time.Duration `json:"maxWaitTimeInSec"`
}
HTTPClientRetryConfig HTTP client config retry configuration
type HTTPClientTransportConfig ¶ added in v0.10.0
type HTTPClientTransportConfig struct {
// CustomCA if provided, is the custom CA to use for the TLS session.
CustomCA *string `json:"http_tls_ca,omitempty"`
}
HTTPClientTransportConfig HTTP client transport configuration
type HTTPRequestLogLevel ¶ added in v0.5.1
type HTTPRequestLogLevel string
HTTPRequestLogLevel HTTP request log level data type
const ( HTTPLogLevelWARN HTTPRequestLogLevel = "warn" HTTPLogLevelINFO HTTPRequestLogLevel = "info" HTTPLogLevelDEBUG HTTPRequestLogLevel = "debug" )
HTTP request log levels
type HTTPRequestMetricHelper ¶ added in v0.6.0
type HTTPRequestMetricHelper interface {
/*
RecordRequest record parameters regarding a request to the metrics
@param method string - HTTP request method
@param status int - HTTP response status
@param latency time.Duration - delay between request received, and response sent
@param respSize int64 - HTTP response size in bytes
*/
RecordRequest(method string, status int, latency time.Duration, respSize int64)
}
HTTPRequestMetricHelper HTTP request metric recording helper agent
type HTTPRequestRetryParam ¶ added in v0.5.0
type HTTPRequestRetryParam struct {
// MaxRetires maximum number of retries
MaxRetires int
// InitialWaitTime the initial retry wait time
InitialWaitTime time.Duration
// MaxWaitTime the max retry wait time
MaxWaitTime time.Duration
}
HTTPRequestRetryParam HTTP client request retry parameters
type IntervalTimer ¶
type IntervalTimer interface {
/*
Start starts timer with a specific timeout interval, and the callback to trigger on timeout.
If oneShort, cancel after first timeout.
@param interval time.Duration - timeout interval
@param handler TimeoutHandler - handler to trigger on timeout
@param oneShort bool - if true, timer stop after first activation
*/
Start(interval time.Duration, handler TimeoutHandler, oneShort bool) error
/*
Stop stops the timer
*/
Stop() error
}
IntervalTimer is a support interface for triggering events at specific intervals
func GetIntervalTimerInstance ¶
func GetIntervalTimerInstance( rootCtxt context.Context, wg *sync.WaitGroup, logTags log.Fields, ) (IntervalTimer, error)
GetIntervalTimerInstance get an implementation instance of IntervalTimer
@param rootCtxt context.Context - the base Context the timer will derive new runtime context from each time Start is called. @param wg *sync.WaitGroup - WaitGroup use by timer @param logTags log.Fields - log metadata fields @return an IntervalTimer instance
type JWTCheckMiddleware ¶ added in v0.10.0
type JWTCheckMiddleware struct {
Component
// contains filtered or unexported fields
}
JWTCheckMiddleware middleware for validating Oauth JWT tokens
func DefineJWTCheckMiddleware ¶ added in v0.10.0
func DefineJWTCheckMiddleware( providerClient OpenIDProviderClient, logTags log.Fields, ) JWTCheckMiddleware
DefineJWTCheckMiddleware define a new Oauth JWT validation middleware
@param providerClient OpenIDProviderClient - core client @param logTags log.Fields - metadata fields to include in the logs @returns new middleware
func (JWTCheckMiddleware) ParseAndValidateJWT ¶ added in v0.10.0
func (m JWTCheckMiddleware) ParseAndValidateJWT(next http.HandlerFunc) http.HandlerFunc
ParseAndValidateJWT is a support middleware to be used with Mux to parse and validate OAuth bearer token in the request.
@param next http.HandlerFunc - the core request handler function @return middleware http.HandlerFunc
type LogMetadataModifier ¶
LogMetadataModifier is the function signature of a callback to update log.Fields with additional key-value pairs.
type MessageBus ¶ added in v0.6.2
type MessageBus interface {
/*
CreateTopic create new message topic
@param ctxt context.Context - execution context
@param topicName string - topic name
@param topicLogTags log.Fields - metadata fields to include in the logs of the topic entity
@return new MessageTopic instance
*/
CreateTopic(
ctxt context.Context, topicName string, topicLogTags log.Fields,
) (MessageTopic, error)
/*
GetTopic fetch a message topic
@param ctxt context.Context - execution context
@param topicName string - topic name
@returns MessageTopic instance
*/
GetTopic(ctxt context.Context, topicName string) (MessageTopic, error)
/*
DeleteTopic delete a message topic
@param ctxt context.Context - execution context
@param topicName string - topic name
*/
DeleteTopic(ctxt context.Context, topicName string) error
}
MessageBus an application scoped local message bus
func GetNewMessageBusInstance ¶ added in v0.6.2
GetNewMessageBusInstance get message bus instance
@param parentCtxt context.Context - parent execution context @param logTags log.Fields - metadata fields to include in the logs @return new MessageBus instance
type MessageTopic ¶ added in v0.6.2
type MessageTopic interface {
/*
Publish publish a message on the topic in parallel.
@param ctxt context.Context - execution context
@param message interface{} - the message to send
@param blockFor time.Duration - how long to block for the publish to complete. If >0,
this is a non-blocking call; blocking call otherwise.
*/
Publish(ctxt context.Context, message interface{}, blockFor time.Duration) error
/*
CreateSubscription create a new topic subscription
@param ctxt context.Context - execution context
@param subscriber string - name of the subscription
@param bufferLen int - length of message buffer
@returns the channel to receive messages on
*/
CreateSubscription(
ctxt context.Context, subscriber string, bufferLen int,
) (chan interface{}, error)
/*
DeleteSubscription delete an existing topic subscription
@param ctxt context.Context - execution context
@param subscriber string - subscription to delete
*/
DeleteSubscription(ctxt context.Context, subscriber string) error
}
MessageTopic a message bus topic, responsible for managing its child subscriptions.
type MetricsCollector ¶ added in v0.6.0
type MetricsCollector interface {
/*
InstallApplicationMetrics install trackers for Golang application execution metrics
*/
InstallApplicationMetrics()
/*
InstallHTTPMetrics install trackers for HTTP request metrics collection. This will return
a helper agent to record the metrics.
@returns request metrics logging agent
*/
InstallHTTPMetrics() HTTPRequestMetricHelper
/*
InstallPubSubMetrics install trackers for PubSub messaging collection. This will return
a helper agent to record the metrics.
@return PubSub metrics logging agent
*/
InstallPubSubMetrics() PubSubMetricHelper
/*
InstallTaskProcessorMetrics install tracker for Task processor operations. This will return
a helper agent to record the metrics
@return Task process logging agent
*/
InstallTaskProcessorMetrics() TaskProcessorMetricHelper
/*
InstallCustomCounterVecMetrics install new custom `CounterVec` metrics
@param ctxt context.Context - execution context
@param metricsName string - metrics name
@param metricsHelpMessage string - metrics help message
@param metricsLabels []string - labels to support
@returns new `CounterVec` handle
*/
InstallCustomCounterVecMetrics(
ctxt context.Context, metricsName string, metricsHelpMessage string, metricsLabels []string,
) (*prometheus.CounterVec, error)
/*
InstallCustomGaugeVecMetrics install new custom `GaugeVec` metrics
@param ctxt context.Context - execution context
@param metricsName string - metrics name
@param metricsHelpMessage string - metrics help message
@param metricsLabels []string - labels to support
@returns new `GaugeVec` handle
*/
InstallCustomGaugeVecMetrics(
ctxt context.Context, metricsName string, metricsHelpMessage string, metricsLabels []string,
) (*prometheus.GaugeVec, error)
/*
ExposeCollectionEndpoint expose the Prometheus metric collection endpoint
@param outer *mux.Router - HTTP router to install endpoint on
@param metricsPath string - metrics endpoint path relative to the router provided
@param maxSupportedRequest int - max number of request the endpoint will support
*/
ExposeCollectionEndpoint(router *mux.Router, metricsPath string, maxSupportedRequest int)
}
MetricsCollector metrics collection support client
func GetNewMetricsCollector ¶ added in v0.6.0
func GetNewMetricsCollector( logTags log.Fields, customLogModifiers []LogMetadataModifier, ) (MetricsCollector, error)
GetNewMetricsCollector get metrics collection support client
@param logTags log.Fields - metadata fields to include in the logs @param customLogModifiers []LogMetadataModifier - additional log metadata modifiers to use @returns metric collection support client
type OAuthTokenManager ¶ added in v0.5.0
type OAuthTokenManager interface {
/*
GetToken fetch the current valid OAuth token
@param ctxt context.Context - the execution context
@param timestamp time.Time - the current timestamp
@returns the token
*/
GetToken(ctxt context.Context, timestamp time.Time) (string, error)
/*
Stop stop any support background tasks which were started
@param ctxt context.Context - execution context
*/
Stop(ctxt context.Context) error
}
OAuthTokenManager Oauth token manager handles fetching and refreshing of OAuth tokens
func GetNewClientCredOAuthTokenManager ¶ added in v0.5.0
func GetNewClientCredOAuthTokenManager( parentCtxt context.Context, httpClient *resty.Client, params ClientCredOAuthTokenManagerParam, ) (OAuthTokenManager, error)
GetNewClientCredOAuthTokenManager get client credential flow oauth token manager
@param parentCtxt context.Context - parent context @param httpClient *resty.Client - use this HTTP client to interact with the IDP @param params ClientCredOAuthTokenManagerParam - configuration for the token manager @returns new OAuthTokenManager instance
type OIDPClientParam ¶ added in v0.10.0
type OIDPClientParam struct {
// Issuer is the URL of the OpenID provider issuer URL
Issuer string `json:"issuer" validate:"required,url"`
// ClientID is the client ID to use during token introspection
ClientID *string `json:"client_id" validate:"omitempty"`
// ClientCred is the client credential to use during token introspection
ClientCred *string `json:"client_cred" validate:"omitempty"`
// RequestHostOverride if specified, use this as "Host" header when communicating with provider
RequestHostOverride *string `json:"host_override" validate:"omitempty"`
// TargetAudiences a set of audiences which are accepted
TargetAudiences []string `json:"target_audiences"`
// LogTags metadata fields to include in the logs
LogTags log.Fields
}
OIDPClientParam defines connection parameters to one OpenID provider
type OpenIDProviderClient ¶ added in v0.10.0
type OpenIDProviderClient interface {
/*
AssociatedPublicKey fetches the associated public based on "kid" value of a JWT token
@param token *jwt.Token - the JWT token to find the public key for
@return public key material
*/
AssociatedPublicKey(token *jwt.Token) (interface{}, error)
/*
ParseJWT parses a string into a JWT token object.
@param raw string - the original JWT string
@param claimStore jwt.Claims - the object to store the claims in
@return the parsed JWT token object
*/
ParseJWT(raw string, claimStore jwt.Claims) (*jwt.Token, error)
/*
CanIntrospect whether the client can perform introspection
@return whether the client can perform introspection
*/
CanIntrospect() bool
/*
IntrospectToken perform introspection for a token
@param ctxt context.Context - the operating context
@param token string - the token to introspect
@return whether token is still valid
*/
IntrospectToken(ctxt context.Context, token string) (bool, error)
}
OpenIDProviderClient a client to interact with an OpenID provider
func DefineOpenIDProviderClient ¶ added in v0.10.0
func DefineOpenIDProviderClient( params OIDPClientParam, httpClient *resty.Client, ) (OpenIDProviderClient, error)
DefineOpenIDProviderClient defines a new OpenID provider client
@param params OpenIDProviderConfig - OpenID provider client parameters @param httpClient *resty.Client - the HTTP client to use to communicate with the OpenID provider @return new client instance
type PriorityQueueEntry ¶ added in v0.8.0
type PriorityQueueEntry interface {
/*
HigherPriorityThan check whether priority of this object is higher than "right"
@param right PriorityQueueEntry - object to compare against
@returns whether this element has higher priority than "right"
*/
HigherPriorityThan(right PriorityQueueEntry) bool
}
PriorityQueueEntry a priority queue object
type PubSubClient ¶ added in v0.4.0
type PubSubClient interface {
/*
UpdateLocalTopicCache sync local topic cache with existing topics in project
@param ctxt context.Context - execution context
*/
UpdateLocalTopicCache(ctxt context.Context) error
/*
UpdateLocalSubscriptionCache sync local subscription cache with existing subscriptions in project
@param ctxt context.Context - execution context
*/
UpdateLocalSubscriptionCache(ctxt context.Context) error
/*
CreateTopic create PubSub topic
@param ctxt context.Context - execution context
@param topic string - topic name
@param config *pubsub.TopicConfig - optionally, provide config on the topic
*/
CreateTopic(ctxt context.Context, topic string, config *pubsub.TopicConfig) error
/*
DeleteTopic delete PubSub topic
@param ctxt context.Context - execution context
@param topic string - topic name
*/
DeleteTopic(ctxt context.Context, topic string) error
/*
GetTopic get the topic config for a topic
@param ctxt context.Context - execution context
@param topic string - topic name
@returns if topic is known, the topic config
*/
GetTopic(ctxt context.Context, topic string) (pubsub.TopicConfig, error)
/*
UpdateTopic update the topic config
@param ctxt context.Context - execution context
@param topic string - topic name
@param newConfig pubsub.TopicConfigToUpdate - the new config
*/
UpdateTopic(ctxt context.Context, topic string, newConfig pubsub.TopicConfigToUpdate) error
/*
CreateSubscription create PubSub subscription to attach to topic
@param ctxt context.Context - execution context
@param targetTopic string - target topic
@param subscription string - subscription name
@param config pubsub.SubscriptionConfig - subscription config
*/
CreateSubscription(
ctxt context.Context, targetTopic, subscription string, config pubsub.SubscriptionConfig,
) error
/*
DeleteSubscription delete PubSub subscription
@param ctxt context.Context - execution context
@param subscription string - subscription name
*/
DeleteSubscription(ctxt context.Context, subscription string) error
/*
GetSubscription get the subscription config for a subscription
@param ctxt context.Context - execution context
@param subscription string - subscription name
@returns if subscription is known, the subscription config
*/
GetSubscription(ctxt context.Context, subscription string) (pubsub.SubscriptionConfig, error)
/*
UpdateSubscription update the subscription config
@param ctxt context.Context - execution context
@param subscription string - subscription name
@param newConfig pubsub.SubscriptionConfigToUpdate - the new config
*/
UpdateSubscription(
ctxt context.Context, subscription string, newConfig pubsub.SubscriptionConfigToUpdate,
) error
/*
Publish publish a message to a topic
@param ctxt context.Context - execution context
@param topic string - topic name
@param message []byte - message content
@param metadata map[string]string - message metadata, which will be sent using attributes
@param blocking bool - whether the call is blocking until publish is complete
@returns when non-blocking, the async result object to check on publish status
*/
Publish(
ctxt context.Context, topic string, message []byte, metadata map[string]string, blocking bool,
) (*pubsub.PublishResult, error)
/*
Subscribe subscribe for message on a subscription
THIS CALL IS BLOCKING!!
@param ctxt context.Context - execution context
@param subscription string - subscription name
@param handler PubSubMessageHandler - RX message callback
*/
Subscribe(ctxt context.Context, subscription string, handler PubSubMessageHandler) error
/*
Close close and clean up the client
@param ctxt context.Context - execution context
*/
Close(ctxt context.Context) error
}
PubSubClient is a wrapper interface around the PubSub API with some ease-of-use features
func GetNewPubSubClientInstance ¶ added in v0.4.0
func GetNewPubSubClientInstance( client *pubsub.Client, logTags log.Fields, metricsHelper PubSubMetricHelper, ) (PubSubClient, error)
GetNewPubSubClientInstance get PubSub wrapper client
@param client *pubsub.Client - core PubSub client @param logTags log.Fields - metadata fields to include in the logs @param metricsHelper PubSubMetricHelper - metric collection helper agent @returns new PubSubClient instance
type PubSubMessageHandler ¶ added in v0.4.0
type PubSubMessageHandler func( ctxt context.Context, pubTimestamp time.Time, msg []byte, metadata map[string]string, ) error
PubSubMessageHandler callback to trigger when PubSub message received
type PubSubMetricHelper ¶ added in v0.6.0
type PubSubMetricHelper interface {
/*
RecordPublish record PubSub publish message
@param topic string - PubSub topic
@param successful bool - whether the operation was successful
@param payloadLen int64 - publish payload length
*/
RecordPublish(topic string, successful bool, payloadLen int64)
/*
RecordReceive record PubSub receive message
@param topic string - PubSub topic
@param successful bool - whether the operation was successful
@param payloadLen int64 - receive payload length
*/
RecordReceive(topic string, successful bool, payloadLen int64)
}
PubSubMetricHelper PubSub publish and receive metric recording helper agent
type PubSubRequestResponseClientParam ¶ added in v0.4.0
type PubSubRequestResponseClientParam struct {
// TargetID the ID to target to send a request (or response) to this client
TargetID string
// Name client instance name
Name string
// PSClient base pubsub client
PSClient PubSubClient
// MsgRetentionTTL PubSub message TTL, after which the message is purged.
MsgRetentionTTL time.Duration
// LogTags metadata fields to include in the logs
LogTags log.Fields
// CustomLogModifiers additional log metadata modifiers to use
CustomLogModifiers []LogMetadataModifier
// SupportWorkerCount number of support workers to spawn to process incoming messages
SupportWorkerCount int
// TimeoutEnforceInt interval between request timeout checks
TimeoutEnforceInt time.Duration
// SupportTaskMetricsHelper metrics collection helper for the support tasks
SupportTaskMetricsHelper TaskProcessorMetricHelper
}
PubSubRequestResponseClientParam configuration parameters of PubSub based RequestResponseClient
type Queue ¶ added in v0.8.0
type Queue[V any] interface { /* Len get the current queue length @return current queue length */ Len() int /* Push enqueue data @param data V - data to enqueue */ Push(data V) error /* Pop dequeue data @return data from queue @error ErrorNoDataAvailable - queue is empty */ Pop() (V, error) }
Queue queue
func GetNewPriorityQueue ¶ added in v0.8.0
func GetNewPriorityQueue[V PriorityQueueEntry]() Queue[V]
GetNewPriorityQueue define new priority queue
@generic V any - the data type being passed through the queue @returns new queue instance
func GetNewSimpleQueue ¶ added in v0.8.0
GetNewSimpleQueue define new simple queue
@generic V any - the data type being passed through the queue @returns new queue instance
type RRInboundRequestHandler ¶ added in v0.5.3
type RRInboundRequestHandler func( ctxt context.Context, request interface{}, origMsg ReqRespMessage, ) (interface{}, error)
RRInboundRequestHandler callback function to process a specific inbound request
type RRMessageParser ¶ added in v0.5.3
RRMessageParser callback function to parse request-response payload into a specific data type
type ReqRespMessage ¶ added in v0.4.0
type ReqRespMessage struct {
// RequestID request ID associated with this message
RequestID string `json:"request" validate:"required"`
// SenderID message sender ID
SenderID string `json:"sender" validate:"required"`
// TargetID message target ID
TargetID string `json:"target" validate:"required"`
// IsRequest whether the message is a request message
IsRequest bool `json:"is_request"`
// Timestamp message timestamp
Timestamp time.Time `json:"timestamp"`
// Metadata message metadata
Metadata map[string]string `json:"meta,omitempty"`
// Payload message payload
Payload []byte `json:"payload,omitempty"`
}
ReqRespMessage message structure passing through a request-response system
type ReqRespMessageHandler ¶ added in v0.4.0
type ReqRespMessageHandler func(ctxt context.Context, msg ReqRespMessage) error
ReqRespMessageHandler callback called when request-response message is available for processing
type ReqRespTimeoutHandler ¶ added in v0.4.0
ReqRespTimeoutHandler callback called when request timed out waiting for all responses
type RequestCallParam ¶ added in v0.4.0
type RequestCallParam struct {
// RespHandler response message handler callback
RespHandler ReqRespMessageHandler
// ExpectedResponsesCount the expected number of responses to receive. Once this many responses
// are received, the request is considered to be complete.
ExpectedResponsesCount int
// Blocking whether the request call is blocking
Blocking bool
// Timeout the request timeout if it has not received all responses after this duration
Timeout time.Duration
// TimeoutHandler request timeout handler callback
// TODO FIXME: implement request timeout handling
TimeoutHandler ReqRespTimeoutHandler
}
RequestCallParam contains the parameters of a request
type RequestResponseClient ¶ added in v0.4.0
type RequestResponseClient interface {
/*
SetInboundRequestHandler set the inbound request handler
@param ctxt context.Context - execution context
@param handler ReqRespMessageHandler - the handler to use
*/
SetInboundRequestHandler(ctxt context.Context, handler ReqRespMessageHandler) error
/*
Request make a new request
@param ctxt context.Context - execution context
@param targetID string - target ID this request is destined for
@param message []byte - request message payload
@param metadata map[string]string - request metadata
@param callParam RequestCallParam - request call parameters
@return request ID
*/
Request(
ctxt context.Context,
targetID string,
message []byte,
metadata map[string]string,
callParam RequestCallParam,
) (string, error)
/*
Respond respond to an inbound request
@param ctxt context.Context - execution context
@param originalReq ReqRespMessage - original request
@param message []byte - response message payload
@param metadata map[string]string - request metadata
@param blocking bool - whether the call is blocking
*/
Respond(
ctxt context.Context,
originalReq ReqRespMessage,
message []byte,
metadata map[string]string,
blocking bool,
) error
/*
Stop stop any support background tasks which were started
@param ctxt context.Context - execution context
*/
Stop(ctxt context.Context) error
}
RequestResponseClient is a request-response client built on provided messaging transport
Each client instance will respond to requests for a single request target ID
func GetNewPubSubRequestResponseClientInstance ¶ added in v0.4.0
func GetNewPubSubRequestResponseClientInstance( parentCtxt context.Context, params PubSubRequestResponseClientParam, ) (RequestResponseClient, error)
GetNewPubSubRequestResponseClientInstance get PubSub based RequestResponseClient
@param parentCtxt context.Context - parent context @param params PubSubRequestResponseClientParam - client config parameters @return new RequestResponseClient instance
type RequestResponseDriver ¶ added in v0.5.3
type RequestResponseDriver struct {
Component
Client RequestResponseClient
PayloadParser RRMessageParser
// contains filtered or unexported fields
}
RequestResponseDriver helper request-response driver class to simplify RR client usage
func (*RequestResponseDriver) InstallHandler ¶ added in v0.5.3
func (d *RequestResponseDriver) InstallHandler( requestType reflect.Type, handler RRInboundRequestHandler, )
InstallHandler install a handler for an inbound request
@param requestType reflect.Type - request message type @param handler InboundRequestHandler - request handler callback
func (*RequestResponseDriver) MakeRequest ¶ added in v0.5.3
func (d *RequestResponseDriver) MakeRequest( ctxt context.Context, requestInstanceName string, targetID string, requestMsg []byte, requestMeta map[string]string, callParam RequestCallParam, ) ([]interface{}, error)
MakeRequest wrapper function to marking an outbound request
@param ctxt context.Context - execution context @param requestInstanceName string - descriptive name for this request to identify it in logs @param targetID string - request target ID @param requestMsg []byte - request payload @param requestMeta map[string]string - request's associated metadata @param callParam RequestCallParam - request call parameters @returns response payload or payloads if multiple responses expected
func (*RequestResponseDriver) ProcessInboundRequest ¶ added in v0.5.3
func (d *RequestResponseDriver) ProcessInboundRequest( ctxt context.Context, msg ReqRespMessage, ) error
ProcessInboundRequest process inbound request
@param ctxt context.Context - execution context @param msg ReqRespMessage - raw request message
type RestAPIBaseResponse ¶
type RestAPIBaseResponse struct {
// Success indicates whether the request was successful
Success bool `json:"success" validate:"required"`
// RequestID gives the request ID to match against logs
RequestID string `json:"request_id" validate:"required"`
// Error are details in case of errors
Error *ErrorDetail `json:"error,omitempty"`
}
RestAPIBaseResponse standard REST API response
type RestAPIHandler ¶
type RestAPIHandler struct {
Component
// CallRequestIDHeaderField the HTTP header containing the request ID provided by the caller
CallRequestIDHeaderField *string
// DoNotLogHeaders marks the set of HTTP headers to not log
DoNotLogHeaders map[string]bool
// LogLevel configure the request logging level
LogLevel HTTPRequestLogLevel
// MetricsHelper HTTP request metric collection agent
MetricsHelper HTTPRequestMetricHelper
}
RestAPIHandler base REST API handler
func (RestAPIHandler) GetStdRESTErrorMsg ¶
func (h RestAPIHandler) GetStdRESTErrorMsg( ctxt context.Context, respCode int, errMsg string, errDetail string, ) RestAPIBaseResponse
GetStdRESTErrorMsg defines a standard error message
@param ctxt context.Context - a request context @param respCode int - the request response code @param errMsg string - the error message @param errDetail string - the details on the error @return the standard REST response
func (RestAPIHandler) GetStdRESTSuccessMsg ¶
func (h RestAPIHandler) GetStdRESTSuccessMsg(ctxt context.Context) RestAPIBaseResponse
GetStdRESTSuccessMsg defines a standard success message
@param ctxt context.Context - a request context @return the standard REST response
func (RestAPIHandler) LoggingMiddleware ¶
func (h RestAPIHandler) LoggingMiddleware(next http.HandlerFunc) http.HandlerFunc
LoggingMiddleware is a support middleware to be used with Mux to perform request logging
@param next http.HandlerFunc - the core request handler function @return middleware http.HandlerFunc
func (RestAPIHandler) ReadRequestIDFromContext ¶ added in v0.3.2
func (h RestAPIHandler) ReadRequestIDFromContext(ctxt context.Context) string
ReadRequestIDFromContext reads the request ID from the request context if available
@param ctxt context.Context - a request context @return if available, the request ID
func (RestAPIHandler) WriteRESTResponse ¶
func (h RestAPIHandler) WriteRESTResponse( w http.ResponseWriter, respCode int, resp interface{}, headers map[string]string, ) error
WriteRESTResponse helper function to write out the REST API response
@param w http.ResponseWriter - response writer
@param respCode int - the response code
@param resp interface{} - the response body
@param headers map[string]string - the response header
@return whether write succeeded
type RestRequestOAuthTokenKey ¶ added in v0.10.0
type RestRequestOAuthTokenKey struct{}
RestRequestOAuthTokenKey associated key for *jwt.Token when storing in request context
type RestRequestParam ¶
type RestRequestParam struct {
// ID is the request ID
ID string `json:"id"`
// Host is the request host
Host string `json:"host" validate:"required,fqdn"`
// URI is the request URI
URI string `json:"uri" validate:"required,uri"`
// Method is the request method
Method string `json:"method" validate:"required,oneof=GET HEAD PUT POST PATCH DELETE OPTIONS"`
// Referer is the request referer string
Referer string `json:"referer"`
// RemoteAddr is the request
RemoteAddr string `json:"remote_address"`
// Proto is the request HTTP proto string
Proto string `json:"http_proto"`
// ProtoMajor is the request HTTP proto major version
ProtoMajor int `json:"http_version_major"`
// ProtoMinor is the request HTTP proto minor version
ProtoMinor int `json:"http_version_minor"`
// RequestHeaders additional request headers
RequestHeaders http.Header
// Timestamp is when the request is first received
Timestamp time.Time
}
RestRequestParam is a helper object for logging a request's parameters into its context
type RestRequestParamKey ¶
type RestRequestParamKey struct{}
RestRequestParamKey associated key for RESTRequestParam when storing in request context
type Sequencer ¶
type Sequencer interface {
/*
NextValue returns the next value in the sequence
*/
NextValue() float64
}
Sequencer is a helper interface for returning a sequence of numbers
type TaskProcessor ¶
type TaskProcessor interface {
/*
Submit submits a new task parameter to be processed by a handler
@param ctx context.Context - calling context
@param newTaskParam interface{} - task-parameter
@return whether successful
*/
Submit(ctx context.Context, newTaskParam interface{}) error
/*
SetTaskExecutionMap update the mapping between task-parameter object and its associated
handler function.
The task-parameter object contains information need to execute a particular task. When
a user wants to execute a task, the user is submitting a task-parameter object via Submit.
The module finds the associated handler function and calls it with the task-parameter object.
@param newMap map[reflect.Type]TaskHandler - map of task handlers to various task-parameter
object types
@return whether successful
*/
SetTaskExecutionMap(newMap map[reflect.Type]TaskProcessorSupportHandler) error
/*
AddToTaskExecutionMap add new (task-parameter, handler function) mapping to the existing set.
@param parameterType reflect.Type - task-parameter object type
@param handler TaskHandler - task handler
@return whether successful
*/
AddToTaskExecutionMap(parameterType reflect.Type, handler TaskProcessorSupportHandler) error
/*
StartEventLoop starts one daemon thread for processing the submitted task-parameters
@param wg *sync.WaitGroup - wait group
@return whether successful
*/
StartEventLoop(wg *sync.WaitGroup) error
/*
StopEventLoop stops the daemon threads
@return whether successful
*/
StopEventLoop() error
}
TaskProcessor implements an event loop where tasks are processed by a daemon thread
func GetNewTaskProcessorInstance ¶
func GetNewTaskProcessorInstance( ctxt context.Context, instanceName string, taskBufferLen int, logTags log.Fields, metricsHelper TaskProcessorMetricHelper, ) (TaskProcessor, error)
GetNewTaskProcessorInstance get TaskProcessor
@param ctxt context.Context - parent context @param instanceName string - instance name @param taskBufferLen int - number of task-parameters to buffer @param logTags log.Fields - metadata fields to include in the logs @param metricsHelper TaskProcessorMetricHelper - metrics collections helper @return new TaskProcessor instance
type TaskProcessorMetricHelper ¶ added in v0.7.0
type TaskProcessorMetricHelper interface {
/*
RecordSubmit record task submission
@param instance string - task processor instance name
@param successful bool - whether the operation was successful
*/
RecordSubmit(instance string, successful bool)
/*
RecordSubmit record task processed
@param instance string - task processor instance name
*/
RecordProcessed(instance string)
}
TaskProcessorMetricHelper Task processor metric recording helper agent
type TaskProcessorSupportHandler ¶
type TaskProcessorSupportHandler func(taskParam interface{}) error
TaskProcessorSupportHandler is the function signature of callback used to process an user task
type TimeoutHandler ¶
type TimeoutHandler func() error
TimeoutHandler callback function signature called timer timeout