diff options
| author | Marko Kreen | 2007-03-13 11:52:09 +0000 |
|---|---|---|
| committer | Marko Kreen | 2007-03-13 11:52:09 +0000 |
| commit | 50abdba44a031ad40b1886f941479f203ca92039 (patch) | |
| tree | 873e72d78cd48917b2907c4c63abf185649ebb54 /sql/pgq/structure | |
final public releaseskytools_2_1
Diffstat (limited to 'sql/pgq/structure')
| -rw-r--r-- | sql/pgq/structure/func_internal.sql | 23 | ||||
| -rw-r--r-- | sql/pgq/structure/func_public.sql | 36 | ||||
| -rw-r--r-- | sql/pgq/structure/install.sql | 7 | ||||
| -rw-r--r-- | sql/pgq/structure/tables.sql | 217 | ||||
| -rw-r--r-- | sql/pgq/structure/triggers.sql | 8 | ||||
| -rw-r--r-- | sql/pgq/structure/types.sql | 47 |
6 files changed, 338 insertions, 0 deletions
diff --git a/sql/pgq/structure/func_internal.sql b/sql/pgq/structure/func_internal.sql new file mode 100644 index 00000000..f84bb195 --- /dev/null +++ b/sql/pgq/structure/func_internal.sql @@ -0,0 +1,23 @@ +-- Section: Internal Functions + +-- Group: Low-level event handling + +\i functions/pgq.batch_event_sql.sql +\i functions/pgq.batch_event_tables.sql +\i functions/pgq.event_retry_raw.sql +\i functions/pgq.insert_event_raw.sql + +-- Group: Ticker + +\i functions/pgq.ticker.sql + +-- Group: Periodic maintenence + +\i functions/pgq.maint_retry_events.sql +\i functions/pgq.maint_rotate_tables.sql +\i functions/pgq.maint_tables_to_vacuum.sql + +-- Group: Random utility functions + +\i functions/pgq.grant_perms.sql + diff --git a/sql/pgq/structure/func_public.sql b/sql/pgq/structure/func_public.sql new file mode 100644 index 00000000..4440a22b --- /dev/null +++ b/sql/pgq/structure/func_public.sql @@ -0,0 +1,36 @@ +-- Section: Public Functions + +-- Group: Queue creation + +\i functions/pgq.create_queue.sql +\i functions/pgq.drop_queue.sql + +-- Group: Event publishing + +\i functions/pgq.insert_event.sql +\i functions/pgq.current_event_table.sql + +-- Group: Subscribing to queue + +\i functions/pgq.register_consumer.sql +\i functions/pgq.unregister_consumer.sql + +-- Group: Batch processing + +\i functions/pgq.next_batch.sql +\i functions/pgq.get_batch_events.sql +\i functions/pgq.event_failed.sql +\i functions/pgq.event_retry.sql +\i functions/pgq.finish_batch.sql + +-- Group: General info functions + +\i functions/pgq.get_queue_info.sql +\i functions/pgq.get_consumer_info.sql +\i functions/pgq.version.sql +\i functions/pgq.get_batch_info.sql + +-- Group: Failed queue browsing + +\i functions/pgq.failed_queue.sql + diff --git a/sql/pgq/structure/install.sql b/sql/pgq/structure/install.sql new file mode 100644 index 00000000..b3c77cb6 --- /dev/null +++ b/sql/pgq/structure/install.sql @@ -0,0 +1,7 @@ + +\i structure/tables.sql +\i structure/types.sql +\i structure/func_internal.sql +\i structure/func_public.sql +\i structure/triggers.sql + diff --git a/sql/pgq/structure/tables.sql b/sql/pgq/structure/tables.sql new file mode 100644 index 00000000..fc56cc81 --- /dev/null +++ b/sql/pgq/structure/tables.sql @@ -0,0 +1,217 @@ +-- ---------------------------------------------------------------------- +-- Section: Internal Tables +-- +-- Map to Slony-I: +-- sl_node - pgq.consumer +-- sl_set - pgq.queue +-- sl_subscriber + sl_confirm - pgq.subscription +-- sl_event - pgq.tick +-- sl_setsync - pgq_ext.completed_* +-- sl_log_* - slony1 has per-cluster data tables, +-- here we do redirection in pgq.queue +-- to have per-queue data tables. +-- ---------------------------------------------------------------------- + +set client_min_messages = 'warning'; + +-- drop schema if exists pgq cascade; +create schema pgq; +grant usage on schema pgq to public; + +-- ---------------------------------------------------------------------- +-- Table: pgq.consumer +-- +-- Name to id lookup for consumers +-- +-- Columns: +-- co_id - consumer's id for internal usage +-- co_name - consumer's id for external usage +-- ---------------------------------------------------------------------- +create table pgq.consumer ( + co_id serial, + co_name text not null default 'fooz', + + constraint consumer_pkey primary key (co_id), + constraint consumer_name_uq UNIQUE (co_name) +); + + +-- ---------------------------------------------------------------------- +-- Table: pgq.queue +-- +-- Information about available queues +-- +-- Columns: +-- queue_id - queue id for internal usage +-- queue_name - queue name visible outside +-- queue_data - parent table for actual data tables +-- queue_switch_step1 - tx when rotation happened +-- queue_switch_step2 - tx after rotation was committed +-- queue_switch_time - time when switch happened +-- queue_ticker_max_count - batch should not contain more events +-- queue_ticker_max_lag - events should not age more +-- queue_ticker_idle_period - how often to tick when no events happen +-- ---------------------------------------------------------------------- +create table pgq.queue ( + queue_id serial, + queue_name text not null, + + queue_ntables integer not null default 3, + queue_cur_table integer not null default 0, + queue_rotation_period interval not null default '2 hours', + queue_switch_step1 bigint not null default get_current_txid(), + queue_switch_step2 bigint default get_current_txid(), + queue_switch_time timestamptz not null default now(), + + queue_external_ticker boolean not null default false, + queue_ticker_max_count integer not null default 500, + queue_ticker_max_lag interval not null default '3 seconds', + queue_ticker_idle_period interval not null default '1 minute', + + queue_data_pfx text not null, + queue_event_seq text not null, + queue_tick_seq text not null, + + constraint queue_pkey primary key (queue_id), + constraint queue_name_uq unique (queue_name) +); + +-- ---------------------------------------------------------------------- +-- Table: pgq.tick +-- +-- Snapshots for event batching +-- +-- Columns: +-- tick_queue - queue id whose tick it is +-- tick_id - ticks id (per-queue) +-- tick_time - time when tick happened +-- tick_snapshot +-- ---------------------------------------------------------------------- +create table pgq.tick ( + tick_queue int4 not null, + tick_id bigint not null, + tick_time timestamptz not null default now(), + tick_snapshot txid_snapshot not null default get_current_snapshot(), + + constraint tick_pkey primary key (tick_queue, tick_id), + constraint tick_queue_fkey foreign key (tick_queue) + references pgq.queue (queue_id) +); + +-- ---------------------------------------------------------------------- +-- Sequence: pgq.batch_id_seq +-- +-- Sequence for batch id's. +-- ---------------------------------------------------------------------- + +create sequence pgq.batch_id_seq; +-- ---------------------------------------------------------------------- +-- Table: pgq.subscription +-- +-- Consumer registration on a queue +-- +-- Columns: +-- +-- sub_id - subscription id for internal usage +-- sub_queue - queue id +-- sub_consumer - consumer's id +-- sub_tick - last tick the consumer processed +-- sub_batch - shortcut for queue_id/consumer_id/tick_id +-- sub_next_tick - +-- ---------------------------------------------------------------------- +create table pgq.subscription ( + sub_id serial not null, + sub_queue int4 not null, + sub_consumer int4 not null, + sub_last_tick bigint not null, + sub_active timestamptz not null default now(), + sub_batch bigint, + sub_next_tick bigint, + + constraint subscription_pkey primary key (sub_id), + constraint sub_queue_fkey foreign key (sub_queue) + references pgq.queue (queue_id), + constraint sub_consumer_fkey foreign key (sub_consumer) + references pgq.consumer (co_id) +); + + +-- ---------------------------------------------------------------------- +-- Table: pgq.event_template +-- +-- Parent table for all event tables +-- +-- Columns: +-- ev_id - event's id, supposed to be unique per queue +-- ev_time - when the event was inserted +-- ev_txid - transaction id which inserted the event +-- ev_owner - subscription id that wanted to retry this +-- ev_retry - how many times the event has been retried, NULL for new events +-- ev_type - consumer/producer can specify what the data fields contain +-- ev_data - data field +-- ev_extra1 - extra data field +-- ev_extra2 - extra data field +-- ev_extra3 - extra data field +-- ev_extra4 - extra data field +-- ---------------------------------------------------------------------- +create table pgq.event_template ( + ev_id bigint not null, + ev_time timestamptz not null, + + ev_txid bigint not null default get_current_txid(), + ev_owner int4, + ev_retry int4, + + ev_type text, + ev_data text, + ev_extra1 text, + ev_extra2 text, + ev_extra3 text, + ev_extra4 text +); + +-- ---------------------------------------------------------------------- +-- Table: pgq.retry_queue +-- +-- Events to be retried +-- +-- Columns: +-- ev_retry_after - time when it should be re-inserted to main queue +-- ---------------------------------------------------------------------- +create table pgq.retry_queue ( + ev_retry_after timestamptz not null, + + like pgq.event_template, + + constraint rq_pkey primary key (ev_owner, ev_id), + constraint rq_owner_fkey foreign key (ev_owner) + references pgq.subscription (sub_id) +); +alter table pgq.retry_queue alter column ev_owner set not null; +alter table pgq.retry_queue alter column ev_txid drop not null; +create index rq_retry_idx on pgq.retry_queue (ev_retry_after); + +-- ---------------------------------------------------------------------- +-- Table: pgq.failed_queue +-- +-- Events whose processing failed +-- +-- Columns: +-- ev_failed_reason - consumer's excuse for not processing +-- ev_failed_time - when it was tagged failed +-- ---------------------------------------------------------------------- +create table pgq.failed_queue ( + ev_failed_reason text, + ev_failed_time timestamptz not null, + + -- all event fields + like pgq.event_template, + + constraint fq_pkey primary key (ev_owner, ev_id), + constraint fq_owner_fkey foreign key (ev_owner) + references pgq.subscription (sub_id) +); +alter table pgq.failed_queue alter column ev_owner set not null; +alter table pgq.failed_queue alter column ev_txid drop not null; + + diff --git a/sql/pgq/structure/triggers.sql b/sql/pgq/structure/triggers.sql new file mode 100644 index 00000000..e732347f --- /dev/null +++ b/sql/pgq/structure/triggers.sql @@ -0,0 +1,8 @@ + +-- Section: Public Triggers + +-- Group: Trigger Functions + +\i triggers/pgq.logutriga.sql +\i triggers/pgq.sqltriga.sql + diff --git a/sql/pgq/structure/types.sql b/sql/pgq/structure/types.sql new file mode 100644 index 00000000..c89ce500 --- /dev/null +++ b/sql/pgq/structure/types.sql @@ -0,0 +1,47 @@ + +create type pgq.ret_queue_info as ( + queue_name text, + queue_ntables integer, + queue_cur_table integer, + queue_rotation_period interval, + queue_switch_time timestamptz, + queue_external_ticker boolean, + queue_ticker_max_count integer, + queue_ticker_max_lag interval, + queue_ticker_idle_period interval, + ticker_lag interval +); + +create type pgq.ret_consumer_info as ( + queue_name text, + consumer_name text, + lag interval, + last_seen interval +); + +create type pgq.ret_batch_info as ( + queue_name text, + consumer_name text, + batch_start timestamptz, + batch_end timestamptz, + prev_tick_id bigint, + tick_id bigint, + lag interval +); + + +create type pgq.ret_batch_event as ( + ev_id bigint, + ev_time timestamptz, + + ev_txid bigint, + ev_retry int4, + + ev_type text, + ev_data text, + ev_extra1 text, + ev_extra2 text, + ev_extra3 text, + ev_extra4 text +); + |
