diff options
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r-- | python/pgq/consumer.py | 25 |
1 files changed, 21 insertions, 4 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index 77c018c7..b3b45d18 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -132,6 +132,10 @@ class Consumer(skytools.DBScript): # whether to stay behind queue top (postgres interval) #pgq_keep_lag = + + # in how many seconds to write keepalive stats for idle consumers + # this stats is used for detecting that consumer is still running + #keepalive_stats = 300 """ # by default, use cursor-based fetch @@ -157,6 +161,10 @@ class Consumer(skytools.DBScript): consumer_filter = None + keepalive_stats = None + # statistics: time spent waiting for events + idle_start = None + def __init__(self, service_name, db_name, args): """Initialize new consumer. @@ -191,6 +199,8 @@ class Consumer(skytools.DBScript): raise skytools.UsageError("pgq_autocommit is not compatible with pgq_lazy_fetch") self.set_database_defaults(self.db_name, autocommit = self.pgq_autocommit) + self.idle_start = time.time() + def reload(self): skytools.DBScript.reload(self) @@ -209,6 +219,8 @@ class Consumer(skytools.DBScript): expr = "ev_extra1 in (%s)" % ','.join(tfilt) self.consumer_filter = expr + self.keepalive_stats = self.cf.getint("keepalive_stats", 300) + def startup(self): """Handle commands here. __init__ does not have error logging.""" if self.options.register: @@ -374,11 +386,16 @@ class Consumer(skytools.DBScript): [batch_id, ev_id, retry_time]) def stat_start(self): - self.stat_batch_start = time.time() + t = time.time() + self.stat_batch_start = t + if self.stat_batch_start - self.idle_start > self.keepalive_stats: + self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4)) + self.idle_start = t def stat_end(self, count): t = time.time() self.stat_put('count', count) - self.stat_put('duration', t - self.stat_batch_start) - - + self.stat_put('duration', round(t - self.stat_batch_start,4)) + if count > 0: # reset timer if we got some events + self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4)) + self.idle_start = t
\ No newline at end of file |