antarys

package module
v0.0.0-...-5042f98 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2025 License: Apache-2.0 Imports: 38 Imported by: 0

README

Antarys

Antarys

Antarys

A hackable vector database for on-demand scaling

License

High-Performance Embeddable Vector Database. [WIP]

Shards

Everything lives in shards:

type ShardedCollectionData struct {
    Vectors      map[string][]float32  // Your actual vectors
    Metadata     map[string]any        // embedding metadata you give us
    HNSWGraph    *hnswGraph           // The search index

    // async channels
    pendingInserts   chan *pendingInsert
    hnswUpdateChan   chan *hnswUpdate
}

HNSW Graphs are responsible for creating connections

type hnswGraph struct {
    Nodes      map[uint32]*hnswNode  // Map of node ID to actual node
    EntryPoint uint32                // Where to start searches
    IDToNodeID map[string]uint32     // Your ID -> our internal ID
}

type hnswNode struct {
    ID          string
    VectorID    string
    Connections map[int][]uint32      // level -> list of connected nodes
}

Async Insertion

Here's where it gets interesting. When you insert a vector:

  1. We immediately store it in the Vectors map
  2. Return success to you right away
  3. Separately queue an HNSW update
func (shard *ShardedCollectionData) processInsertLockless(insert *pendingInsert) error {
    // Store the vector right away
    shard.Mutex.Lock()
    shard.Vectors[insert.id] = insert.vector
    shard.Metadata[insert.id] = insert.metadata
    shard.Mutex.Unlock()

    // Try to queue the HNSW update
    select {
    case shard.hnswUpdateChan <- &hnswUpdate{...}:
        // Great, it's queued
    default:
        // channel full
    }

    return nil
}

There's a worker constantly processing these updates:

func (shard *ShardedCollectionData) asyncHNSWWorker() {
    updateBuffer := make([]*hnswUpdate, 0, 100)
    ticker := time.NewTicker(10 * time.Millisecond)

    for {
        select {
        case update := <-shard.hnswUpdateChan:
            updateBuffer = append(updateBuffer, update)

            // Process in batches of 100
            if len(updateBuffer) >= 100 {
                shard.processBatchHNSWUpdates(updateBuffer)
                updateBuffer = updateBuffer[:0]
            }

        case <-ticker.C:
            // Or every 10ms, whichever comes first
            if len(updateBuffer) > 0 {
                shard.processBatchHNSWUpdates(updateBuffer)
                updateBuffer = updateBuffer[:0]
            }
        }
    }
}

Current Limitations

This approach has some issues with the channel being full, HNSW updates might get lost resulting in search quality loss when search operations kick in. Also a goroutine dump can be triggered if we are reading and writing too much information all at the same time.

The Future: Contiguous Arrays

The current approach uses tons of pointers and maps. Every time we search, we're jumping around memory. Not great for performance.

The plan is to switch to structure-of-arrays - basically flatten everything into contiguous chunks of memory.

Instead of This

// Current: nodes scattered in memory
type hnswNode struct {
    ID          string
    Connections map[int][]uint32  // Pointer to another map
}

nodes := map[uint32]*hnswNode{  // Pointers everywhere
    1: &hnswNode{...},
    2: &hnswNode{...},
}

We'll Do This

type Graph struct {
    // All vectors in one big chunk
    Vectors     [][]float32    // [nodeID][dimension]

    // All edges flattened
    Edges       []uint32       // [1,2,5,3,7,9,...]
    EdgeStart   []uint32       // [0,3,6,...] - where each node's edges begin
    EdgeCount   []uint16       // [3,3,2,...] - how many edges each node has

    // Node info
    Levels      []uint8        // [2,1,3,...] - max level for each node
    EntryPoint  uint32
}

Why This Is Better

Cache Friendly: Instead of chasing pointers, we do sequential reads through arrays.

// Current: pointer hopping
for _, neighbor := range node.Connections[level] {
    neighborNode := graph.Nodes[neighbor]  // Cache miss
    distance := similarity(query, neighborNode.Vector)  // Another cache miss
}

// Future: array walking
for i := uint32(0); i < graph.NodeCount; i++ {
    vector := graph.Vectors[i]           // Sequential access
    distance := similarity(query, vector) // Cache hit
}

SIMD Friendly: Can process multiple vectors at once at compile time:

// Process 8 vectors simultaneously
for i := 0; i < len(nodeIDs); i += 8 {
    batch := nodeIDs[i:i+8]
    similarities := dotProductBatch8(query, graph.Vectors, batch)
}

Copy-on-Write for Consistency

To fix the consistency issues, we'll use immutable snapshots:

type Database struct {
    currentSnapshot  *Graph    // Read from this
    mutableBuffer    *WriteBuffer // Write to this
}

// Reads are always consistent
func (db *Database) Search(query []float32) []Result {
    snapshot := atomic.LoadPointer(&db.currentSnapshot)
    return searchGraph(snapshot, query)
}

// Writes accumulate in buffer
func (db *Database) Insert(vector []float32) error {
    db.buffer.Add(vector)

    // Rebuild when buffer gets big
    if db.buffer.Size() > threshold {
        go db.rebuildSnapshot()
    }
}

// Atomic swap when ready
func (db *Database) rebuildSnapshot() {
    newGraph := merge(db.currentSnapshot, db.buffer)
    atomic.StorePointer(&db.currentSnapshot, newGraph)
    db.buffer.Clear()
}

Memory Layout Example

Say you have 3 vectors with these connections:

  • Node 0: connects to [1, 2]
  • Node 1: connects to [0, 2]
  • Node 2: connects to [0, 1]

Current storage:

Node0 -> {connections: map[0:[1,2]]} -> malloc'd somewhere
Node1 -> {connections: map[0:[0,2]]} -> malloc'd somewhere else
Node2 -> {connections: map[0:[0,1]]} -> malloc'd somewhere else

Array storage:

Vectors:   [v0_data, v1_data, v2_data]        // Sequential
Edges:     [1, 2, 0, 2, 0, 1]                // All edges flattened
EdgeStart: [0, 2, 4]                         // Node 0 starts at 0, Node 1 at 2, etc
EdgeCount: [2, 2, 2]                         // Each node has 2 edges

Planned Features

The following features are in development:

  • Contiguous Arrays Architecture
  • Filtered search and payload indexing (HTTP API)
  • Distributed architecture with coordination
  • gRPC support for high-performance RPC
  • Comprehensive observability and monitoring
  • Advanced resource management
  • Hybrid and multi-modal search capabilities
  • Multi-tenancy and security features
  • Enhanced query optimization
  • Advanced collection management
  • Backup and recovery improvements

Documentation

Overview

taken from https://github.com/Anush008/fastembed-go

Index

Constants

View Source
const (
	DefaultDimension           = 1536
	DefaultMaxResults          = 100
	DefaultSimilarityThreshold = 0.0
	DefaultCommitInterval      = 30 * time.Second
	DefaultDataDirName         = "data"
	DefaultCacheSize           = 5000
	DefaultQueryConcurrency    = 4
	DefaultHNSWM               = 16
	DefaultHNSWEfConstruction  = 200
	DefaultHNSWEfSearch        = 64
	MagicHeader                = "ANTARYS"
	FormatVersion              = 1
	DefaultShards              = 8
	DefaultBatchSize           = 1000
	MinAccuracyCandidates      = 64
	MaxHNSWLevels              = 12
)
View Source
const ANTARYS_TITLE = `` /* 184-byte string literal not displayed */
View Source
const VERSION string = "0.2.0"

Variables

View Source
var ModelsByIndex = map[int]string{
	1: "fast-bge-base-en",
	2: "fast-bge-base-en-v1.5",
	3: "fast-bge-small-en",
	4: "fast-bge-small-en-v1.5",
	5: "fast-bge-small-zh-v1.5",
}
View Source
var ValidChecksumFiles map[string]string = map[string]string{
	"libonnxruntime.1.22.0.dylib": "db045368293215c9d22aa7b8c983d688b3ae9ca1da3f64ffbe01ba7df31c3355",
	"libonnxruntime_arm64.so":     "0afd69a0ae38c5099fd0e8604dda398ac43dee67cd9c6394b5142b19e82528de",
	"libonnxruntime_x64.so":       "3da6146e14e7b8aaec625dde11d6114c7457c87a5f93d744897da8781e35c673",
}

Functions

func AntarysTitle

func AntarysTitle()

func BatchCosineSimilarity

func BatchCosineSimilarity(queries [][]float32, targets [][]float32) [][]float32

func BatchNormalizeVectors

func BatchNormalizeVectors(vectors [][]float32)

func FileExists

func FileExists(path string) bool

func GetOptimalVector

func GetOptimalVector(size int) []float32

func GetSIMDWidth

func GetSIMDWidth() int

func HasModel

func HasModel(model EmbeddingModel, cacheDir string) bool

func HasOnnxRuntime

func HasOnnxRuntime(runtime OnnxRuntime, cacheDir string) bool

func IsModelValid

func IsModelValid(id string, cacheDir string) bool

func IsOnnxValid

func IsOnnxValid(cacheDir string) bool

func LoadDynamicOnnx

func LoadDynamicOnnx() (string, error)

func MatrixVectorMultiply

func MatrixVectorMultiply(matrix [][]float32, vector []float32) []float32

func NormaliseVector

func NormaliseVector(vector []float32)

func NormaliseVectorLockless

func NormaliseVectorLockless(vector []float32)

func NormaliseVectors

func NormaliseVectors(vectors [][]float32)

func ResetMathStats

func ResetMathStats()

func RetrieveModel

func RetrieveModel(model EmbeddingModel, cacheDir string, showDownloadProgress bool) (string, error)

func RetrieveRuntime

func RetrieveRuntime(runtime OnnxRuntime, cacheDir string, showDownloadProgress bool) (string, error)

func TestPrecisionAccuracy

func TestPrecisionAccuracy() map[string]float64

func ValidateChecksum

func ValidateChecksum(path string, expected string) bool

func ValidateOnnxAndModel

func ValidateOnnxAndModel(modelID string, cacheDir string) error

Types

type AntarysDB

type AntarysDB struct {
	Collections map[string]*Collection
	Mutex       sync.RWMutex
	// contains filtered or unexported fields
}

func NewAntarysDB

func NewAntarysDB(dataDir string) (*AntarysDB, error)

func NewAntarysDBWithConfig

func NewAntarysDBWithConfig(config Config) (*AntarysDB, error)

func (*AntarysDB) BatchInsert

func (db *AntarysDB) BatchInsert(collection string, batch []*BatchInsert) error

func (*AntarysDB) BatchInsertInCollection

func (db *AntarysDB) BatchInsertInCollection(collection string, records []struct {
	ID       string
	Vector   []float32
	Metadata any
}) error

func (*AntarysDB) BuildHNSWIndex

func (db *AntarysDB) BuildHNSWIndex(collectionName string, options IndexingOptions) error

func (*AntarysDB) BuildIndex

func (db *AntarysDB) BuildIndex(collectionName string, options IndexingOptions) error

func (*AntarysDB) Close

func (db *AntarysDB) Close() error

func (*AntarysDB) Commit

func (db *AntarysDB) Commit() error

func (*AntarysDB) Count

func (db *AntarysDB) Count() int64

func (*AntarysDB) CountInCollection

func (db *AntarysDB) CountInCollection(collection string) int64

func (*AntarysDB) CreateCollection

func (db *AntarysDB) CreateCollection(name string, dimensions int) error

func (*AntarysDB) CreateCollectionWithOptions

func (db *AntarysDB) CreateCollectionWithOptions(name string, dimensions int, options IndexingOptions) error

func (*AntarysDB) Delete

func (db *AntarysDB) Delete(id string) error

func (*AntarysDB) DeleteCollection

func (db *AntarysDB) DeleteCollection(name string) error

func (*AntarysDB) DeleteFromCollection

func (db *AntarysDB) DeleteFromCollection(collection string, id string) error

func (*AntarysDB) Dump

func (db *AntarysDB) Dump(outputDir string) error

func (*AntarysDB) ExportToJSON

func (db *AntarysDB) ExportToJSON(collectionName string, outputFile string) error

func (*AntarysDB) Get

func (db *AntarysDB) Get(id string) (VectorRecord, error)

func (*AntarysDB) GetCollectionDimensions

func (db *AntarysDB) GetCollectionDimensions(collectionName string) (int, error)

func (*AntarysDB) GetFromCollection

func (db *AntarysDB) GetFromCollection(collection, id string) (VectorRecord, error)

func (*AntarysDB) GetStats

func (db *AntarysDB) GetStats() map[string]interface{}

func (*AntarysDB) InsertInCollection

func (db *AntarysDB) InsertInCollection(collection string, id string, vector []float32, metadata any) error

func (*AntarysDB) ListCollections

func (db *AntarysDB) ListCollections() []string

func (*AntarysDB) ListIDs

func (db *AntarysDB) ListIDs(collectionName string) ([]string, error)

func (*AntarysDB) Load

func (db *AntarysDB) Load() error

func (*AntarysDB) LoadFromJSON

func (db *AntarysDB) LoadFromJSON(collectionName string, inputFile string) error

func (*AntarysDB) PurgeCache

func (db *AntarysDB) PurgeCache()

func (*AntarysDB) RebuildIndex

func (db *AntarysDB) RebuildIndex(collectionName string, options IndexingOptions) error

func (*AntarysDB) Restore

func (db *AntarysDB) Restore(inputDir string) error

func (*AntarysDB) SearchByMetadata

func (db *AntarysDB) SearchByMetadata(filter func(any) bool, opts *SearchOptions) ([]VectorRecord, error)

func (*AntarysDB) SearchByMetadataInCollection

func (db *AntarysDB) SearchByMetadataInCollection(collection string, filter func(any) bool, opts *SearchOptions) ([]VectorRecord, error)

func (*AntarysDB) SearchInCollection

func (db *AntarysDB) SearchInCollection(collectionName string, query []float32, opts *SearchOptions) ([]SearchResult, error)

func (*AntarysDB) StartAutoCommit

func (db *AntarysDB) StartAutoCommit(interval time.Duration)

func (*AntarysDB) UpdateMetadata

func (db *AntarysDB) UpdateMetadata(collection, id string, metadata any) error

type BatchInsert

type BatchInsert struct {
	Id       string
	Vector   []float32
	Metadata any
	Created  time.Time
	Updated  time.Time
}

type Collection

type Collection struct {
	Name         string
	Dimensions   int `json:"dimensions"`
	VectorCount  int64
	Shards       []*ShardedCollectionData
	ShardMask    uint32
	Initialized  bool
	IndexOptions IndexingOptions
}

type Config

type Config struct {
	DataDir          string          `json:"data_dir"`
	CommitInterval   time.Duration   `json:"commit_interval"`
	CacheSize        int             `json:"cache_size"`
	QueryConcurrency int             `json:"query_concurrency"`
	IndexingOptions  IndexingOptions `json:"indexing_options"`
	EncryptionKey    []byte          `json:"-"`
	Compression      bool            `json:"compression"`
	EnableProfiling  bool            `json:"enable_profiling"`
}

type DBStats

type DBStats struct {
	Queries            int64
	CacheHits          int64
	InsertedVectors    int64
	TotalSearchTime    int64
	TotalInsertTime    int64
	LastCommitDuration time.Duration
	// contains filtered or unexported fields
}

type EmbeddingModel

type EmbeddingModel string
const (
	BGEBaseEN     EmbeddingModel = "fast-bge-base-en"
	BGEBaseENV15  EmbeddingModel = "fast-bge-base-en-v1.5"
	BGESmallEN    EmbeddingModel = "fast-bge-small-en"
	BGESmallENV15 EmbeddingModel = "fast-bge-small-en-v1.5"
	BGESmallZH    EmbeddingModel = "fast-bge-small-zh-v1.5"
)

type FlagEmbedding

type FlagEmbedding struct {
	// contains filtered or unexported fields
}

func NewFlagEmbedding

func NewFlagEmbedding(options *InitOptions) (*FlagEmbedding, error)

func (*FlagEmbedding) Destroy

func (f *FlagEmbedding) Destroy() error

func (*FlagEmbedding) Embed

func (f *FlagEmbedding) Embed(input []string, batchSize int) ([]([]float32), error)

func (*FlagEmbedding) PassageEmbed

func (f *FlagEmbedding) PassageEmbed(input []string, batchSize int) ([]([]float32), error)

func (*FlagEmbedding) QueryEmbed

func (f *FlagEmbedding) QueryEmbed(input string) ([]float32, error)

type IndexingOptions

type IndexingOptions struct {
	UseHNSW              bool               `json:"use_hnsw"`
	M                    int                `json:"m"`
	N                    int                `json:"n"`
	EfConstruction       int                `json:"ef_construction"`
	Quantization         QuantizationMethod `json:"quantization"`
	NumShards            int                `json:"num_shards"`
	ParallelConstruction bool               `json:"parallel_construction"`
	EdgesPerCentroid     int                `json:"edges_per_centroid"`
	UsePQ                bool               `json:"use_pq"`
	NumPQSubspaces       int                `json:"num_pq_subspaces"`
	NumPQClusters        int                `json:"num_pq_clusters"`
	SubRegionPortion     float32            `json:"sub_region_portion"`
	VLQMaxResults        int                `json:"vlq_max_results"`
}

type InitOptions

type InitOptions struct {
	Model                EmbeddingModel
	ExecutionProviders   []string
	MaxLength            int
	CacheDir             string
	ShowDownloadProgress *bool
}

type LRUCache

type LRUCache struct {
	// contains filtered or unexported fields
}

func (*LRUCache) Get

func (c *LRUCache) Get(key uint64) (float32, bool)

func (*LRUCache) Put

func (c *LRUCache) Put(key uint64, value float32)

type MathStats

type MathStats struct {
	TotalOperations    int64
	FastPathOps        int64
	BlasOperations     int64
	PrecisionFallbacks int64
	VectorOperations   int64
}

func GetMathStats

func GetMathStats() MathStats

type MemoryoptimisedVector

type MemoryoptimisedVector struct {
	Data       []float32
	Dimensions int
}

type ModelInfo

type ModelInfo struct {
	Model       EmbeddingModel
	Dim         int
	Description string
}

func ListSupportedModels

func ListSupportedModels() []ModelInfo

type OnnxRuntime

type OnnxRuntime string
const (
	ORT_MAC_UNIVERSAL OnnxRuntime = "libonnxruntime.1.22.0.dylib"
)

type QuantizationMethod

type QuantizationMethod int
const (
	QuantizationNone       QuantizationMethod = 0
	QuantizationScalar8Bit QuantizationMethod = 1
	QuantizationPQ         QuantizationMethod = 2
	QuantizationOPQ        QuantizationMethod = 3
	QuantizationResidualPQ QuantizationMethod = 4
	DefaultQuantization                       = QuantizationNone
)

type QuantizedVector

type QuantizedVector struct {
	Data       []byte
	Dimensions int
	Scale      float32
	Offset     float32
}

type QueryRequest

type QueryRequest struct {
	Query    []float32
	Options  *SearchOptions
	ResultCh chan<- []SearchResult
	ErrorCh  chan<- error
}

type SearchMethod

type SearchMethod int
const (
	SearchMethodBruteForce SearchMethod = iota
	SearchMethodHNSW
	SearchMethodVLQADC
)

type SearchOptions

type SearchOptions struct {
	MaxResults          int                `json:"max_results"`
	SimilarityThreshold float32            `json:"similarity_threshold"`
	MetadataFilter      func(any) bool     `json:"-"`
	SortByCreated       bool               `json:"sort_by_created"`
	SortByUpdated       bool               `json:"sort_by_updated"`
	Quantization        QuantizationMethod `json:"quantization"`
	UseANN              bool               `json:"use_ann"`
	EfSearch            int                `json:"ef_search"`
	BFSMode             bool               `json:"bfs_mode"`
	GreedyMode          bool               `json:"greedy_mode"`
	SearchContext       context.Context    `json:"-"`
}

type SearchResult

type SearchResult struct {
	Record   VectorRecord `json:"record"`
	Score    float32      `json:"score"`
	Distance float32      `json:"distance"`
}

type ShardedCollectionData

type ShardedCollectionData struct {
	Vectors      map[string][]float32
	Quantized    map[string]QuantizedVector
	Metadata     map[string]any
	Created      map[string]time.Time
	Updated      map[string]time.Time
	HNSWGraph    *hnswGraph
	Mutex        sync.RWMutex
	PQCentroids  [][]float32
	PQCodes      map[string][]byte
	NumSubspaces int

	AsyncWorkerStop  chan struct{}
	AsyncWorkerGroup sync.WaitGroup
	// contains filtered or unexported fields
}

func (*ShardedCollectionData) StopAsyncWorkers

func (shard *ShardedCollectionData) StopAsyncWorkers()

type VectorRecord

type VectorRecord struct {
	ID         string    `json:"id"`
	Vector     []float32 `json:"vector"`
	Metadata   any       `json:"metadata,omitempty"`
	Created    time.Time `json:"created"`
	Updated    time.Time `json:"updated"`
	Collection string    `json:"collection"`
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

func NewWorkerPool

func NewWorkerPool(size int) *WorkerPool

func (*WorkerPool) Submit

func (wp *WorkerPool) Submit(job func())

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL