summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/pgq/expected/pgq_core.out18
-rw-r--r--sql/pgq/functions/pgq.find_tick_helper.sql12
-rw-r--r--sql/pgq/sql/pgq_core.sql3
3 files changed, 27 insertions, 6 deletions
diff --git a/sql/pgq/expected/pgq_core.out b/sql/pgq/expected/pgq_core.out
index 31ecd02f..976726d2 100644
--- a/sql/pgq/expected/pgq_core.out
+++ b/sql/pgq/expected/pgq_core.out
@@ -173,6 +173,24 @@ select pgq.ticker();
1
(1 row)
+select * from pgq.next_batch_custom('myqueue', 'consumer', '1 hour', null, null);
+ batch_id | cur_tick_id | prev_tick_id | cur_tick_time | prev_tick_time | cur_tick_event_seq | prev_tick_event_seq
+----------+-------------+--------------+---------------+----------------+--------------------+---------------------
+ | | | | | |
+(1 row)
+
+select * from pgq.next_batch_custom('myqueue', 'consumer', null, 10000, null);
+ batch_id | cur_tick_id | prev_tick_id | cur_tick_time | prev_tick_time | cur_tick_event_seq | prev_tick_event_seq
+----------+-------------+--------------+---------------+----------------+--------------------+---------------------
+ | | | | | |
+(1 row)
+
+select * from pgq.next_batch_custom('myqueue', 'consumer', null, null, '10 minutes');
+ batch_id | cur_tick_id | prev_tick_id | cur_tick_time | prev_tick_time | cur_tick_event_seq | prev_tick_event_seq
+----------+-------------+--------------+---------------+----------------+--------------------+---------------------
+ | | | | | |
+(1 row)
+
select pgq.next_batch('myqueue', 'consumer');
next_batch
------------
diff --git a/sql/pgq/functions/pgq.find_tick_helper.sql b/sql/pgq/functions/pgq.find_tick_helper.sql
index 77afcb42..c0177db3 100644
--- a/sql/pgq/functions/pgq.find_tick_helper.sql
+++ b/sql/pgq/functions/pgq.find_tick_helper.sql
@@ -21,10 +21,10 @@ declare
ival interval;
begin
-- first, fetch last tick of the queue
- select tick_id, tick_time, tick_seq into t
+ select tick_id, tick_time, tick_event_seq into t
from pgq.tick
where tick_queue = i_queue_id
- and tick_id > i_tick_id
+ and tick_id > i_prev_tick_id
order by tick_queue desc, tick_id desc
limit 1;
if not found then
@@ -34,7 +34,7 @@ begin
-- check if it is reasonably ok
ok := true;
if i_min_count is not null then
- cnt = t.tick_seq - i_prev_tick_seq;
+ cnt = t.tick_event_seq - i_prev_tick_seq;
if cnt < i_min_count then
return;
end if;
@@ -54,18 +54,18 @@ begin
-- if last tick too far away, do large scan
if not ok then
- select tick_id, tick_time, tick_seq into t
+ select tick_id, tick_time, tick_event_seq into t
from pgq.tick
where tick_queue = i_queue_id
and tick_id > i_prev_tick_id
- and (i_min_count is null or (tick_seq - i_prev_tick_seq) >= i_min_count)
+ and (i_min_count is null or (tick_event_seq - i_prev_tick_seq) >= i_min_count)
and (i_min_interval is null or (tick_time - i_prev_tick_time) >= i_min_interval)
order by tick_queue asc, tick_id asc
limit 1;
end if;
next_tick_id := t.tick_id;
next_tick_time := t.tick_time;
- next_tick_seq := t.tick_seq;
+ next_tick_seq := t.tick_event_seq;
return;
end;
$$ language plpgsql stable;
diff --git a/sql/pgq/sql/pgq_core.sql b/sql/pgq/sql/pgq_core.sql
index c4f4b4cd..50878c53 100644
--- a/sql/pgq/sql/pgq_core.sql
+++ b/sql/pgq/sql/pgq_core.sql
@@ -43,6 +43,9 @@ select pgq.insert_event('myqueue', 'r3', 'data');
select pgq.current_event_table('myqueue');
select pgq.ticker();
+select * from pgq.next_batch_custom('myqueue', 'consumer', '1 hour', null, null);
+select * from pgq.next_batch_custom('myqueue', 'consumer', null, 10000, null);
+select * from pgq.next_batch_custom('myqueue', 'consumer', null, null, '10 minutes');
select pgq.next_batch('myqueue', 'consumer');
select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4 from pgq.get_batch_events(3);