quacfka

package module
v0.5.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2025 License: Apache-2.0 Imports: 28 Imported by: 1

README

Quacfka 🏹🦆

Go Reference

Go library to stream Kafka protobuf messages to DuckDB. Uses generics. Use your protobuf message as a type parameter to autogenerate an Arrow schema, provide a protobuf unmarshaling func, and stream data into DuckDB with a very high throughput.

Features

Arrow schema generation from a protobuf message type parameter
  • Converts a proto.Message into an Apache Arrow schema
    • Supports nested types
Configurable loggers
  • Set Debug, Error, and benchmark loggers

🚀 Install

Using Quacfka is easy. First, use go get to install the latest version of the library.

go get -u github.com/loicalleyne/quacfka@latest

💡 Usage

You can import quacfka using:

import "github.com/loicalleyne/quacfka"

Create a new Orchestrator, configure the Kafka client, processing and DuckDB, then Run(). Kafka client can be configured with a slice of franz-go/pkg/kgo.Opt or SASL user/pass auth.

	import q "github.com/loicalleyne/quacfka"
	/* 
	Options:
	- create with DuckDB file rotation
	- create with a CustomArrow function to munge the Arrow record in-flight and insert it to another DuckDB table
	- create with a Normalizer to create a normalized Arrow Record you can write to in your proto decode function
	- run without Kafka
	- run without protobuf message decoding
	- run without duckdb
	 */
	normFields := []string{"id", "site.id", "timestamp.seconds", "stores[0].gallery.deals.id"}
	normAliases := []string{"id", "site", "event_time", "deal"}
    o, err := q.NewOrchestrator[*your.CustomProtoMessageType](q.WithFileRotateThresholdMB(5000), q.WithCustomArrows([]q.CustomArrow{{CustomFunc: flattenNestedForAgg, DestinationTable: "test"}}),q.WithNormalizer(normFields, normAliases, false))
	if err != nil {
		panic(err)
	}
	defer o.Close()
	q.SetDebugLogger(log.Printf)
	q.SetErrorLogger(log.Printf)
	q.SetFatalLogger(log.Fatalf)
	q.SetBenchmarkLogger(log.Printf)
    k := o.NewKafkaConfig()
	k.ClientCount.Store(int32(*kafkaRoutines))
	k.MsgChanCap = 122880 * 5
	k.ConsumerGroup = os.Getenv("CONSUMER_GROUP")
	k.Seeds = append(k.Seeds, os.Getenv("KAFKA_SEED"))
	k.User = os.Getenv("KAFKA_USER")
	k.Password = os.Getenv("KAFKA_PW")
	k.Munger = messageMunger
	k.Topic = "kafka.topic01"
    // Tune record channel capacity, row group size, number of processing routines, set custom unmarshal func
	err = o.ConfigureProcessor(*duckRoutines*3, 1, *routines, customProtoUnmarshal)
	if err != nil {
		log.Println(err)
		panic(err)
	}
	var driverPath string
	switch runtime.GOOS {
	case "darwin":
		driverPath = "/usr/local/lib/libduckdb.so.dylib"
	case "linux":
		driverPath = "/usr/local/lib/libduckdb.so"
	case "windows":
		h, _ := os.UserHomeDir()
		driverPath = h + "\\Downloads\\libduckdb-windows-amd64\\duckdb.dll"
	default:
	}
	err = o.ConfigureDuck(q.WithPathPrefix("duck"), q.WithDriverPath(driverPath), q.WithDestinationTable("mytable"), q.WithDuckConnections(*duckRoutines))
	if err != nil {
		panic(err)
	}
	// Use MockKafka to generate random data for your custom proto to simulate consuming the protobuf from Kafka
	// wg.Add(1)
	// go o.MockKafka(ctxT, &wg, &rr.BidRequestEvent{Id: "1233242423243"})
	wg.Add(1)
	go o.Run(ctxT, &wg)
	// Get chan string of closed, rotated DuckDB files
	duckFiles := o.DuckPaths()
	...
	// Query duckdb files to aggregate, activate alerts, etc...
	...
	wg.Wait()
	// Check for processing errors
	if o.Error() != nil {
		log.Println(err)
	}
	// Print pipeline metrics
	log.Printf("%v\n", o.Report())
...
func customProtoUnmarshal(m []byte, s any) error {
	newMessage := rr.BidRequestEventFromVTPool()
	err := newMessage.UnmarshalVTUnsafe(m)
	if err != nil {
		return err
	}
	// Assert s to `*bufarrow.Schema[*your.CustomProtoMessageType]`
	// Populate the Normalizer Arrow Record with flattened data
	rb := s.(*bufarrow.Schema[*rr.BidRequestEvent]).NormalizerBuilder()
	if rb != nil {
		b := rb.Fields()
		if b != nil {
			id := newMessage.GetId()
			site := newMessage.GetSite().GetId()
			timestampSeconds := newMessage.GetTimestamp().GetSeconds()
			if len(newMessage.GetStores()[0].GetGallery().GetDeals()) == 0 {
				b[0].(*array.StringBuilder).Append(id)
				b[1].(*array.StringBuilder).Append(site)
				b[2].(*array.Int64Builder).Append(timestampSeconds)
				b[3].(*array.StringBuilder).AppendNull()
			}
			for i := 0; i < len(newMessage.GetImp()[0].GetPmp().GetDeals()); i++ {
				b[0].(*array.StringBuilder).Append(id)
				b[1].(*array.StringBuilder).Append(site)
				b[2].(*array.Int64Builder).Append(timestampSeconds)
				b[3].(*array.StringBuilder).Append(newMessage.GetImp()[0].GetPmp().GetDeals()[i].GetId())
			}
		}

	// Assert s to `*bufarrow.Schema[*your.CustomProtoMessageType]`
	s.(*bufarrow.Schema[*your.CustomProtoMessageType]).Append(newMessage)
	newMessage.ReturnToVTPool()
	return nil
}

// Custom protobuf wire format bytes munger
// Confluent Java client adds magic bytes at beginning of message which will cause
// protobuf decoding to fail if not removed
func messageMunger(m []byte) []byte {
	return m[6:]
}

// Custom Arrow function to build a new Arrow Record from the main processing output Record 
func flattenNestedForAgg(ctx context.Context, dest string, record arrow.Record) arrow.Record {
	...
	return mungedRecord
}
// {
//   "num_cpu": 60,
//   "runtime_os": "linux",
//   "kafka_clients": 5,
//   "kafka_queue_cap": 983040,
//   "processor_routines": 32,
//   "arrow_queue_cap": 4,
//   "duckdb_threshold_mb": 4200,
//   "duckdb_connections": 24,
//   "normalizer_fields": 10,
//   "start_time": "2025-02-24T21:06:23Z",
//   "end_time": "2025-02-24T21:11:23Z",
//   "records": "123_686_901.00",
//   "norm_records": "122_212_452.00",
//   "data_transferred": "146.53 GB",
//   "duration": "4m59.585s",
//   "records_per_second": "398_271.90",
//   "total_rows_per_second": "806_210.41",
//   "transfer_rate": "500.86 MB/second",
//   "duckdb_files": 9,
//   "duckdb_files_MB": 38429,
//   "file_avg_duration": "33.579s"
// }

Generate random data to emulate the Kafka topic

	wg.Add(1)
	// Instantiate a sample proto.Message to provide a description,
	// random data will be generated for all fields.
	go o.MockKafka(ctxT, &wg, &your.CustomProtoMessageType{Id: "1233242423243"})
	wg.Add(1)
	// WithFileRotateThresholdMB specifies a file rotation threshold target in MB (not very accurate yet)
	go o.Run(ctxT, &wg, q.WithoutKafka(), q.WithFileRotateThresholdMB(250))
	wg.Wait()

💫 Show your support

Give a ⭐️ if this project helped you! Feedback and PRs welcome.

Licence

Quacfka is released under the Apache 2.0 license. See LICENCE

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWaitGroupIsNil                = errors.New("*sync.waitgroup is nil")
	ErrMessageChanCapacityZero       = errors.New("chan []byte must have capacity > 0")
	ErrRecordChanCapacityZero        = errors.New("chan []arrow.record must have capacity > 0")
	ErrProcessingFuncIsNil           = errors.New("func([]byte, *bufa.Schema[T]) error is nil")
	ErrProcessingRoutineCountInvalid = errors.New("deserializer routine count not > 0")
	ErrRowGroupSizeMultiplier        = errors.New("row group size multiplier not > 0")
)
View Source
var (
	ErrMissingDuckDBConfig = errors.New("missing duckdb configuration")
)

Functions

func SetBenchmarkLogger

func SetBenchmarkLogger(logger Logger)

func SetDebugLogger

func SetDebugLogger(logger Logger)

func SetErrorLogger

func SetErrorLogger(logger Logger)

func SetFatalLogger

func SetFatalLogger(logger Logger)

func WithMessageCutConfluencePrefix added in v0.5.1

func WithMessageCutConfluencePrefix(m []byte) []byte

WithMessageCutConfluencePrefix removes 6 bytes that Confluence producer adds for schema registry metadata.

Types

type Cardinality added in v0.5.10

type Cardinality protoreflect.Cardinality

Cardinality determines whether a field is optional, required, or repeated.

const (
	Optional Cardinality = 1 // appears zero or one times
	Required Cardinality = 2 // appears exactly one time; invalid with Proto3
	Repeated Cardinality = 3 // appears zero or more times
)

Constants as defined by the google.protobuf.Cardinality enumeration.

func (*Cardinality) Get added in v0.5.10

func (c *Cardinality) Get() bufa.Cardinality

type CustomArrow added in v0.4.0

type CustomArrow struct {
	CustomFunc       func(context.Context, string, arrow.Record) arrow.Record
	DestinationTable string
}

type CustomField added in v0.5.10

type CustomField struct {
	Name             string
	Type             FieldType
	FieldCardinality Cardinality
	IsPacked         bool
}

type DuckOption

type DuckOption func(duckConfig)

func WithDestinationTable

func WithDestinationTable(p string) DuckOption

func WithDriverPath

func WithDriverPath(p string) DuckOption

func WithDuckConnections

func WithDuckConnections(p int) DuckOption

func WithDuckRunner added in v0.5.1

func WithDuckRunner(p *DuckRunner) DuckOption

Run queries in series after writes to db file are completed before rotating to next db file. Usage: exec := new(q.DuckRunner) exec.AddQueries(queries, false) exec.SetFunc(RunAggs) exec.SetDeleteOnDone(true) err = o.ConfigureDuck(q.WithPathPrefix("reqlog"), q.WithDriverPath(driverPath), q.WithDestinationTable("req"), q.WithDuckConnections(24), q.WithDuckRunner(exec))

func WithPath

func WithPath(p string) DuckOption

func WithPathPrefix

func WithPathPrefix(p string) DuckOption

type DuckRunner added in v0.5.1

type DuckRunner struct {
	// contains filtered or unexported fields
}

func (*DuckRunner) AddQueries added in v0.5.1

func (d *DuckRunner) AddQueries(queries []string, exec bool)

AddQueries adds queries to a DuckRunner and sets whether the runner should use RunExec instead of RunFunc to run the queries (RunExec should be used when no results from queries are expected).

func (*DuckRunner) Err added in v0.5.1

func (d *DuckRunner) Err() error

func (*DuckRunner) GetDB added in v0.5.1

func (d *DuckRunner) GetDB() *couac.Quacker

func (*DuckRunner) IsDeleteDBOnDone added in v0.5.1

func (d *DuckRunner) IsDeleteDBOnDone() bool

func (*DuckRunner) IsExec added in v0.5.1

func (d *DuckRunner) IsExec() bool

func (*DuckRunner) Path added in v0.5.1

func (d *DuckRunner) Path() string

func (*DuckRunner) Queries added in v0.5.1

func (d *DuckRunner) Queries() []string

func (*DuckRunner) Run added in v0.5.1

func (d *DuckRunner) Run(ctx context.Context) error

Run runs the defined queries, if exec is set to true does not expect queries to return any results otherwise will use query function to coordinate queries. Exec set to true is meant for running queries that aggregate to another table as well as EXPORT/COPY TO statements.

func (*DuckRunner) SetDeleteOnDone added in v0.5.1

func (d *DuckRunner) SetDeleteOnDone(b bool)

func (*DuckRunner) SetErr added in v0.5.1

func (d *DuckRunner) SetErr(err error)

func (*DuckRunner) SetFunc added in v0.5.1

func (d *DuckRunner) SetFunc(f func(*DuckRunner) error)

func (*DuckRunner) SetPath added in v0.5.1

func (d *DuckRunner) SetPath(p string)

type FieldType added in v0.5.10

type FieldType fieldType
const (
	BOOL    FieldType = "bool"
	BYTES   FieldType = "[]byte"
	STRING  FieldType = "string"
	INT64   FieldType = "int64"
	FLOAT64 FieldType = "float64"
)

func (*FieldType) Get added in v0.5.10

func (t *FieldType) Get() bufa.FieldType

type KafkaClientConf

type KafkaClientConf[T proto.Message] struct {

	// franz-go/pkg/kgo.Opt configurations. If any are set, these will override all
	//  of the subsequently listed client settings.
	ClientConf []kgo.Opt
	// Number of Kafka clients to open. Default value is 1. If using more than one
	// use of a consumer group is recommended.
	ClientCount atomic.Int32
	// Consumer group
	ConsumerGroup string
	// Instance prefix
	InstancePrefix string
	// Message channel capacity, must be greater than 0. Default capacity is 122880.
	MsgChanCap int
	// MsgTimeAppend sets whether to append the Kafka message timestamp as an 8 byte uint64
	// at end of message bytes. It is the deserializing's function responsibility
	// to truncate these prior to reading the protobuf message.
	// Use `time.Milli(int64(binary.LittleEndian.Uint64(b)))` to retrieve the timestamp.
	MsgTimeAppend bool
	// Function to munge message bytes prior to deserialization.
	// As an example, Confluent java client adds 6 magic bytes at
	// beginning of message data for use with Schema Registry which must
	// be removed from the message prior to deserialization.
	Munger func([]byte) []byte
	// Kafka TLS dialer
	TlsDialer *tls.Dialer
	// Kafka topic to consume
	Topic string
	// Kafka seed brokers
	Seeds []string
	// SASL Auth User
	User string
	// SASL Auth password
	Password string
	// contains filtered or unexported fields
}

type Logger

type Logger func(string, ...any)

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

type MetricsReport

type MetricsReport struct {
	NumCPU             int    `json:"num_cpu"`
	RuntimeOS          string `json:"runtime_os"`
	KafkaClientCount   int    `json:"kafka_clients"`
	KafkaQueueCap      int    `json:"kafka_queue_cap"`
	ProcessorCount     int    `json:"processor_routines"`
	ArrowQueueCap      int    `json:"arrow_queue_cap"`
	DuckDBThresholdMB  int    `json:"duckdb_threshold_mb"`
	DuckConnCount      int    `json:"duckdb_connections,omitzero"`
	CustomArrows       *int   `json:"custom_arrows,omitempty"`
	NormalizerFields   *int   `json:"normalizer_fields,omitempty"`
	StartTime          string `json:"start_time"`
	EndTime            string `json:"end_time,omitzero"`
	Records            string `json:"records"`
	NormRecords        string `json:"norm_records"`
	DataTransferred    string `json:"data_transferred"`
	Duration           string `json:"duration"`
	RecordsPerSec      string `json:"records_per_second"`
	TotalRecordsPerSec string `json:"total_rows_per_second"`
	TransferRate       string `json:"transfer_rate"`
	OutputFiles        int64  `json:"duckdb_files"`
	OutputFilesMB      int64  `json:"duckdb_files_MB"`
	AvgDurationPerFile string `json:"file_avg_duration,omitempty"`
}

type Opt

type Opt struct {
	// contains filtered or unexported fields
}

type Option

type Option func(config)

func WithCustomArrows added in v0.4.0

func WithCustomArrows(p []CustomArrow) Option

func WithCustomFields added in v0.5.10

func WithCustomFields(p []CustomField) Option

func WithDuckPathsChan added in v0.5.2

func WithDuckPathsChan(s int) Option

func WithFileRotateThresholdDurationSeconds added in v0.5.12

func WithFileRotateThresholdDurationSeconds(p int) Option

WithFileRotateThresholdDurationSeconds sets the database file rotation duration threshold in seconds. Minimum duration is 60 seconds

func WithFileRotateThresholdMB

func WithFileRotateThresholdMB(p int64) Option

WithFileRotateThresholdMB sets the database file rotation size. Minimum rotation threshold is 100MB.

func WithNormalizer added in v0.5.0

func WithNormalizer(fields, aliases []string, failOnRangeError bool) Option

WithNormalizer configures the scalars to add to a flat Arrow Record suitable for efficient aggregation. Protobuf data with nested messages converted to Arrow records is not only slower to insert into duckdb, running aggregation queries on nested data is much slower(by orders of magnitude). Fields should be specified by their path (field names separated by a period ie. 'field1.field2.field3'). The Arrow field types of the selected fields will be used to build the new schema. If coaslescing data between multiple fields of the same type, specify only one of the paths. List fields should have an index to retrieve specified, otherwise defaults to all elements; ranges are not yet implemented.

func WithoutDuck

func WithoutDuck() Option

func WithoutDuckIngestRaw added in v0.5.1

func WithoutDuckIngestRaw() Option

func WithoutKafka

func WithoutKafka() Option

func WithoutProcessing

func WithoutProcessing() Option

type Orchestrator

type Orchestrator[T proto.Message] struct {
	Metrics *Metrics
	// contains filtered or unexported fields
}

func NewOrchestrator

func NewOrchestrator[T proto.Message](opts ...Option) (*Orchestrator[T], error)

func (*Orchestrator[T]) ArrowQueueCapacity added in v0.5.0

func (o *Orchestrator[T]) ArrowQueueCapacity() int

func (*Orchestrator[T]) BenchmarksReport

func (o *Orchestrator[T]) BenchmarksReport() string

BenchmarksReport generates an indented live benchmark report with throughput rates sampled from the last 30 seconds.

func (*Orchestrator[T]) BenchmarksUnformatedReport added in v0.5.7

func (o *Orchestrator[T]) BenchmarksUnformatedReport() string

BenchmarksUnformatedReport generates an unindented live benchmark report with throughput rates sampled from the last 30 seconds.

func (*Orchestrator[T]) Close

func (o *Orchestrator[T]) Close()

Close closes the DuckDB database, if open.

func (*Orchestrator[T]) ConfigureDuck

func (o *Orchestrator[T]) ConfigureDuck(opts ...DuckOption) error

ConfigureDuck initializes DuckDB connection settings.

func (*Orchestrator[T]) ConfigureProcessor

func (o *Orchestrator[T]) ConfigureProcessor(rChanCap, rowGroupSizeMultiplier, routineCount int, unmarshalFunc func([]byte, any) error) error

func (*Orchestrator[T]) CurrentDBSize added in v0.4.0

func (o *Orchestrator[T]) CurrentDBSize() int64

func (*Orchestrator[T]) DuckConnCount

func (o *Orchestrator[T]) DuckConnCount() int

func (*Orchestrator[T]) DuckIngest

func (o *Orchestrator[T]) DuckIngest(ctx context.Context, w *sync.WaitGroup)

DuckIngest reads records from the record channel and ingests the Records into a single DuckDB database. Ingestion ends when the Record channel is closed and empty.

func (*Orchestrator[T]) DuckIngestWithRotate

func (o *Orchestrator[T]) DuckIngestWithRotate(ctx context.Context, w *sync.WaitGroup)

DuckIngestWithRotate reads records from the record channel and ingests the Records into rotating DuckDB files, rotating at the file size threshold defined. Ingestion ends when the Record channel is closed and empty.

func (*Orchestrator[T]) DuckPaths

func (o *Orchestrator[T]) DuckPaths() chan string

func (*Orchestrator[T]) Error

func (o *Orchestrator[T]) Error() error

func (*Orchestrator[T]) IsClosed added in v0.5.8

func (o *Orchestrator[T]) IsClosed() bool

IsClosed returns whether DuckDB database is open or not.

func (*Orchestrator[T]) KafkaClientCount

func (o *Orchestrator[T]) KafkaClientCount() int

func (*Orchestrator[T]) KafkaQueueCapacity added in v0.5.0

func (o *Orchestrator[T]) KafkaQueueCapacity() int

func (*Orchestrator[T]) MessageChan

func (o *Orchestrator[T]) MessageChan() chan []byte

func (*Orchestrator[T]) MessageChanClose added in v0.5.8

func (o *Orchestrator[T]) MessageChanClose()

func (*Orchestrator[T]) MessageChanSend added in v0.5.8

func (o *Orchestrator[T]) MessageChanSend(m []byte)

func (*Orchestrator[T]) MockKafka added in v0.3.0

func (o *Orchestrator[T]) MockKafka(ctx context.Context, w *sync.WaitGroup, p T)

MockKafka produces protobuf messages with random data in each field to the message channel. An initialized message with at least one field containing data must be passed as an argument for generation to work. Usage: wg.Add(1) go o.MockKafka(ctxT, &wg, &gen.RequestEvent{Id: "1233242423243"})

func (*Orchestrator[T]) MsgProcessorsCount

func (o *Orchestrator[T]) MsgProcessorsCount() int

func (*Orchestrator[T]) NewKafkaConfig

func (o *Orchestrator[T]) NewKafkaConfig() *KafkaClientConf[T]

func (*Orchestrator[T]) NewMetrics

func (o *Orchestrator[T]) NewMetrics()

func (*Orchestrator[T]) ProcessMessages

func (o *Orchestrator[T]) ProcessMessages(ctx context.Context, wg *sync.WaitGroup)

ProcessMessages creates a pool of deserializer goroutines

func (*Orchestrator[T]) RecordChan

func (o *Orchestrator[T]) RecordChan() chan Record

func (*Orchestrator[T]) RecordChanClose added in v0.5.8

func (o *Orchestrator[T]) RecordChanClose()

func (*Orchestrator[T]) RecordChanSend added in v0.5.8

func (o *Orchestrator[T]) RecordChanSend(r Record)

func (*Orchestrator[T]) Report

func (o *Orchestrator[T]) Report() string

Report generates a summary of the collected Metrics

func (*Orchestrator[T]) ReportJSONL

func (o *Orchestrator[T]) ReportJSONL() string

Report generates a summary of the collected Metrics

func (*Orchestrator[T]) ResetMetrics added in v0.5.0

func (o *Orchestrator[T]) ResetMetrics()

ResetMetrics resets all metrics to zero.

func (*Orchestrator[T]) RestartDuck added in v0.5.8

func (o *Orchestrator[T]) RestartDuck() error

Reopen DuckDB database using an existing DuckDB configuration. Returns an error if ConfigureDuck has not been previously run or if Orchestrator has not been closed.

func (*Orchestrator[T]) Run

func (o *Orchestrator[T]) Run(ctx context.Context, wg *sync.WaitGroup)

func (*Orchestrator[T]) Schema

func (o *Orchestrator[T]) Schema() *bufa.Schema[T]

func (*Orchestrator[T]) StartMetrics

func (o *Orchestrator[T]) StartMetrics()

func (*Orchestrator[T]) UpdateMetrics

func (o *Orchestrator[T]) UpdateMetrics()

UpdateMetrics calculates the total duration, throughput, and throughput in bytes.

type Record added in v0.5.0

type Record struct {
	Raw  arrow.Record
	Norm arrow.Record
}

type TypedMetricsReport

type TypedMetricsReport struct {
	NumCPU             int     `json:"num_cpu"`
	RuntimeOS          string  `json:"runtime_os"`
	KafkaClientCount   int     `json:"kafka_clients"`
	KafkaQueueCap      int     `json:"kafka_queue_cap"`
	ProcessorCount     int     `json:"processor_routines"`
	ArrowQueueCap      int     `json:"arrow_queue_cap"`
	DuckDBThresholdMB  int     `json:"duckdb_threshold_mb"`
	DuckConnCount      int     `json:"duckdb_connections,omitzero"`
	CustomArrows       *int    `json:"custom_arrows,omitempty"`
	NormalizerFields   *int    `json:"normalizer_fields,omitempty"`
	StartTime          string  `json:"start_time"`
	EndTime            string  `json:"end_time"`
	Records            int64   `json:"records"`
	NormRecords        int64   `json:"norm_records"`
	DataTransferred    int64   `json:"bytes_transferred"`
	Duration           int64   `json:"duration_nano"`
	RecordsPerSec      float64 `json:"records_per_second"`
	TotalRecordsPerSec float64 `json:"total_rows_per_second"`
	TransferRate       string  `json:"transfer_rate"`
	OutputFiles        int64   `json:"duckdb_files"`
	OutputFilesMB      int64   `json:"duckdb_files_MB"`
	AvgDurationPerFile float64 `json:"file_avg_duration,omitzero"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL