file

package module
v0.10.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2025 License: Apache-2.0 Imports: 10 Imported by: 1

README

Conduit Connector File

The File plugin is one of Conduit builtin plugins. It provides both source and destination File connectors, allowing for a file to be either a source, or a destination in a Conduit pipeline.

How it works

The Source connector listens for changes appended to the source file and sends records with the changes. The Destination connector receives records and writes them to a file.

Source

The Source connector only cares to have a valid path, even if the file doesn't exist, it will still run and wait until a file with the configured name is there, then it will start listening to changes and sending records.

Destination

The Destination connector will create the file if it doesn't exist, and records with changes will be appended to the destination file when received.

Source Configuration Parameters

version: 2.2
pipelines:
  - id: example
    status: running
    connectors:
      - id: example
        plugin: "file"
        settings:
          # Path is the file path used by the connector to read/write records.
          # Type: string
          # Required: yes
          path: ""
          # Maximum delay before an incomplete batch is read from the source.
          # Type: duration
          # Required: no
          sdk.batch.delay: "0"
          # Maximum size of batch before it gets read from the source.
          # Type: int
          # Required: no
          sdk.batch.size: "0"
          # Specifies whether to use a schema context name. If set to false, no
          # schema context name will be used, and schemas will be saved with the
          # subject name specified in the connector (not safe because of name
          # conflicts).
          # Type: bool
          # Required: no
          sdk.schema.context.enabled: "true"
          # Schema context name to be used. Used as a prefix for all schema
          # subject names. If empty, defaults to the connector ID.
          # Type: string
          # Required: no
          sdk.schema.context.name: ""
          # Whether to extract and encode the record key with a schema.
          # Type: bool
          # Required: no
          sdk.schema.extract.key.enabled: "false"
          # The subject of the key schema. If the record metadata contains the
          # field "opencdc.collection" it is prepended to the subject name and
          # separated with a dot.
          # Type: string
          # Required: no
          sdk.schema.extract.key.subject: "key"
          # Whether to extract and encode the record payload with a schema.
          # Type: bool
          # Required: no
          sdk.schema.extract.payload.enabled: "false"
          # The subject of the payload schema. If the record metadata contains
          # the field "opencdc.collection" it is prepended to the subject name
          # and separated with a dot.
          # Type: string
          # Required: no
          sdk.schema.extract.payload.subject: "payload"
          # The type of the payload schema.
          # Type: string
          # Required: no
          sdk.schema.extract.type: "avro"

Destination Configuration Parameters

version: 2.2
pipelines:
  - id: example
    status: running
    connectors:
      - id: example
        plugin: "file"
        settings:
          # Path is the file path used by the connector to read/write records.
          # Type: string
          # Required: yes
          path: ""
          # Maximum delay before an incomplete batch is written to the
          # destination.
          # Type: duration
          # Required: no
          sdk.batch.delay: "0"
          # Maximum size of batch before it gets written to the destination.
          # Type: int
          # Required: no
          sdk.batch.size: "0"
          # Allow bursts of at most X records (0 or less means that bursts are
          # not limited). Only takes effect if a rate limit per second is set.
          # Note that if `sdk.batch.size` is bigger than `sdk.rate.burst`, the
          # effective batch size will be equal to `sdk.rate.burst`.
          # Type: int
          # Required: no
          sdk.rate.burst: "0"
          # Maximum number of records written per second (0 means no rate
          # limit).
          # Type: float
          # Required: no
          sdk.rate.perSecond: "0"
          # The format of the output record. See the Conduit documentation for a
          # full list of supported formats
          # (https://conduit.io/docs/using/connectors/configuration-parameters/output-format).
          # Type: string
          # Required: no
          sdk.record.format: "opencdc/json"
          # Options to configure the chosen output record format. Options are
          # normally key=value pairs separated with comma (e.g.
          # opt1=val2,opt2=val2), except for the `template` record format, where
          # options are a Go template.
          # Type: string
          # Required: no
          sdk.record.format.options: ""
          # Whether to extract and decode the record key with a schema.
          # Type: bool
          # Required: no
          sdk.schema.extract.key.enabled: "true"
          # Whether to extract and decode the record payload with a schema.
          # Type: bool
          # Required: no
          sdk.schema.extract.payload.enabled: "true"

How to build it

Run make.

Testing

Run make test to run all the tests.

Limitations

  • The Source connector only detects appended changes to the file, so it doesn't detect deletes or edits.
  • The connectors can only access local files on the machine where Conduit is running. So, running Conduit on a server means it can't access a file on your local machine.
  • Currently, only works reliably with text files (may work with non-text files, but not guaranteed)

scarf pixel

Documentation

Index

Constants

View Source
const MetadataFilePath = "file.path"

Variables

View Source
var Connector = sdk.Connector{
	NewSpecification: sdk.YAMLSpecification(specs, version),
	NewSource:        NewSource,
	NewDestination:   NewDestination,
}

Functions

func NewDestination

func NewDestination() sdk.Destination

func NewSource

func NewSource() sdk.Source

Types

type Config added in v0.7.0

type Config struct {
	// Path is the file path used by the connector to read/write records.
	Path string `json:"path" validate:"required"`
}

func (Config) Validate added in v0.7.0

func (c Config) Validate() error

type Destination

type Destination struct {
	sdk.UnimplementedDestination
	// contains filtered or unexported fields
}

func (*Destination) Config added in v0.10.0

func (d *Destination) Config() sdk.DestinationConfig

func (*Destination) Open

func (d *Destination) Open(context.Context) error

func (*Destination) Teardown

func (d *Destination) Teardown(context.Context) error

func (*Destination) Write

func (d *Destination) Write(_ context.Context, recs []opencdc.Record) (int, error)

type DestinationConfig added in v0.7.0

type DestinationConfig struct {
	sdk.DefaultDestinationMiddleware

	Config // embed the global config
}

func (DestinationConfig) Validate added in v0.7.0

func (c DestinationConfig) Validate(context.Context) error

type Source

type Source struct {
	sdk.UnimplementedSource
	// contains filtered or unexported fields
}

func (*Source) Ack

func (*Source) Config added in v0.10.0

func (s *Source) Config() sdk.SourceConfig

func (*Source) Open

func (s *Source) Open(ctx context.Context, position opencdc.Position) error

func (*Source) Read

func (s *Source) Read(ctx context.Context) (opencdc.Record, error)

func (*Source) Teardown

func (s *Source) Teardown(context.Context) error

type SourceConfig added in v0.7.0

type SourceConfig struct {
	sdk.DefaultSourceMiddleware

	Config // embed the global config
}

func (SourceConfig) Validate added in v0.7.0

func (c SourceConfig) Validate(context.Context) error

Directories

Path Synopsis
cmd
connector command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL