-
-
Notifications
You must be signed in to change notification settings - Fork 53
Architecture
- You're debugging a behavior that doesn't match the docs and want to know what the wire actually does.
- You're considering writing a custom backend (Socket / Upgrader / Dialer) and want to understand the contracts.
- You're benchmarking and want to know where the bottlenecks live.
flowchart TB
subgraph Server
S[Server.start dispatch loop]
BR[broadcaster<br>atomic.Pointer]
SE[StackExchange<br>optional]
end
subgraph "Conn (per connection)"
RD[reader goroutine<br>startReader]
WR[Socket writer<br>serialized via mu]
WM[waitingMessages<br>map ch ↔ wait token]
end
subgraph "NSConn (per namespace)"
RM[rooms map]
EV[events map]
end
RD --> |decoded msg| HM[handleMessage]
HM --> |fire event| EV
HM --> |reply| WR
HM --> |signal Ask waiter| WM
S --> |broadcast| BR
BR --> |fan-out| WR
S <--> |publish/subscribe| SE
A neffos process has one Server, one dispatch goroutine (Server.start), and one reader goroutine per connection (Conn.startReader). Writes are serialized inside the underlying Socket via its mutex. Broadcasts use a lock-free atomic-pointer entry table.
sequenceDiagram
autonumber
participant Cl as Client
participant Up as Server.Upgrade
participant R as reader goroutine
participant DL as dispatch loop
participant H as OnConnect
Cl->>Up: HTTP Upgrade
Up->>R: spawn startReader
Up->>DL: s.connect <- c
Cl->>R: ackBinary ('M')
R-->>R: handleACK: wait readiness
Up->>H: s.OnConnect(c)
H-->>Up: nil or error
alt OnConnect error
Up->>R: readiness.unwait(err)
R-->>Cl: ackNotOKBinary + err text
R-->>R: exit (defers Close)
else
Up->>R: readiness.unwait(nil)
R-->>Cl: ackIDBinary + Conn.ID
Note over Cl,R: connection live
end
Why the ack handshake? The server cannot send the connection's ID until OnConnect has decided the connection is allowed. Client and server hold their messages in a per-connection queue until the ack completes; once acknowledged, the queue is drained.
Every neffos frame is a single ;-separated tuple (or its binary equivalent):
<wait>;<namespace>;<room>;<event>;<isError>;<isNoOp>;<body>
-
wait— the request/response token. Empty for fire-and-forget. Starts with$if originated client-side,#if it's a confirmation, plain otherwise. -
namespace/room/event— escaped via;→@%!semicolon@%!so the field separator never collides with user data. -
isError/isNoOp—1or0. -
body— everything from the seventh;to end. Binary frames keep the body in raw bytes.
Native (raw WebSocket) frames are sent unchanged when Message.IsNative is true and Message.wait is empty — useful for interop with non-neffos clients.
client server
------ ------
ws open
--- 'M' ------------------------> handleACK: wait readiness; OnConnect runs
<----------------- 'A' + ID --- server is acknowledged; client unwait(nil)
(queued frames drain on both sides)
If OnConnect returns a non-nil error: server sends 'H' + err.Error() instead of 'A' + ID; client's Dial rejects.
There are three modes:
-
Default (async).
Broadcastcallsbroadcaster.broadcast(msgs). The broadcaster atomically swaps a freshbroadcastEntryinto place and closes the previous entry'sdonechannel. Each per-connectionwaitMessagesgoroutine wakes from its waited entry, reads the entry's messages, writes them to the connection, and loops back to wait on the new entry. -
SyncBroadcaster=true.
Broadcastsends tos.broadcastMessages; the dispatch loop iterates connections in order. Strict order, no overlap, slower. -
StackExchange configured.
BroadcastcallsStackExchange.Publish. The broker fans out to all interested subscribers (including other neffos instances). The local broadcaster is bypassed.
The atomic.Pointer-based broadcaster lets the writes to broadcastEntry.messages happen-before the close(done), which happens-before any receiver's <-done. No mutex is held while a receiver is waiting.
Ask requires a reply, so it needs to know where the reply comes from:
- The caller generates a unique wait token via
genWait. - The wait token is registered in
s.waitingMessages[wait] = chan Message (buffered 1). - A
Broadcastcarriesmsg.waitto all clients. - Whichever client replies first sends a message with the matching wait; the server's
handleMessageseesIsWait()and pushes to the buffered channel. -
Askreturns the reply (orctx.Err()if the deadline fires first).
The channel is buffered to cap 1 so a replier never blocks if the caller cancelled. A deferred delete cleans up the entry in both paths.
A Conn owns a connectedNamespaces map[string]*NSConn, each NSConn owns a rooms map[string]*Room. Both maps are guarded by per-conn / per-nsconn sync.RWMutex. Joining/leaving a room is an Ask to the other side, then a local mutation under the lock.
forceLeaveAll(isLocal) is used during Close and DisconnectAll — it bypasses the remote handshake and fires only the local events. The replyRoom* functions handle incoming join/leave messages from the other side.
Two small synchronization primitives:
-
process(per namespace, on aConn): used byaskConnectto block any concurrenttryNamespacefor the same namespace until the connect handshake finishes. IdempotentDone, lock-freeWaitafter Done. -
waiterOnce(perConn): one-shot signal for the "OnConnect has decided" state. Closed once with the outcome ofOnConnect(nil or error). All subsequentwait()calls return immediately.
- The ask method — caller-side view of the Ask protocol
- Broadcast — caller-side view of broadcasts
- Scale out — what changes when a StackExchange is configured
-
Upgraders and dialers — the
Socketinterface this all sits on
Home | About | Project | Getting Started | Technical Docs | Copyright © 2019-2023 Gerasimos Maropoulos. Documentation terms of use.
Getting started
Concepts
Messaging
Production
Scale out