streaming

package module
v0.1.59999 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: MIT Imports: 19 Imported by: 3

README

go-streaming

I2P streaming protocol implementation in Go. Provides TCP-like reliable, ordered, bidirectional streams over the I2P anonymous network using go-i2cp.

Features

Implemented
  • Three-way handshake (SYN/SYN-ACK/ACK)
  • Data transfer with sequence tracking and selective ACK
  • CLOSE handshake for clean shutdown
  • net.Conn interface (Read, Write, Close, SetDeadlines)
  • Packet-based flow control (6-packet initial window, up to 128 packets)
  • Exponential backoff retransmission
  • Out-of-order packet buffering
  • Ping/Pong (ECHO) for RTT measurement
  • StreamManager for automatic packet routing from I2CP callbacks
  • Connection multiplexing on single I2CP session
  • MTU negotiation (1730 bytes default, 1812 for ECIES)
Planned
  • Full RST (connection reset) support
  • Bulk vs. interactive connection profiles
Limitations

No half-close: I2P streaming requires bidirectional CLOSE. Both sides must send CLOSE to fully terminate a connection. When you call Close():

  1. Sends a CLOSE packet
  2. Waits for the peer's CLOSE acknowledgment
  3. Cleans up resources

Use SetWriteDeadline or SetDeadline if you need to timeout during close.

Usage

Server
import (
    "context"
    "io"
    go_i2cp "github.com/go-i2p/go-i2cp"
    streaming "github.com/go-i2p/go-streaming"
)

func main() {
    client := go_i2cp.NewClient(&go_i2cp.ClientCallBacks{})
    client.Connect(context.Background())
    defer client.Close()
    
    manager, _ := streaming.NewStreamManager(client)
    manager.StartSession(context.Background())
    defer manager.Close()
    
    listener, _ := streaming.ListenWithManager(manager, 8080, 1730)
    defer listener.Close()
    
    for {
        conn, _ := listener.Accept()
        go io.Copy(conn, conn)
    }
}
Client
import (
    "context"
    go_i2cp "github.com/go-i2p/go-i2cp"
    streaming "github.com/go-i2p/go-streaming"
)

func main() {
    client := go_i2cp.NewClient(&go_i2cp.ClientCallBacks{})
    client.Connect(context.Background())
    defer client.Close()
    
    manager, _ := streaming.NewStreamManager(client)
    manager.StartSession(context.Background())
    defer manager.Close()
    
    crypto := go_i2cp.NewCrypto()
    dest, _ := go_i2cp.NewDestinationFromBase64(destB64, crypto)
    
    conn, _ := streaming.DialWithManager(manager, dest, 0, 8080)
    defer conn.Close()
    
    conn.Write([]byte("Hello, I2P!"))
    buf := make([]byte, 1024)
    n, _ := conn.Read(buf)
}

See examples/ for complete working examples.

Logging

Uses github.com/go-i2p/logger. Logging is disabled by default.

Variable Values Description
DEBUG_I2P debug, warn, error Log level
WARNFAIL_I2P true Exit on warnings
DEBUG_I2P=debug go run ./examples/echo/server/

Testing

go test -v ./...        # All tests
go test -race -v ./...  # With race detector
go test -cover ./...    # With coverage

System tests (requires running I2P router):

go test -tags=system -v -timeout=5m

Containerized system tests (predictable router + test runner, no host ports):

./scripts/container/test-with-router.sh

This runs tests in a router-tests container against an i2p-router container with no published ports.

Documentation

License

MIT - See LICENSE

Documentation

Overview

Package streaming provides TCP-like reliable streams over I2P.

Package streaming implements the I2P streaming protocol, providing TCP-like reliable, ordered, bidirectional streams over the I2P anonymous network.

This is an MVP implementation focusing on correctness over performance. It uses github.com/go-i2p/go-i2cp for I2CP transport and implements the I2P streaming packet format using standard library encoding.

Architecture:

  • Each I2CP message carries one TCP-like packet (not fragmented at I2CP layer)
  • Default MTU is 1730 bytes payload (fits in 2x 1KB I2NP tunnel messages)
  • ECIES connections can use 1812 bytes MTU (lower overhead)
  • Windowing uses packet count, not byte count per I2P streaming spec

Package streaming provides TCP-like reliable streams over I2P.

Index

Constants

View Source
const (
	// FlagSYN (bit 0): SYNCHRONIZE - Similar to TCP SYN.
	// Set in the initial packet and in the first response.
	// FROM_INCLUDED and SIGNATURE_INCLUDED must also be set.
	FlagSYN uint16 = 1 << 0

	// FlagCLOSE (bit 1): Similar to TCP FIN.
	// If the response to a SYNCHRONIZE fits in a single message,
	// the response will contain both SYNCHRONIZE and CLOSE.
	// SIGNATURE_INCLUDED must also be set.
	FlagCLOSE uint16 = 1 << 1

	// FlagRESET (bit 2): Abnormal close.
	// SIGNATURE_INCLUDED must also be set.
	FlagRESET uint16 = 1 << 2

	// FlagSignatureIncluded (bit 3): Signature is present in option data.
	// Currently sent only with SYNCHRONIZE, CLOSE, and RESET, where it is required,
	// and with ECHO, where it is required for a ping.
	FlagSignatureIncluded uint16 = 1 << 3

	// FlagSignatureRequested (bit 4): Unused.
	// Requests every packet in the other direction to have SIGNATURE_INCLUDED.
	FlagSignatureRequested uint16 = 1 << 4

	// FlagFromIncluded (bit 5): FROM destination is present in option data.
	// Currently sent only with SYNCHRONIZE, where it is required,
	// and with ECHO, where it is required for a ping.
	FlagFromIncluded uint16 = 1 << 5

	// FlagDelayRequested (bit 6): Optional delay field is present.
	// 2 bytes indicating how many milliseconds the sender wants the recipient
	// to wait before sending any more data. A value greater than 60000 indicates choking.
	FlagDelayRequested uint16 = 1 << 6

	// FlagMaxPacketSizeIncluded (bit 7): MTU is present in option data.
	// The maximum length of the payload. Sent with SYNCHRONIZE.
	FlagMaxPacketSizeIncluded uint16 = 1 << 7

	// FlagProfileInteractive (bit 8): Unused or ignored.
	// The interactive profile is unimplemented.
	FlagProfileInteractive uint16 = 1 << 8

	// FlagECHO (bit 9): Ping/pong packet.
	// If set, most other options are ignored.
	// Ping: sendStreamId > 0, requires SIGNATURE_INCLUDED and FROM_INCLUDED
	// Pong: sendStreamId = 0, receiveStreamId echoes ping's sendStreamId
	FlagECHO uint16 = 1 << 9

	// FlagNoACK (bit 10): Tells the recipient to ignore the ackThrough field.
	// Currently set in the initial SYN packet, otherwise the ackThrough field is always valid.
	FlagNoACK uint16 = 1 << 10

	// FlagOfflineSignature (bit 11): Offline signature (LS2) is present.
	// Contains the offline signature section from LS2.
	// FROM_INCLUDED must also be set.
	FlagOfflineSignature uint16 = 1 << 11
)

Packet flags per I2P streaming specification. Bit order: 15....0 (15 is MSB) per https://geti2p.net/spec/streaming

View Source
const (
	// SignatureTypeDSA_SHA1 is the original I2P signature type (legacy, pre-0.9.12)
	SignatureTypeDSA_SHA1 = 0
	// SignatureTypeECDSA_SHA256_P256 uses ECDSA with P-256 curve
	SignatureTypeECDSA_SHA256_P256 = 1
	// SignatureTypeECDSA_SHA384_P384 uses ECDSA with P-384 curve
	SignatureTypeECDSA_SHA384_P384 = 2
	// SignatureTypeECDSA_SHA512_P521 uses ECDSA with P-521 curve
	SignatureTypeECDSA_SHA512_P521 = 3
	// SignatureTypeRSA_SHA256_2048 uses RSA with 2048-bit key
	SignatureTypeRSA_SHA256_2048 = 4
	// SignatureTypeRSA_SHA384_3072 uses RSA with 3072-bit key
	SignatureTypeRSA_SHA384_3072 = 5
	// SignatureTypeRSA_SHA512_4096 uses RSA with 4096-bit key
	SignatureTypeRSA_SHA512_4096 = 6
	// SignatureTypeEd25519 is the modern I2P signature type (default since 0.9.15)
	SignatureTypeEd25519 = 7
	// SignatureTypeEd25519ph is Ed25519 with pre-hashing
	SignatureTypeEd25519ph = 8
)

I2P Signature Type Constants These match the I2P specification for signature types in destination certificates. Reference: https://geti2p.net/spec/common-structures#certificate

View Source
const (
	// DestinationPublicKeySize is the size of the ElGamal encryption key (256 bytes)
	DestinationPublicKeySize = 256
	// DestinationSigningKeyPadding is the legacy DSA signing key size used for padding (128 bytes)
	DestinationSigningKeyPadding = 128
)

I2P Destination Structure Constants

View Source
const (
	// DefaultMTU is the default maximum transmission unit in bytes (payload only).
	// Set to 1730 to fit in 2x 1KB I2NP tunnel messages.
	DefaultMTU = 1730

	// ECIESMTU is the recommended MTU for ECIES-X25519 connections.
	// Lower overhead allows 1812 bytes payload.
	ECIESMTU = 1812

	// MinMTU is the minimum MTU that must be supported per spec.
	MinMTU = 512

	// DefaultWindowSize is the initial window size in packets (not bytes).
	// Per I2P streaming spec, start with 6 packets if no control block data available.
	DefaultWindowSize = 6

	// MaxWindowSize is the maximum window size in packets.
	// Per I2P streaming spec, maximum is 128 packets.
	MaxWindowSize = 128

	// DefaultConnectTimeout is the default timeout for Dial operations.
	DefaultConnectTimeout = 60 * time.Second

	// DefaultHandshakeTimeout is the timeout for waiting for SYN-ACK response.
	DefaultHandshakeTimeout = 30 * time.Second

	// DefaultInactivityTimeout is the default inactivity timeout per I2P streaming spec.
	// Per spec: i2p.streaming.inactivityTimeout = 90*1000 (90 seconds).
	// When no data is sent or received for this duration, a keepalive action is taken.
	DefaultInactivityTimeout = 90 * time.Second

	// InactivityCheckInterval is how often the inactivity timer checks for timeout.
	InactivityCheckInterval = 10 * time.Second

	// MinRTO is the minimum retransmission timeout.
	// Per I2P streaming spec: MIN_RESEND_DELAY = 100ms.
	MinRTO = 100 * time.Millisecond

	// MaxRTO is the maximum retransmission timeout.
	// Per I2P streaming spec: MAX_RESEND_DELAY = 45 seconds.
	MaxRTO = 45 * time.Second

	// FastRetransmitThreshold is the number of NACKs required before fast retransmit.
	// Per I2P streaming spec: "Two NACKs of a packet is a request for a 'fast retransmit'"
	FastRetransmitThreshold = 2

	// MaxRetransmissions is the maximum number of times a packet can be retransmitted.
	// Per I2P streaming spec: i2p.streaming.maxResends default = 8.
	MaxRetransmissions = 8

	// PersistProbeInterval is the interval between probe packets when choked.
	// Per I2P streaming spec: "The choked endpoint should start a 'persist timer'
	// to control the probing" to compensate for possible lost unchoke packets.
	// Using 5 seconds as a reasonable interval (less than typical chokedUntil but often enough).
	PersistProbeInterval = 5 * time.Second

	// MinDelayChoke is the minimum OptionalDelay value that indicates choking.
	// Per I2P streaming spec: "Optional delay values greater than 60000 indicate choking."
	// Java I2P reference: Packet.java MIN_DELAY_CHOKE = 60001
	MinDelayChoke uint16 = 60001

	// SendDelayChoke is the OptionalDelay value to send when signaling choke.
	// Java I2P reference: Packet.java SEND_DELAY_CHOKE = 61000
	SendDelayChoke uint16 = 61000

	// MaxDelayNormal is the maximum OptionalDelay value for normal (non-choke) delay requests.
	// Values 0-60000 indicate an advisory delay in milliseconds.
	MaxDelayNormal uint16 = 60000
)

MTU constants per I2P streaming specification

View Source
const MaxNACKs = 255

MaxNACKs is the maximum number of NACKs that can be stored and sent per packet. Per I2P streaming specification, NACK count is limited to 255.

View Source
const MaxPingPayloadSize = 32

MaxPingPayloadSize is the maximum payload size for ping/pong packets. Per I2P streaming spec: "The payload in the ping, up to a maximum of 32 bytes, is returned in the pong."

Variables

This section is empty.

Functions

func CleanupTestConnections

func CleanupTestConnections()

CleanupTestConnections closes the shared test connection. Call this from TestMain if needed.

func GenerateTestISN

func GenerateTestISN() uint32

GenerateTestISN generates a fixed ISN for testing. Exported so tests can use it for assertions.

func ParseHashList

func ParseHashList(list string) []string

ParseHashList parses a comma or space-separated list of hashes. Per I2P spec: "Comma- or space-separated list of Base64 peer Hashes"

func PeerDestination added in v0.1.52

func PeerDestination(addr net.Addr) (*go_i2cp.Destination, bool)

PeerDestination extracts an I2P destination from a net.Addr. Returns false if the address is not an I2PAddr or no destination is available.

func PeerDestinationBase64 added in v0.1.52

func PeerDestinationBase64(addr net.Addr) (string, bool)

PeerDestinationBase64 extracts a peer destination Base64 from a net.Addr. Returns false if the address is not an I2PAddr or no destination is available.

func RequireI2CPSession

func RequireI2CPSession(t *testing.T) *go_i2cp.Session

RequireI2CPSession returns a shared I2CP session for tests that need just a session object (not a full StreamManager connection setup). This is useful for unit tests that construct StreamConn manually.

func SignPacket

func SignPacket(pkt *Packet, keyPair *go_i2cp.Ed25519KeyPair) error

SignPacket signs a packet with the given Ed25519 signing key pair. The signature covers the entire marshaled packet with the signature field zeroed.

Requirements:

  • packet.FromDestination must be set (required for signature length calculation)
  • packet.Flags must have FlagSignatureIncluded set
  • keyPair must match the destination's public key

Process:

  1. Marshal the packet (signature field will be zeros or reserved space)
  2. Zero out the signature bytes in the marshaled data
  3. Sign the modified data
  4. Update packet.Signature with the signature

Returns error if marshalling fails or signing fails.

func VerifyOfflineSignature

func VerifyOfflineSignature(offsig *OfflineSig, dest *go_i2cp.Destination, crypto *go_i2cp.Crypto) error

VerifyOfflineSignature verifies an offline signature (LS2) by checking that:

  1. The signature has not expired
  2. The destination's signing key properly signed the transient key

Offline signatures allow a destination to delegate signing authority to a transient key with an expiration time. This is used in LeaseSet2 (LS2) destinations for key rotation and improved security.

The signed data format per I2P specification:

  • Expires: 4 bytes (Unix timestamp)
  • TransientSigType: 2 bytes (signature type)
  • TransientPublicKey: variable length (based on signature type)

Requirements:

  • offsig must not be nil
  • dest must not be nil
  • crypto must not be nil for key operations

Returns error if:

  • Signature has expired (current time > offsig.Expires)
  • Signature verification fails
  • Input parameters are invalid

func VerifyPacketSignature

func VerifyPacketSignature(pkt *Packet, crypto *go_i2cp.Crypto) error

VerifyPacketSignature verifies a packet's signature using the public key from the packet's FromDestination field.

Requirements:

  • packet.FromDestination must be set
  • packet.Signature must be set
  • packet.Flags must have FlagSignatureIncluded set

Process:

  1. Extract signing public key from FromDestination
  2. Marshal packet with signature field zeroed
  3. Verify signature against the marshaled data

Returns error if verification fails or prerequisites are not met.

Types

type AccessDeniedError

type AccessDeniedError struct {
	Reason string
}

AccessDeniedError is returned when a connection is rejected due to access list.

func (*AccessDeniedError) Error

func (e *AccessDeniedError) Error() string

type AccessListConfig

type AccessListConfig struct {
	// Mode specifies how the access list is used
	Mode AccessListMode

	// Hashes contains the list of destination hashes (Base64 encoded or raw 32-byte hashes)
	// Per spec: "Comma- or space-separated list of Base64 peer Hashes"
	Hashes []string

	// DisableRejectLogging disables log warnings when connections are rejected
	DisableRejectLogging bool
}

AccessListConfig configures destination-based access filtering. This implements the i2cp.accessList, i2cp.enableAccessList, and i2cp.enableBlackList options from the I2P streaming specification.

func DefaultAccessListConfig

func DefaultAccessListConfig() *AccessListConfig

DefaultAccessListConfig returns the default (disabled) configuration. Per I2P spec, access list filtering is disabled by default.

type AccessListMode

type AccessListMode int

AccessListMode specifies how the access list is used.

const (
	// AccessListModeDisabled means no access list filtering (default)
	AccessListModeDisabled AccessListMode = iota
	// AccessListModeWhitelist allows only listed destinations
	AccessListModeWhitelist
	// AccessListModeBlacklist blocks listed destinations
	AccessListModeBlacklist
)

type ConnState

type ConnState int

ConnState represents the current state of a streaming connection. Follows I2P streaming protocol state machine.

const (
	// StateInit is the initial state before any handshake
	StateInit ConnState = iota
	// StateSynSent indicates SYN sent, waiting for SYN-ACK
	StateSynSent
	// StateSynRcvd indicates SYN received, SYN-ACK sent, waiting for ACK
	StateSynRcvd
	// StateEstablished indicates connection is established and ready for data
	StateEstablished
	// StateCloseWait indicates CLOSE received, waiting to send CLOSE response
	StateCloseWait
	// StateClosing indicates CLOSE sent, waiting for CLOSE response
	StateClosing
	// StateClosed indicates connection is fully closed
	StateClosed
)

func (ConnState) String

func (s ConnState) String() string

String returns a human-readable representation of the connection state.

type ConnectionLimitsConfig

type ConnectionLimitsConfig struct {
	// MaxConcurrentStreams is the total limit for incoming and outgoing streams combined.
	// 0 or negative means unlimited.
	MaxConcurrentStreams int

	// Per-peer incoming connection limits
	MaxConnsPerMinute int // Max incoming connections per minute from a single peer
	MaxConnsPerHour   int // Max incoming connections per hour from a single peer
	MaxConnsPerDay    int // Max incoming connections per day from a single peer

	// Total incoming connection limits (all peers combined)
	MaxTotalConnsPerMinute int
	MaxTotalConnsPerHour   int
	MaxTotalConnsPerDay    int

	// LimitAction specifies what to do when limits are exceeded
	LimitAction LimitAction

	// DisableRejectLogging disables log warnings when connections are rejected
	DisableRejectLogging bool
}

ConnectionLimitsConfig configures connection rate limiting. All limit values of 0 mean disabled (unlimited). This implements the i2p.streaming.* connection limiting options from the spec.

func DefaultConnectionLimitsConfig

func DefaultConnectionLimitsConfig() *ConnectionLimitsConfig

DefaultConnectionLimitsConfig returns the default (unlimited) configuration. Per I2P spec, all limits are disabled by default.

type I2PAddr

type I2PAddr struct {
	// contains filtered or unexported fields
}

I2PAddr implements the net.Addr interface for I2P destinations. This allows StreamConn to be used anywhere net.Conn is expected.

func (*I2PAddr) Base32 added in v0.1.52

func (a *I2PAddr) Base32() string

Base32 returns the Base32 representation of the destination. Returns an empty string if no destination is available.

func (*I2PAddr) Base64 added in v0.1.52

func (a *I2PAddr) Base64() string

Base64 returns the Base64 representation of the destination. Returns an empty string if no destination is available.

func (*I2PAddr) Destination added in v0.1.52

func (a *I2PAddr) Destination() *go_i2cp.Destination

Destination returns the underlying I2P destination. Returns nil if no destination is available.

func (*I2PAddr) Network

func (a *I2PAddr) Network() string

Network returns the network type ("i2p"). Implements net.Addr interface.

func (*I2PAddr) String

func (a *I2PAddr) String() string

String returns a string representation of the I2P address. For now, returns "destination:port" format. Implements net.Addr interface.

type LimitAction

type LimitAction int

LimitAction specifies what action to take when connection limits are exceeded.

const (
	// LimitActionReset sends a RESET packet to the peer (default)
	LimitActionReset LimitAction = iota
	// LimitActionDrop silently drops the connection without response
	LimitActionDrop
	// LimitActionHTTP sends an HTTP 429 response before closing
	LimitActionHTTP
)

type MessageStats

type MessageStats struct {
	TotalSent         uint64 // Total messages sent
	TotalDelivered    uint64 // Messages with success status
	TotalFailed       uint64 // Messages with failure status
	TotalRetried      uint64 // Messages retried after failure
	TotalExpired      uint64 // Messages expired without status
	AvgDeliveryTimeMs int64  // Average delivery time in milliseconds
	LastDeliveryMs    int64  // Last delivery time in milliseconds
}

MessageStats tracks message delivery statistics.

type OfflineSig

type OfflineSig struct {
	Expires            uint32 // Timestamp (seconds since epoch)
	TransientSigType   uint16 // Signature type of transient key
	TransientPublicKey []byte // Variable length based on type
	DestSignature      []byte // Signature by destination key
}

OfflineSig represents an I2P LS2 offline signature block. Offline signatures allow a destination to delegate signing authority to a transient key, which is useful for LeaseSet2 (LS2) destinations where the signing key may be offline.

Structure per I2P specification:

  • Expires: Unix timestamp (4 bytes) when the offline signature expires
  • TransientSigType: Signature type of the transient key (2 bytes)
  • TransientPublicKey: Public key for the transient signing key (variable length)
  • DestSignature: Signature by the destination's signing key (variable length)

The destination signs the transient key to prove it authorized the delegation.

type Packet

type Packet struct {
	// Required fields (always present)
	SendStreamID uint32 // Stream ID from sender's perspective
	RecvStreamID uint32 // Stream ID from receiver's perspective
	SequenceNum  uint32 // Packet sequence number
	AckThrough   uint32 // Highest sequence number acknowledged
	Flags        uint16 // Packet flags (SYN, ACK, CLOSE, etc.)

	// Optional fields (presence indicated by flags or special values)
	NACKs         []uint32 // Negative acknowledgments for selective ACK or replay prevention (destination hash in SYN)
	OptionalDelay uint16   // Optional delay in ms (0-60000 = delay, >60000 = choked)
	ResendDelay   uint8    // Resend delay hint (changed from uint16 per spec)
	MaxPacketSize uint16   // MTU - maximum payload size in bytes (sent with SYN)

	// Authentication fields (presence indicated by FlagFromIncluded and FlagSignatureIncluded)
	FromDestination  *go_i2cp.Destination // Source destination when FlagFromIncluded is set (387+ bytes)
	Signature        []byte               // Packet signature when FlagSignatureIncluded is set (variable length based on key type)
	OfflineSignature *OfflineSig          // Offline signature (LS2) when FlagOfflineSignature is set

	// Payload
	Payload []byte
}

Packet represents an I2P streaming protocol packet. Per spec, packets are variable length with optional fields.

Design rationale:

  • Implements I2P streaming packet format per specification
  • Uses standard library encoding/binary for serialization
  • Keeps packet format simple for MVP (no advanced options initially)

func (*Packet) Marshal

func (p *Packet) Marshal() ([]byte, error)

Marshal serializes the Packet into bytes per I2P streaming protocol format.

func (*Packet) Unmarshal

func (p *Packet) Unmarshal(data []byte) error

Unmarshal parses bytes into a Packet per I2P streaming protocol format.

type PingConfig

type PingConfig struct {
	// AnswerPings controls whether to respond to incoming ping packets.
	// Per spec: "Streaming may be configured to disable sending pongs with
	// the configuration i2p.streaming.answerPings=false."
	// Default: true
	AnswerPings bool

	// PingTimeout is the maximum time to wait for a pong response.
	// Default: 30 seconds
	PingTimeout time.Duration
}

PingConfig holds configuration options for ping operations.

func DefaultPingConfig

func DefaultPingConfig() *PingConfig

DefaultPingConfig returns the default ping configuration.

type PingResult

type PingResult struct {
	// RTT is the round-trip time of the ping.
	RTT time.Duration
	// Payload is the echoed payload from the pong (up to 32 bytes).
	Payload []byte
	// Err is any error that occurred during the ping.
	Err error
}

PingResult represents the result of a ping operation.

type ProfileConfig

type ProfileConfig struct {
	// Profile specifies the traffic pattern hint.
	// Default: ProfileBulk (1)
	Profile StreamProfile
}

ProfileConfig holds profile-related configuration for streaming connections. Per I2P spec: i2p.streaming.profile option.

func DefaultProfileConfig

func DefaultProfileConfig() ProfileConfig

DefaultProfileConfig returns the default profile configuration. Per spec, the default is bulk (optimize for bandwidth).

type StreamConn

type StreamConn struct {
	// contains filtered or unexported fields
}

StreamConn represents a single bidirectional stream over I2CP. Implements net.Conn interface.

Design decisions:

  • Single sync.Mutex for simplicity (can optimize later with RWMutex)
  • Simple []byte buffers initially (no fancy circular buffers yet)
  • Fixed window size for MVP (dynamic sizing in post-MVP)
  • Packet-based windowing per I2P spec (not byte-based like TCP)

func CreateTestStreamConn

func CreateTestStreamConn(t *testing.T) *StreamConn

CreateTestStreamConn creates a StreamConn for testing with a real I2CP session. This replaces the old createTestConnection that used session: nil.

func Dial

func Dial(session *go_i2cp.Session, dest *go_i2cp.Destination, localPort, remotePort uint16) (*StreamConn, error)

Dial initiates a connection to the specified I2P destination. This implements the client side of the three-way handshake:

  1. Send SYN packet with our MTU and random ISN
  2. Wait for SYN-ACK response
  3. Send ACK to complete handshake

The connection includes MTU negotiation - both peers advertise their maximum supported MTU and use the minimum of the two.

MVP implementation uses simple polling with time.Sleep() rather than sophisticated channel-based state management. This can be optimized later.

Parameters:

  • session: Active I2CP session for sending/receiving messages
  • dest: Remote I2P destination to connect to
  • localPort: Source port for this connection (0 for automatic)
  • remotePort: Destination port on remote peer

Returns the established connection or an error if handshake fails.

func DialWithMTU

func DialWithMTU(session *go_i2cp.Session, dest *go_i2cp.Destination, localPort, remotePort uint16, mtu int, timeout time.Duration) (*StreamConn, error)

DialWithMTU is like Dial but allows specifying a custom MTU and timeout. Use ECIESMTU (1812) for ECIES-X25519 connections for better efficiency.

func DialWithManager

func DialWithManager(manager *StreamManager, dest *go_i2cp.Destination, localPort, remotePort uint16) (*StreamConn, error)

DialWithManager initiates a connection using a StreamManager for packet routing. This is the recommended way to create outgoing connections as it integrates with I2CP callbacks. The manager will route incoming packets to this connection automatically.

func (*StreamConn) Close

func (s *StreamConn) Close() error

Close closes the connection and releases resources. Implements the io.Closer and net.Conn interfaces.

Phase 5: Implements proper CLOSE handshake per I2P streaming spec. CLOSE is distinct from FIN - it's a bidirectional handshake.

Half-Close Behavior: Unlike TCP, I2P streaming does NOT support true half-close semantics. Per the I2P streaming specification: "The connection is not closed until the peer responds with the CLOSE flag."

When Close() is called:

  1. A CLOSE packet is sent to the peer
  2. The connection state transitions to StateClosing
  3. Local resources are cleaned up immediately (TCB saved to cache)

Note: This implementation does not wait for the peer's CLOSE acknowledgment before returning. This is intentional for simplicity, but means:

  • Data in flight may not be delivered
  • Use SetWriteDeadline before Close() if delivery is critical
  • Consider sending all data and waiting for ACKs before closing

Java I2P reference: ConnectionPacketHandler.java "this is fine, half-close" indicates that half-close was a "major bug before 0.9.9" where packets were dropped and resets sent. The current go-streaming behavior is correct.

func (*StreamConn) GetInactivityTimeout

func (s *StreamConn) GetInactivityTimeout() time.Duration

GetInactivityTimeout returns the current inactivity timeout setting.

func (*StreamConn) LocalAddr

func (s *StreamConn) LocalAddr() net.Addr

LocalAddr returns the local network address. Implements net.Conn interface.

func (*StreamConn) Profile

func (s *StreamConn) Profile() StreamProfile

Profile returns the local stream profile hint. Returns ProfileBulk (1) for bulk transfer or ProfileInteractive (2) for interactive.

func (*StreamConn) Read

func (s *StreamConn) Read(buf []byte) (int, error)
  • Use circbuf.Bytes() and manual management (no built-in Read method)
  • No timeout support yet (Phase 5+)

Returns the number of bytes read or an error. Read receives data from the connection. Implements the io.Reader and net.Conn interfaces.

This method blocks until data is available in the receive buffer. The receive buffer is populated by the receiveLoop() goroutine which processes incoming packets.

Phase 5 additions:

  • Returns io.EOF when connection is closed (per net.Conn spec)
  • Respects read deadlines

Returns the number of bytes read or an error.

func (*StreamConn) RemoteAddr

func (s *StreamConn) RemoteAddr() net.Addr

RemoteAddr returns the remote network address. Implements net.Conn interface.

func (*StreamConn) RemoteProfile

func (s *StreamConn) RemoteProfile() StreamProfile

RemoteProfile returns the remote peer's stream profile hint as received in SYN. Returns ProfileBulk (1) for bulk transfer or ProfileInteractive (2) for interactive.

func (*StreamConn) SetDeadline

func (s *StreamConn) SetDeadline(t time.Time) error

SetDeadline sets the read and write deadlines. Implements net.Conn interface.

A zero value for t means I/O operations will not time out.

func (*StreamConn) SetInactivityTimeout

func (s *StreamConn) SetInactivityTimeout(timeout time.Duration)

SetInactivityTimeout sets the inactivity timeout for the connection. Set to 0 to disable inactivity detection. Default is 90 seconds per I2P streaming spec.

func (*StreamConn) SetReadDeadline

func (s *StreamConn) SetReadDeadline(t time.Time) error

SetReadDeadline sets the deadline for future Read calls. Implements net.Conn interface.

A zero value for t means Read will not time out.

func (*StreamConn) SetWriteDeadline

func (s *StreamConn) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the deadline for future Write calls. Implements net.Conn interface.

A zero value for t means Write will not time out.

func (*StreamConn) Write

func (s *StreamConn) Write(data []byte) (int, error)

Write sends data over the connection. Implements the io.Writer interface.

This method handles MTU-aware chunking, splitting large writes into multiple packets that fit within the negotiated MTU. Each packet is sent with proper sequence numbers and flow control.

MVP implementation:

  • Simple chunking based on MTU
  • Increment sequence number for each packet
  • Window-based flow control: blocks when in-flight packets >= cwnd
  • Choke handling: waits using condition variable (no race condition)

Returns the number of bytes written or an error.

type StreamListener

type StreamListener struct {
	// contains filtered or unexported fields
}

StreamListener listens for and accepts incoming streaming connections. Minimal implementation for MVP.

func Listen

func Listen(session *go_i2cp.Session, localPort uint16) (*StreamListener, error)

Listen creates a StreamListener that accepts incoming connections on the specified port. This sets up the infrastructure for the server side of the handshake.

The listener will wait for incoming SYN packets and complete the three-way handshake:

  1. Receive SYN packet (via StreamManager callback)
  2. Send SYN-ACK response
  3. Wait for ACK to complete (handled by connection)

This function creates a StreamManager internally to handle I2CP callbacks. The manager routes incoming packets to this listener based on port.

Parameters:

  • session: Active I2CP session for sending/receiving messages
  • localPort: Port to listen on (0 for automatic)

Returns a listener ready to accept connections.

IMPORTANT: The caller must call StartProcessingIO() on the returned listener to begin receiving messages from I2CP. Without this, no packets will be received.

func ListenWithMTU

func ListenWithMTU(session *go_i2cp.Session, localPort uint16, mtu int) (*StreamListener, error)

ListenWithMTU is like Listen but allows specifying a custom MTU. Use ECIESMTU (1812) for ECIES-X25519 connections.

func ListenWithManager

func ListenWithManager(manager *StreamManager, localPort uint16, mtu int) (*StreamListener, error)

ListenWithManager creates a StreamListener that uses a StreamManager for packet routing. This is the recommended way to create listeners as it integrates with I2CP callbacks.

The manager handles:

  • Registering SessionCallbacks with I2CP
  • Routing incoming SYN packets to this listener
  • Managing multiple listeners/connections on one session

Use this instead of Listen() when you want automatic packet routing from I2CP.

func (*StreamListener) Accept

func (l *StreamListener) Accept() (net.Conn, error)

Accept waits for and returns the next incoming connection. This blocks until a connection is available or the listener is closed.

MVP implementation uses simple channel-based approach. The actual SYN packet processing will be implemented in Phase 4.

func (*StreamListener) Addr

func (l *StreamListener) Addr() net.Addr

Addr returns the listener's network address. Implements net.Listener interface.

func (*StreamListener) Close

func (l *StreamListener) Close() error

Close stops the listener and rejects new connections.

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager manages I2CP session integration and routes incoming packets to the appropriate streaming connections. This is the bridge between I2CP transport and the streaming protocol layer.

Architecture:

  • Registers SessionCallbacks with I2CP session
  • Routes incoming messages (protocol 6) to connections
  • Manages connection multiplexing by port
  • Handles new incoming connections for listeners
  • Handles ping/pong (ECHO) packets per I2P streaming spec

Why a manager is needed:

  • I2CP delivers messages via callbacks, not polling
  • Multiple connections share a single I2P session
  • Need to route packets to correct connection by port
  • Server needs to accept new connections from SYN packets

func CreateSecondI2CPSession

func CreateSecondI2CPSession(t *testing.T) *StreamManager

CreateSecondI2CPSession creates a separate I2CP session for client/server tests. This is needed because loopback to the same destination may not work reliably. Returns a new StreamManager with its own session and destination.

func NewStreamManager

func NewStreamManager(client *go_i2cp.Client) (*StreamManager, error)

NewStreamManager creates a new stream manager for the given I2CP client. This sets up the SessionCallbacks and starts the packet processor.

The manager handles:

  • Receiving I2CP messages via callbacks
  • Routing packets to appropriate connections/listeners
  • Creating I2CP session with proper callbacks

Call StartSession() after creating to initialize the I2CP session.

func NewStreamManagerFromSession

func NewStreamManagerFromSession(client *go_i2cp.Client, session *go_i2cp.Session) (*StreamManager, error)

NewStreamManagerFromSession creates a stream manager that wraps an existing I2CP session. This is useful when integrating with systems that manage their own I2CP sessions, such as SAM bridge implementations.

IMPORTANT: The session must have been created with callbacks that forward to this StreamManager. Use GetSessionCallbacksForManager() to get the appropriate callbacks before creating the session, or ensure your callbacks invoke the StreamManager's handler methods.

Example usage with pre-created session:

// Create manager first (without session)
sm := streaming.NewStreamManagerBase(client)

// Get callbacks that forward to the manager
callbacks := sm.GetSessionCallbacks()

// Create session with those callbacks
session := go_i2cp.NewSession(client, callbacks)

// Attach the session to the manager
sm.SetSession(session)

Or use the convenience constructor:

// If session was created with compatible callbacks
sm, err := streaming.NewStreamManagerFromSession(client, existingSession)

Note: When using an existing session, StartSession() should NOT be called as the session is assumed to already be initialized. The manager will immediately mark itself as ready.

func (*StreamManager) AccessFilter

func (sm *StreamManager) AccessFilter() *accessFilter

AccessFilter returns the access filter for this manager. This is used internally by listeners to check access permissions.

func (*StreamManager) ActiveStreams

func (sm *StreamManager) ActiveStreams() int

ActiveStreams returns the current number of active streams.

func (*StreamManager) AddToAccessList

func (sm *StreamManager) AddToAccessList(hash string)

AddToAccessList adds a destination hash to the access list. The hash should be the base64 representation of the destination hash.

func (*StreamManager) CleanupTCBCache

func (sm *StreamManager) CleanupTCBCache() int

CleanupTCBCache removes expired entries from the TCB cache. This can be called periodically to prevent memory growth. Returns the number of entries removed.

func (*StreamManager) Close

func (sm *StreamManager) Close() error

Close shuts down the stream manager and all connections. This stops the packet processor and closes the I2CP session.

func (*StreamManager) ConnectionLimiter

func (sm *StreamManager) ConnectionLimiter() *connectionLimiter

ConnectionLimiter returns the connection limiter for this manager. This is used internally by listeners to check limits.

func (*StreamManager) Destination

func (sm *StreamManager) Destination() *go_i2cp.Destination

Destination returns the local I2P destination for this session. This is the address that remote peers use to connect to us.

func (*StreamManager) EnableTCBCache

func (sm *StreamManager) EnableTCBCache(enabled bool)

EnableTCBCache enables or disables TCB cache sharing. When disabled, connections will not use cached RTT/window values.

func (*StreamManager) GetAccessFilter

func (sm *StreamManager) GetAccessFilter() *AccessListConfig

GetAccessFilter returns a copy of the current access filter configuration.

func (*StreamManager) GetConnectionLimits

func (sm *StreamManager) GetConnectionLimits() *ConnectionLimitsConfig

GetConnectionLimits returns a copy of the current connection limits configuration.

func (*StreamManager) GetMessageStats

func (sm *StreamManager) GetMessageStats() MessageStats

GetMessageStats returns the current message delivery statistics.

func (*StreamManager) GetPingConfig

func (sm *StreamManager) GetPingConfig() *PingConfig

GetPingConfig returns a copy of the current ping configuration.

func (*StreamManager) GetProfile

func (sm *StreamManager) GetProfile() StreamProfile

GetProfile returns the current streaming profile.

func (*StreamManager) GetProfileConfig

func (sm *StreamManager) GetProfileConfig() ProfileConfig

GetProfileConfig returns the current streaming profile configuration. Per I2P spec: i2p.streaming.profile option.

func (*StreamManager) GetSessionCallbacks

func (sm *StreamManager) GetSessionCallbacks() go_i2cp.SessionCallbacks

GetSessionCallbacks returns the SessionCallbacks that should be used when creating an I2CP session that will be used with this StreamManager.

This is useful for SAM bridge integration or other scenarios where the session needs to be created externally but must route messages through the StreamManager.

Example:

sm := streaming.NewStreamManagerBase(client)
callbacks := sm.GetSessionCallbacks()
// Optionally wrap or extend callbacks here
session := go_i2cp.NewSession(client, callbacks)
sm.SetSession(session)

func (*StreamManager) GetStreamProfile

func (sm *StreamManager) GetStreamProfile() StreamProfile

GetStreamProfile returns the configured stream profile hint. Returns ProfileBulk (1) for bulk transfer or ProfileInteractive (2) for interactive.

func (*StreamManager) GetTCBCacheConfig

func (sm *StreamManager) GetTCBCacheConfig() TCBCacheConfig

GetTCBCacheConfig returns the current TCB cache configuration.

func (*StreamManager) LookupDestination

func (sm *StreamManager) LookupDestination(ctx context.Context, hostname string) (*go_i2cp.Destination, error)

LookupDestination performs a destination lookup and waits for the result. This is a convenience method that handles the request tracking and result retrieval.

func (*StreamManager) MessageTracker

func (sm *StreamManager) MessageTracker() *messageStatusTracker

MessageTracker returns the message status tracker for this manager. This is used internally by connections to track outgoing messages.

func (*StreamManager) Ping

func (sm *StreamManager) Ping(ctx context.Context, dest *go_i2cp.Destination, payload []byte) *PingResult

Ping sends a ping packet to the specified destination and waits for a pong response. This implements the I2P streaming ping/pong mechanism per specification.

Per spec:

  • A ping packet has ECHO, SIGNATURE_INCLUDED, and FROM_INCLUDED flags set
  • The sendStreamId must be greater than zero
  • Payload up to 32 bytes is echoed back in the pong

Example:

result := manager.Ping(ctx, destination, []byte("hello"))
if result.Err != nil {
    log.WithError(result.Err).Error("ping failed")
} else {
    log.WithField("rtt", result.RTT).Info("ping successful")
}

func (*StreamManager) RegisterConnection

func (sm *StreamManager) RegisterConnection(localPort, remotePort uint16, conn *StreamConn)

RegisterConnection registers a connection for incoming packet routing.

func (*StreamManager) RegisterListener

func (sm *StreamManager) RegisterListener(port uint16, listener *StreamListener)

RegisterListener registers a listener to receive incoming connections. When SYN packets arrive for this port, they'll be routed to the listener.

func (*StreamManager) RemoveFromAccessList

func (sm *StreamManager) RemoveFromAccessList(hash string)

RemoveFromAccessList removes a destination hash from the access list.

func (*StreamManager) Session

func (sm *StreamManager) Session() *go_i2cp.Session

Session returns the underlying I2CP session. This is needed for creating StreamListener and StreamConn instances.

func (*StreamManager) SetAccessFilter

func (sm *StreamManager) SetAccessFilter(config *AccessListConfig)

SetAccessFilter configures the access list filtering for this manager. Pass nil to use default (disabled) configuration.

func (*StreamManager) SetAccessListEnabled

func (sm *StreamManager) SetAccessListEnabled(enabled bool, mode AccessListMode)

SetAccessListEnabled enables or disables access list filtering.

func (*StreamManager) SetAccessListMode

func (sm *StreamManager) SetAccessListMode(mode AccessListMode)

SetAccessListMode sets the access list mode (whitelist or blacklist).

func (*StreamManager) SetConnectionLimits

func (sm *StreamManager) SetConnectionLimits(config *ConnectionLimitsConfig)

SetConnectionLimits updates the connection rate limiting configuration. Use this to protect against connection flooding attacks.

Example:

manager.SetConnectionLimits(&streaming.ConnectionLimitsConfig{
    MaxConcurrentStreams: 100,
    MaxConnsPerMinute:    10,
    LimitAction:          streaming.LimitActionReset,
})

func (*StreamManager) SetPingConfig

func (sm *StreamManager) SetPingConfig(config *PingConfig)

SetPingConfig updates the ping configuration. Use this to enable/disable answering pings or adjust timeout.

func (*StreamManager) SetProfile

func (sm *StreamManager) SetProfile(profile StreamProfile)

SetProfile is a convenience method to set the streaming profile. Valid values are ProfileBulk (1) and ProfileInteractive (2).

func (*StreamManager) SetProfileConfig

func (sm *StreamManager) SetProfileConfig(config ProfileConfig)

SetProfileConfig updates the streaming profile configuration. This affects new connections only; existing connections retain their profile. Per I2P spec: i2p.streaming.profile option.

func (*StreamManager) SetSession

func (sm *StreamManager) SetSession(session *go_i2cp.Session) error

SetSession sets the I2CP session for this StreamManager. This is used when the session is created externally and needs to be attached to an existing StreamManager.

IMPORTANT: The session should have been created with callbacks from GetSessionCallbacks() to ensure proper message routing.

If the packet processor is not already running, this method starts it. After calling SetSession, do NOT call StartSession().

func (*StreamManager) SetStreamProfile

func (sm *StreamManager) SetStreamProfile(profile StreamProfile)

SetStreamProfile sets the stream profile hint for new connections. ProfileBulk (1) is optimized for large data transfers. ProfileInteractive (2) is optimized for low-latency exchanges.

func (*StreamManager) SetTCBCacheConfig

func (sm *StreamManager) SetTCBCacheConfig(config TCBCacheConfig)

SetTCBCacheConfig updates the TCB cache configuration. Changes affect new cache entries and lookups immediately.

func (*StreamManager) StartSession

func (sm *StreamManager) StartSession(ctx context.Context) error

StartSession initializes the I2CP session and waits for it to be ready. This must be called after creating the manager before any Listen/Dial operations.

The session creation process:

  1. Sends CreateSession message to I2P router
  2. Waits for SessionCreated response
  3. Receives destination address from router
  4. Signals sessionReady when complete

Note: Some I2P router configurations may not send SessionCreated responses. In this case, we proceed after a timeout as the session may still be usable.

func (*StreamManager) TCBCache

func (sm *StreamManager) TCBCache() *tcbCache

TCBCache returns the TCB cache instance for direct access. The TCB cache implements RFC 2140 control block sharing, storing RTT, RTT variance, and window size estimates per remote peer.

func (*StreamManager) UnregisterConnection

func (sm *StreamManager) UnregisterConnection(localPort, remotePort uint16)

UnregisterConnection removes a connection registration.

func (*StreamManager) UnregisterListener

func (sm *StreamManager) UnregisterListener(port uint16)

UnregisterListener removes a listener registration.

type StreamProfile

type StreamProfile int

StreamProfile represents the streaming profile hint per I2P spec. The profile is a hint to the streaming library about expected traffic patterns. Per spec: "Optimization strategies, if any, are implementation-dependent."

Note: As of API 0.9.64, Java I2P ignores this value. The PROFILE_INTERACTIVE flag is defined in the protocol but "not implemented in any known router." This implementation accepts the configuration for completeness but does not change behavior based on profile selection.

const (
	// ProfileBulk (1) optimizes for high bandwidth, possibly at the expense of latency.
	// This is the default profile per I2P streaming specification.
	ProfileBulk StreamProfile = 1

	// ProfileInteractive (2) optimizes for low latency, possibly at the expense
	// of bandwidth or efficiency. When set, the PROFILE_INTERACTIVE flag (bit 8)
	// is included in SYN packets to hint to the remote peer.
	ProfileInteractive StreamProfile = 2
)

func (StreamProfile) IsValid

func (p StreamProfile) IsValid() bool

IsValid returns true if the profile is a valid value per I2P spec.

func (StreamProfile) String

func (p StreamProfile) String() string

String returns a human-readable name for the profile.

type TCBCacheConfig

type TCBCacheConfig struct {
	// RTTDampening controls how much to dampen RTT when sharing (0.0-1.0)
	// Cached RTT is multiplied by this factor when applied to new connections.
	// Default: 0.75 per I2P streaming spec
	RTTDampening float64

	// RTTDevDampening controls how much to dampen RTT variance (0.0-1.0)
	// Default: 0.75 per I2P streaming spec
	RTTDevDampening float64

	// WindowDampening controls how much to dampen window size (0.0-1.0)
	// Default: 0.75 per I2P streaming spec
	WindowDampening float64

	// EntryTTL is how long cache entries remain valid after last update
	// Default: 5 minutes per I2P spec "expires after a few minutes"
	EntryTTL time.Duration

	// Enabled controls whether TCB sharing is active
	// Default: true
	Enabled bool
}

TCBCacheConfig holds configuration for TCB cache behavior. Dampening factors control how much cached values influence new connections. Per I2P spec defaults: all dampening factors = 0.75

func DefaultTCBCacheConfig

func DefaultTCBCacheConfig() TCBCacheConfig

DefaultTCBCacheConfig returns the default TCB cache configuration per I2P spec.

type TCBData

type TCBData struct {
	RTT         time.Duration
	RTTVariance time.Duration
	WindowSize  uint32
	FromCache   bool
}

TCBData holds the control block parameters that can be applied to a new connection.

type TestI2CPConnection

type TestI2CPConnection struct {
	Client  *go_i2cp.Client
	Manager *StreamManager
	// contains filtered or unexported fields
}

TestI2CPConnection holds a shared I2CP connection for tests. This ensures all tests use a real I2P router connection.

func RequireI2CP

func RequireI2CP(t *testing.T) *TestI2CPConnection

RequireI2CP returns a shared I2CP connection for tests. It connects to the I2P router at localhost:7654. Tests will fail if no I2P router is available.

Directories

Path Synopsis
examples
echo/client command
Echo client example for go-i2p/go-streaming
Echo client example for go-i2p/go-streaming
echo/server command
Echo server example for go-i2p/go-streaming
Echo server example for go-i2p/go-streaming
scripts
container command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL