aznet

package module
v0.0.0-...-1bb45b4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: MIT Imports: 34 Imported by: 0

README

aznet

Network abstraction over Azure Storage

The standard Go net.Conn interface using Azure Storage services as the transport layer.

What is aznet?

aznet provides a TCP-like networking abstraction over Azure Storage, allowing you to use familiar socket programming patterns with cloud storage as the underlying transport. Any code that works with net.Conn can work with aznet.

Drop-in Replacement Example

Swap standard TCP for Azure Storage with a single line:

// Before: Standard TCP
ln, _ := net.Listen("tcp", ":8080")

// After: aznet using Azure Blob Storage
ln, _ := aznet.Listen("azblob", "https://account:key@account.blob.core.windows.net/")

Documentation

Comprehensive documentation is available in the docs/ directory and rendered via Starlight:

cd docs
pnpm install
pnpm dev

Requirements

  • Go 1.25 or later.
  • An Azure Storage Account (or Azurite for local development).

Installation

go get github.com/atsika/aznet

Contributing

Contributions are welcome! Please see the existing code structure and driver implementations.

License

MIT License - see LICENSE file for details.

Credits


Built with ❤️ by Atsika

Documentation

Index

Constants

View Source
const (
	// MsgTypeData is for application data.
	MsgTypeData byte = 0x00
	// MsgTypePing is for keep-alive heartbeats.
	MsgTypePing byte = 0x01
	// MsgTypeFin is for graceful close.
	MsgTypeFin byte = 0x02
	// MsgTypeRotate is for rotation notifications.
	MsgTypeRotate byte = 0x03
)
View Source
const (
	// Default endpoint names for connection bootstrap
	DefaultHandshakeEndpoint = "handshake"
	DefaultTokenEndpoint     = "token"

	// DefaultReqPrefix is the default prefix for request channels.
	DefaultReqPrefix = "req"
	// DefaultResPrefix is the default prefix for response channels.
	DefaultResPrefix = "res"

	// DefaultSASExpiry is the default authorization expiry time.
	DefaultSASExpiry = 24 * time.Hour

	// DefaultFastPoll is the polling interval used during activity.
	// Adaptive polling backs off exponentially from FastPoll to DataPoll.
	DefaultFastPoll = 10 * time.Millisecond
	// DefaultDataPoll is the steady-state polling interval for idle connections.
	// At 500ms this produces ~7,200 read API calls per hour per connection.
	// Tune via WithDataPoll() to balance latency vs cost.
	DefaultDataPoll = 500 * time.Millisecond
	// DefaultAcceptPoll is the polling interval for listeners accepting incoming connections.
	DefaultAcceptPoll = 1 * time.Second
	// DefaultPingInterval is the interval between keep-alive heartbeats.
	DefaultPingInterval = 30 * time.Second

	// DefaultConnectTimeout is the maximum duration the client waits for connection acknowledgment.
	DefaultConnectTimeout = 30 * time.Second
	// DefaultIdleTimeout is the idle timeout before considering a peer dead.
	DefaultIdleTimeout = 5 * time.Minute
)
View Source
const FrameHeaderSize = 4 + 1 // 4 bytes length + 1 byte type
View Source
const MaxBlobBlockSize = 4 * 1024 * 1024

MaxBlobBlockSize is the maximum size of a single block in an Append Blob (4 MB).

View Source
const MaxBlocksPerBlob = 50000

MaxBlocksPerBlob is the maximum number of blocks per append blob.

View Source
const MaxQueueTextMessageSize = 64 * 1024

MaxQueuePayload is the maximum raw data size stored in a queue message (64 KB).

View Source
const MaxTableBinaryPropertySize = 64 * 1024

MaxTableBinaryPropertySize is the maximum size (64 KiB) for a single Edm.Binary property.

View Source
const MaxTableProperties = 15

MaxTableProperties is the number of binary properties we use to store a single large entity.

View Source
const NoiseOverhead = 4 + 16

NoiseOverhead is the encryption overhead: 4 bytes length prefix + 16 bytes AES-GCM tag.

Variables

View Source
var (
	// ErrUnsupportedScheme is returned when no registered driver exists for the requested URL scheme.
	ErrUnsupportedScheme = errors.New("unsupported scheme")
	// ErrClientCreationFailed is returned when an Azure service client cannot be created.
	ErrClientCreationFailed = errors.New("client creation failed")
	// ErrSASGenerationFailed is returned when a SAS token cannot be generated.
	ErrSASGenerationFailed = errors.New("failed to generate SAS token")
	// ErrMissingSAS is returned when required SAS tokens are missing from the URL.
	ErrMissingSAS = errors.New("missing handshake or token SAS in URL")
	// ErrInvalidSASEncoding is returned when a SAS token is not properly URL-encoded.
	ErrInvalidSASEncoding = errors.New("invalid SAS encoding")
	// ErrDecodeTokenFailed is returned when the JSON token payload cannot be decoded.
	ErrDecodeTokenFailed = errors.New("failed to decode token payload")
	// ErrWriteBufferFailed is returned when data cannot be written to an internal buffer.
	ErrWriteBufferFailed = errors.New("failed to write data to buffer")
	// ErrHandshakeExchangeFailed is returned when the initial handshake message cannot be sent or received.
	ErrHandshakeExchangeFailed = errors.New("failed to exchange handshake")
	// ErrInvalidConfig is returned when the provided options result in an invalid configuration.
	ErrInvalidConfig = errors.New("invalid configuration")
	// ErrNoData is returned when no data is available to read.
	ErrNoData = errors.New("no data available")
)
View Source
var (
	// ErrHandshakeFailed is returned when the Noise handshake fails.
	ErrHandshakeFailed = errors.New("handshake failed")
	// ErrHandshakeIncomplete is returned when the handshake is not complete.
	ErrHandshakeIncomplete = errors.New("handshake not complete")
	// ErrDecryptionFailed is returned when received data cannot be decrypted.
	ErrDecryptionFailed = errors.New("decryption failed")
	// ErrEncryptionFailed is returned when data cannot be encrypted.
	ErrEncryptionFailed = errors.New("encryption failed")
	// ErrNoiseInitFailed is returned when the Noise protocol state cannot be initialized.
	ErrNoiseInitFailed = errors.New("noise handshake initialization failed")
	// ErrNoiseMsgFailed is returned when a Noise handshake message cannot be created.
	ErrNoiseMsgFailed = errors.New("handshake message creation failed")
)

Functions

func BuildFrame

func BuildFrame(writeBuf *bytes.Buffer, f Frame)

BuildFrame writes a framed message to the write buffer. Frame format: [4 bytes: length][1 byte: type][N bytes: payload] Caller must ensure writeBuf is protected from concurrent access.

func Dial

func Dial(network, address string, opts ...Option) (net.Conn, error)

Dial is analogous to net.Dial. It takes a network type (e.g. "azblob") and an address (e.g. "https://account.blob.core.windows.net/?handshake=...").

func GetFactories

func GetFactories() []string

GetFactories returns a list of registered factory names.

func Listen

func Listen(network, address string, opts ...Option) (net.Listener, error)

Listen is analogous to net.Listen. It takes a network type (e.g. "azblob") and an address (e.g. "account.blob.core.windows.net").

func RegisterFactory

func RegisterFactory(scheme string, factory Factory)

RegisterFactory registers a factory for the given scheme (e.g., "azblob").

func UnregisterFactory

func UnregisterFactory(scheme string)

UnregisterFactory removes the factory registration.

Types

type AdaptivePoll

type AdaptivePoll struct {
	Cur    time.Duration
	Fast   time.Duration
	Steady time.Duration
	// contains filtered or unexported fields
}

AdaptivePoll implements an exponential back-off sleep utility. Call Reset() after any activity to return to the fast interval.

func NewAdaptivePoll

func NewAdaptivePoll(fast, steady time.Duration) *AdaptivePoll

NewAdaptivePoll builds a poller initialized to the fast interval.

func (*AdaptivePoll) Reset

func (p *AdaptivePoll) Reset()

Reset moves the current interval back to the fast value.

func (*AdaptivePoll) Sleep

func (p *AdaptivePoll) Sleep()

Sleep waits for the current interval and then backs off exponentially up to Steady.

type Buffers

type Buffers struct {
	Enc   []byte // Encryption scratch space
	Dec   []byte // Decryption scratch space
	Read  bytes.Buffer
	Write bytes.Buffer
	Noise bytes.Buffer
}

Buffers encapsulates the internal bytes.Buffer instances used by a connection.

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config holds runtime settings for a connection or listener. Zero value yields sane defaults via defaultConfig(). Users should modify it through functional options.

func (*Config) SASTimes

func (c *Config) SASTimes() (start, end time.Time)

SASTimes returns the start and end times for a SAS token based on config.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the configuration is sane and valid.

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn implements net.Conn.

func (*Conn) Close

func (c *Conn) Close() error

func (*Conn) CloseWrite

func (c *Conn) CloseWrite() error

CloseWrite shuts down the writing side of the connection. It sends a FIN frame to the peer to indicate that no more data will be sent.

func (*Conn) GetMetrics

func (c *Conn) GetMetrics() Metrics

func (*Conn) LocalAddr

func (c *Conn) LocalAddr() net.Addr

func (*Conn) MTU

func (c *Conn) MTU() int

MTU returns the maximum number of application bytes that can fit in a single transport frame for the current connection.

func (*Conn) Read

func (c *Conn) Read(p []byte) (int, error)

func (*Conn) RemoteAddr

func (c *Conn) RemoteAddr() net.Addr

func (*Conn) SetDeadline

func (c *Conn) SetDeadline(t time.Time) error

func (*Conn) SetReadDeadline

func (c *Conn) SetReadDeadline(t time.Time) error

func (*Conn) SetWriteDeadline

func (c *Conn) SetWriteDeadline(t time.Time) error

func (*Conn) Write

func (c *Conn) Write(p []byte) (int, error)

type DefaultMetrics

type DefaultMetrics struct {
	// contains filtered or unexported fields
}

DefaultMetrics implements the Metrics interface with atomic counters.

func NewDefaultMetrics

func NewDefaultMetrics() *DefaultMetrics

NewDefaultMetrics creates a new DefaultMetrics instance.

func (*DefaultMetrics) GetBytesReceived

func (m *DefaultMetrics) GetBytesReceived() int64

func (*DefaultMetrics) GetBytesSent

func (m *DefaultMetrics) GetBytesSent() int64

func (*DefaultMetrics) GetDeleteTransactionCount

func (m *DefaultMetrics) GetDeleteTransactionCount() int64

func (*DefaultMetrics) GetListTransactionCount

func (m *DefaultMetrics) GetListTransactionCount() int64

func (*DefaultMetrics) GetReadTransactionCount

func (m *DefaultMetrics) GetReadTransactionCount() int64

func (*DefaultMetrics) GetWriteTransactionCount

func (m *DefaultMetrics) GetWriteTransactionCount() int64

func (*DefaultMetrics) IncrementBytesReceived

func (m *DefaultMetrics) IncrementBytesReceived(n int64)

func (*DefaultMetrics) IncrementBytesSent

func (m *DefaultMetrics) IncrementBytesSent(n int64)

func (*DefaultMetrics) IncrementDeleteTransaction

func (m *DefaultMetrics) IncrementDeleteTransaction()

func (*DefaultMetrics) IncrementListTransaction

func (m *DefaultMetrics) IncrementListTransaction()

func (*DefaultMetrics) IncrementReadTransaction

func (m *DefaultMetrics) IncrementReadTransaction()

func (*DefaultMetrics) IncrementWriteTransaction

func (m *DefaultMetrics) IncrementWriteTransaction()

type Driver

type Driver interface {
	// Handshake operations
	PostHandshake(ctx context.Context, connID string, data []byte) error
	GetHandshakes(ctx context.Context) ([]Handshake, error)
	DeleteHandshake(ctx context.Context, id string) error

	// Token exchange operations
	PostToken(ctx context.Context, connID string, data []byte) error
	GetToken(ctx context.Context, connID string) ([]byte, error)
	DeleteToken(ctx context.Context, connID string) error

	// Session lifecycle
	CreateSession(ctx context.Context, connID string) (SessionTokens, error)
	CreateBootstrapTokens() (hSAS, tSAS string, err error)

	// NewTransport creates a data transporter for an established session.
	NewTransport(ctx context.Context, connID string, tokens SessionTokens, isInitiator bool) (Transport, error)

	// CleanupBootstrap removes shared bootstrap resources (handshake/token endpoints).
	CleanupBootstrap(ctx context.Context) error
	// CleanupSession removes per-connection resources (req/res channels).
	CleanupSession(ctx context.Context, connID string) error
}

Driver defines how a driver handles the initial connection setup.

type Endpoint

type Endpoint struct {
	URL     *url.URL
	Account string
	Key     string
	IsAzure bool
}

Endpoint represents an aznet endpoint.

func NewEndpoint

func NewEndpoint(u *url.URL) *Endpoint

NewEndpoint creates a new Endpoint from a URL.

func (*Endpoint) BuildConnURL

func (e *Endpoint) BuildConnURL(cfg *Config, handshakeSAS, tokenSAS string) string

BuildConnURL constructs the final aznet connection URL with base64 encoded SAS tokens.

func (*Endpoint) JoinURL

func (e *Endpoint) JoinURL(resource, sas string) string

JoinURL joins the base service URL with a resource name and optional SAS token.

func (*Endpoint) ParseSAS

func (e *Endpoint) ParseSAS(cfg *Config) (string, string, error)

ParseSAS parses the handshake and token SAS tokens from the URL query. It returns the decoded SAS strings (without leading '?').

func (*Endpoint) ServiceURL

func (e *Endpoint) ServiceURL() string

ServiceURL returns the base URL for the Azure Storage service.

type Factory

type Factory interface {
	// NewDriver creates a Driver for the given endpoint and config.
	NewDriver(ep *Endpoint, cfg *Config) (Driver, error)
}

Factory is an interface for creating a Driver implementation.

type Frame

type Frame struct {
	Payload []byte
	Length  uint32
	Type    byte
}

Frame represents a single message unit.

type Handshake

type Handshake struct {
	ID      string // handshake identifier (used for cleanup)
	Payload []byte // The raw Noise handshake message
}

Handshake represents a discovered connection request.

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener implements net.Listener.

func (*Listener) Accept

func (l *Listener) Accept() (net.Conn, error)

func (*Listener) Addr

func (l *Listener) Addr() net.Addr

func (*Listener) Close

func (l *Listener) Close() error

func (*Listener) ConnectionString

func (l *Listener) ConnectionString() (string, error)

ConnectionString returns the connection string for this listener.

type Metrics

type Metrics interface {
	IncrementWriteTransaction()
	IncrementReadTransaction()
	IncrementListTransaction()
	IncrementDeleteTransaction()
	IncrementBytesSent(n int64)
	IncrementBytesReceived(n int64)

	GetWriteTransactionCount() int64
	GetReadTransactionCount() int64
	GetListTransactionCount() int64
	GetDeleteTransactionCount() int64
	GetBytesSent() int64
	GetBytesReceived() int64
}

Metrics is an interface for tracking connection statistics. Drivers call Increment* and collectors read via Get*.

func GetMetrics

func GetMetrics(c net.Conn) Metrics

GetMetrics returns the metrics from a connection if it supports metrics tracking. It returns nil if the connection doesn't support metrics.

type Noise

type Noise struct {
	// contains filtered or unexported fields
}

Noise encapsulates the Noise Protocol handshake state and cipher suite.

func NewNoiseClient

func NewNoiseClient() (*Noise, error)

NewNoiseClient creates a new Noise Protocol handshake as the initiator (client). It uses the NN pattern (no static keys, anonymous connection).

func NewNoiseServer

func NewNoiseServer() (*Noise, error)

NewNoiseServer creates a new Noise Protocol handshake as the responder (server). It uses the NN pattern (no static keys, anonymous connection).

func (*Noise) DecryptData

func (nh *Noise) DecryptData(dst, ciphertext []byte) ([]byte, error)

DecryptData decrypts application data using the established session cipher.

func (*Noise) EncryptData

func (nh *Noise) EncryptData(dst, plaintext []byte) ([]byte, error)

EncryptData encrypts application data using the established session cipher.

func (*Noise) GetCipherStates

func (nh *Noise) GetCipherStates() (send, recv *noise.CipherState, err error)

GetCipherStates returns the established cipher states for encrypting/decrypting data. send is for sending, recv is for receiving.

func (*Noise) IsComplete

func (nh *Noise) IsComplete() bool

IsComplete returns true if the handshake is complete and session keys are established.

func (*Noise) IsInitiator

func (nh *Noise) IsInitiator() bool

IsInitiator returns true if the handshake is the initiator.

func (*Noise) ReadMessage

func (nh *Noise) ReadMessage(msg []byte) ([]byte, error)

ReadMessage processes a handshake message from the peer, decrypting the payload. It returns the decrypted payload.

func (*Noise) SealData

func (nh *Noise) SealData(dst, plaintext []byte) ([]byte, error)

SealData encrypts plaintext and prepends a 4-byte big-endian length. It uses the provided dst buffer if it has enough capacity.

func (*Noise) UnsealData

func (nh *Noise) UnsealData(dst, data []byte) (plaintext, remaining []byte, err error)

UnsealData attempts to extract and decrypt a Noise chunk from data. It returns the decrypted plaintext into dst, the remaining data, and an error.

func (*Noise) WriteMessage

func (nh *Noise) WriteMessage(payload []byte) ([]byte, error)

WriteMessage creates the next handshake message, encrypting the payload. It returns the message to send to the peer.

type Option

type Option func(*Config)

Option defines a functional option for Listen/Dial.

func WithAcceptPoll

func WithAcceptPoll(d time.Duration) Option

WithAcceptPoll sets how frequently the listener scans for new connections.

func WithConnectTimeout

func WithConnectTimeout(d time.Duration) Option

WithConnectTimeout sets the maximum duration the client waits for the listener to acknowledge a Dialled connection. Zero or negative disables the timeout.

func WithContext

func WithContext(ctx context.Context) Option

WithContext sets the base context for all network/SDK calls initiated by Listen/Dial. Useful for cancellation or shared tracing.

func WithDataPoll

func WithDataPoll(d time.Duration) Option

WithDataPoll sets how often established connections poll for data.

func WithEndpoints

func WithEndpoints(handshake, token string) Option

WithEndpoints allows overriding the default handshake and token endpoints used during the connection bootstrap phase.

func WithFastPoll

func WithFastPoll(d time.Duration) Option

WithFastPoll sets the polling interval used when data is actively flowing.

func WithIdleTimeout

func WithIdleTimeout(d time.Duration) Option

WithIdleTimeout sets the grace period after which background janitors purge half-closed connections that never completed a FIN handshake. Zero disables automatic cleanup.

func WithMetrics

func WithMetrics(metrics Metrics) Option

WithMetrics sets a custom metrics implementation for tracking connection statistics. If not provided, a default implementation with atomic counters will be used.

func WithPing

func WithPing(d time.Duration) Option

WithPing sets the keep-alive heartbeat cadence. Zero disables keep-alive.

func WithPrefixes

func WithPrefixes(reqPrefix, resPrefix string) Option

WithPrefixes sets the prefixes that drivers use when creating per-connection request/response artefacts (e.g. blobs or queues).

func WithSASExpiry

func WithSASExpiry(d time.Duration) Option

WithSASExpiry sets the validity time for SAS tokens. The token cannot be revoked once generated, so be careful and don't set it too long.

type Rotator

type Rotator interface {
	ShouldRotate() bool
	RotateTX(ctx context.Context) error
	RotateRX() error
}

Rotator is optionally implemented by transports that need resource rotation (e.g., blob append blobs have a 50,000 block limit). Core handles rotation signaling automatically when this interface is satisfied.

type ServiceAddr

type ServiceAddr struct {
	Net      string // driver name (e.g. "azblob")
	Endpoint string // base service URL
	Resource string // resource identifier (container/queue/table + sub-resource)
}

ServiceAddr is a reusable net.Addr implementation for all drivers.

func (ServiceAddr) Network

func (a ServiceAddr) Network() string

func (ServiceAddr) String

func (a ServiceAddr) String() string

type SessionTokens

type SessionTokens struct {
	Req string `json:"req"`
	Res string `json:"res"`
}

SessionTokens represents the session-specific tokens/SAS exchanged after handshake.

type Transport

type Transport interface {
	// WriteRaw sends raw bytes to the peer.
	WriteRaw(ctx context.Context, data io.ReadSeeker) error
	// ReadRaw attempts to read raw bytes from the peer.
	ReadRaw(ctx context.Context) (io.ReadCloser, error)
	// Close terminates the transport.
	Close() error
	// LocalAddr returns the local network address.
	LocalAddr() net.Addr
	// RemoteAddr returns the remote network address.
	RemoteAddr() net.Addr
	// MaxRawSize returns the maximum raw capacity of the transport in bytes.
	MaxRawSize() int
}

Transport is the raw byte-exchange interface implemented by drivers.

Directories

Path Synopsis
cmd
azurl command
examples
echo/client command
echo/server command
metrics/client command
metrics/server command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL