diff options
| -rw-r--r-- | sql/pgq/expected/pgq_core.out | 18 | ||||
| -rw-r--r-- | sql/pgq/functions/pgq.find_tick_helper.sql | 12 | ||||
| -rw-r--r-- | sql/pgq/sql/pgq_core.sql | 3 |
3 files changed, 27 insertions, 6 deletions
diff --git a/sql/pgq/expected/pgq_core.out b/sql/pgq/expected/pgq_core.out index 31ecd02f..976726d2 100644 --- a/sql/pgq/expected/pgq_core.out +++ b/sql/pgq/expected/pgq_core.out @@ -173,6 +173,24 @@ select pgq.ticker(); 1 (1 row) +select * from pgq.next_batch_custom('myqueue', 'consumer', '1 hour', null, null); + batch_id | cur_tick_id | prev_tick_id | cur_tick_time | prev_tick_time | cur_tick_event_seq | prev_tick_event_seq +----------+-------------+--------------+---------------+----------------+--------------------+--------------------- + | | | | | | +(1 row) + +select * from pgq.next_batch_custom('myqueue', 'consumer', null, 10000, null); + batch_id | cur_tick_id | prev_tick_id | cur_tick_time | prev_tick_time | cur_tick_event_seq | prev_tick_event_seq +----------+-------------+--------------+---------------+----------------+--------------------+--------------------- + | | | | | | +(1 row) + +select * from pgq.next_batch_custom('myqueue', 'consumer', null, null, '10 minutes'); + batch_id | cur_tick_id | prev_tick_id | cur_tick_time | prev_tick_time | cur_tick_event_seq | prev_tick_event_seq +----------+-------------+--------------+---------------+----------------+--------------------+--------------------- + | | | | | | +(1 row) + select pgq.next_batch('myqueue', 'consumer'); next_batch ------------ diff --git a/sql/pgq/functions/pgq.find_tick_helper.sql b/sql/pgq/functions/pgq.find_tick_helper.sql index 77afcb42..c0177db3 100644 --- a/sql/pgq/functions/pgq.find_tick_helper.sql +++ b/sql/pgq/functions/pgq.find_tick_helper.sql @@ -21,10 +21,10 @@ declare ival interval; begin -- first, fetch last tick of the queue - select tick_id, tick_time, tick_seq into t + select tick_id, tick_time, tick_event_seq into t from pgq.tick where tick_queue = i_queue_id - and tick_id > i_tick_id + and tick_id > i_prev_tick_id order by tick_queue desc, tick_id desc limit 1; if not found then @@ -34,7 +34,7 @@ begin -- check if it is reasonably ok ok := true; if i_min_count is not null then - cnt = t.tick_seq - i_prev_tick_seq; + cnt = t.tick_event_seq - i_prev_tick_seq; if cnt < i_min_count then return; end if; @@ -54,18 +54,18 @@ begin -- if last tick too far away, do large scan if not ok then - select tick_id, tick_time, tick_seq into t + select tick_id, tick_time, tick_event_seq into t from pgq.tick where tick_queue = i_queue_id and tick_id > i_prev_tick_id - and (i_min_count is null or (tick_seq - i_prev_tick_seq) >= i_min_count) + and (i_min_count is null or (tick_event_seq - i_prev_tick_seq) >= i_min_count) and (i_min_interval is null or (tick_time - i_prev_tick_time) >= i_min_interval) order by tick_queue asc, tick_id asc limit 1; end if; next_tick_id := t.tick_id; next_tick_time := t.tick_time; - next_tick_seq := t.tick_seq; + next_tick_seq := t.tick_event_seq; return; end; $$ language plpgsql stable; diff --git a/sql/pgq/sql/pgq_core.sql b/sql/pgq/sql/pgq_core.sql index c4f4b4cd..50878c53 100644 --- a/sql/pgq/sql/pgq_core.sql +++ b/sql/pgq/sql/pgq_core.sql @@ -43,6 +43,9 @@ select pgq.insert_event('myqueue', 'r3', 'data'); select pgq.current_event_table('myqueue'); select pgq.ticker(); +select * from pgq.next_batch_custom('myqueue', 'consumer', '1 hour', null, null); +select * from pgq.next_batch_custom('myqueue', 'consumer', null, 10000, null); +select * from pgq.next_batch_custom('myqueue', 'consumer', null, null, '10 minutes'); select pgq.next_batch('myqueue', 'consumer'); select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4 from pgq.get_batch_events(3); |
