From e5f0d3f133bdcef3f5e0968cdec735a0a7ff89b0 Mon Sep 17 00:00:00 2001 From: Konstantin Voykov Date: Sun, 1 Mar 2026 18:54:17 +0300 Subject: [PATCH 1/2] metrics: victoria: go.mod issues --- metrics/victoria/go.mod | 22 +++++++-------------- metrics/victoria/go.sum | 44 ++++++++++------------------------------- 2 files changed, 17 insertions(+), 49 deletions(-) diff --git a/metrics/victoria/go.mod b/metrics/victoria/go.mod index 5856cb3..1ea626d 100644 --- a/metrics/victoria/go.mod +++ b/metrics/victoria/go.mod @@ -1,23 +1,15 @@ module github.com/koykov/queue/metrics/victoria -go 1.24 +go 1.22 -require ( - github.com/VictoriaMetrics/metrics v1.40.2 - github.com/prometheus/client_golang v1.23.2 -) +require github.com/koykov/vmchain v0.0.0-20260214151654-90e37c6be8fc require ( - github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/kr/text v0.2.0 // indirect - github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.66.1 // indirect - github.com/prometheus/procfs v0.16.1 // indirect + github.com/VictoriaMetrics/metrics v1.40.2 // indirect + github.com/koykov/byteconv v1.0.1 // indirect + github.com/koykov/indirect v1.0.1 // indirect + github.com/koykov/x2bytes v1.0.4 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/histogram v1.2.0 // indirect - go.yaml.in/yaml/v2 v2.4.2 // indirect - golang.org/x/sys v0.35.0 // indirect - google.golang.org/protobuf v1.36.8 // indirect + golang.org/x/sys v0.17.0 // indirect ) diff --git a/metrics/victoria/go.sum b/metrics/victoria/go.sum index 602af06..0a7f01f 100644 --- a/metrics/victoria/go.sum +++ b/metrics/victoria/go.sum @@ -1,48 +1,24 @@ github.com/VictoriaMetrics/metrics v1.40.2 h1:OVSjKcQEx6JAwGeu8/KQm9Su5qJ72TMEW4xYn5vw3Ac= github.com/VictoriaMetrics/metrics v1.40.2/go.mod h1:XE4uudAAIRaJE614Tl5HMrtoEU6+GDZO4QTnNSsZRuA= -github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= -github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= -github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= -github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/koykov/byteconv v1.0.1 h1:5Yb6++P+HnipDW/V9rHXR7CyS0ZFncspT4isvt65fFA= +github.com/koykov/byteconv v1.0.1/go.mod h1:viZknv/akQJrXOQS3bZu2U7TE+gjA03LFQpSsRExZR4= +github.com/koykov/indirect v1.0.1 h1:1veVipIWBeklFHMvzuwhL82X5eDaJzN+hPeVGRvu22Y= +github.com/koykov/indirect v1.0.1/go.mod h1:2qWC0hrIHIexlKaqPA0VWEa0s2V/qxxNJv7XPncnh2I= +github.com/koykov/vmchain v0.0.0-20260214151654-90e37c6be8fc h1:oFSqZjtZFDnbLnL7WTXAqEcWNf87R+R5cryNapBPuDI= +github.com/koykov/vmchain v0.0.0-20260214151654-90e37c6be8fc/go.mod h1:Ey4rbJOs7KL1/P0TIxyFStu3mL92m5sQxe82T81w5ow= +github.com/koykov/x2bytes v1.0.4 h1:aRTi/QHz3BbiIfW+BIKW7V8EmdIV1g3kwLjTHTJz6Xg= +github.com/koykov/x2bytes v1.0.4/go.mod h1:0fbvyQAm3RAiTOE/NT0Dg3ZXL9EQiYpyrp5wZyoz11Q= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= -github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= -github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= -github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= -github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= -github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= -github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= -github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= -go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= -golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= -golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= -google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From a512307249e110c7fcf67d31743ac330fb08a725 Mon Sep 17 00:00:00 2001 From: Konstantin Voykov Date: Tue, 14 Apr 2026 22:30:57 +0300 Subject: [PATCH 2/2] metrics: add queue execution of a single job duration --- dummy.go | 1 + metrics.go | 2 ++ metrics/prometheus/writer.go | 15 ++++++++++++--- metrics/victoria/writer.go | 5 +++++ worker.go | 5 ++++- 5 files changed, 24 insertions(+), 4 deletions(-) diff --git a/dummy.go b/dummy.go index 7403017..bb5d37d 100644 --- a/dummy.go +++ b/dummy.go @@ -18,6 +18,7 @@ func (DummyMetrics) QueueRetry(_ time.Duration) {} func (DummyMetrics) QueueLeak(_ string) {} func (DummyMetrics) QueueDeadline() {} func (DummyMetrics) QueueLost() {} +func (DummyMetrics) QueueExec(_ time.Duration) {} func (DummyMetrics) SubqPut(_ string) {} func (DummyMetrics) SubqPull(_ string) {} func (DummyMetrics) SubqLeak(_ string) {} diff --git a/metrics.go b/metrics.go index a96819d..7eb24b7 100644 --- a/metrics.go +++ b/metrics.go @@ -31,6 +31,8 @@ type MetricsWriter interface { QueueDeadline() // QueueLost registers lost items missed queue and DLQ. QueueLost() + // QueueExec registers how long queue executes a job. + QueueExec(spent time.Duration) // SubqPut registers income of new item to the sub-queue. SubqPut(subq string) diff --git a/metrics/prometheus/writer.go b/metrics/prometheus/writer.go index 61ed3d5..c9c83ac 100644 --- a/metrics/prometheus/writer.go +++ b/metrics/prometheus/writer.go @@ -19,6 +19,7 @@ type Writer interface { QueueLeak(direction string) QueueDeadline() QueueLost() + QueueExec(spent time.Duration) SubqPut(subq string) SubqPull(subq string) SubqLeak(subq string) @@ -35,8 +36,7 @@ var ( promQueueIn, promQueueOut, promQueueRetry, promQueueLeak, promQueueDeadline, promQueueLost, promSubqIn, promSubqOut, promSubqLeak *prometheus.CounterVec - promWorkerWait *prometheus.HistogramVec - promRetryDelay *prometheus.HistogramVec + promWorkerWait, promRetryDelay, promQueueExec *prometheus.HistogramVec ) func init() { @@ -94,6 +94,11 @@ func init() { Help: "How long worker waits between retry attempts.", Buckets: buckets, }, []string{"queue"}) + promQueueExec = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "queue_exec", + Help: "How long queue executes the job.", + Buckets: buckets, + }, []string{"queue"}) promSubqSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "queue_subq_size", @@ -114,7 +119,7 @@ func init() { prometheus.MustRegister(promWorkerIdle, promWorkerActive, promWorkerSleep, promQueueSize, promQueueIn, promQueueOut, promQueueRetry, promQueueLeak, promQueueLost, promQueueDeadline, - promWorkerWait, promRetryDelay, + promWorkerWait, promRetryDelay, promQueueExec, promSubqSize, promSubqIn, promSubqOut, promSubqLeak) } @@ -215,6 +220,10 @@ func (w writer) QueueLost() { promQueueSize.WithLabelValues(w.name).Dec() } +func (w writer) QueueExec(spent time.Duration) { + promQueueExec.WithLabelValues(w.name).Observe(float64(spent.Nanoseconds() / int64(w.prec))) +} + func (w writer) SubqPut(subq string) { promSubqIn.WithLabelValues(w.name, subq).Inc() promSubqSize.WithLabelValues(w.name, subq).Inc() diff --git a/metrics/victoria/writer.go b/metrics/victoria/writer.go index 334ddf5..602d207 100644 --- a/metrics/victoria/writer.go +++ b/metrics/victoria/writer.go @@ -19,6 +19,7 @@ type Writer interface { QueueLeak(direction string) QueueDeadline() QueueLost() + QueueExec(spent time.Duration) SubqPut(subq string) SubqPull(subq string) SubqLeak(subq string) @@ -111,6 +112,10 @@ func (w writer) QueueLost() { vmchain.Gauge("queue_size", nil).WithLabel("queue", w.name).Dec() } +func (w writer) QueueExec(spent time.Duration) { + vmchain.Histogram("queue_exec").WithLabel("queue", w.name).Update(float64(spent.Nanoseconds() / int64(w.prec))) +} + func (w writer) SubqPut(subq string) { vmchain.Counter("queue_subq_in").WithLabel("queue", w.name).WithLabel("subq", subq).Inc() vmchain.Gauge("queue_subq_size", nil).WithLabel("queue", w.name).WithLabel("subq", subq).Inc() diff --git a/worker.go b/worker.go index 20840c2..bea1690 100644 --- a/worker.go +++ b/worker.go @@ -138,7 +138,10 @@ func (w *worker) await(queue *Queue) { } // Forward itm to dequeuer. - if err := w.proc.Do(itm.payload); err != nil { + now := w.config.Clock.Now() + err := w.proc.Do(itm.payload) + w.mw().QueueExec(w.config.Clock.Now().Sub(now)) + if err != nil { // Processing failed. if itm.retries < w.c().MaxRetries { // Try to retry processing if possible.