mtmux

package module
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

README

Multiple Tunnels Multiplexing (mtmux)

mtmux is a Go library that enables multiplexing multiple logical streams over multiple TCP tunnels. It allows for efficient data transfer by distributing the load across several connections, improving performance and reliability.

graph LR
    S1[Stream 1]
    S2[Stream 2]
    S3[Stream 3]
    S4[Stream 4]
    S5[Stream 5]
    S6[Stream 6]

    Sp1[Stream 1']
    Sp2[Stream 2']
    Sp3[Stream 3']
    Sp4[Stream 4']
    Sp5[Stream 5']
    Sp6[Stream 6']

    A((A))
    B((B))

    subgraph Middle[Multiplexer]
        L1[Link 1]
        L2[Link 2]
        L3[Link 3]
    end

    S1 --> A
    S2 --> A
    S3 --> A
    S4 --> A
    S5 --> A
    S6 --> A

    A --> L1
    A --> L2
    A --> L3

    L1 --> B
    L2 --> B
    L3 --> B

    B --> Sp1
    B --> Sp2
    B --> Sp3
    B --> Sp4
    B --> Sp5
    B --> Sp6

    %% 定义样式:所有文本为黑色
    classDef stream fill:#e6f3ff,stroke:#3399ff,color:black;
    classDef link fill:#fff2e6,stroke:#ff9933,color:black;
    classDef point fill:#f0f0f0,stroke:#666,stroke-width:2px,color:black;
    classDef subgraphLabel color:black;

    %% 应用样式
    class S1,S2,S3,S4,S5,S6,Sp1,Sp2,Sp3,Sp4,Sp5,Sp6 stream
    class L1,L2,L3 link
    class A,B point

    %% 强制子图标题也为黑色(部分渲染器支持)
    class Left,Right,Middle subgraphLabel

Install

go get github.com/flben233/mtmux

Example

Use as tunnel

Server side:

mtmux -m server -p 12345 -t 8

Client side:

Connect to server at 127.0.0.1:12345 and listen on local port 15200, forwarding to target on server side 127.0.0.1:5201.

mtmux -m client -p 15200 -s 127.0.0.1:12345 -d 127.0.0.1:5201 -t 8
Use as library

Server:

// Get a default configuration
cfg := mtmux.DefaultConfig()

// Listen for incoming connections. This will use the port range from 12345 to 12345 + Tunnels - 1
ln, _ := mtmux.Listen("tcp", "127.0.0.1:12345", int(cfg.Tunnels))
defer ln.Close()
bundle, _ := ln.Accept()

// Create a new server session
session, _ := mtmux.Server(bundle, cfg)
session.Start(context.Background())
defer session.Close()

// Accept a new stream
stream, _ := session.AcceptStream()

// Write and read some data
stream.Write([]byte("hello"))
buf := make([]byte, 1024)
n, _ := stream.Read(buf)

fmt.Println(string(buf[:n]))

Client:

// Get a default configuration
cfg := mtmux.DefaultConfig()

// Dial to the server
bundle, _ := mtmux.Dial("tcp", "127.0.0.1:12345", int(cfg.Tunnels))

// Create a new client session
session, _ := mtmux.Client(bundle, cfg)
session.Start(context.Background())
defer session.Close()

// Open a new stream
stream, _ := session.OpenStream()

// Write and read some data
buf := make([]byte, 1024)
n, _ := stream.Read(buf)
_, _ = stream.Write([]byte("world"))

fmt.Println(string(buf[:n]))

Configuration

  • Tunnels: Number of TCP tunnels to use for multiplexing.
  • KeepAliveInterval: Interval for sending keep-alive messages.
  • KeepAliveTimeout: Timeout duration for keep-alive messages.
  • Timeout: Timeout duration for waiting for missing data frames.

Documentation

Index

Constants

View Source
const (
	SESSION_BUFFER_SIZE      = STREAM_BUFFER_SIZE + 1024 // Extra 1KB for session header
	CONTROL_STREAM_ID        = "0"
	CONTROL_TYPE_OPEN_STREAM = "OPEN_STREAM"
	CONTROL_STREAM_CONFIRMED = "STREAM_CONFIRMED"
)
View Source
const STREAM_BUFFER_SIZE = 32 * 1024 // 32KB buffer size

Variables

This section is empty.

Functions

func ConnWrapper

func ConnWrapper(conns []net.Conn, wrapper func(conn net.Conn) net.Conn) []net.Conn

func Debug added in v0.2.1

func Debug(v ...interface{})

Leveled convenience helpers

func Debugf added in v0.2.1

func Debugf(format string, v ...interface{})

func Dial

func Dial(network, addr string, tunnels int) ([]net.Conn, error)

func Error added in v0.2.1

func Error(v ...interface{})

func Errorf added in v0.2.1

func Errorf(format string, v ...interface{})

func Info added in v0.2.1

func Info(v ...interface{})

func Infof added in v0.2.1

func Infof(format string, v ...interface{})

func RemoveElement

func RemoveElement(slice []net.Conn, index int) []net.Conn

func SetLevel added in v0.2.1

func SetLevel(l Level)

SetLevel sets the global minimum logging level. Messages below this level will be ignored by the default logger. If a custom logger is set it is up to that implementation to respect levels.

func SetLogger added in v0.2.1

func SetLogger(l Logger)

SetLogger sets the package-wide logger. Passing nil resets to default.

func SetNoOutputLogger added in v0.2.1

func SetNoOutputLogger()

SetNoOutputLogger sets a logger that discards all output.

func WaitWithContext

func WaitWithContext(wg *sync.WaitGroup, ctx context.Context) error

func Warn added in v0.2.1

func Warn(v ...interface{})

func Warnf added in v0.2.1

func Warnf(format string, v ...interface{})

Types

type Config

type Config struct {
	KeepAliveInterval time.Duration
	KeepAliveTimeout  time.Duration
	Tunnels           int32
	Timeout           time.Duration
}

func DefaultConfig

func DefaultConfig() *Config

type ControlMsg

type ControlMsg struct {
	Type string
	Data string
}

type Frame

type Frame struct {
	StreamID string
	// Data      []byte
	DataLen   uint64
	DataIndex uint64
	IsEOF     bool // Signals that the sender has closed the write end
}

type Level added in v0.2.1

type Level int

Level represents a logging level.

const (
	LevelDebug Level = iota
	LevelInfo
	LevelWarn
	LevelError
	LevelOff
)

type Listener

type Listener struct {
	Host      string
	Port      uint64
	Tunnels   int
	Listeners []net.Listener
}

func Listen

func Listen(network, addr string, tunnels int) (*Listener, error)

func ListenWithCustomFunc

func ListenWithCustomFunc(addr string, tunnels int, listenFunc func(addr string) (net.Listener, error)) (*Listener, error)

func (*Listener) Accept

func (l *Listener) Accept() ([]net.Conn, error)

func (*Listener) Addr

func (l *Listener) Addr() net.Addr

func (*Listener) Close

func (l *Listener) Close() error

type Logger added in v0.2.1

type Logger interface {
	Debug(v ...interface{})
	Info(v ...interface{})
	Warn(v ...interface{})
	Error(v ...interface{})

	Debugf(format string, v ...interface{})
	Infof(format string, v ...interface{})
	Warnf(format string, v ...interface{})
	Errorf(format string, v ...interface{})
}

Logger is the interface users can implement to provide custom logging. It includes leveled methods and formatters.

type SafeBuffer added in v0.3.0

type SafeBuffer struct {
	// contains filtered or unexported fields
}

func (*SafeBuffer) Len added in v0.3.0

func (sb *SafeBuffer) Len() int

func (*SafeBuffer) Read added in v0.3.0

func (sb *SafeBuffer) Read(p []byte) (n int, err error)

func (*SafeBuffer) ReadWhenNotEmpty added in v0.3.0

func (sb *SafeBuffer) ReadWhenNotEmpty(p []byte) (n int, err error)

func (*SafeBuffer) Reset added in v0.3.2

func (sb *SafeBuffer) Reset()

func (*SafeBuffer) Write added in v0.3.0

func (sb *SafeBuffer) Write(p []byte) (n int, err error)

type Session

type Session struct {
	ID         string
	LocalAddr  net.Addr
	RemoteAddr net.Addr
	// contains filtered or unexported fields
}

func Client

func Client(conns []net.Conn, config *Config) (*Session, error)

func Server

func Server(conns []net.Conn, config *Config) (*Session, error)

func (*Session) AcceptStream

func (s *Session) AcceptStream() (*Stream, error)

func (*Session) Close

func (s *Session) Close() error

func (*Session) IsClosed

func (s *Session) IsClosed() bool

func (*Session) NumStreams

func (s *Session) NumStreams() int

func (*Session) OpenStream

func (s *Session) OpenStream() (*Stream, error)

func (*Session) Start

func (s *Session) Start(ctx context.Context)

type Stream

type Stream struct {
	ID           string
	ReadBuf      *SafeBuffer
	WriteBuf     chan []byte
	ReadIndex    atomic.Uint64 // Received data index
	WriteIndex   atomic.Uint64 // Sent data index
	UnorderedBuf map[uint64][]byte
	Finalize     func(StreamID string)
	// contains filtered or unexported fields
}

func NewStream added in v0.1.6

func NewStream(streamID string, finalize func(string)) *Stream

NewStream creates a new stream with the given streamID

func (*Stream) Close

func (s *Stream) Close() error

func (*Stream) CloseRead

func (s *Stream) CloseRead() error

In the most cases, CloseRead is called when EOF is received from remote side.

func (*Stream) CloseWrite

func (s *Stream) CloseWrite() error

CloseWrite closes the write end of the stream. Can be used to signal EOF to the remote side. You need to call CloseRead to prevent resource leak when EOF from remote side isn't received. (e.g., setting a timeout and closing the stream)

func (*Stream) Deliver

func (s *Stream) Deliver(data []byte, idx uint64, isEOF bool) error

DO NOT MODIFY data AFTER PASSING TO Deliver

func (*Stream) IsClosed added in v0.1.6

func (s *Stream) IsClosed() bool

func (*Stream) IsReadClosed added in v0.1.6

func (s *Stream) IsReadClosed() bool

func (*Stream) IsWriteClosed added in v0.1.6

func (s *Stream) IsWriteClosed() bool

func (*Stream) LocalAddr

func (s *Stream) LocalAddr() net.Addr

LocalAddr returns the local network address, if known.

func (*Stream) PutBuffer added in v0.1.7

func (s *Stream) PutBuffer(buf []byte)

func (*Stream) Read

func (s *Stream) Read(b []byte) (n int, err error)

func (*Stream) RemoteAddr

func (s *Stream) RemoteAddr() net.Addr

RemoteAddr returns the remote network address, if known.

func (*Stream) SetDeadline

func (s *Stream) SetDeadline(t time.Time) error

SetDeadline sets the read and write deadlines associated with the stream. Currently not implemented for mtmux streams.

func (*Stream) SetReadDeadline

func (s *Stream) SetReadDeadline(t time.Time) error

SetReadDeadline sets the deadline for future Read calls. Currently not implemented for mtmux streams.

func (*Stream) SetWriteDeadline

func (s *Stream) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the deadline for future Write calls. Currently not implemented for mtmux streams.

func (*Stream) Write

func (s *Stream) Write(b []byte) (n int, err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL