Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrFilteredOut = errors.New("filtered out")
ErrFilteredOut is returned by a processor when a token should be filtered out of the pipeline
Functions ¶
This section is empty.
Types ¶
type DataProcessor ¶ added in v0.2.0
DataProcessor is an interface for processing data
type DataWriter ¶
DataWriter is an interface for writing data to a destination.
type Pipeline ¶
type Pipeline[T any] struct { // contains filtered or unexported fields }
Pipeline runs a series of workers in parallel. All workers run at the same time in separate goroutines. Each stage of workers are connected by a single channel that is the size of the number of workers in the stage. The pipeline stops when a worker returns an error or all workers are done. workers should not close the send channel, the pipeline stages handle that. Pipelines can be chained together by using them as workers.
func NewPipeline ¶
NewPipeline creates a new DataPipeline.
func (*Pipeline[T]) Run ¶
Run starts the pipeline. The pipeline stops when a worker returns an error, all workers are done, or the context is canceled.
func (*Pipeline[T]) SetReceiveChan ¶
func (dp *Pipeline[T]) SetReceiveChan(c <-chan T)
SetReceiveChan sets the receive channel for the pipeline. The receive channel is used as the input to the first stage. This satisfies the Worker interface so that pipelines can be chained together. Usually users will not need to call this method.
func (*Pipeline[T]) SetSendChan ¶
func (dp *Pipeline[T]) SetSendChan(c chan<- T)
SetSendChan sets the send channel for the pipeline. The send channel is used as the output from the last stage. This satisfies the Worker interface so that pipelines can be chained together. Usually users will not need to call this method.
type ProcessorWorker ¶ added in v0.2.0
type ProcessorWorker[T any] struct { // contains filtered or unexported fields }
ProcessorWorker implements the pipeline.Worker interface It wraps a DataProcessor and processes data with it
func NewProcessorWorker ¶ added in v0.2.0
func NewProcessorWorker[T any](processor DataProcessor[T]) *ProcessorWorker[T]
NewProcessorWorker creates a new ProcessorWorker
func (*ProcessorWorker[T]) Run ¶ added in v0.2.0
func (w *ProcessorWorker[T]) Run(ctx context.Context) error
Run starts the ProcessorWorker
func (*ProcessorWorker[T]) SetReceiveChan ¶ added in v0.2.0
func (w *ProcessorWorker[T]) SetReceiveChan(c <-chan T)
SetReceiveChan sets receive channel for the ProcessorWorker
func (*ProcessorWorker[T]) SetSendChan ¶ added in v0.2.0
func (w *ProcessorWorker[T]) SetSendChan(c chan<- T)
SetSendChan sets the send channel for the ProcessorWorker
type Worker ¶
type Worker[T any] interface { SetSendChan(chan<- T) SetReceiveChan(<-chan T) Run(context.Context) error }
Worker is an interface for a pipeline item Each worker has a send and receive channel that connects it to the previous and next stage in the pipeline The Run method starts the worker
func NewReadWorker ¶
NewReadWorker creates a new ReadWorker.
func NewWriteWorker ¶
func NewWriteWorker[T any](writer DataWriter[T], limiter *rate.Limiter) Worker[T]
NewWriteWorker creates a new Worker.