summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/pgq/functions/pgq.ticker.sql16
1 files changed, 10 insertions, 6 deletions
diff --git a/sql/pgq/functions/pgq.ticker.sql b/sql/pgq/functions/pgq.ticker.sql
index 7e267acb..be985447 100644
--- a/sql/pgq/functions/pgq.ticker.sql
+++ b/sql/pgq/functions/pgq.ticker.sql
@@ -54,7 +54,6 @@ declare
q record;
state record;
last2 record;
- xmax2 int8;
begin
select queue_id, queue_tick_seq, queue_external_ticker,
queue_ticker_max_count, queue_ticker_max_lag,
@@ -78,7 +77,9 @@ begin
-- load state from last tick
select now() - tick_time as lag,
q.event_seq - tick_event_seq as new_events,
- tick_id, tick_time, txid_snapshot_xmax(tick_snapshot) as sxmax
+ tick_id, tick_time, tick_event_seq,
+ txid_snapshot_xmax(tick_snapshot) as sxmax,
+ txid_snapshot_xmin(tick_snapshot) as sxmin
into state
from pgq.tick
where tick_queue = q.queue_id
@@ -86,12 +87,15 @@ begin
limit 1;
if found then
+ if state.sxmin > txid_current() then
+ raise exception 'Invalid PgQ state: old xmin=%, old xmax=%, cur txid=%',
+ state.sxmin, state.sxmax, txid_current();
+ end if;
if state.new_events < 0 then
- raise warning 'new_events < 0?';
+ raise warning 'Negative new_events? old=% cur=%', state.tick_event_seq, q.event_seq;
end if;
- xmax2 := txid_snapshot_xmax(txid_current_snapshot());
- if state.sxmax > xmax2 then
- raise exception 'Invalid PgQ state: old xmax=%, cur xmax=%', state.sxmax, xmax2;
+ if state.sxmax > txid_current() then
+ raise warning 'Dubious PgQ state: old xmax=%, cur txid=%', state.sxmax, txid_current();
end if;
if state.new_events > 0 then