diff options
-rw-r--r-- | sql/pgq/functions/pgq.ticker.sql | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/sql/pgq/functions/pgq.ticker.sql b/sql/pgq/functions/pgq.ticker.sql index 7e267acb..be985447 100644 --- a/sql/pgq/functions/pgq.ticker.sql +++ b/sql/pgq/functions/pgq.ticker.sql @@ -54,7 +54,6 @@ declare q record; state record; last2 record; - xmax2 int8; begin select queue_id, queue_tick_seq, queue_external_ticker, queue_ticker_max_count, queue_ticker_max_lag, @@ -78,7 +77,9 @@ begin -- load state from last tick select now() - tick_time as lag, q.event_seq - tick_event_seq as new_events, - tick_id, tick_time, txid_snapshot_xmax(tick_snapshot) as sxmax + tick_id, tick_time, tick_event_seq, + txid_snapshot_xmax(tick_snapshot) as sxmax, + txid_snapshot_xmin(tick_snapshot) as sxmin into state from pgq.tick where tick_queue = q.queue_id @@ -86,12 +87,15 @@ begin limit 1; if found then + if state.sxmin > txid_current() then + raise exception 'Invalid PgQ state: old xmin=%, old xmax=%, cur txid=%', + state.sxmin, state.sxmax, txid_current(); + end if; if state.new_events < 0 then - raise warning 'new_events < 0?'; + raise warning 'Negative new_events? old=% cur=%', state.tick_event_seq, q.event_seq; end if; - xmax2 := txid_snapshot_xmax(txid_current_snapshot()); - if state.sxmax > xmax2 then - raise exception 'Invalid PgQ state: old xmax=%, cur xmax=%', state.sxmax, xmax2; + if state.sxmax > txid_current() then + raise warning 'Dubious PgQ state: old xmax=%, cur txid=%', state.sxmax, txid_current(); end if; if state.new_events > 0 then |