summaryrefslogtreecommitdiff
path: root/sql/pgq/structure
diff options
context:
space:
mode:
authorMarko Kreen2007-03-13 11:52:09 +0000
committerMarko Kreen2007-03-13 11:52:09 +0000
commit50abdba44a031ad40b1886f941479f203ca92039 (patch)
tree873e72d78cd48917b2907c4c63abf185649ebb54 /sql/pgq/structure
final public releaseskytools_2_1
Diffstat (limited to 'sql/pgq/structure')
-rw-r--r--sql/pgq/structure/func_internal.sql23
-rw-r--r--sql/pgq/structure/func_public.sql36
-rw-r--r--sql/pgq/structure/install.sql7
-rw-r--r--sql/pgq/structure/tables.sql217
-rw-r--r--sql/pgq/structure/triggers.sql8
-rw-r--r--sql/pgq/structure/types.sql47
6 files changed, 338 insertions, 0 deletions
diff --git a/sql/pgq/structure/func_internal.sql b/sql/pgq/structure/func_internal.sql
new file mode 100644
index 00000000..f84bb195
--- /dev/null
+++ b/sql/pgq/structure/func_internal.sql
@@ -0,0 +1,23 @@
+-- Section: Internal Functions
+
+-- Group: Low-level event handling
+
+\i functions/pgq.batch_event_sql.sql
+\i functions/pgq.batch_event_tables.sql
+\i functions/pgq.event_retry_raw.sql
+\i functions/pgq.insert_event_raw.sql
+
+-- Group: Ticker
+
+\i functions/pgq.ticker.sql
+
+-- Group: Periodic maintenence
+
+\i functions/pgq.maint_retry_events.sql
+\i functions/pgq.maint_rotate_tables.sql
+\i functions/pgq.maint_tables_to_vacuum.sql
+
+-- Group: Random utility functions
+
+\i functions/pgq.grant_perms.sql
+
diff --git a/sql/pgq/structure/func_public.sql b/sql/pgq/structure/func_public.sql
new file mode 100644
index 00000000..4440a22b
--- /dev/null
+++ b/sql/pgq/structure/func_public.sql
@@ -0,0 +1,36 @@
+-- Section: Public Functions
+
+-- Group: Queue creation
+
+\i functions/pgq.create_queue.sql
+\i functions/pgq.drop_queue.sql
+
+-- Group: Event publishing
+
+\i functions/pgq.insert_event.sql
+\i functions/pgq.current_event_table.sql
+
+-- Group: Subscribing to queue
+
+\i functions/pgq.register_consumer.sql
+\i functions/pgq.unregister_consumer.sql
+
+-- Group: Batch processing
+
+\i functions/pgq.next_batch.sql
+\i functions/pgq.get_batch_events.sql
+\i functions/pgq.event_failed.sql
+\i functions/pgq.event_retry.sql
+\i functions/pgq.finish_batch.sql
+
+-- Group: General info functions
+
+\i functions/pgq.get_queue_info.sql
+\i functions/pgq.get_consumer_info.sql
+\i functions/pgq.version.sql
+\i functions/pgq.get_batch_info.sql
+
+-- Group: Failed queue browsing
+
+\i functions/pgq.failed_queue.sql
+
diff --git a/sql/pgq/structure/install.sql b/sql/pgq/structure/install.sql
new file mode 100644
index 00000000..b3c77cb6
--- /dev/null
+++ b/sql/pgq/structure/install.sql
@@ -0,0 +1,7 @@
+
+\i structure/tables.sql
+\i structure/types.sql
+\i structure/func_internal.sql
+\i structure/func_public.sql
+\i structure/triggers.sql
+
diff --git a/sql/pgq/structure/tables.sql b/sql/pgq/structure/tables.sql
new file mode 100644
index 00000000..fc56cc81
--- /dev/null
+++ b/sql/pgq/structure/tables.sql
@@ -0,0 +1,217 @@
+-- ----------------------------------------------------------------------
+-- Section: Internal Tables
+--
+-- Map to Slony-I:
+-- sl_node - pgq.consumer
+-- sl_set - pgq.queue
+-- sl_subscriber + sl_confirm - pgq.subscription
+-- sl_event - pgq.tick
+-- sl_setsync - pgq_ext.completed_*
+-- sl_log_* - slony1 has per-cluster data tables,
+-- here we do redirection in pgq.queue
+-- to have per-queue data tables.
+-- ----------------------------------------------------------------------
+
+set client_min_messages = 'warning';
+
+-- drop schema if exists pgq cascade;
+create schema pgq;
+grant usage on schema pgq to public;
+
+-- ----------------------------------------------------------------------
+-- Table: pgq.consumer
+--
+-- Name to id lookup for consumers
+--
+-- Columns:
+-- co_id - consumer's id for internal usage
+-- co_name - consumer's id for external usage
+-- ----------------------------------------------------------------------
+create table pgq.consumer (
+ co_id serial,
+ co_name text not null default 'fooz',
+
+ constraint consumer_pkey primary key (co_id),
+ constraint consumer_name_uq UNIQUE (co_name)
+);
+
+
+-- ----------------------------------------------------------------------
+-- Table: pgq.queue
+--
+-- Information about available queues
+--
+-- Columns:
+-- queue_id - queue id for internal usage
+-- queue_name - queue name visible outside
+-- queue_data - parent table for actual data tables
+-- queue_switch_step1 - tx when rotation happened
+-- queue_switch_step2 - tx after rotation was committed
+-- queue_switch_time - time when switch happened
+-- queue_ticker_max_count - batch should not contain more events
+-- queue_ticker_max_lag - events should not age more
+-- queue_ticker_idle_period - how often to tick when no events happen
+-- ----------------------------------------------------------------------
+create table pgq.queue (
+ queue_id serial,
+ queue_name text not null,
+
+ queue_ntables integer not null default 3,
+ queue_cur_table integer not null default 0,
+ queue_rotation_period interval not null default '2 hours',
+ queue_switch_step1 bigint not null default get_current_txid(),
+ queue_switch_step2 bigint default get_current_txid(),
+ queue_switch_time timestamptz not null default now(),
+
+ queue_external_ticker boolean not null default false,
+ queue_ticker_max_count integer not null default 500,
+ queue_ticker_max_lag interval not null default '3 seconds',
+ queue_ticker_idle_period interval not null default '1 minute',
+
+ queue_data_pfx text not null,
+ queue_event_seq text not null,
+ queue_tick_seq text not null,
+
+ constraint queue_pkey primary key (queue_id),
+ constraint queue_name_uq unique (queue_name)
+);
+
+-- ----------------------------------------------------------------------
+-- Table: pgq.tick
+--
+-- Snapshots for event batching
+--
+-- Columns:
+-- tick_queue - queue id whose tick it is
+-- tick_id - ticks id (per-queue)
+-- tick_time - time when tick happened
+-- tick_snapshot
+-- ----------------------------------------------------------------------
+create table pgq.tick (
+ tick_queue int4 not null,
+ tick_id bigint not null,
+ tick_time timestamptz not null default now(),
+ tick_snapshot txid_snapshot not null default get_current_snapshot(),
+
+ constraint tick_pkey primary key (tick_queue, tick_id),
+ constraint tick_queue_fkey foreign key (tick_queue)
+ references pgq.queue (queue_id)
+);
+
+-- ----------------------------------------------------------------------
+-- Sequence: pgq.batch_id_seq
+--
+-- Sequence for batch id's.
+-- ----------------------------------------------------------------------
+
+create sequence pgq.batch_id_seq;
+-- ----------------------------------------------------------------------
+-- Table: pgq.subscription
+--
+-- Consumer registration on a queue
+--
+-- Columns:
+--
+-- sub_id - subscription id for internal usage
+-- sub_queue - queue id
+-- sub_consumer - consumer's id
+-- sub_tick - last tick the consumer processed
+-- sub_batch - shortcut for queue_id/consumer_id/tick_id
+-- sub_next_tick -
+-- ----------------------------------------------------------------------
+create table pgq.subscription (
+ sub_id serial not null,
+ sub_queue int4 not null,
+ sub_consumer int4 not null,
+ sub_last_tick bigint not null,
+ sub_active timestamptz not null default now(),
+ sub_batch bigint,
+ sub_next_tick bigint,
+
+ constraint subscription_pkey primary key (sub_id),
+ constraint sub_queue_fkey foreign key (sub_queue)
+ references pgq.queue (queue_id),
+ constraint sub_consumer_fkey foreign key (sub_consumer)
+ references pgq.consumer (co_id)
+);
+
+
+-- ----------------------------------------------------------------------
+-- Table: pgq.event_template
+--
+-- Parent table for all event tables
+--
+-- Columns:
+-- ev_id - event's id, supposed to be unique per queue
+-- ev_time - when the event was inserted
+-- ev_txid - transaction id which inserted the event
+-- ev_owner - subscription id that wanted to retry this
+-- ev_retry - how many times the event has been retried, NULL for new events
+-- ev_type - consumer/producer can specify what the data fields contain
+-- ev_data - data field
+-- ev_extra1 - extra data field
+-- ev_extra2 - extra data field
+-- ev_extra3 - extra data field
+-- ev_extra4 - extra data field
+-- ----------------------------------------------------------------------
+create table pgq.event_template (
+ ev_id bigint not null,
+ ev_time timestamptz not null,
+
+ ev_txid bigint not null default get_current_txid(),
+ ev_owner int4,
+ ev_retry int4,
+
+ ev_type text,
+ ev_data text,
+ ev_extra1 text,
+ ev_extra2 text,
+ ev_extra3 text,
+ ev_extra4 text
+);
+
+-- ----------------------------------------------------------------------
+-- Table: pgq.retry_queue
+--
+-- Events to be retried
+--
+-- Columns:
+-- ev_retry_after - time when it should be re-inserted to main queue
+-- ----------------------------------------------------------------------
+create table pgq.retry_queue (
+ ev_retry_after timestamptz not null,
+
+ like pgq.event_template,
+
+ constraint rq_pkey primary key (ev_owner, ev_id),
+ constraint rq_owner_fkey foreign key (ev_owner)
+ references pgq.subscription (sub_id)
+);
+alter table pgq.retry_queue alter column ev_owner set not null;
+alter table pgq.retry_queue alter column ev_txid drop not null;
+create index rq_retry_idx on pgq.retry_queue (ev_retry_after);
+
+-- ----------------------------------------------------------------------
+-- Table: pgq.failed_queue
+--
+-- Events whose processing failed
+--
+-- Columns:
+-- ev_failed_reason - consumer's excuse for not processing
+-- ev_failed_time - when it was tagged failed
+-- ----------------------------------------------------------------------
+create table pgq.failed_queue (
+ ev_failed_reason text,
+ ev_failed_time timestamptz not null,
+
+ -- all event fields
+ like pgq.event_template,
+
+ constraint fq_pkey primary key (ev_owner, ev_id),
+ constraint fq_owner_fkey foreign key (ev_owner)
+ references pgq.subscription (sub_id)
+);
+alter table pgq.failed_queue alter column ev_owner set not null;
+alter table pgq.failed_queue alter column ev_txid drop not null;
+
+
diff --git a/sql/pgq/structure/triggers.sql b/sql/pgq/structure/triggers.sql
new file mode 100644
index 00000000..e732347f
--- /dev/null
+++ b/sql/pgq/structure/triggers.sql
@@ -0,0 +1,8 @@
+
+-- Section: Public Triggers
+
+-- Group: Trigger Functions
+
+\i triggers/pgq.logutriga.sql
+\i triggers/pgq.sqltriga.sql
+
diff --git a/sql/pgq/structure/types.sql b/sql/pgq/structure/types.sql
new file mode 100644
index 00000000..c89ce500
--- /dev/null
+++ b/sql/pgq/structure/types.sql
@@ -0,0 +1,47 @@
+
+create type pgq.ret_queue_info as (
+ queue_name text,
+ queue_ntables integer,
+ queue_cur_table integer,
+ queue_rotation_period interval,
+ queue_switch_time timestamptz,
+ queue_external_ticker boolean,
+ queue_ticker_max_count integer,
+ queue_ticker_max_lag interval,
+ queue_ticker_idle_period interval,
+ ticker_lag interval
+);
+
+create type pgq.ret_consumer_info as (
+ queue_name text,
+ consumer_name text,
+ lag interval,
+ last_seen interval
+);
+
+create type pgq.ret_batch_info as (
+ queue_name text,
+ consumer_name text,
+ batch_start timestamptz,
+ batch_end timestamptz,
+ prev_tick_id bigint,
+ tick_id bigint,
+ lag interval
+);
+
+
+create type pgq.ret_batch_event as (
+ ev_id bigint,
+ ev_time timestamptz,
+
+ ev_txid bigint,
+ ev_retry int4,
+
+ ev_type text,
+ ev_data text,
+ ev_extra1 text,
+ ev_extra2 text,
+ ev_extra3 text,
+ ev_extra4 text
+);
+