summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--configure.ac1
-rw-r--r--python/londiste/playback.py4
-rw-r--r--python/pgq/cascade/admin.py92
-rw-r--r--python/skytools/__init__.py2
-rw-r--r--python/skytools/parsing.py58
-rw-r--r--sql/londiste/functions/londiste.create_partition.sql1
-rw-r--r--sql/londiste/structure/grants.ini20
-rw-r--r--sql/pgq/structure/grants.ini10
-rw-r--r--sql/pgq_node/structure/grants.ini18
-rwxr-xr-xtests/londiste/regen.sh11
10 files changed, 187 insertions, 30 deletions
diff --git a/configure.ac b/configure.ac
index 2b0698d9..fbc36fe7 100644
--- a/configure.ac
+++ b/configure.ac
@@ -145,6 +145,7 @@ dnl Postres headers on Solaris define incompat unsetenv without that
AC_CHECK_FUNCS(unsetenv)
dnl Optional use of libevent
+AC_SEARCH_LIBS(clock_gettime, rt)
AC_USUAL_LIBEVENT_OPT
dnl Needed on SmartOS (Solaris)
diff --git a/python/londiste/playback.py b/python/londiste/playback.py
index 4f509dcf..4fa87014 100644
--- a/python/londiste/playback.py
+++ b/python/londiste/playback.py
@@ -277,6 +277,10 @@ class Replicator(CascadedWorker):
# target database
db = dbname=somedb host=127.0.0.1
+ # public connect string for target node, which other nodes use
+ # to access this one.
+ #public_node_location =
+
# how many tables can be copied in parallel
#parallel_copies = 1
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py
index ed44dad7..dd9215d8 100644
--- a/python/pgq/cascade/admin.py
+++ b/python/pgq/cascade/admin.py
@@ -29,9 +29,9 @@ command_usage = """\
%prog [options] INI CMD [subcmd args]
Node Initialization:
- create-root NAME PUBLIC_CONNSTR
- create-branch NAME PUBLIC_CONNSTR --provider=<public_connstr>
- create-leaf NAME PUBLIC_CONNSTR --provider=<public_connstr>
+ create-root NAME [PUBLIC_CONNSTR]
+ create-branch NAME [PUBLIC_CONNSTR] --provider=<public_connstr>
+ create-leaf NAME [PUBLIC_CONNSTR] --provider=<public_connstr>
Initializes node.
Node Administration:
@@ -141,24 +141,43 @@ class CascadeAdmin(skytools.AdminScript):
db = self.get_database("db")
self.install_code(db)
- def cmd_create_root(self, node_name, node_location):
- return self.create_node('root', node_name, node_location)
+ def cmd_create_root(self, node_name, *args):
+ return self.create_node('root', node_name, args)
- def cmd_create_branch(self, node_name, node_location):
- return self.create_node('branch', node_name, node_location)
+ def cmd_create_branch(self, node_name, *args):
+ return self.create_node('branch', node_name, args)
- def cmd_create_leaf(self, node_name, node_location):
- return self.create_node('leaf', node_name, node_location)
+ def cmd_create_leaf(self, node_name, *args):
+ return self.create_node('leaf', node_name, args)
- def create_node(self, node_type, node_name, node_location):
+ def create_node(self, node_type, node_name, args):
"""Generic node init."""
provider_loc = self.options.provider
if node_type not in ('root', 'branch', 'leaf'):
raise Exception('unknown node type')
+ # load public location
+ if len(args) > 1:
+ raise UsageError('Too many args, only public connect string allowed')
+ elif len(args) == 1:
+ node_location = args[0]
+ else:
+ node_location = self.cf.get('public_node_location', '')
+ if not node_location:
+ raise UsageError('Node public location must be given either in command line or config')
+
+ # check if sane
+ ok = 0
+ for k, v in skytools.parse_connect_string(node_location):
+ if k in ('host', 'service'):
+ ok = 1
+ break
+ if not ok:
+ raise UsageError('No host= in public connect string, bad idea')
+
# connect to database
- db = self.get_database("new_node", connstr = node_location)
+ db = self.get_database("db")
# check if code is installed
self.install_code(db)
@@ -170,6 +189,9 @@ class CascadeAdmin(skytools.AdminScript):
self.log.info("Node is already initialized as %s", info['node_type'])
return
+ # check if public connstr is sane
+ self.check_public_connstr(db, node_location)
+
self.log.info("Initializing node")
node_attrs = {}
@@ -257,6 +279,43 @@ class CascadeAdmin(skytools.AdminScript):
self.log.info("Done")
+ def check_public_connstr(self, db, pub_connstr):
+ """Look if public and local connect strings point to same db's.
+ """
+ pub_db = self.get_database("pub_db", connstr = pub_connstr)
+ curs1 = db.cursor()
+ curs2 = pub_db.cursor()
+ q = "select oid, datname, txid_current() as txid, txid_current_snapshot() as snap"\
+ " from pg_catalog.pg_database where datname = current_database()"
+ curs1.execute(q)
+ res1 = curs1.fetchone()
+ db.commit()
+
+ curs2.execute(q)
+ res2 = curs2.fetchone()
+ pub_db.commit()
+
+ curs1.execute(q)
+ res3 = curs1.fetchone()
+ db.commit()
+
+ self.close_database("pub_db")
+
+ failure = 0
+ if (res1['oid'], res1['datname']) != (res2['oid'], res2['datname']):
+ failure += 1
+
+ sn1 = skytools.Snapshot(res1['snap'])
+ tx = res2['txid']
+ sn2 = skytools.Snapshot(res3['snap'])
+ if sn1.contains(tx):
+ failure += 2
+ elif not sn2.contains(tx):
+ failure += 4
+
+ if failure:
+ raise UsageError("Public connect string points to different database than local connect string (fail=%d)" % failure)
+
def extra_init(self, node_type, node_db, provider_db):
"""Callback to do specific init."""
pass
@@ -363,12 +422,17 @@ class CascadeAdmin(skytools.AdminScript):
nodes = Queue.Queue()
# launch workers and wait
- n = max (min (members.qsize() >> 2, 100), 1)
- for i in range(n):
+ num_nodes = len(self.queue_info.member_map)
+ num_threads = max (min (num_nodes / 4, 100), 1)
+ tlist = []
+ for i in range(num_threads):
t = threading.Thread (target = self._cmd_status_worker, args = (members, nodes))
t.daemon = True
t.start()
- members.join()
+ tlist.append(t)
+ #members.join()
+ for t in tlist:
+ t.join()
while True:
try:
diff --git a/python/skytools/__init__.py b/python/skytools/__init__.py
index 8f2c52a3..048d41fb 100644
--- a/python/skytools/__init__.py
+++ b/python/skytools/__init__.py
@@ -47,7 +47,9 @@ _symbols = {
# skytools.parsing
'dedent': 'skytools.parsing:dedent',
'hsize_to_bytes': 'skytools.parsing:hsize_to_bytes',
+ 'merge_connect_string': 'skytools.parsing:merge_connect_string',
'parse_acl': 'skytools.parsing:parse_acl',
+ 'parse_connect_string': 'skytools.parsing:parse_connect_string',
'parse_logtriga_sql': 'skytools.parsing:parse_logtriga_sql',
'parse_pgarray': 'skytools.parsing:parse_pgarray',
'parse_sqltriga_sql': 'skytools.parsing:parse_sqltriga_sql',
diff --git a/python/skytools/parsing.py b/python/skytools/parsing.py
index decc7e7e..318b1bf9 100644
--- a/python/skytools/parsing.py
+++ b/python/skytools/parsing.py
@@ -7,7 +7,8 @@ import skytools
__all__ = [
"parse_pgarray", "parse_logtriga_sql", "parse_tabbed_table",
"parse_statements", 'sql_tokenizer', 'parse_sqltriga_sql',
- "parse_acl", "dedent", "hsize_to_bytes"]
+ "parse_acl", "dedent", "hsize_to_bytes",
+ "parse_connect_string", "merge_connect_string"]
_rc_listelem = re.compile(r'( [^,"}]+ | ["] ( [^"\\]+ | [\\]. )* ["] )', re.X)
@@ -445,6 +446,61 @@ def hsize_to_bytes (input):
bytes = int(m.group(1)) * 1024 ** units.index(m.group(2).upper())
return bytes
+#
+# Connect string parsing
+#
+
+_cstr_rx = r""" \s* (\w+) \s* = \s* ( ' ( \\.| [^'\\] )* ' | \S+ ) \s* """
+_cstr_unesc_rx = r"\\(.)"
+_cstr_badval_rx = r"[\s'\\]"
+_cstr_rc = None
+_cstr_unesc_rc = None
+_cstr_badval_rc = None
+
+def parse_connect_string(cstr):
+ r"""Parse Postgres connect string.
+
+ >>> parse_connect_string("host=foo")
+ [('host', 'foo')]
+ >>> parse_connect_string(r" host = foo password = ' f\\\o\'o ' ")
+ [('host', 'foo'), ('password', "' f\\o'o '")]
+ """
+ global _cstr_rc, _cstr_unesc_rc
+ if not _cstr_rc:
+ _cstr_rc = re.compile(_cstr_rx, re.X)
+ _cstr_unesc_rc = re.compile(_cstr_unesc_rx)
+ pos = 0
+ res = []
+ while pos < len(cstr):
+ m = _cstr_rc.match(cstr, pos)
+ if not m:
+ raise ValueError('Invalid connect string')
+ pos = m.end()
+ k = m.group(1)
+ v = m.group(2)
+ if v[0] == "'":
+ v = _cstr_unesc_rc.sub(r"\1", v)
+ res.append( (k,v) )
+ return res
+
+def merge_connect_string(cstr_arg_list):
+ """Put fragments back together.
+
+ >>> merge_connect_string([('host', 'ip'), ('pass', ''), ('x', ' ')])
+ "host=ip pass='' x=' '"
+ """
+ global _cstr_badval_rc
+ if not _cstr_badval_rc:
+ _cstr_badval_rc = re.compile(_cstr_badval_rx)
+
+ buf = []
+ for k, v in cstr_arg_list:
+ if not v or _cstr_badval_rc.search(v):
+ v = v.replace('\\', r'\\')
+ v = v.replace("'", r"\'")
+ v = "'" + v + "'"
+ buf.append("%s=%s" % (k, v))
+ return ' '.join(buf)
if __name__ == '__main__':
import doctest
diff --git a/sql/londiste/functions/londiste.create_partition.sql b/sql/londiste/functions/londiste.create_partition.sql
index 7528acdc..1a59e2ee 100644
--- a/sql/londiste/functions/londiste.create_partition.sql
+++ b/sql/londiste/functions/londiste.create_partition.sql
@@ -235,6 +235,7 @@ begin
if r_extra is not null then
sql := 'ALTER TABLE ' || fq_part || r_extra
|| quote_ident(r.rulename);
+ execute sql;
end if;
end loop;
diff --git a/sql/londiste/structure/grants.ini b/sql/londiste/structure/grants.ini
index df88dacc..ca2a3765 100644
--- a/sql/londiste/structure/grants.ini
+++ b/sql/londiste/structure/grants.ini
@@ -1,13 +1,13 @@
[GrantFu]
# roles that we maintain in this file
-roles = londiste_writer, londiste_reader, public
+roles = londiste_writer, londiste_reader, public, pgq_admin
[1.tables]
on.tables = londiste.table_info, londiste.seq_info, londiste.pending_fkeys, londiste.applied_execute
-londiste_writer = select, insert, update, delete
+pgq_admin = select, insert, update, delete
londiste_reader = select
# backwards compat, should be dropped?
@@ -25,10 +25,20 @@ londiste_reader = execute
londiste_writer = execute
-[3.local.node]
+[4.local.node]
on.functions = %(londiste_local_fns)s, %(londiste_internal_fns)s
londiste_writer = execute
+[5.seqs]
+londiste_writer = usage
+on.sequences =
+ londiste.table_info_nr_seq,
+ londiste.seq_info_nr_seq
+
+[6.maint]
+pgq_admin = execute
+on.functions = londiste.periodic_maintenance()
+
# define various groups of functions
[DEFAULT]
@@ -86,5 +96,7 @@ londiste_local_fns =
londiste.drop_table_triggers(text, text),
londiste.table_info_trigger(),
londiste.create_partition(text, text, text, text, timestamptz, text),
- londiste.drop_obsolete_partitions (text, interval, text)
+ londiste.drop_obsolete_partitions (text, interval, text),
+ londiste.create_trigger(text,text,text[],text,text)
+
diff --git a/sql/pgq/structure/grants.ini b/sql/pgq/structure/grants.ini
index 451695da..b83d27c5 100644
--- a/sql/pgq/structure/grants.ini
+++ b/sql/pgq/structure/grants.ini
@@ -24,16 +24,22 @@ on.tables =
pgq.queue,
pgq.tick,
pgq.subscription
+pgq_admin = select, insert, update, delete
pgq_reader = select
public = select
[5.event.tables]
-on.tables = pgq.event_template, pgq.retry_queue
+on.tables = pgq.event_template
pgq_reader = select
+pgq_admin = select, truncate
# drop public access to events
public =
+[6.retry.event]
+on.tables = pgq.retry_queue
+pgq_admin = select, insert, update, delete
+
#
# define various groups of functions
@@ -68,6 +74,7 @@ pgq_read_fns =
pgq.event_retry(bigint, bigint, timestamptz),
pgq.event_retry(bigint, bigint, integer),
pgq.batch_retry(bigint, integer),
+ pgq.force_tick(text),
pgq.finish_batch(bigint)
pgq_write_fns =
@@ -90,7 +97,6 @@ pgq_system_fns =
pgq.grant_perms(text),
pgq._grant_perms_from(text,text,text,text),
pgq.tune_storage(text),
- pgq.force_tick(text),
pgq.seq_setval(text, int8),
pgq.create_queue(text),
pgq.drop_queue(text, bool),
diff --git a/sql/pgq_node/structure/grants.ini b/sql/pgq_node/structure/grants.ini
index d1cc4558..ef095b6d 100644
--- a/sql/pgq_node/structure/grants.ini
+++ b/sql/pgq_node/structure/grants.ini
@@ -28,10 +28,24 @@ pgq_admin = execute
on.functions = %(pgq_node_admin_fns)s
pgq_admin = execute
+[5.tables]
+pgq_reader = select
+pgq_writer = select
+pgq_admin = select, insert, update, delete
+on.tables =
+ pgq_node.node_location,
+ pgq_node.node_info,
+ pgq_node.local_state,
+ pgq_node.subscriber_info
+
# define various groups of functions
[DEFAULT]
pgq_node_remote_fns =
+ pgq_node.register_location(text, text, text, boolean),
+ pgq_node.unregister_location(text, text),
+ pgq_node.get_consumer_info(text),
+ pgq_node.get_consumer_state(text, text),
pgq_node.get_queue_locations(text),
pgq_node.get_node_info(text),
pgq_node.get_subscriber_info(text),
@@ -49,8 +63,6 @@ pgq_node_admin_fns =
pgq_node.maint_watermark(text)
pgq_node_consumer_fns =
- pgq_node.get_consumer_info(text),
- pgq_node.get_consumer_state(text, text),
pgq_node.register_consumer(text, text, text, int8),
pgq_node.unregister_consumer(text, text),
pgq_node.change_consumer_provider(text, text, text),
@@ -60,8 +72,6 @@ pgq_node_consumer_fns =
pgq_node.set_consumer_error(text, text, text)
pgq_node_worker_fns =
- pgq_node.register_location(text, text, text, boolean),
- pgq_node.unregister_location(text, text),
pgq_node.create_node(text, text, text, text, text, bigint, text),
pgq_node.drop_node(text, text),
pgq_node.demote_root(text, int4, text),
diff --git a/tests/londiste/regen.sh b/tests/londiste/regen.sh
index b40ae4ec..031d29ad 100755
--- a/tests/londiste/regen.sh
+++ b/tests/londiste/regen.sh
@@ -33,6 +33,7 @@ cat > conf/londiste_$db.ini <<EOF
[londiste3]
job_name = londiste_$db
db = dbname=$db
+public_node_location = dbname=$db host=/tmp
queue_name = replika
logfile = log/%(job_name)s.log
pidfile = pid/%(job_name)s.pid
@@ -68,11 +69,11 @@ run cat conf/pgqd.ini
run cat conf/londiste_db1.ini
msg "Install londiste3 and initialize nodes"
-run londiste3 $v conf/londiste_db1.ini create-root node1 'dbname=db1'
-run londiste3 $v conf/londiste_db2.ini create-branch node2 'dbname=db2' --provider='dbname=db1'
-run londiste3 $v conf/londiste_db3.ini create-branch node3 'dbname=db3' --provider='dbname=db1'
-run londiste3 $v conf/londiste_db4.ini create-branch node4 'dbname=db4' --provider='dbname=db2' --sync-watermark=node4,node5
-run londiste3 $v conf/londiste_db5.ini create-branch node5 'dbname=db5' --provider='dbname=db3' --sync-watermark=node4,node5
+run londiste3 $v conf/londiste_db1.ini create-root node1
+run londiste3 $v conf/londiste_db2.ini create-branch node2 --provider='dbname=db1'
+run londiste3 $v conf/londiste_db3.ini create-branch node3 --provider='dbname=db1'
+run londiste3 $v conf/londiste_db4.ini create-branch node4 --provider='dbname=db2' --sync-watermark=node4,node5
+run londiste3 $v conf/londiste_db5.ini create-branch node5 --provider='dbname=db3' --sync-watermark=node4,node5
msg "Run ticker"
run pgqd $v -d conf/pgqd.ini