Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Worker ¶
type Worker struct {
MineSweeper datastore.MineSweeper
Dispatcher pubsub.Dispatcher
MineInterval time.Duration
Logger *zap.Logger
}
Worker is the outbox worker which runs repeatedly until asked to stop.
Example (Rabbit_with_pg) ¶
package main
import (
"context"
"database/sql"
"os"
"os/signal"
"time"
"github.com/kamal-github/outbox"
"github.com/kamal-github/outbox/datastore"
"github.com/kamal-github/outbox/pubsub"
"go.uber.org/zap"
)
func main() {
ctx := context.Background()
// Setup log
logger, err := zap.NewProduction()
if err != nil {
panic(err)
}
// Connect to Postgres
dsName := "postgres://postgres:password@localhost:5432/test-outbox?sslmode=disable"
dbConn, err := connectToSQLDB("postgres", dsName)
if err != nil {
panic(err)
}
// Setup Postgres as Minesweeper
mineSweeper, err := datastore.NewPostgres(dbConn, "outbox", logger)
if err != nil {
panic(err)
}
defer mineSweeper.Close()
// Setup RabbitMQ as PubSub
dispatcher, err := pubsub.NewRabbitMQ("", mineSweeper, logger)
if err != nil {
panic(err)
}
defer dispatcher.Close()
// Graceful shutdown
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
workerDone := make(chan struct{})
// Run worker in a separate go routine.
go outbox.Worker{
MineSweeper: mineSweeper,
Dispatcher: dispatcher,
Logger: logger,
MineInterval: 2 * time.Second,
}.Start(ctx, workerDone)
<-sig
cancel()
<-workerDone
}
func connectToSQLDB(driver, dsName string) (*sql.DB, error) {
db, err := sql.Open(driver, dsName)
if err != nil {
return db, err
}
return db, nil
}
Example (Sqs_with_pg) ¶
package main
import (
"context"
"database/sql"
"os"
"os/signal"
"time"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/kamal-github/outbox"
"github.com/kamal-github/outbox/datastore"
"github.com/kamal-github/outbox/pubsub"
"go.uber.org/zap"
)
func main() {
ctx := context.Background()
// Setup log
logger, err := zap.NewProduction()
if err != nil {
panic(err)
}
// Connect to Postgres
dsName := "postgres://postgres:password@localhost:5432/test-outbox?sslmode=disable"
dbConn, err := connectToSQLDB("postgres", dsName)
if err != nil {
panic(err)
}
// Setup Postgres as Minesweeper
mineSweeper, err := datastore.NewPostgres(dbConn, "outbox", logger)
if err != nil {
panic(err)
}
defer mineSweeper.Close()
// Setup AWS session and SQS connection
awsSession := session.Must(session.NewSession())
sqsConn := sqs.New(awsSession)
// Setup SQS as PubSub
dispatcher, err := pubsub.NewSimpleQueueService(sqsConn, mineSweeper, logger)
if err != nil {
panic(err)
}
defer dispatcher.Close()
// Graceful shutdown
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
workerDone := make(chan struct{})
// Run worker in a separate go routine.
go outbox.Worker{
MineSweeper: mineSweeper,
Dispatcher: dispatcher,
Logger: logger,
MineInterval: 2 * time.Second,
}.Start(ctx, workerDone)
<-sig
cancel()
<-workerDone
}
func connectToSQLDB(driver, dsName string) (*sql.DB, error) {
db, err := sql.Open(driver, dsName)
if err != nil {
return db, err
}
return db, nil
}
func (Worker) Start ¶
Start starts the outbox worker and iterative looks for new outbox rows (ready to process) after each given MineInterval and publishes to one of the configured Messaging system.
When no ready to process message are found, it keep looking for new ones.
Exit as soon as ctx is cancelled.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package datastore maintains all the datastorage implementation.
|
Package datastore maintains all the datastorage implementation. |
|
Package event contains details related to event defined as Outbox Row in Datastore.
|
Package event contains details related to event defined as Outbox Row in Datastore. |
|
internal
|
|
|
Package pubsub contains various implementation for event dispatcher.
|
Package pubsub contains various implementation for event dispatcher. |
|
mocks
Package sweepermock is a generated GoMock package.
|
Package sweepermock is a generated GoMock package. |
|
Package test contains integration tests and related configuration.
|
Package test contains integration tests and related configuration. |
Click to show internal directories.
Click to hide internal directories.