summaryrefslogtreecommitdiff
path: root/python/pgq/consumer.py
diff options
context:
space:
mode:
authorAsko Oja2012-09-19 11:59:00 +0000
committerAsko Oja2012-09-19 11:59:00 +0000
commit6da151f9f0f5e76617a9fd30ca0670a7fb0c414c (patch)
tree6ea53ea3a6b1f6434a0f7c870ed44f272f107d53 /python/pgq/consumer.py
parent9679435f22217cc1bb6b1a2a9df436abf831f957 (diff)
Added keepalive_stats parameter for consumer that controls how often idle consumer logs idle time. Also added rounding to milliseconds for logged durations
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r--python/pgq/consumer.py19
1 files changed, 15 insertions, 4 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index 8ec9a18a..96d2180b 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -132,6 +132,10 @@ class Consumer(skytools.DBScript):
# whether to stay behind queue top (postgres interval)
#pgq_keep_lag =
+
+ # in how many seconds to write keepalive stats for idle consumers
+ # this stats is used for detecting that consumer is still running
+ #keepalive_stats = 61
"""
# by default, use cursor-based fetch
@@ -157,8 +161,9 @@ class Consumer(skytools.DBScript):
consumer_filter = None
+ keepalive_stats = None
# statistics: time spent waiting for events
- idle_start = None
+ idle_start = None
def __init__(self, service_name, db_name, args):
"""Initialize new consumer.
@@ -214,6 +219,8 @@ class Consumer(skytools.DBScript):
expr = "ev_extra1 in (%s)" % ','.join(tfilt)
self.consumer_filter = expr
+ self.keepalive_stats = self.cf.getint("keepalive_stats", 61)
+
def startup(self):
"""Handle commands here. __init__ does not have error logging."""
if self.options.register:
@@ -379,12 +386,16 @@ class Consumer(skytools.DBScript):
[batch_id, ev_id, retry_time])
def stat_start(self):
- self.stat_batch_start = time.time()
+ t = time.time()
+ self.stat_batch_start = t
+ if self.stat_batch_start - self.idle_start > self.keepalive_stats:
+ self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4))
+ self.idle_start = t
def stat_end(self, count):
t = time.time()
self.stat_put('count', count)
- self.stat_put('duration', t - self.stat_batch_start)
+ self.stat_put('duration', round(t - self.stat_batch_start,4))
if count > 0: # reset timer if we got some events
- self.stat_put('idle', self.stat_batch_start - self.idle_start)
+ self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4))
self.idle_start = t \ No newline at end of file