pool

package module
v0.1.59999 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: MIT Imports: 7 Imported by: 0

README

Connection Pool

The pool package provides connection pooling for the go-noise library. It enables connection reuse for Noise protocol connections.

Features

  • Interface-Only Design: Uses net.Conn and net.Addr interfaces exclusively
  • Connection Lifecycle Management: Connections expire based on age and idle time
  • Thread-Safe Operations: All methods safe for concurrent use
  • TOCTOU-Safe Dialing: GetOrDial serializes dials per address to prevent duplicate sessions
  • Graceful Shutdown: Drain waits for in-flight connections before closing
  • Usage Statistics: Pool health and usage monitoring via Stats and Snapshot

Quick Start

package main

import (
    "context"
    "net"
    "time"
    
    "github.com/go-i2p/pool"
)

func main() {
    // Create a connection pool
    p := pool.NewConnPool(&pool.PoolConfig{
        MaxSize: 10,                // Max connections per address (0 = default 10; use Unbounded:true for no limit)
        MaxAge:  30 * time.Minute,  // Connection max lifetime
        MaxIdle: 5 * time.Minute,   // Max idle time before cleanup
    })
    defer p.Close()

    // Use GetOrDial to atomically get or create a connection
    conn, err := p.GetOrDial(context.Background(), "example.com:80", 
        func(ctx context.Context) (net.Conn, error) {
            return net.Dial("tcp", "example.com:80")
        })
    if err != nil {
        panic(err)
    }
    
    // Connection automatically returned to pool when closed
    defer conn.Close()
    
    // Use the connection...
    conn.Write([]byte("GET / HTTP/1.0\r\n\r\n"))
}

GetOrDial atomically retrieves an existing connection or dials a new one, serializing dials per address to prevent duplicate NTCP2 sessions:

conn, err := p.GetOrDial(ctx, "10.0.0.1:15555", func(ctx context.Context) (net.Conn, error) {
    // Dial and perform any handshake/protocol negotiation
    return net.DialTimeout("tcp", "10.0.0.1:15555", 5*time.Second)
})
if err != nil {
    return err
}
defer conn.Close() // returns to pool

Graceful Shutdown with Drain

Use Drain to wait for in-flight connections before closing the pool:

// Stop accepting new work first, then:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := p.Drain(ctx); err != nil {
    log.Printf("drain timed out: %v", err)
}
p.Close()

Configuration

  • MaxSize: Maximum connections per remote address (default: 10). A zero or negative value applies the default limit (10). To disable the per-address limit entirely, set Unbounded: true.
  • MaxTotal: Maximum total connections across all addresses (0 = unlimited)
  • MaxAge: Maximum connection lifetime (default: 30 minutes)
  • MaxIdle: Maximum idle time before cleanup (default: 5 minutes)
  • HealthCheck: Optional liveness probe called by Get() before returning a connection
  • ReadyCheck: Optional check called by Put() to verify handshake completion

Memory Growth Characteristics

The pool retains one *sync.Mutex per unique remote address ever passed to GetOrDial. These entries are never evicted — deleting them would reintroduce the TOCTOU race that serialized dialing is designed to prevent.

For a long-running I2P router expect approximately 24–48 bytes of retained heap per unique peer address dialed over the pool's lifetime:

Unique peers contacted Approximate retained memory
1,000 ~24–48 KB
10,000 ~240–480 KB
100,000 ~2.4–4.8 MB

If this growth is unacceptable, create a fresh ConnPool periodically and migrate callers to the new instance.

Thread Safety

All pool operations are thread-safe and support concurrent usage. The pool supports concurrent connections.

Documentation

Overview

Package pool provides a connection pool for reusing Noise-encrypted connections across multiple Dial operations, reducing handshake overhead for repeated peers.

Index

Constants

View Source
const DefaultCleanupInterval = time.Minute

DefaultCleanupInterval is the default interval between background cleanup runs when MaxIdle and MaxAge are not set or are large.

RATIONALE (AUDIT L-3): 1 minute provides a reasonable default for long-lived pools where connections may be idle for hours. More frequent cleanup (e.g., every 10s) provides minimal benefit for typical pool lifetimes and increases lock contention. Less frequent cleanup (e.g., 5 minutes) would delay removal of expired connections too long for interactive workloads.

View Source
const DefaultDrainPollInterval = 50 * time.Millisecond

DefaultDrainPollInterval is the default polling interval for Drain when waiting for in-use connections to be released.

RATIONALE (AUDIT L-3): 50ms balances responsiveness (drain completes within ~100ms of the last connection being released) against CPU overhead (20 checks per second per waiting Drain call). Shorter intervals (e.g., 10ms) would provide faster drain at the cost of 100 checks/second CPU load; longer intervals (e.g., 100ms) would reduce overhead but delay drain completion.

View Source
const DefaultMaxSize = 10

DefaultMaxSize is the default per-address connection limit applied when a caller supplies a PoolConfig without MaxSize and without setting Unbounded.

View Source
const MinCleanupInterval = time.Second

MinCleanupInterval is the minimum interval between background cleanup runs to prevent tight loops when MaxIdle/MaxAge are very small.

RATIONALE (AUDIT L-3): 1 second prevents excessive cleanup overhead when pool is configured with very short timeouts (e.g., MaxIdle=2s for testing). Even with 2-second timeouts, checking every second ensures expired connections are removed within 1 second of expiration, which is acceptable latency. Values below 1s (e.g., 100ms) would cause unnecessary CPU churn for little benefit.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config = PoolConfig

Config is an alias for PoolConfig following Go naming conventions. Prefer Config in new code; PoolConfig is retained for backward compatibility.

type ConnEntry

type ConnEntry = PooledConn

ConnEntry is an alias for PooledConn following Go naming conventions. Prefer ConnEntry in new code; PooledConn is retained for backward compatibility.

type ConnPool

type ConnPool struct {
	// contains filtered or unexported fields
}

ConnPool manages a pool of reusable connections for performance optimization. It only uses interface types (net.Conn, net.Addr) for maximum compatibility.

Memory growth note: dialMu retains one *sync.Mutex per unique remote address ever passed to GetOrDial. These entries are never deleted because deletion would reintroduce the TOCTOU race that dialMu is designed to prevent. For a long-running I2P router that contacts many unique peers, expect roughly 24–48 bytes of retained heap per unique address dialed over the pool's lifetime. At 10,000 unique peers this is approximately 240–480 KB; at 100,000 peers it is approximately 2.4–4.8 MB. If per-address mutex retention is unacceptable for your workload, create a new ConnPool periodically and migrate callers to the new instance (AUDIT M-4).

func NewConnPool

func NewConnPool(config *PoolConfig) *ConnPool

NewConnPool creates a new connection pool with the given configuration. The caller's *PoolConfig is read but never modified; effective defaults are derived into local variables only (M-1).

func (*ConnPool) Close

func (p *ConnPool) Close() error

Close closes idle connections and prevents new connections from being added. In-use connections are closed when returned via Release() or Discard().

Callers should call Drain() before Close() if they want to wait for in-flight sessions to complete. If Drain() is called concurrently with or after Close(), it will still correctly observe in-use connections and wait for them to be returned.

func (*ConnPool) Drain

func (p *ConnPool) Drain(ctx context.Context) error

Drain waits for all in-use connections to be returned to the pool. It blocks until either all connections are idle (in_use == 0) or the provided context is cancelled. Use this during graceful shutdown to allow in-flight sessions to complete before calling Close().

Drain does not prevent new connections from being checked out; it only waits for the current in-use count to reach zero. Callers should stop accepting new work before calling Drain.

func (*ConnPool) Get

func (p *ConnPool) Get(remoteAddr string) PooledConnection

Get retrieves a connection from the pool for the given remote address. Returns nil if no suitable connection is available.

Get delegates to GetWithContext with a background context. Use GetWithContext directly if the caller has a deadline or cancellation requirement, so that a blocking HealthCheck can be interrupted.

func (*ConnPool) GetOrDial

func (p *ConnPool) GetOrDial(ctx context.Context, remoteAddr string, dial func(ctx context.Context) (net.Conn, error)) (PooledConnection, error)

GetOrDial atomically retrieves an idle connection for remoteAddr or, if none is available, calls dial to create a new one. The dial function is called outside the pool lock so it may perform blocking I/O (e.g., TCP connect + Noise handshake), but only one goroutine at a time will dial for a given remoteAddr. This prevents the TOCTOU race where multiple goroutines simultaneously discover an empty pool and each dial a fresh connection to the same NTCP2 router — which the NTCP2 spec considers a protocol error (§2.1: "only one active NTCP2 session per router").

The returned connection is wrapped in a PoolConnWrapper. If dial succeeds, the new connection is added to the pool and checked out in a single atomic step.

If ctx is cancelled before dial completes, GetOrDial returns ctx.Err().

LOCKING AND HEALTH CHECK: When the per-address dial lock (addrMu) is held and a re-check of the pool finds an existing connection, HealthCheck is executed while addrMu is still held. The lock is released immediately after Get() returns, so the HealthCheck itself runs within the lock window. Callers providing a HealthCheck that performs blocking I/O (e.g., a Noise ping round-trip) should be aware that concurrent GetOrDial calls for the same address will be serialised for the duration of each health check. For workloads with many concurrent callers per address and a slow HealthCheck, prefer GetWithContext() with a timeout to bound latency.

ADDRESSING: This function pools connections under the user-provided remoteAddr parameter, allowing logical addressing (e.g., "proxy:8080"). This differs from Put(), which uses conn.RemoteAddr().String() for physical addressing. The returned PoolConnWrapper's Close() method uses Release() with the correct logical address, maintaining consistency. Users should not mix GetOrDial with manual Put() calls on the same connection unless the addresses match exactly.

func (*ConnPool) GetWithContext

func (p *ConnPool) GetWithContext(ctx context.Context, remoteAddr string) PooledConnection

GetWithContext retrieves a connection from the pool for the given remote address, respecting context cancellation during health check execution. Returns nil if no suitable connection is available or if the context is cancelled.

This method is recommended over Get() for applications with request deadlines or timeout requirements, as it prevents indefinite blocking if the health check callback performs blocking I/O (AUDIT M-1 fix).

If ctx is cancelled before the health check completes, the candidate connection is returned to the pool and nil is returned.

func (*ConnPool) Put

func (p *ConnPool) Put(conn net.Conn) error

Put adds a connection to the pool for reuse.

Callers must only Put() connections whose Noise handshake has been completed. If a ReadyCheck callback is configured in PoolConfig, it is called before pooling; the connection is rejected (closed) if the check returns false. Without a ReadyCheck, it is the caller's responsibility to ensure the connection is in a usable state.

ADDRESSING (AUDIT L-1): Put() uses conn.RemoteAddr().String() as the pool key, which reflects the connection's physical address. This differs from GetOrDial(), which pools under the user-provided logical address parameter. WARNING: If you obtained the connection via GetOrDial with a logical address (e.g., "proxy:8080"), DO NOT call Put() manually — use the wrapper's Close() method instead, or the connection will be pooled under its physical address (e.g., "10.0.0.1:8080") and Get("proxy:8080") will fail to find it.

PERFORMANCE (AUDIT L-2): The RemoteAddr().String() method is called before acquiring the pool lock. If your net.Addr implementation's String() method is slow or blocking, this will delay connection pooling. Ensure RemoteAddr() returns quickly and returns a non-empty string.

func (*ConnPool) Release

func (p *ConnPool) Release(remoteAddr string, conn net.Conn) error

Release marks a connection as no longer in use, making it available for reuse. Returns an error if the pool is closed or the connection is not found.

When the pool is closed, Release closes the connection and returns a POOL_CLOSED-coded error regardless of whether the close succeeded or failed. This lets callers (e.g., PoolConnWrapper.Close) distinguish "pool was closed, connection already closed inside Release" from other release failures that require the caller to close the connection themselves (L-4, L-6).

If conn is a *PoolConnWrapper, the wrapper is marked closed so that a subsequent call to wrapper.Close() returns an ALREADY_CLOSED error instead of issuing a second release (preventing a double-release vulnerability).

func (*ConnPool) Remove

func (p *ConnPool) Remove(remoteAddr string, conn net.Conn) error

Remove closes a connection and permanently removes it from the pool. Use this when a connection is known to be broken.

Returns CONNECTION_NOT_FOUND if the connection was not in the pool for the given address (the connection is still closed in this case to avoid resource leaks). Returns nil on success.

func (*ConnPool) Snapshot

func (p *ConnPool) Snapshot() []ConnSnapshot

Snapshot returns a read-only metadata snapshot of all pooled connections for diagnostics, monitoring, or testing purposes.

Each returned ConnSnapshot is a value copy containing only metadata fields (Address, CreatedAt, LastUsedAt, IsInUse). No live net.Conn handle is reachable from the snapshot, so callers cannot accidentally corrupt pool state or trigger data races by interacting with the underlying connections.

func (*ConnPool) Stats

func (p *ConnPool) Stats() PoolStats

Stats returns a snapshot of current pool statistics. The returned PoolStats value is a fresh copy and can be safely read by the caller without affecting pool state (AUDIT M-6). Call Stats() again to get updated values.

type ConnSnapshot

type ConnSnapshot struct {
	// Address is the remote address string used as the pool key.
	Address string
	// CreatedAt is the time the connection was added to the pool.
	CreatedAt time.Time
	// LastUsedAt is the time the connection was last returned from Get().
	LastUsedAt time.Time
	// IsInUse reports whether the connection is currently checked out.
	IsInUse bool
}

ConnSnapshot is a read-only metadata copy of a pooled connection's state at a point in time. It carries no live resource handles and is safe to store, compare, and pass freely across goroutines without risk of corrupting pool state. Use Snapshot() to obtain values of this type.

type ConnWrapper

type ConnWrapper = PoolConnWrapper

ConnWrapper is an alias for PoolConnWrapper following Go naming conventions. Prefer ConnWrapper in new code; PoolConnWrapper is retained for backward compatibility.

type Pool

type Pool interface {
	// Get retrieves an idle connection for remoteAddr, or nil if none is available.
	Get(remoteAddr string) PooledConnection
	// GetWithContext retrieves an idle connection for remoteAddr, respecting
	// context cancellation during health check execution. This method is
	// recommended over Get() for applications with request deadlines or
	// timeout requirements (AUDIT L1 fix).
	GetWithContext(ctx context.Context, remoteAddr string) PooledConnection
	// Put returns a connection to the pool for reuse.
	Put(conn net.Conn) error
	// GetOrDial retrieves an idle connection for remoteAddr or dials a new one.
	GetOrDial(ctx context.Context, remoteAddr string, dial func(context.Context) (net.Conn, error)) (PooledConnection, error)
	// Drain waits for all in-use connections to be returned, up to the context deadline.
	Drain(ctx context.Context) error
	// Snapshot returns a read-only metadata snapshot of the current pool state.
	// The returned ConnSnapshot values carry no live net.Conn handles and are
	// safe to store and inspect without risk of corrupting pool state.
	Snapshot() []ConnSnapshot
	// Stats returns pool statistics (total connections, in-use count, etc.).
	Stats() PoolStats
	// Close closes the pool and all connections it holds.
	Close() error
}

Pool is the interface satisfied by *ConnPool. Callers that need to substitute a test double, an LRU pool, or a metrics-instrumented pool can depend on Pool instead of *ConnPool.

type PoolConfig

type PoolConfig struct {
	// MaxSize is the maximum number of connections per remote address.
	// A zero or negative value is treated as "apply the default limit"
	// (see DefaultMaxSize). To deliberately disable the per-address limit
	// (NOT recommended — it is a file-descriptor exhaustion vector), set
	// Unbounded to true.
	MaxSize int
	// MaxTotal is the maximum total number of connections across all addresses.
	// A zero value means no global limit is enforced.
	MaxTotal int
	// MaxAge is the maximum age of a connection before it is closed.
	MaxAge time.Duration
	// MaxIdle is the maximum idle time before a connection is closed.
	MaxIdle time.Duration
	// Unbounded, when true, disables the safe default for MaxSize. Callers
	// must opt in explicitly to unbounded per-address pools so that a
	// forgotten MaxSize does not silently allow FD exhaustion (AUDIT L-1).
	Unbounded bool
	// HealthCheck is an optional callback to probe connection liveness
	// before returning it from Get(). Return true if healthy.
	//
	// PANIC RECOVERY (AUDIT M-3): If the callback panics, the panic is
	// recovered and logged, and the connection is treated as unhealthy
	// (removed from pool and closed). The panic does not propagate to
	// the caller of Get(). Callers cannot distinguish between a panic
	// and a legitimate "false" return value.
	HealthCheck func(net.Conn) bool
	// ReadyCheck is an optional callback invoked by Put() to verify that a
	// connection is ready for reuse (e.g., that a Noise handshake has been
	// completed). Return true if the connection is ready to be pooled.
	// When nil, all connections are accepted by Put().
	//
	// For NTCP2 connections, the recommended check is:
	//   func(c net.Conn) bool {
	//       if nc, ok := c.(*noise.NoiseConn); ok {
	//           return nc.GetConnectionState() == internal.StateEstablished
	//       }
	//       return true
	//   }
	//
	// PANIC RECOVERY (AUDIT M-3): If the callback panics, the panic is
	// recovered and logged, and the connection is treated as not ready
	// (rejected from pool and closed). The panic does not propagate to
	// the caller of Put().
	ReadyCheck func(net.Conn) bool
}

PoolConfig configures a connection pool

type PoolConnWrapper

type PoolConnWrapper struct {
	net.Conn
	// contains filtered or unexported fields
}

PoolConnWrapper wraps a pooled connection to handle automatic release

func (*PoolConnWrapper) Close

func (w *PoolConnWrapper) Close() error

Close returns the connection to the pool instead of closing it. Returns an error on double-close or if the underlying connection close fails. If Release fails with a POOL_CLOSED error, the connection was already closed inside Release; Close returns that error without calling Close() again (L-4). For any other Release failure, the connection is closed defensively and the error from w.Conn.Close() is returned.

func (*PoolConnWrapper) Discard

func (w *PoolConnWrapper) Discard() error

Discard closes the underlying connection and permanently removes it from the pool. Use this when the connection is known to be broken.

type PoolStats

type PoolStats struct {
	// Total is the total number of connections (in-use + available).
	Total int
	// InUse is the number of connections currently checked out.
	InUse int
	// Available is the number of connections ready to be checked out.
	Available int
	// Addresses is the number of unique remote addresses in the pool.
	Addresses int
}

PoolStats holds a snapshot of pool statistics returned by Stats(). All fields are safe to read without holding any lock.

type PooledConn

type PooledConn struct {
	// contains filtered or unexported fields
}

PooledConn represents a connection in the pool with metadata. All fields are unexported to prevent callers from mutating pool state without holding the pool mutex. PooledConn is an internal bookkeeping type; use Snapshot() to get a safe, read-only ConnSnapshot for diagnostics.

func (*PooledConn) NetConn deprecated

func (p *PooledConn) NetConn() net.Conn

NetConn returns the underlying network connection.

Deprecated: NetConn is retained for internal pool use only. Snapshot() now returns []ConnSnapshot which carries no live net.Conn handle. Do not call Close(), Write(), or Read() on this value from external code.

type PooledConnection

type PooledConnection interface {
	net.Conn
	// Discard closes the connection and permanently removes it from the pool.
	// Use this when the connection is known to be broken.
	Discard() error
}

PooledConnection is a net.Conn that exposes Discard() for permanently removing a broken connection from the pool. All connections returned by Get, GetWithContext, and GetOrDial satisfy this interface, so callers who program against the Pool interface can call Discard() without a type assertion to *PoolConnWrapper.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL