pipeline

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrFilteredOut = errors.New("filtered out")

ErrFilteredOut is returned by a processor when a token should be filtered out of the pipeline

Functions

This section is empty.

Types

type DataProcessor added in v0.2.0

type DataProcessor[T any] interface {
	Process(T) (T, error)
}

DataProcessor is an interface for processing data

type DataWriter

type DataWriter[T any] interface {
	Write(T) (n int, err error)
	Close() (err error)
}

DataWriter is an interface for writing data to a destination.

type Pipeline

type Pipeline[T any] struct {
	// contains filtered or unexported fields
}

Pipeline runs a series of workers in parallel. All workers run at the same time in separate goroutines. Each stage of workers are connected by a single channel that is the size of the number of workers in the stage. The pipeline stops when a worker returns an error or all workers are done. workers should not close the send channel, the pipeline stages handle that. Pipelines can be chained together by using them as workers.

func NewPipeline

func NewPipeline[T any](isSynced bool, workGroups ...[]Worker[T]) (*Pipeline[T], error)

NewPipeline creates a new DataPipeline.

func (*Pipeline[T]) Run

func (dp *Pipeline[T]) Run(ctx context.Context) error

Run starts the pipeline. The pipeline stops when a worker returns an error, all workers are done, or the context is canceled.

func (*Pipeline[T]) SetReceiveChan

func (dp *Pipeline[T]) SetReceiveChan(c <-chan T)

SetReceiveChan sets the receive channel for the pipeline. The receive channel is used as the input to the first stage. This satisfies the Worker interface so that pipelines can be chained together. Usually users will not need to call this method.

func (*Pipeline[T]) SetSendChan

func (dp *Pipeline[T]) SetSendChan(c chan<- T)

SetSendChan sets the send channel for the pipeline. The send channel is used as the output from the last stage. This satisfies the Worker interface so that pipelines can be chained together. Usually users will not need to call this method.

type ProcessorWorker added in v0.2.0

type ProcessorWorker[T any] struct {
	// contains filtered or unexported fields
}

ProcessorWorker implements the pipeline.Worker interface It wraps a DataProcessor and processes data with it

func NewProcessorWorker added in v0.2.0

func NewProcessorWorker[T any](processor DataProcessor[T]) *ProcessorWorker[T]

NewProcessorWorker creates a new ProcessorWorker

func (*ProcessorWorker[T]) Run added in v0.2.0

func (w *ProcessorWorker[T]) Run(ctx context.Context) error

Run starts the ProcessorWorker

func (*ProcessorWorker[T]) SetReceiveChan added in v0.2.0

func (w *ProcessorWorker[T]) SetReceiveChan(c <-chan T)

SetReceiveChan sets receive channel for the ProcessorWorker

func (*ProcessorWorker[T]) SetSendChan added in v0.2.0

func (w *ProcessorWorker[T]) SetSendChan(c chan<- T)

SetSendChan sets the send channel for the ProcessorWorker

type Worker

type Worker[T any] interface {
	SetSendChan(chan<- T)
	SetReceiveChan(<-chan T)
	Run(context.Context) error
}

Worker is an interface for a pipeline item Each worker has a send and receive channel that connects it to the previous and next stage in the pipeline The Run method starts the worker

func NewReadWorker

func NewReadWorker[T any](reader dataReader[T]) Worker[T]

NewReadWorker creates a new ReadWorker.

func NewWriteWorker

func NewWriteWorker[T any](writer DataWriter[T], limiter *rate.Limiter) Worker[T]

NewWriteWorker creates a new Worker.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL