avpipeline

package module
v0.0.0-...-8b3942e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2025 License: CC0-1.0 Imports: 15 Imported by: 10

README

About

This package is targeted on unlocking libav capabilities to build dynamic pipelines for processing audio/video.

For example:

	ctx, cancelFn := context.WithCancel(ctx)
	defer cancelFn()

	// input node

	logger.Debugf(ctx, "opening '%s' as the input...", fromURL)
	input, err := processor.NewInputFromURL(ctx, fromURL, secret.New(""), kernel.InputConfig{})
	assert(ctx, err == nil, err)
	defer input.Close(ctx)
	inputNode := node.New(input)

	// output node

	logger.Debugf(ctx, "opening '%s' as the output...", toURL)
	output, err := processor.NewOutputFromURL(
		ctx,
		toURL, secret.New(""),
		kernel.OutputConfig{},
	)
	assert(ctx, err == nil, err)
	defer output.Close(ctx)
	outputNode := node.New(output)

	// transcoder node

	hwDevName := codec.HardwareDeviceName(*hwDeviceName)
	transcoder, err := processor.NewTranscoder(
		ctx,
		codec.NewNaiveDecoderFactory(ctx, &codec.NaiveDecoderFactoryParams{
			HardwareDeviceName: hwDeviceName,
		}),
		codec.NewNaiveEncoderFactory(ctx, &codec.NaiveEncoderFactoryParams{
			VideoCodec:         *videoCodec,
			AudioCodec:         codec.CodecNameCopy,
			HardwareDeviceType: 0,
			HardwareDeviceName: hwDeviceName,
			VideoOptions:       types.DictionaryItems{{Key: "bf", Value: "0"}}.ToAstiav(),
		}),
		nil,
	)
	assert(ctx, err == nil, err)
	defer transcoder.Close(ctx)
	logger.Debugf(ctx, "initialized a transcoder to %s (hwdev:%s)...", *videoCodec, hwDeviceName)
	transcodingNode := node.New(transcoder)

	// route nodes: input -> transcoder -> output

	inputNode.AddPushPacketsTo(transcodingNode)
	transcodingNode.AddPushPacketsTo(outputNode)
	logger.Debugf(ctx, "resulting pipeline: %s", inputNode.String())
	logger.Debugf(ctx, "resulting pipeline (for graphviz):\n%s\n", inputNode.DotString(false))

	// start

	errCh := make(chan node.Error, 10)
	observability.Go(ctx, func() {
		defer cancelFn()
		avpipeline.Serve(ctx, avpipeline.ServeConfig{
			EachNode: node.ServeConfig{
				FrameDrop: *frameDrop,
			},
		}, errCh, inputNode)
	})

	// observe

	statusTicker := time.NewTicker(time.Second)
	defer statusTicker.Stop()
	for {
		select {
		case <-ctx.Done():
			logger.Infof(ctx, "finished")
			return
		case err, ok := <-errCh:
			if !ok {
				return
			}
			if errors.Is(err.Err, context.Canceled) {
				continue
			}
			if errors.Is(err.Err, io.EOF) {
				continue
			}
			if err.Err != nil {
				logger.Fatal(ctx, err)
				return
			}
		case <-statusTicker.C:
			inputStats := inputNode.GetStats()
			inputStatsJSON, err := json.Marshal(inputStats.FramesWrote)
			assert(ctx, err == nil, err)

			outputStats := outputNode.GetStats()
			outputStatsJSON, err := json.Marshal(outputStats.FramesRead)
			assert(ctx, err == nil, err)

			fmt.Printf("input:%s -> output:%s\n", inputStatsJSON, outputStatsJSON)
		}
	}

Examples

  • avd users avpipeline to implement a streaming server (as an alternative to mediamtx).
  • ffstream uses avpipeline to implement a CLI that could be used as a kick-in replacement to ffmpeg in some livestreaming use cases. It allows for dynamic change of bitrate and for enabling a passthrough mode (to disable transcoding).

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Drain

func Drain(
	ctx context.Context,
	setBlockInput *bool,
	nodes ...node.Abstract,
) (_err error)

func FindNodeByObjectID

func FindNodeByObjectID(
	ctx context.Context,
	objID globaltypes.ObjectID,
	nodes ...node.Abstract,
) (_ret node.Abstract, _err error)

func IsDrained

func IsDrained(
	ctx context.Context,
	nodes ...node.Abstract,
) (_ret bool)

func LogLevelFromAstiav

func LogLevelFromAstiav(level astiav.LogLevel) logger.Level

func LogLevelToAstiav

func LogLevelToAstiav(level logger.Level) astiav.LogLevel

func NextLayer

func NextLayer[T node.Abstract](
	ctx context.Context,
	nodes ...T,
) ([]node.Abstract, error)

func NotifyAboutPacketSources

func NotifyAboutPacketSources[T node.Abstract](
	ctx context.Context,
	packetSource packet.Source,
	nodes ...T,
) (_err error)

func Serve

func Serve[T node.Abstract](
	ctx context.Context,
	serveConfig ServeConfig,
	errCh chan<- node.Error,
	nodes ...T,
)

func SetBlockInput

func SetBlockInput(
	ctx context.Context,
	blocked bool,
	nodes ...node.Abstract,
) error

func Traverse

func Traverse[T node.Abstract](
	ctx context.Context,
	callback func(ctx context.Context, parent node.Abstract, item reflect.Type, node node.Abstract) error,
	nodes ...T,
) (_err error)

func WaitForDrain

func WaitForDrain(
	ctx context.Context,
	nodes ...node.Abstract,
) (_err error)

Types

type ErrNotFound

type ErrNotFound struct{}

func (ErrNotFound) Error

func (e ErrNotFound) Error() string

type ErrTraverseStop

type ErrTraverseStop struct{}

func (ErrTraverseStop) Error

func (ErrTraverseStop) Error() string

type ServeConfig

type ServeConfig struct {
	EachNode             node.ServeConfig
	NodeTreeFilter       condition.Condition
	NodeFilter           condition.Condition
	AutoServeNewBranches bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL