sequin

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2024 License: MIT Imports: 12 Imported by: 2

README

sequin-go

A Go SDK for sending, receiving, and acknowledging messages in Sequin streams. For easy development and testing, it also comes with helpful methods for managing the lifecycle of streams and consumers.

See the docs on pkg.go.dev.

Installing

Install the library:

go get github.com/sequinstream/sequin-go

Initializing

You'll typically initialize a Sequin Client once in your application. Create a new file to initialize the Client in, and use it in other parts of your app:

package sequin

import (
    "os"
    "github.com/sequinstream/sequin-go"
)

var Client *sequin.Client

func init() {
    baseURL := os.Getenv("SEQUIN_URL")
    if baseURL == "" {
        baseURL = "http://localhost:7376"
    }

    Client = sequin.NewClient(baseURL)
}

By default, the Client is initialized using Sequin's default host and port in local development: http://localhost:7376

Usage

You'll predominantly use sequin-go to send, receive, and acknowledge messages in Sequin streams:

package main

import (
    "fmt"
    "yourproject/sequin"
)

func main() {
    // Define your stream and consumer
    stream := "your-stream-name"
    consumer := "your-consumer-name"

    // Send a message
    res, err := sequin.Client.SendMessage(stream, "test.1", "Hello, Sequin!")
    if err != nil {
        fmt.Printf("Error sending message: %v\n", err)
        // Handle the error appropriately
    } else {
        fmt.Printf("Message sent successfully: %+v\n", res)
    }

    // Receive a message
    message, err := sequin.Client.ReceiveMessage(stream, consumer)
    if err != nil {
        fmt.Printf("Error receiving message: %v\n", err)
    } else if message == nil {
        fmt.Println("No messages available")
    } else {
        fmt.Printf("Received message: %+v\n", message)
        // Don't forget to acknowledge the message
        ackErr := sequin.AckMessage(stream, consumer, message.ackId)
        if ackErr != nil {
            fmt.Printf("Error acking message: %v\n", ackErr)
        } else {
            fmt.Println("Message acked")
        }
    }
}

Testing

To adequately test Sequin, we recommend creating temporary streams and consumers in addition to testing sending and receiving messages. Here's an example using the testing package:

package sequin_test

import (
    "testing"
    "time"
    "github.com/sequinstream/sequin-go"
)

func TestSequinStreamAndConsumer(t *testing.T) {
    client := sequin.NewClient("http://localhost:7673")

    streamName := "test-stream-" + time.Now().Format("20060102150405")
    consumerName := "test-consumer-" + time.Now().Format("20060102150405")

    // Create a new stream
    stream, err := client.CreateStream(streamName, nil)
    if err != nil {
        t.Fatalf("Error creating stream: %v", err)
    }
    if stream.Name != streamName {
        t.Fatalf("Expected stream name %s, got %s", streamName, stream.Name)
    }

    // Create a consumer
    consumer, err := client.CreateConsumer(streamName, consumerName, "test.>", nil)
    if err != nil {
        t.Fatalf("Error creating consumer: %v", err)
    }
    if consumer.Name != consumerName {
        t.Fatalf("Expected consumer name %s, got %s", consumerName, consumer.Name)
    }

    // Send a message
    res, err := client.SendMessage(streamName, "test.1", "Hello, Sequin!")
    if err != nil {
        t.Fatalf("Error sending message: %v", err)
    }
    if res.Published != 1 {
        t.Fatalf("Expected 1 message published, got %d", res.Published)
    }

    // Receive and ack a message
    message, err := client.ReceiveMessage(streamName, consumerName)
    if err != nil {
        t.Fatalf("Error receiving message: %v", err)
    }
    if message == nil {
        t.Fatal("Expected to receive a message, got nil")
    }

    err = client.AckMessage(streamName, consumerName, message.ackId)
    if err != nil {
        t.Fatalf("Error acking message: %v", err)
    }

    // Delete the consumer
    err = client.DeleteConsumer(streamName, consumerName)
    if err != nil {
        t.Fatalf("Error deleting consumer: %v", err)
    }

    // Delete the stream
    err = client.DeleteStream(streamName)
    if err != nil {
        t.Fatalf("Error deleting stream: %v", err)
    }
}

This test creates a temporary stream and consumer, sends a message, receives and acknowledges it, and then cleans up by deleting the consumer and stream. You can expand on this basic test to cover more of your specific use cases and edge cases.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a Sequin client

func NewClient

func NewClient(token string, opts *ClientOptions) *Client

NewClient creates a new Sequin client

func (*Client) Ack

func (c *Client) Ack(ctx context.Context, consumerGroupID string, ackIDs []string) error

Ack acknowledges messages as processed

func (*Client) Nack

func (c *Client) Nack(ctx context.Context, consumerGroupID string, ackIDs []string) error

Nack negative acknowledges messages, making them available for redelivery

func (*Client) Receive

func (c *Client) Receive(ctx context.Context, consumerGroupID string, params *ReceiveParams) ([]Message, error)

Receive fetches messages from a consumer

type ClientOptions

type ClientOptions struct {
	BaseURL    string        // API base URL, defaults to "https://api.sequinstream.com/api"
	HTTPClient *http.Client  // Custom HTTP client, optional
	Timeout    time.Duration // HTTP client timeout, defaults to 30s
}

ClientOptions configures the client behavior

type Message

type Message struct {
	AckID  string
	Record json.RawMessage
}

Message represents a single message with its acknowledgment ID

type PrefetchingOptions

type PrefetchingOptions struct {
	// BufferSize determines how many messages to prefetch.
	// Must be > 0.
	BufferSize int
}

PrefetchingOptions configures message prefetching behavior.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func NewProcessor

func NewProcessor(client SequinClient, consumerGroup string, handler ProcessorFunc, opts ProcessorOptions) (*Processor, error)

func (*Processor) Run

func (p *Processor) Run(ctx context.Context) error

type ProcessorFunc

type ProcessorFunc func(context.Context, []Message) error

ProcessorFunc processes a batch of messages. It should return an error if processing fails.

If an error is returned, none of the messages in the batch will be acknowledged and they will be redelivered after the visibility timeout.

type ProcessorOptions

type ProcessorOptions struct {
	// MaxBatchSize is the maximum number of messages to process in a single batch.
	// The processor will call ProcessorFunc with up to this many messages.
	// If zero, defaults to 1.
	MaxBatchSize int

	// FetchBatchSize is the number of messages to request from the server in a single call.
	// This can be larger than MaxBatchSize to improve throughput.
	// If zero, defaults to MaxBatchSize.
	FetchBatchSize int

	// MaxConcurrent is the maximum number of concurrent batch processors.
	// If zero, defaults to 1.
	MaxConcurrent int

	// Prefetching configures message prefetching behavior.
	// If nil, messages are processed immediately as they arrive.
	Prefetching *PrefetchingOptions

	// ErrorHandler is called when message processing fails.
	// If nil, errors are logged to stderr.
	ErrorHandler func(context.Context, []Message, error)
}

ProcessorOptions configures the behavior of a Processor.

type ReceiveParams

type ReceiveParams struct {
	BatchSize int `json:"batch_size,omitempty"`
	WaitFor   int `json:"wait_for,omitempty"` // milliseconds
}

ReceiveParams represents parameters for the receive request

type ReceiveResponse

type ReceiveResponse struct {
	Data []struct {
		AckID string `json:"ack_id"`
		Data  struct {
			Record json.RawMessage `json:"record"`
		} `json:"data"`
	} `json:"data"`
}

ReceiveResponse represents the response from the receive endpoint

type SequinClient

type SequinClient interface {
	Receive(ctx context.Context, consumerGroupID string, params *ReceiveParams) ([]Message, error)
	Ack(ctx context.Context, consumerGroupID string, ackIDs []string) error
	Nack(ctx context.Context, consumerGroupID string, ackIDs []string) error
}

SequinClient defines the interface for Sequin client operations

Directories

Path Synopsis
examples
audit_logging Module
simple Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL