summaryrefslogtreecommitdiff
path: root/python/pgq/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r--python/pgq/consumer.py25
1 files changed, 21 insertions, 4 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index 77c018c7..b3b45d18 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -132,6 +132,10 @@ class Consumer(skytools.DBScript):
# whether to stay behind queue top (postgres interval)
#pgq_keep_lag =
+
+ # in how many seconds to write keepalive stats for idle consumers
+ # this stats is used for detecting that consumer is still running
+ #keepalive_stats = 300
"""
# by default, use cursor-based fetch
@@ -157,6 +161,10 @@ class Consumer(skytools.DBScript):
consumer_filter = None
+ keepalive_stats = None
+ # statistics: time spent waiting for events
+ idle_start = None
+
def __init__(self, service_name, db_name, args):
"""Initialize new consumer.
@@ -191,6 +199,8 @@ class Consumer(skytools.DBScript):
raise skytools.UsageError("pgq_autocommit is not compatible with pgq_lazy_fetch")
self.set_database_defaults(self.db_name, autocommit = self.pgq_autocommit)
+ self.idle_start = time.time()
+
def reload(self):
skytools.DBScript.reload(self)
@@ -209,6 +219,8 @@ class Consumer(skytools.DBScript):
expr = "ev_extra1 in (%s)" % ','.join(tfilt)
self.consumer_filter = expr
+ self.keepalive_stats = self.cf.getint("keepalive_stats", 300)
+
def startup(self):
"""Handle commands here. __init__ does not have error logging."""
if self.options.register:
@@ -374,11 +386,16 @@ class Consumer(skytools.DBScript):
[batch_id, ev_id, retry_time])
def stat_start(self):
- self.stat_batch_start = time.time()
+ t = time.time()
+ self.stat_batch_start = t
+ if self.stat_batch_start - self.idle_start > self.keepalive_stats:
+ self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4))
+ self.idle_start = t
def stat_end(self, count):
t = time.time()
self.stat_put('count', count)
- self.stat_put('duration', t - self.stat_batch_start)
-
-
+ self.stat_put('duration', round(t - self.stat_batch_start,4))
+ if count > 0: # reset timer if we got some events
+ self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4))
+ self.idle_start = t \ No newline at end of file