summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormartinko2013-08-09 14:03:34 +0000
committermartinko2013-08-09 14:03:34 +0000
commit49ac8090682a5d76b577eeade3e29c9e4d83650c (patch)
tree0bd7141f24ccaa9cdaf2120006263bfe47d4f36a
parentaae5c5b1dde3e456189cdff0fa1cd7b5cb369aa8 (diff)
parent344d063d4e61bdd382d9e1977964fa1fe6363991 (diff)
Merge branch 'release/skytools_3_1_5'skytools_3_1_5
-rw-r--r--INSTALL2
-rw-r--r--NEWS53
-rw-r--r--configure.ac3
-rw-r--r--debian/changelog6
-rw-r--r--doc/londiste3.txt3
-rw-r--r--doc/skytools3.txt33
-rw-r--r--doc/walmgr3.txt13
-rw-r--r--python/londiste/handler.py18
-rw-r--r--python/londiste/handlers/__init__.py2
-rw-r--r--python/londiste/handlers/dispatch.py59
-rw-r--r--python/londiste/handlers/shard.py (renamed from python/londiste/handlers/part.py)73
-rw-r--r--python/londiste/playback.py4
-rw-r--r--python/londiste/repair.py27
-rw-r--r--python/londiste/setup.py93
-rw-r--r--python/londiste/util.py22
-rw-r--r--python/pgq/baseconsumer.py5
-rw-r--r--python/pgq/consumer.py35
-rw-r--r--python/pgq/event.py28
-rw-r--r--python/skytools/gzlog.py7
-rwxr-xr-xpython/skytools/querybuilder.py9
-rw-r--r--python/skytools/scripting.py5
-rw-r--r--python/skytools/skylog.py62
-rw-r--r--python/skytools/sockutil.py11
-rw-r--r--python/skytools/timeutil.py18
-rw-r--r--python/skytools/tnetstrings.py115
-rwxr-xr-xpython/walmgr.py251
-rw-r--r--scripts/data_maintainer.py4
-rwxr-xr-xscripts/simple_local_consumer.py6
-rw-r--r--sql/pgq/triggers/stringutil.c2
-rw-r--r--sql/pgq_node/functions/pgq_node.get_node_info.sql3
30 files changed, 755 insertions, 217 deletions
diff --git a/INSTALL b/INSTALL
index f5772337..0ba65c22 100644
--- a/INSTALL
+++ b/INSTALL
@@ -52,7 +52,7 @@ suitable for installation. The following additional packages are needed
to build the debian package:
devscripts autotools-dev python-all-dev python-support xmlto asciidoc
- libpq-dev postgresql-server-dev-all
+ libevent-dev libpq-dev postgresql-server-dev-all
Then build:
diff --git a/NEWS b/NEWS
index 6f8d020a..943b59db 100644
--- a/NEWS
+++ b/NEWS
@@ -1,5 +1,49 @@
-2012-04-17 - SkyTools 3.1.4 - "Boldly Going Nowhere"
+2013-07-31 - SkyTools 3.1.5 - "Caution, Blind Man Driving"
+
+ = Features =
+
+ * walmgr: new command createslave and new option --synch-standby
+ * londiste: Dispatch handler extended to support sharding
+ * skytools.skylog: added UdpTNetStringsHandler
+
+ = Minor features =
+
+ * londiste add: --skip-non-existing option
+ * londiste add-table: --skip-non-existing ignores tables not on provider
+ * londiste add-table: --find-copy-node working with --create now
+ * londiste resync: supports --find-copy-node and --copy-node options now
+ * londiste resync: now checks if table is available on provider
+ * londiste --wait-sync: show progress in absolute numbers
+ * londiste.handlers.dispatch: added switch to ignore events aiming at obsolete (dropped) partitions
+ * querybuilder: show list of missing arguments on KeyError
+ * scripts/simple_local_consumer.py: added consumer_filter option
+ * skytools.sockutil.set_tcp_keepalive: accept socket object directly
+
+ = Fixes =
+
+ * londiste copy: fix data filtering when parallel_copies > 1
+ * londiste.playback: support multiple -v options for workers
+ * londiste.repair: made less verbose
+ * pgq.Consumer: non-lazy loading should use RetriableEvent
+ * pgq.logutriga: do not quote '-'
+ * grantfu: 2-pass processing
+
+ = Cleanups =
+
+ * londiste: inform about no tables passed on cmd line (thus nothing to do)
+ * doc: mention config option in create-* commands
+ * parse_pgarray: allow None
+ * londiste status: make node name easier to select
+ * scripts/data_maintainer.py: added suffix "3" to service name
+ * skytools.timeutil: make tests more robust
+ * londiste: changed handler argument 'key' to 'hash_key'
+ * londiste: moved hash_key logic from dispatch to part handler
+ * londiste.handlers: renamed handler 'part' to 'shard'
+ * pgq: Consumer class also need _make_event, for _load_batch_events_old()
+ * pgq: move RetriableEvent to consumer.py
+
+2013-04-17 - SkyTools 3.1.4 - "Boldly Going Nowhere"
= Features =
@@ -42,9 +86,9 @@
* Refactor Consumer code.
* Remove: pgqadm and related code, its unmaintained
- * weeping change to postpone log string formatting
+ * Sweeping change to postpone log string formatting
* docs: copy-condition removed from londiste3 man page
- * Varios logging cleanups
+ * Various logging cleanups
* Londiste: fix trigger generation when extra params that are add-table specific are used (introduced in commit 364ade9)
* londiste: quote queue name in trigger args
* londiste: actually execute the ENABLE/DISABLE RULE query in londiste.create_partition
@@ -106,7 +150,7 @@
* Fix Londiste compare and repair to accept copy-condition from handler (Asko Oja)
* londiste: don't filter EXECUTE or TRUNCATE events on merge node
* qadmin: make "install pgq" also install pgq_coop
- * Fix 2.1 to 3.0 upgade script, rename to pgq.upgrade_2.1_to_3.0.sql
+ * Fix 2.1 to 3.0 upgrade script, rename to pgq.upgrade_2.1_to_3.0.sql
* Reorg of Londiste tests (Asko Oja)
* Socket library autoconf (Tony Arkles)
* pgq: quote internal table names
@@ -320,4 +364,3 @@
* Londiste: handlers
* QAdmin
* pgqd
-
diff --git a/configure.ac b/configure.ac
index f3829de0..b40b58d5 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1,6 +1,6 @@
dnl Process this file with autoconf to produce a configure script.
-AC_INIT(skytools, 3.1.4)
+AC_INIT(skytools, 3.1.5)
AC_CONFIG_SRCDIR(python/londiste.py)
AC_CONFIG_HEADER(lib/usual/config.h)
AC_PREREQ([2.59])
@@ -157,4 +157,3 @@ AC_USUAL_CASSERT
dnl Write result
AC_CONFIG_FILES([config.mak])
AC_OUTPUT
-
diff --git a/debian/changelog b/debian/changelog
index 3025f329..b073d363 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,9 @@
+skytools3 (3.1.5) experimental; urgency=low
+
+ * v3.1.5
+
+ -- martinko <gamato@users.sf.net> Wed, 31 Jul 2013 14:15:16 +0200
+
skytools3 (3.1.4) experimental; urgency=low
* v3.1.4
diff --git a/doc/londiste3.txt b/doc/londiste3.txt
index c0954a58..19b1f7c4 100644
--- a/doc/londiste3.txt
+++ b/doc/londiste3.txt
@@ -319,6 +319,9 @@ Do full copy of the table, again.
--max-parallel-copy='max_parallel_copy'::
Max number of parallel copy processes.
+ --skip-non-existing::
+ Skip objects that do not exist.
+
Trigger creation flags (default: AIUDL):
- I - ON INSERT
diff --git a/doc/skytools3.txt b/doc/skytools3.txt
index 70a95f79..0d346e4a 100644
--- a/doc/skytools3.txt
+++ b/doc/skytools3.txt
@@ -11,9 +11,9 @@ Keep old design from Skytools 2
- No pushing with LISTEN/NOTIFY is used for data transport.
- Administrative work happens in separate process.
- Can go down anytime, without affecting anything else.
-* Relaxed attitude about tables
- - Tables can be added/removed any time.
- - Inital data sync happens table-by-table, no attempt is made to keep
+* Relaxed attitude about tables.
+ - Tables can be added/removed at any time.
+ - Initial data sync happens table-by-table, no attempt is made to keep
consistent picture between tables during initial copy.
New features in Skytools 3
@@ -26,19 +26,19 @@ New features in Skytools 3
- For terminology and technical details see here: set.notes.txt.
* New Londiste features:
- - Parallel copy - during inital sync several tables can be
- copied at the same time. In 2.x the copy already happened in separate
- process, making it parallel was just a matter of tuning launching/syncing logic.
+ - Parallel copy - during initial sync several tables can be copied
+ at the same time. In 2.x the copy already happened in separate process,
+ making it parallel was just a matter of tuning launching/syncing logic.
- - EXECUTE command, to run random SQL script on all nodes. The script is executed
- in single a TX on root, and inserted as an event into the queue in the same TX.
- The goal is to emulate DDL AFTER TRIGGER that way.
- Londiste itself does no locking and no coordination between nodes. The assumption
- is that the DDL commands themselves do enough locking. If more locking is needed
- is can be added to script.
+ - EXECUTE command, to run random SQL script on all nodes. The script is
+ executed in single TX on root, and inserted as an event into the queue
+ in the same TX. The goal is to emulate DDL AFTER TRIGGER that way.
+ Londiste itself does no locking and no coordination between nodes.
+ The assumption is that the DDL commands themselves do enough locking.
+ If more locking is needed is can be added to script.
- Automatic table or sequence creation by importing the structure
- from provider node. Activeted with --create switch for add-table, add-seq.
+ from provider node. Activated with --create switch for add-table, add-seq.
By default *everything* is copied, including Londiste own triggers.
The basic idea is that the triggers may be customized and that way
we avoid the need to keep track of trigger customizations.
@@ -58,8 +58,8 @@ New features in Skytools 3
- Target table can use different name (--dest-table)
-* New interactive admin console - qadmin. Because long command lines are not very
- user-friendly, this is an experiment on interactive console with
+* New interactive admin console - qadmin. Because long command lines are
+ not very user-friendly, this is an experiment on interactive console with
heavy emphasis on tab-completion.
* New multi-database ticker: `pgqd`. It is possible to set up one process that
@@ -95,7 +95,7 @@ Minor improvements
* Skytools 3 modules are parallel installable with Skytools 2.
Solved via loader module (like http://faq.pygtk.org/index.py?req=all#2.4[pygtk]).
-
+
import pkgloader
pkgloader.require('skytools', '3.0')
import skytools
@@ -105,4 +105,3 @@ Further reading
---------------
* http://skytools.projects.postgresql.org/skytools-3.0/[Documentation] for skytools3.
-
diff --git a/doc/walmgr3.txt b/doc/walmgr3.txt
index 257ff2db..5ea572c1 100644
--- a/doc/walmgr3.txt
+++ b/doc/walmgr3.txt
@@ -88,6 +88,12 @@ listed below.
Remove .pgpass entry, which was used for streaming replication
(used in Slave)
+ --synch-standby='synchronous_standby_names'::
+ Do the same thing as command synch-standby, but walmgr ini file is not used.
+ This option can be used when walmgr ini is not available. It tries to guess
+ the postgres config location, --pgdata option may also be needed.
+ (used in Master)
+
== DAEMON OPTIONS ==
-r, --reload::
@@ -151,6 +157,13 @@ Pauses WAL playback.
Continues previously paused WAL playback.
+=== createslave ===
+
+Creates backup from Master database using streaming replication.
+Also creates recovery.conf and starts slave standby.
+Backup is created with pg_basebackup and pg_receivexlog (available in 9.2 and
+up).
+
== COMMON COMMANDS ==
=== listbackups ===
diff --git a/python/londiste/handler.py b/python/londiste/handler.py
index 51fa603a..287ad546 100644
--- a/python/londiste/handler.py
+++ b/python/londiste/handler.py
@@ -145,7 +145,7 @@ class BaseHandler:
def real_copy(self, src_tablename, src_curs, dst_curs, column_list):
"""do actual table copy and return tuple with number of bytes and rows
- copyed
+ copied
"""
condition = self.get_copy_condition(src_curs, dst_curs)
return skytools.full_copy(src_tablename, src_curs, dst_curs,
@@ -178,9 +178,9 @@ class TableHandler(BaseHandler):
enc = args.get('encoding')
if enc:
- self.enc = EncodingValidator(self.log, enc)
+ self.encoding_validator = EncodingValidator(self.log, enc)
else:
- self.enc = None
+ self.encoding_validator = None
def process_event(self, ev, sql_queue_func, arg):
row = self.parse_row_data(ev)
@@ -212,13 +212,13 @@ class TableHandler(BaseHandler):
if len(ev.type) == 1:
if not self.allow_sql_event:
raise Exception('SQL events not supported by this handler')
- if self.enc:
- return self.enc.validate_string(ev.data, self.table_name)
+ if self.encoding_validator:
+ return self.encoding_validator.validate_string(ev.data, self.table_name)
return ev.data
else:
row = skytools.db_urldecode(ev.data)
- if self.enc:
- return self.enc.validate_dict(row, self.table_name)
+ if self.encoding_validator:
+ return self.encoding_validator.validate_dict(row, self.table_name)
return row
def real_copy(self, src_tablename, src_curs, dst_curs, column_list):
@@ -226,9 +226,9 @@ class TableHandler(BaseHandler):
copied
"""
- if self.enc:
+ if self.encoding_validator:
def _write_hook(obj, data):
- return self.enc.validate_copy(data, column_list, src_tablename)
+ return self.encoding_validator.validate_copy(data, column_list, src_tablename)
else:
_write_hook = None
condition = self.get_copy_condition(src_curs, dst_curs)
diff --git a/python/londiste/handlers/__init__.py b/python/londiste/handlers/__init__.py
index 92705462..b6b39100 100644
--- a/python/londiste/handlers/__init__.py
+++ b/python/londiste/handlers/__init__.py
@@ -5,7 +5,7 @@ import sys
DEFAULT_HANDLERS = [
'londiste.handlers.qtable',
'londiste.handlers.applyfn',
- 'londiste.handlers.part',
+ 'londiste.handlers.shard',
'londiste.handlers.multimaster',
'londiste.handlers.vtable',
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py
index 758034c7..0b02edcd 100644
--- a/python/londiste/handlers/dispatch.py
+++ b/python/londiste/handlers/dispatch.py
@@ -138,6 +138,10 @@ post_part:
retention_period:
how long to keep partitions around. examples: '3 months', '1 year'
+ignore_old_events:
+ * 0 - handle all events in the same way (default)
+ * 1 - ignore events coming for obsolete partitions
+
encoding:
name of destination encoding. handler replaces all invalid encoding symbols
and logs them as warnings
@@ -153,17 +157,20 @@ creating or coping initial data to destination table. --expect-sync and
--skip-truncate should be used and --create switch is to be avoided.
"""
-import sys
-import datetime
import codecs
+import datetime
import re
+import sys
+from functools import partial
+
import skytools
-from londiste.handler import BaseHandler, EncodingValidator
from skytools import quote_ident, quote_fqident, UsageError
from skytools.dbstruct import *
from skytools.utf8 import safe_utf8_decode
-from functools import partial
+
+from londiste.handler import EncodingValidator
from londiste.handlers import handler_args, update
+from londiste.handlers.shard import ShardHandler
__all__ = ['Dispatcher']
@@ -618,7 +625,7 @@ ROW_HANDLERS = {'plain': RowHandler,
#------------------------------------------------------------------------------
-class Dispatcher(BaseHandler):
+class Dispatcher (ShardHandler):
"""Partitioned loader.
Splits events into partitions, if requested.
Then applies them without further processing.
@@ -630,10 +637,11 @@ class Dispatcher(BaseHandler):
# compat for dest-table
dest_table = args.get('table', dest_table)
- BaseHandler.__init__(self, table_name, args, dest_table)
+ ShardHandler.__init__(self, table_name, args, dest_table)
# show args
self.log.debug("dispatch.init: table_name=%r, args=%r", table_name, args)
+ self.ignored_tables = set()
self.batch_info = None
self.dst_curs = None
self.pkeys = None
@@ -641,11 +649,6 @@ class Dispatcher(BaseHandler):
self.conf = self.get_config()
hdlr_cls = ROW_HANDLERS[self.conf.row_mode]
self.row_handler = hdlr_cls(self.log)
- if self.conf.encoding:
- self.encoding_validator = EncodingValidator(self.log,
- self.conf.encoding)
- else:
- self.encoding_validator = None
def _parse_args_from_doc (self):
doc = __doc__
@@ -688,6 +691,7 @@ class Dispatcher(BaseHandler):
conf.post_part = self.args.get('post_part')
conf.part_func = self.args.get('part_func', PART_FUNC_NEW)
conf.retention_period = self.args.get('retention_period')
+ conf.ignore_old_events = self.get_arg('ignore_old_events', [0, 1], 0)
# set row mode and event types to process
conf.row_mode = self.get_arg('row_mode', ROW_MODES)
event_types = self.args.get('event_types', '*')
@@ -717,8 +721,6 @@ class Dispatcher(BaseHandler):
conf.field_map[tmp[0]] = tmp[0]
else:
conf.field_map[tmp[0]] = tmp[1]
- # encoding validator
- conf.encoding = self.args.get('encoding')
return conf
def get_arg(self, name, value_list, default = None):
@@ -728,17 +730,20 @@ class Dispatcher(BaseHandler):
raise Exception('Bad argument %s value %r' % (name, val))
return val
+ def _validate_hash_key(self):
+ pass # no need for hash key when not sharding
+
def reset(self):
"""Called before starting to process a batch.
Should clean any pending data."""
- BaseHandler.reset(self)
+ ShardHandler.reset(self)
def prepare_batch(self, batch_info, dst_curs):
"""Called on first event for this table in current batch."""
if self.conf.table_mode != 'ignore':
self.batch_info = batch_info
self.dst_curs = dst_curs
- #BaseHandler.prepare_batch(self, batch_info, dst_curs)
+ ShardHandler.prepare_batch(self, batch_info, dst_curs)
def filter_data(self, data):
"""Process with fields skip and map"""
@@ -763,7 +768,7 @@ class Dispatcher(BaseHandler):
pkeys = [fmap[p] for p in pkeys if p in fmap]
return pkeys
- def process_event(self, ev, sql_queue_func, arg):
+ def _process_event(self, ev, sql_queue_func, arg):
"""Process a event.
Event should be added to sql_queue or executed directly.
"""
@@ -781,6 +786,7 @@ class Dispatcher(BaseHandler):
raise Exception('Unknown event type: %s' % ev.ev_type)
# process only operations specified
if not op in self.conf.event_types:
+ #self.log.debug('dispatch.process_event: ignored event type')
return
self.log.debug('dispatch.process_event: %s/%s', ev.ev_type, ev.ev_data)
if self.pkeys is None:
@@ -789,22 +795,25 @@ class Dispatcher(BaseHandler):
# prepare split table when needed
if self.conf.table_mode == 'part':
dst, part_time = self.split_format(ev, data)
+ if dst in self.ignored_tables:
+ return
if dst not in self.row_handler.table_map:
self.check_part(dst, part_time)
+ if dst in self.ignored_tables:
+ return
else:
dst = self.dest_table
if dst not in self.row_handler.table_map:
self.row_handler.add_table(dst, LOADERS[self.conf.load_mode],
- self.pkeys, self.conf)
+ self.pkeys, self.conf)
self.row_handler.process(dst, op, data)
- #BaseHandler.process_event(self, ev, sql_queue_func, arg)
def finish_batch(self, batch_info, dst_curs):
"""Called when batch finishes."""
if self.conf.table_mode != 'ignore':
self.row_handler.flush(dst_curs)
- #BaseHandler.finish_batch(self, batch_info, dst_curs)
+ #ShardHandler.finish_batch(self, batch_info, dst_curs)
def get_part_name(self):
# if custom part name template given, use it
@@ -902,6 +911,8 @@ class Dispatcher(BaseHandler):
if self.conf.retention_period:
self.drop_obsolete_partitions (self.dest_table, self.conf.retention_period, self.conf.period)
+ if self.conf.ignore_old_events and not skytools.exists_table(curs, dst):
+ self.ignored_tables.add(dst) # must have been just dropped
def drop_obsolete_partitions (self, parent_table, retention_period, partition_period):
""" Drop obsolete partitions of partition-by-date parent table.
@@ -918,12 +929,17 @@ class Dispatcher(BaseHandler):
if res:
self.log.info("Dropped tables: %s", ", ".join(res))
+ def get_copy_condition(self, src_curs, dst_curs):
+ """ Prepare where condition for copy and replay filtering.
+ """
+ return ShardHandler.get_copy_condition(self, src_curs, dst_curs)
+
def real_copy(self, tablename, src_curs, dst_curs, column_list):
"""do actual table copy and return tuple with number of bytes and rows
copied
"""
_src_cols = _dst_cols = column_list
- condition = ''
+ condition = self.get_copy_condition (src_curs, dst_curs)
if self.conf.skip_fields:
_src_cols = [col for col in column_list
@@ -940,7 +956,8 @@ class Dispatcher(BaseHandler):
else:
_write_hook = None
- return skytools.full_copy(tablename, src_curs, dst_curs, _src_cols, condition,
+ return skytools.full_copy(tablename, src_curs, dst_curs,
+ _src_cols, condition,
dst_tablename = self.dest_table,
dst_column_list = _dst_cols,
write_hook = _write_hook)
diff --git a/python/londiste/handlers/part.py b/python/londiste/handlers/shard.py
index 247256e4..329c6cf4 100644
--- a/python/londiste/handlers/part.py
+++ b/python/londiste/handlers/shard.py
@@ -2,6 +2,7 @@
Parameters:
key=COLUMN: column name to use for hashing
+ hash_key=COLUMN: column name to use for hashing (overrides 'key' parameter)
hashfunc=NAME: function to use for hashing (default: partconf.get_hash_raw)
hashexpr=EXPR: full expression to use for hashing (deprecated)
encoding=ENC: validate and fix incoming data (only utf8 supported atm)
@@ -23,77 +24,89 @@ Local config:
import skytools
from londiste.handler import TableHandler
-__all__ = ['PartHandler']
+__all__ = ['ShardHandler', 'PartHandler']
-class PartHandler(TableHandler):
+class ShardHandler (TableHandler):
__doc__ = __doc__
- handler_name = 'part'
+ handler_name = 'shard'
DEFAULT_HASHFUNC = "partconf.get_hash_raw"
DEFAULT_HASHEXPR = "%s(%s)"
def __init__(self, table_name, args, dest_table):
TableHandler.__init__(self, table_name, args, dest_table)
- self.max_part = None # max part number
- self.local_part = None # part number of local node
+ self.hash_mask = None # aka max part number (atm)
+ self.shard_nr = None # part number of local node
# primary key columns
- self.key = args.get('key')
- if self.key is None:
- raise Exception('Specify key field as key argument')
+ self.hash_key = args.get('hash_key', args.get('key'))
+ self._validate_hash_key()
# hash function & full expression
hashfunc = args.get('hashfunc', self.DEFAULT_HASHFUNC)
self.hashexpr = self.DEFAULT_HASHEXPR % (
skytools.quote_fqident(hashfunc),
- skytools.quote_ident(self.key))
+ skytools.quote_ident(self.hash_key or ''))
self.hashexpr = args.get('hashexpr', self.hashexpr)
+ def _validate_hash_key(self):
+ if self.hash_key is None:
+ raise Exception('Specify hash key field as hash_key argument')
+
def reset(self):
"""Forget config info."""
- self.max_part = None
- self.local_part = None
+ self.hash_mask = None
+ self.shard_nr = None
TableHandler.reset(self)
def add(self, trigger_arg_list):
"""Let trigger put hash into extra3"""
-
arg = "ev_extra3='hash='||%s" % self.hashexpr
trigger_arg_list.append(arg)
TableHandler.add(self, trigger_arg_list)
def prepare_batch(self, batch_info, dst_curs):
"""Called on first event for this table in current batch."""
- if not self.max_part:
- self.load_part_info(dst_curs)
+ if self.hash_key is not None:
+ if not self.hash_mask:
+ self.load_shard_info(dst_curs)
TableHandler.prepare_batch(self, batch_info, dst_curs)
def process_event(self, ev, sql_queue_func, arg):
- """Filter event by hash in extra3, apply only local part."""
- if ev.extra3:
+ """Filter event by hash in extra3, apply only if for local shard."""
+ if ev.extra3 and self.hash_key is not None:
meta = skytools.db_urldecode(ev.extra3)
- self.log.debug('part.process_event: hash=%d, max_part=%s, local_part=%d',
- int(meta['hash']), self.max_part, self.local_part)
- if (int(meta['hash']) & self.max_part) != self.local_part:
- self.log.debug('part.process_event: not my event')
+ self.log.debug('shard.process_event: hash=%i, hash_mask=%i, shard_nr=%i',
+ int(meta['hash']), self.hash_mask, self.shard_nr)
+ if (int(meta['hash']) & self.hash_mask) != self.shard_nr:
+ self.log.debug('shard.process_event: not my event')
return
- self.log.debug('part.process_event: my event, processing')
+ self._process_event(ev, sql_queue_func, arg)
+
+ def _process_event(self, ev, sql_queue_func, arg):
+ self.log.debug('shard.process_event: my event, processing')
TableHandler.process_event(self, ev, sql_queue_func, arg)
def get_copy_condition(self, src_curs, dst_curs):
"""Prepare the where condition for copy and replay filtering"""
- self.load_part_info(dst_curs)
- w = "(%s & %d) = %d" % (self.hashexpr, self.max_part, self.local_part)
- self.log.debug('part: copy_condition=%s', w)
+ if self.hash_key is None:
+ return TableHandler.get_copy_condition(self, src_curs, dst_curs)
+ self.load_shard_info(dst_curs)
+ w = "(%s & %d) = %d" % (self.hashexpr, self.hash_mask, self.shard_nr)
+ self.log.debug('shard: copy_condition=%r', w)
return w
- def load_part_info(self, curs):
- """Load slot info from database."""
+ def load_shard_info(self, curs):
+ """Load part/slot info from database."""
q = "select part_nr, max_part from partconf.conf"
curs.execute(q)
- self.local_part, self.max_part = curs.fetchone()
- if self.local_part is None or self.max_part is None:
- raise Exception('Error loading part info')
+ self.shard_nr, self.hash_mask = curs.fetchone()
+ if self.shard_nr is None or self.hash_mask is None:
+ raise Exception('Error loading shard info')
+
+class PartHandler (ShardHandler):
+ __doc__ = "Deprecated compat name for shard handler.\n" + __doc__.split('\n',1)[1]
+ handler_name = 'part'
# register handler class
-__londiste_handlers__ = [PartHandler]
+__londiste_handlers__ = [ShardHandler, PartHandler]
diff --git a/python/londiste/playback.py b/python/londiste/playback.py
index 73b6d298..8d0b2a94 100644
--- a/python/londiste/playback.py
+++ b/python/londiste/playback.py
@@ -865,7 +865,7 @@ class Replicator(CascadedWorker):
return None
def launch_copy(self, tbl_stat):
- """Run paraller worker for copy."""
+ """Run parallel worker for copy."""
self.log.info("Launching copy process")
script = sys.argv[0]
conf = self.cf.filename
@@ -875,7 +875,7 @@ class Replicator(CascadedWorker):
if self.options.quiet:
cmd.append('-q')
if self.options.verbose:
- cmd.append('-v')
+ cmd += ['-v'] * self.options.verbose
# let existing copy finish and clean its pidfile,
# otherwise new copy will exit immediately.
diff --git a/python/londiste/repair.py b/python/londiste/repair.py
index 46ad067b..3e2e8b16 100644
--- a/python/londiste/repair.py
+++ b/python/londiste/repair.py
@@ -1,8 +1,7 @@
"""Repair data on subscriber.
-Walks tables by primary key and searcher
-missing inserts/updates/deletes.
+Walks tables by primary key and searches for missing inserts/updates/deletes.
"""
import sys, os, skytools, subprocess
@@ -34,7 +33,7 @@ class Repairer(Syncer):
return p
def process_sync(self, t1, t2, src_db, dst_db):
- """Actual comparision."""
+ """Actual comparison."""
apply_db = None
@@ -58,6 +57,8 @@ class Repairer(Syncer):
dump_src = dst_tbl + ".src"
dump_dst = dst_tbl + ".dst"
+ dump_src_sorted = dump_src + ".sorted"
+ dump_dst_sorted = dump_dst + ".sorted"
dst_where = t2.plugin.get_copy_condition(src_curs, dst_curs)
src_where = dst_where
@@ -70,18 +71,20 @@ class Repairer(Syncer):
dst_db.commit()
self.log.info("Sorting src table: %s", dump_src)
- self.do_sort(dump_src, dump_src + '.sorted')
+ self.do_sort(dump_src, dump_src_sorted)
self.log.info("Sorting dst table: %s", dump_dst)
- self.do_sort(dump_dst, dump_dst + '.sorted')
+ self.do_sort(dump_dst, dump_dst_sorted)
- self.dump_compare(dst_tbl, dump_src + ".sorted", dump_dst + ".sorted")
+ self.dump_compare(dst_tbl, dump_src_sorted, dump_dst_sorted)
os.unlink(dump_src)
os.unlink(dump_dst)
- os.unlink(dump_src + ".sorted")
- os.unlink(dump_dst + ".sorted")
+ os.unlink(dump_src_sorted)
+ os.unlink(dump_dst_sorted)
def do_sort(self, src, dst):
+ """ Sort contents of src file, write them to dst file. """
+
p = subprocess.Popen(["sort", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
s_ver = p.communicate()[0]
del p
@@ -153,7 +156,9 @@ class Repairer(Syncer):
return row
def dump_compare(self, tbl, src_fn, dst_fn):
- """Dump + compare single table."""
+ """ Compare two table dumps, create sql file to fix target table
+ or apply changes to target table directly.
+ """
self.log.info("Comparing dumps: %s", tbl)
self.cnt_insert = 0
self.cnt_update = 0
@@ -248,7 +253,7 @@ class Repairer(Syncer):
def show_fix(self, tbl, q, desc):
"""Print/write/apply repair sql."""
- self.log.info("missed %s: %s", desc, q)
+ self.log.debug("missed %s: %s", desc, q)
if self.apply_curs:
self.apply_curs.execute(q)
else:
@@ -262,7 +267,7 @@ class Repairer(Syncer):
list.append(s)
def addcmp(self, list, f, v):
- """Add quoted comparision."""
+ """Add quoted comparison."""
if v is None:
s = "%s is null" % f
else:
diff --git a/python/londiste/setup.py b/python/londiste/setup.py
index 204850b6..20d3f5c0 100644
--- a/python/londiste/setup.py
+++ b/python/londiste/setup.py
@@ -7,6 +7,7 @@ import sys, os, re, skytools
from pgq.cascade.admin import CascadeAdmin
from londiste.exec_attrs import ExecAttrs
+from londiste.util import find_copy_source
import londiste.handler
@@ -139,36 +140,45 @@ class LondisteSetup(CascadeAdmin):
needs_tbl = self.handler_needs_table()
args = self.expand_arg_list(dst_db, 'r', False, args, needs_tbl)
+ # pick proper create flags
+ if self.options.create_full:
+ create_flags = skytools.T_ALL
+ elif self.options.create:
+ create_flags = skytools.T_TABLE | skytools.T_PKEY
+ else:
+ create_flags = 0
+
+ # search for usable copy node if requested & needed
+ if (self.options.find_copy_node and create_flags != 0
+ and needs_tbl and not self.is_root()):
+ src_name, src_loc, _ = find_copy_source(self, self.queue_name, args, None, self.provider_location)
+ self.options.copy_node = src_name
+ self.close_database('provider_db')
+ src_db = self.get_provider_db()
+ src_curs = src_db.cursor()
+ src_tbls = self.fetch_set_tables(src_curs)
+ src_db.commit()
+
# dont check for exist/not here (root handling)
if not self.is_root() and not self.options.expect_sync and not self.options.find_copy_node:
problems = False
for tbl in args:
tbl = skytools.fq_name(tbl)
if (tbl in src_tbls) and not src_tbls[tbl]['local']:
- self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl)
- problems = True
+ if self.options.skip_non_existing:
+ self.log.warning("Table %s does not exist on provider", tbl)
+ else:
+ self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl)
+ problems = True
if problems:
self.log.error("Problems, canceling operation")
sys.exit(1)
- # pick proper create flags
- if self.options.create_full:
- create_flags = skytools.T_ALL
- elif self.options.create:
- create_flags = skytools.T_TABLE | skytools.T_PKEY
- else:
- create_flags = 0
-
# sanity check
if self.options.dest_table and len(args) > 1:
self.log.error("--dest-table can be given only for single table")
sys.exit(1)
- # not implemented
- if self.options.find_copy_node and create_flags != 0:
- self.log.error("--find-copy-node does not work with --create")
- sys.exit(1)
-
# seems ok
for tbl in args:
self.add_table(src_db, dst_db, tbl, create_flags, src_tbls)
@@ -445,6 +455,42 @@ class LondisteSetup(CascadeAdmin):
"""Reload data from provider node."""
db = self.get_database('db')
args = self.expand_arg_list(db, 'r', True, args)
+
+ if not self.options.find_copy_node:
+ self.load_local_info()
+ src_db = self.get_provider_db()
+ src_curs = src_db.cursor()
+ src_tbls = self.fetch_set_tables(src_curs)
+ src_db.commit()
+
+ problems = 0
+ for tbl in args:
+ tbl = skytools.fq_name(tbl)
+ if tbl not in src_tbls or not src_tbls[tbl]['local']:
+ self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl)
+ problems += 1
+ if problems > 0:
+ self.log.error("Problems, cancelling operation")
+ sys.exit(1)
+
+ if self.options.find_copy_node or self.options.copy_node:
+ q = "select table_name, table_attrs from londiste.get_table_list(%s) where local"
+ cur = db.cursor()
+ cur.execute(q, [self.set_name])
+ for row in cur.fetchall():
+ if row['table_name'] not in args:
+ continue
+ attrs = skytools.db_urldecode (row['table_attrs'] or '')
+
+ if self.options.find_copy_node:
+ attrs['copy_node'] = '?'
+ elif self.options.copy_node:
+ attrs['copy_node'] = self.options.copy_node
+
+ attrs = skytools.db_urlencode (attrs)
+ q = "select * from londiste.local_set_table_attrs (%s, %s, %s)"
+ self.exec_cmd(db, q, [self.set_name, row['table_name'], attrs])
+
q = "select * from londiste.local_set_table_state(%s, %s, null, null)"
self.exec_cmd_many(db, q, [self.set_name], args)
@@ -530,9 +576,8 @@ class LondisteSetup(CascadeAdmin):
db.commit()
def get_provider_db(self):
-
- # use custom node for copy
if self.options.copy_node:
+ # use custom node for copy
source_node = self.options.copy_node
m = self.queue_info.get_member(source_node)
if not m:
@@ -546,6 +591,7 @@ class LondisteSetup(CascadeAdmin):
q = 'select * from pgq_node.get_node_info(%s)'
res = self.exec_cmd(db, q, [self.queue_name], quiet = True)
self.provider_location = res[0]['provider_location']
+
return self.get_database('provider_db', connstr = self.provider_location, profile = 'remote')
def expand_arg_list(self, db, kind, existing, args, needs_tbl=True):
@@ -586,6 +632,9 @@ class LondisteSetup(CascadeAdmin):
res = self.solve_globbing(args, lst_exists, map_exists, map_missing, allow_nonexist)
else:
res = self.solve_globbing(args, lst_missing, map_missing, map_exists, allow_nonexist)
+
+ if not res:
+ self.log.info("what to do ?")
return res
def solve_globbing(self, args, full_list, full_map, reverse_map, allow_nonexist):
@@ -657,18 +706,19 @@ class LondisteSetup(CascadeAdmin):
dst_curs = dst_db.cursor()
partial = {}
- done_pos = 1
startup_info = 0
while 1:
dst_curs.execute(q, [self.queue_name])
rows = dst_curs.fetchall()
dst_db.commit()
+ total_count = 0
cur_count = 0
done_list = []
for row in rows:
if not row['local']:
continue
+ total_count += 1
tbl = row['table_name']
if row['merge_state'] != 'ok':
partial[tbl] = 0
@@ -678,13 +728,14 @@ class LondisteSetup(CascadeAdmin):
partial[tbl] = 1
done_list.append(tbl)
+ done_count = total_count - cur_count
+
if not startup_info:
- self.log.info("%d table(s) to copy", len(partial))
+ self.log.info("%d/%d table(s) to copy", cur_count, total_count)
startup_info = 1
for done in done_list:
- self.log.info("%s: finished (%d/%d)", done, done_pos, len(partial))
- done_pos += 1
+ self.log.info("%s: finished (%d/%d)", done, done_count, total_count)
if cur_count == 0:
break
diff --git a/python/londiste/util.py b/python/londiste/util.py
index cba18f62..07ff9407 100644
--- a/python/londiste/util.py
+++ b/python/londiste/util.py
@@ -18,7 +18,7 @@ def find_copy_source(script, queue_name, copy_table_name, node_name, node_locati
@param script: DbScript
@param queue_name: name of the cascaded queue
- @param copy_table_name: name of the table
+ @param copy_table_name: name of the table (or list of names)
@param node_name: target node name
@param node_location: target node location
@returns (node_name, node_location, downstream_worker_name) of source node
@@ -27,6 +27,11 @@ def find_copy_source(script, queue_name, copy_table_name, node_name, node_locati
# None means no steps upwards were taken, so local consumer is worker
worker_name = None
+ if isinstance(copy_table_name, str):
+ need = set([copy_table_name])
+ else:
+ need = set(copy_table_name)
+
while 1:
src_db = script.get_database('_source_db', connstr = node_location, autocommit = 1, profile = 'remote')
src_curs = src_db.cursor()
@@ -39,12 +44,12 @@ def find_copy_source(script, queue_name, copy_table_name, node_name, node_locati
script.log.info("Checking if %s can be used for copy", info['node_name'])
- q = "select table_name, local, table_attrs from londiste.get_table_list(%s) where table_name = %s"
- src_curs.execute(q, [queue_name, copy_table_name])
- got = False
+ q = "select table_name, local, table_attrs from londiste.get_table_list(%s)"
+ src_curs.execute(q, [queue_name])
+ got = set()
for row in src_curs.fetchall():
tbl = row['table_name']
- if tbl != copy_table_name:
+ if tbl not in need:
continue
if not row['local']:
script.log.debug("Problem: %s is not local", tbl)
@@ -53,14 +58,15 @@ def find_copy_source(script, queue_name, copy_table_name, node_name, node_locati
script.log.debug("Problem: %s handler does not store data [%s]", tbl, row['table_attrs'])
continue
script.log.debug("Good: %s is usable", tbl)
- got = True
- break
+ got.add(tbl)
script.close_database('_source_db')
- if got:
+ if got == need:
script.log.info("Node %s seems good source, using it", info['node_name'])
return node_name, node_location, worker_name
+ else:
+ script.log.info("Node %s does not have all tables", info['node_name'])
if info['node_type'] == 'root':
raise skytools.UsageError("Found root and no source found")
diff --git a/python/pgq/baseconsumer.py b/python/pgq/baseconsumer.py
index 9159106e..3ea1c6c6 100644
--- a/python/pgq/baseconsumer.py
+++ b/python/pgq/baseconsumer.py
@@ -285,6 +285,9 @@ class BaseConsumer(skytools.DBScript):
def _launch_process_batch(self, db, batch_id, list):
self.process_batch(db, batch_id, list)
+ def _make_event(self, queue_name, row):
+ return Event(queue_name, row)
+
def _load_batch_events_old(self, curs, batch_id):
"""Fetch all events for this batch."""
@@ -298,7 +301,7 @@ class BaseConsumer(skytools.DBScript):
# map them to python objects
ev_list = []
for r in rows:
- ev = Event(self.queue_name, r)
+ ev = self._make_event(self.queue_name, r)
ev_list.append(ev)
return ev_list
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index 294519d8..d109f749 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -4,17 +4,47 @@
"""
from pgq.baseconsumer import BaseConsumer, BaseBatchWalker
-from pgq.event import *
+from pgq.event import Event
__all__ = ['Consumer']
+# Event status codes
+EV_UNTAGGED = -1
+EV_RETRY = 0
+EV_DONE = 1
+
+
+class RetriableEvent(Event):
+ """Event which can be retryed
+
+ Consumer is supposed to tag them after processing.
+ """
+
+ __slots__ = ('_status', )
+
+ def __init__(self, queue_name, row):
+ super(RetriableEvent, self).__init__(self, queue_name, row)
+ self._status = EV_DONE
+
+ def tag_done(self):
+ self._status = EV_DONE
+
+ def get_status(self):
+ return self._status
+
+ def tag_retry(self, retry_time = 60):
+ self._status = EV_RETRY
+ self.retry_time = retry_time
+
+
class RetriableWalkerEvent(RetriableEvent):
"""Redirects status flags to RetriableBatchWalker.
That way event data can be gc'd immediately and
tag_done() events don't need to be remembered.
"""
+ __slots__ = ('_walker', )
def __init__(self, walker, queue, row):
Event.__init__(self, queue, row)
self._walker = walker
@@ -60,6 +90,9 @@ class Consumer(BaseConsumer):
_batch_walker_class = RetriableBatchWalker
+ def _make_event(self, queue_name, row):
+ return RetriableEvent(queue_name, row)
+
def _flush_retry(self, curs, batch_id, list):
"""Tag retry events."""
diff --git a/python/pgq/event.py b/python/pgq/event.py
index b083ba63..22e648a1 100644
--- a/python/pgq/event.py
+++ b/python/pgq/event.py
@@ -2,12 +2,7 @@
"""PgQ event container.
"""
-__all__ = ['EV_UNTAGGED', 'EV_RETRY', 'EV_DONE', 'Event', 'RetriableEvent']
-
-# Event status codes
-EV_UNTAGGED = -1
-EV_RETRY = 0
-EV_DONE = 1
+__all__ = ['Event']
_fldmap = {
'ev_id': 'ev_id',
@@ -67,24 +62,3 @@ class Event(object):
return "<id=%d type=%s data=%s e1=%s e2=%s e3=%s e4=%s>" % (
self.id, self.type, self.data, self.extra1, self.extra2, self.extra3, self.extra4)
-class RetriableEvent(Event):
- """Event which can be retryed
-
- Consumer is supposed to tag them after processing.
- """
-
- __slots__ = ('_status', )
-
- def __init__(self, queue_name, row):
- super(RetriableEvent, self).__init__(self, queue_name, row)
- self._status = EV_DONE
-
- def tag_done(self):
- self._status = EV_DONE
-
- def get_status(self):
- return self._status
-
- def tag_retry(self, retry_time = 60):
- self._status = EV_RETRY
- self.retry_time = retry_time
diff --git a/python/skytools/gzlog.py b/python/skytools/gzlog.py
index 558e2813..0db40fc3 100644
--- a/python/skytools/gzlog.py
+++ b/python/skytools/gzlog.py
@@ -1,8 +1,8 @@
"""Atomic append of gzipped data.
-The point is - if several gzip streams are concated, they
-are read back as one whose stream.
+The point is - if several gzip streams are concatenated,
+they are read back as one whole stream.
"""
import gzip
@@ -22,7 +22,7 @@ def gzip_append(filename, data, level = 6):
g.write(data)
g.close()
zdata = buf.getvalue()
-
+
# append, safely
f = open(filename, "a+", 0)
f.seek(0, 2)
@@ -36,4 +36,3 @@ def gzip_append(filename, data, level = 6):
f.truncate()
f.close()
raise ex
-
diff --git a/python/skytools/querybuilder.py b/python/skytools/querybuilder.py
index c2eead2d..9930daab 100755
--- a/python/skytools/querybuilder.py
+++ b/python/skytools/querybuilder.py
@@ -319,8 +319,11 @@ class PLPyQuery:
arg_list = [arg_dict.get(k) for k in self.arg_map]
return plpy.execute(self.plan, arg_list)
except KeyError:
- plpy.error("Missing argument: QUERY: %s ARGS: %s VALUES: %s" % (
- repr(self.sql), repr(self.arg_map), repr(arg_dict)))
+ need = set(self.arg_map)
+ got = set(arg_dict.keys())
+ missing = list(need.difference(got))
+ plpy.error("Missing arguments: [%s] QUERY: %s" % (
+ ','.join(missing), repr(self.sql)))
def __repr__(self):
return 'PLPyQuery<%s>' % self.sql
@@ -341,7 +344,7 @@ def plpy_exec(gd, sql, args, all_keys_required = True):
>>> res = plpy_exec(GD, "select {arg1}, {arg2:int4}, {arg1}", {'arg1': '3', 'arg2': '4'})
DBG: plpy.execute(('PLAN', 'select $1, $2, $3', ['text', 'int4', 'text']), ['3', '4', '3'])
>>> res = plpy_exec(GD, "select {arg1}, {arg2:int4}, {arg1}", {'arg1': '3'})
- DBG: plpy.error("Missing argument: QUERY: 'select {arg1}, {arg2:int4}, {arg1}' ARGS: ['arg1', 'arg2', 'arg1'] VALUES: {'arg1': '3'}")
+ DBG: plpy.error("Missing arguments: [arg2] QUERY: 'select {arg1}, {arg2:int4}, {arg1}'")
>>> res = plpy_exec(GD, "select {arg1}, {arg2:int4}, {arg1}", {'arg1': '3'}, False)
DBG: plpy.execute(('PLAN', 'select $1, $2, $3', ['text', 'int4', 'text']), ['3', None, '3'])
"""
diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py
index a5e82663..840f3cf4 100644
--- a/python/skytools/scripting.py
+++ b/python/skytools/scripting.py
@@ -588,7 +588,10 @@ class BaseScript(object):
self.reset()
sys.exit(1)
except Exception, d:
- self.send_stats()
+ try: # this may fail too
+ self.send_stats()
+ except:
+ pass
emsg = str(d).rstrip()
self.reset()
self.exception_hook(d, emsg)
diff --git a/python/skytools/skylog.py b/python/skytools/skylog.py
index 0279d7d0..da8ccb91 100644
--- a/python/skytools/skylog.py
+++ b/python/skytools/skylog.py
@@ -1,11 +1,22 @@
"""Our log handlers for Python's logging package.
"""
-import os, time, socket
-import logging, logging.handlers
+import logging
+import logging.handlers
+import os
+import socket
+import time
import skytools
+# use fast implementation if available, otherwise fall back to reference one
+try:
+ import tnetstring as tnetstrings
+ tnetstrings.parse = tnetstrings.pop
+except ImportError:
+ import skytools.tnetstrings as tnetstrings
+ tnetstrings.dumps = tnetstrings.dump
+
__all__ = ['getLogger']
# add TRACE level
@@ -17,10 +28,15 @@ logging.addLevelName(TRACE, 'TRACE')
_service_name = 'unknown_svc'
_job_name = 'unknown_job'
_hostname = socket.gethostname()
+try:
+ _hostaddr = socket.gethostbyname(_hostname)
+except:
+ _hostaddr = "0.0.0.0"
_log_extra = {
'job_name': _job_name,
'service_name': _service_name,
'hostname': _hostname,
+ 'hostaddr': _hostaddr,
}
def set_service_name(service_name, job_name):
"""Set info about current script."""
@@ -64,6 +80,7 @@ class EasyRotatingFileHandler(logging.handlers.RotatingFileHandler):
fn = os.path.expanduser(filename)
logging.handlers.RotatingFileHandler.__init__(self, fn, maxBytes=maxBytes, backupCount=backupCount)
+
# send JSON message over UDP
class UdpLogServerHandler(logging.handlers.DatagramHandler):
"""Sends log records over UDP to logserver in JSON format."""
@@ -98,10 +115,7 @@ class UdpLogServerHandler(logging.handlers.DatagramHandler):
msg = msg[:self.MAXMSG]
txt_level = self._level_map.get(record.levelno, "ERROR")
hostname = _hostname
- try:
- hostaddr = socket.gethostbyname(hostname)
- except:
- hostaddr = "0.0.0.0"
+ hostaddr = _hostaddr
jobname = _job_name
svcname = _service_name
pkt = self._log_template % (time.time()*1000, txt_level, skytools.quote_json(msg),
@@ -114,6 +128,40 @@ class UdpLogServerHandler(logging.handlers.DatagramHandler):
sock.sendto(s, (self.host, self.port))
sock.close()
+
+# send TNetStrings message over UDP
+class UdpTNetStringsHandler(logging.handlers.DatagramHandler):
+ """ Sends log records in TNetStrings format over UDP. """
+
+ # LogRecord fields to send
+ send_fields = [
+ 'created', 'exc_text', 'levelname', 'levelno', 'message', 'msecs', 'name',
+ 'hostaddr', 'hostname', 'job_name', 'service_name']
+
+ _udp_reset = 0
+
+ def makePickle(self, record):
+ """ Create message in TNetStrings format.
+ """
+ msg = {}
+ self.format(record) # render 'message' attribute and others
+ for k in self.send_fields:
+ msg[k] = record.__dict__[k]
+ tnetstr = tnetstrings.dumps(msg)
+ return tnetstr
+
+ def send(self, s):
+ """ Cache socket for a moment, then recreate it.
+ """
+ now = time.time()
+ if now - 1 > self._udp_reset:
+ if self.sock:
+ self.sock.close()
+ self.sock = self.makeSocket()
+ self._udp_reset = now
+ self.sock.sendto(s, (self.host, self.port))
+
+
class LogDBHandler(logging.handlers.SocketHandler):
"""Sends log records into PostgreSQL server.
@@ -234,6 +282,7 @@ class LogDBHandler(logging.handlers.SocketHandler):
query = "select * from log.add(%s, %s, %s)"
logcur.execute(query, [type, service, msg])
+
# fix unicode bug in SysLogHandler
class SysLogHandler(logging.handlers.SysLogHandler):
"""Fixes unicode bug in logging.handlers.SysLogHandler."""
@@ -301,6 +350,7 @@ class SysLogHostnameHandler(SysLogHandler):
msg)
return msg
+
try:
from logging import LoggerAdapter
except ImportError:
diff --git a/python/skytools/sockutil.py b/python/skytools/sockutil.py
index f950f5a0..dbcd021b 100644
--- a/python/skytools/sockutil.py
+++ b/python/skytools/sockutil.py
@@ -37,10 +37,13 @@ def set_tcp_keepalive(fd, keepalive = True,
if not hasattr(socket, 'SO_KEEPALIVE') or not hasattr(socket, 'fromfd'):
return
- # get numeric fd and cast to socket
- if hasattr(fd, 'fileno'):
- fd = fd.fileno()
- s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+ # need socket object
+ if isinstance(fd, socket.SocketType):
+ s = fd
+ else:
+ if hasattr(fd, 'fileno'):
+ fd = fd.fileno()
+ s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
# skip if unix socket
if type(s.getsockname()) != type(()):
diff --git a/python/skytools/timeutil.py b/python/skytools/timeutil.py
index a29e050c..2ea63082 100644
--- a/python/skytools/timeutil.py
+++ b/python/skytools/timeutil.py
@@ -134,18 +134,24 @@ def datetime_to_timestamp(dt, local_time=True):
Returns seconds since epoch as float.
- >>> datetime_to_timestamp(parse_iso_timestamp("2005-06-01 15:00:59.33 +02"))
- 1117630859.33
- >>> datetime_to_timestamp(datetime.fromtimestamp(1117630859.33, UTC))
- 1117630859.33
- >>> datetime_to_timestamp(datetime.fromtimestamp(1117630859.33))
- 1117630859.33
+ >>> datetime_to_timestamp(parse_iso_timestamp("2005-06-01 15:00:59.5 +02"))
+ 1117630859.5
+ >>> datetime_to_timestamp(datetime.fromtimestamp(1117630859.5, UTC))
+ 1117630859.5
+ >>> datetime_to_timestamp(datetime.fromtimestamp(1117630859.5))
+ 1117630859.5
>>> now = datetime.utcnow()
>>> now2 = datetime.utcfromtimestamp(datetime_to_timestamp(now, False))
+ >>> abs(now2.microsecond - now.microsecond) < 100
+ True
+ >>> now2 = now2.replace(microsecond = now.microsecond)
>>> now == now2
True
>>> now = datetime.now()
>>> now2 = datetime.fromtimestamp(datetime_to_timestamp(now))
+ >>> abs(now2.microsecond - now.microsecond) < 100
+ True
+ >>> now2 = now2.replace(microsecond = now.microsecond)
>>> now == now2
True
"""
diff --git a/python/skytools/tnetstrings.py b/python/skytools/tnetstrings.py
new file mode 100644
index 00000000..afacc09e
--- /dev/null
+++ b/python/skytools/tnetstrings.py
@@ -0,0 +1,115 @@
+# Note this implementation is more strict than necessary to demonstrate
+# minimum restrictions on types allowed in dictionaries.
+
+def dump(data):
+ if type(data) is long or type(data) is int:
+ out = str(data)
+ return '%d:%s#' % (len(out), out)
+ elif type(data) is float:
+ out = '%f' % data
+ return '%d:%s^' % (len(out), out)
+ elif type(data) is str:
+ return '%d:' % len(data) + data + ','
+ elif type(data) is dict:
+ return dump_dict(data)
+ elif type(data) is list:
+ return dump_list(data)
+ elif data == None:
+ return '0:~'
+ elif type(data) is bool:
+ out = repr(data).lower()
+ return '%d:%s!' % (len(out), out)
+ else:
+ assert False, "Can't serialize stuff that's %s." % type(data)
+
+
+def parse(data):
+ payload, payload_type, remain = parse_payload(data)
+
+ if payload_type == '#':
+ value = int(payload)
+ elif payload_type == '}':
+ value = parse_dict(payload)
+ elif payload_type == ']':
+ value = parse_list(payload)
+ elif payload_type == '!':
+ value = payload == 'true'
+ elif payload_type == '^':
+ value = float(payload)
+ elif payload_type == '~':
+ assert len(payload) == 0, "Payload must be 0 length for null."
+ value = None
+ elif payload_type == ',':
+ value = payload
+ else:
+ assert False, "Invalid payload type: %r" % payload_type
+
+ return value, remain
+
+def parse_payload(data):
+ assert data, "Invalid data to parse, it's empty."
+ length, extra = data.split(':', 1)
+ length = int(length)
+
+ payload, extra = extra[:length], extra[length:]
+ assert extra, "No payload type: %r, %r" % (payload, extra)
+ payload_type, remain = extra[0], extra[1:]
+
+ assert len(payload) == length, "Data is wrong length %d vs %d" % (length, len(payload))
+ return payload, payload_type, remain
+
+def parse_list(data):
+ if len(data) == 0: return []
+
+ result = []
+ value, extra = parse(data)
+ result.append(value)
+
+ while extra:
+ value, extra = parse(extra)
+ result.append(value)
+
+ return result
+
+def parse_pair(data):
+ key, extra = parse(data)
+ assert extra, "Unbalanced dictionary store."
+ value, extra = parse(extra)
+
+ return key, value, extra
+
+def parse_dict(data):
+ if len(data) == 0: return {}
+
+ key, value, extra = parse_pair(data)
+ assert type(key) is str, "Keys can only be strings."
+
+ result = {key: value}
+
+ while extra:
+ key, value, extra = parse_pair(extra)
+ result[key] = value
+
+ return result
+
+
+
+def dump_dict(data):
+ result = []
+ for k,v in data.items():
+ result.append(dump(str(k)))
+ result.append(dump(v))
+
+ payload = ''.join(result)
+ return '%d:' % len(payload) + payload + '}'
+
+
+def dump_list(data):
+ result = []
+ for i in data:
+ result.append(dump(i))
+
+ payload = ''.join(result)
+ return '%d:' % len(payload) + payload + ']'
+
+
diff --git a/python/walmgr.py b/python/walmgr.py
index 5f7292ee..2918b067 100755
--- a/python/walmgr.py
+++ b/python/walmgr.py
@@ -16,6 +16,7 @@ Slave commands:
boot Stop playback, accept queries
pause Just wait, don't play WAL-s
continue Start playing WAL-s again
+ createslave Create streaming replication slave
Common commands:
init Create configuration files, set up ssh keys.
@@ -231,26 +232,6 @@ class Pgpass:
f.writelines(self.contents)
f.close()
- def pgpass_fields_from_conninfo(self,conninfo):
- """Extract host,user and port from primary-conninfo"""
- m = re.match("^.*\s*host=\s*([^\s]+)\s*.*$", conninfo)
- if m:
- host = m.group(1)
- else:
- host = 'localhost'
- m = re.match("^.*\s*user=\s*([^\s]+)\s*.*$", conninfo)
- if m:
- user = m.group(1)
- else:
- user = os.environ['USER']
- m = re.match("^.*\s*port=\s*([^\s]+)\s*.*$", conninfo)
- if m:
- port = m.group(1)
- else:
- port = '5432'
-
- return host,port,user
-
class PostgresConfiguration:
"""Postgres configuration manipulation"""
@@ -385,6 +366,8 @@ class WalMgr(skytools.DBScript):
help = "slave: connect string for streaming replication master")
p.add_option("", "--init-slave", action="store_true", dest="init_slave",
help = "Initialize slave walmgr.", default=False)
+ p.add_option("", "--synch-standby", action="store", dest="synchronous_standby_names", default=None,
+ help = "master: do the same thing as command synch-standby, but do not use INI file")
return p
def load_config(self):
@@ -477,6 +460,8 @@ class WalMgr(skytools.DBScript):
self.cmd = 'init_master'
elif self.options.init_slave:
self.cmd = 'init_slave'
+ elif self.options.synchronous_standby_names is not None:
+ self.cmd = "synch-standby"
else:
usage(1)
@@ -502,6 +487,7 @@ class WalMgr(skytools.DBScript):
'pause': self.slave_pause,
'continue': self.slave_continue,
'boot': self.slave_boot,
+ 'createslave': self.slave_createslave,
'cleanup': self.walmgr_cleanup,
'synch-standby': self.master_synch_standby,
'xlock': self.slave_lock_backups_exit,
@@ -663,6 +649,33 @@ class WalMgr(skytools.DBScript):
self.pg_stop_backup()
sys.exit(1)
+ def parse_conninfo(self,conninfo):
+ """Extract host,user and port from primary-conninfo"""
+ m = re.match("^.*\s*host\s*=\s*([^\s]+)\s*.*$", conninfo)
+ if m:
+ host = m.group(1)
+ else:
+ host = 'localhost'
+ m = re.match("^.*\s*user\s*=\s*([^\s]+)\s*.*$", conninfo)
+ if m:
+ user = m.group(1)
+ else:
+ user = os.environ['USER']
+ m = re.match("^.*\s*port\s*=\s*([^\s]+)\s*.*$", conninfo)
+ if m:
+ port = m.group(1)
+ else:
+ port = '5432'
+
+ m = re.match("^.*\s*sslmode\s*=\s*([^\s]+)\s*.*$", conninfo)
+ if m:
+ sslmode = m.group(1)
+ else:
+ sslmode = None
+
+ return host,port,user,sslmode
+
+
def get_last_complete(self):
"""Get the name of last xarchived segment."""
@@ -762,7 +775,7 @@ class WalMgr(skytools.DBScript):
primary_conninfo = self.cf.get("primary_conninfo", "")
if self.options.remove_password and primary_conninfo and not self.not_really:
pg = Pgpass('~/.pgpass')
- host, port, user = pg.pgpass_fields_from_conninfo(primary_conninfo)
+ host, port, user, _ = self.parse_conninfo(primary_conninfo)
if pg.remove_user(host, port, user):
self.log.info("Removing line from .pgpass")
pg.write()
@@ -775,13 +788,25 @@ class WalMgr(skytools.DBScript):
def master_synch_standby(self):
"""Manage synchronous_standby_names parameter"""
- if len(self.args) < 1:
- die(1, "usage: synch-standby SYNCHRONOUS_STANDBY_NAMES")
+ if self.options.synchronous_standby_names is None:
+ if len(self.args) < 1:
+ die(1, "usage: synch-standby SYNCHRONOUS_STANDBY_NAMES")
- names = self.args[0]
- cf = PostgresConfiguration(self, self.cf.getfile("master_config"))
+ names = self.args[0]
+ self.assert_is_master(True)
+ else:
+ # as synchronous_standby_names is available since 9.1
+ # we can override DEFAULT_PG_VERSION
+ global DEFAULT_PG_VERSION
+ DEFAULT_PG_VERSION = "9.1"
- self.assert_is_master(True)
+ self.guess_locations()
+ self.override_cf_option('master_config', self.postgres_conf)
+ self.override_cf_option('master_data', self.pgdata)
+ self.override_cf_option('master_db', 'dbname=template1')
+ names = self.options.synchronous_standby_names
+
+ cf = PostgresConfiguration(self, self.cf.getfile("master_config"))
# list of slaves
db = self.get_database("master_db")
@@ -791,12 +816,14 @@ class WalMgr(skytools.DBScript):
self.close_database("master_db")
if names.strip() == "":
- cf.set_synchronous_standby_names("")
+ if not self.not_really:
+ cf.set_synchronous_standby_names("")
return
if names.strip() == "*":
if slave_names:
- cf.set_synchronous_standby_names(names)
+ if not self.not_really:
+ cf.set_synchronous_standby_names(names)
return
else:
die(1,"At least one slave must be available when enabling synchronous mode")
@@ -812,7 +839,7 @@ class WalMgr(skytools.DBScript):
if not slave_found:
die(1,"At least one slave must be available from new list when enabling synchronous mode")
- else:
+ elif not self.not_really:
cf.set_synchronous_standby_names(names)
def master_configure_archiving(self, enable_archiving, can_restart):
@@ -1202,7 +1229,7 @@ primary_conninfo = %(primary_conninfo)s
pwd = open(self.options.add_password).readline().rstrip('\n\r')
pg = Pgpass('~/.pgpass')
- host, port, user = pg.pgpass_fields_from_conninfo(self.options.primary_conninfo)
+ host, port, user, _ = self.parse_conninfo(self.options.primary_conninfo)
pg.ensure_user(host, port, user, pwd)
pg.write()
@@ -1783,6 +1810,7 @@ STOP TIME: %(stop_time)s
pausefile = os.path.join(srcdir, "PAUSE")
stopfile = os.path.join(srcdir, "STOP")
prgrfile = os.path.join(srcdir, "PROGRESS")
+ prxlogfile = os.path.join(srcdir,"PG_RECEIVEXLOG")
srcfile = os.path.join(srcdir, srcname)
partfile = os.path.join(partdir, srcname)
@@ -1791,6 +1819,11 @@ STOP TIME: %(stop_time)s
primary_conninfo = self.cf.get("primary_conninfo", "")
if primary_conninfo and not os.path.isfile(srcfile):
self.log.info("%s: not found (ignored)", srcname)
+
+ # remove PG_RECEIVEXLOG file if it's present
+ if os.path.isfile(prxlogfile):
+ os.remove(prxlogfile)
+
sys.exit(1)
# assume that postgres has processed the WAL file and is
@@ -1858,7 +1891,7 @@ STOP TIME: %(stop_time)s
self.stat_add('count', 1)
self.send_stats()
- def restore_database(self):
+ def restore_database(self, restore_config=True):
"""Restore the database from backup
If setname is specified, the contents of that backup set directory are
@@ -2063,7 +2096,8 @@ STOP TIME: %(stop_time)s
# attempt to restore configuration. Note that we cannot
# postpone this to boot time, as the configuration is needed
# to start postmaster.
- self.slave_restore_config()
+ if restore_config:
+ self.slave_restore_config()
# run database in recovery mode
self.log.info("Starting postmaster: %s", start_cmd)
@@ -2119,6 +2153,159 @@ STOP TIME: %(stop_time)s
open(stopfile, "w").write("1")
self.log.info("Stopping recovery mode")
+ def slave_createslave(self):
+ self.assert_is_master(False)
+
+ errors = False
+ xlog_dir = self.cf.getfile("completed_wals")
+ full_dir = self.cf.getfile("full_backup")
+ prxloglock = os.path.join(xlog_dir,"PG_RECEIVEXLOG")
+ pg_receivexlog = os.path.join(self.cf.getfile("slave_bin"), "pg_receivexlog")
+ pg_basebackup = os.path.join(self.cf.getfile("slave_bin"), "pg_basebackup")
+
+ # check if pg_receivexlog is available
+ if not os.access(pg_receivexlog, os.X_OK):
+ die(1, "pg_receivexlog not available")
+
+ # check if pg_receivexlog is already running
+ if os.path.isfile(prxloglock):
+ pidstring = open(prxloglock,"r").read()
+ try:
+ pid =int(pidstring)
+ try:
+ os.kill(pid, 0)
+ except OSError, e:
+ if e.errno == errno.EPERM:
+ self.log.fatal("Found pg_receivexlog lock file %s, pid %d in use", prxloglock, pid)
+ sys.exit(1)
+ elif e.errno == errno.ESRCH:
+ self.log.info("Ignoring stale pg_receivexlog lock file")
+ if not self.not_really:
+ os.remove(prxloglock)
+ else:
+ self.log.fatal("pg_receivexlog is already running in %s, pid %d", xlog_dir, pid)
+ sys.exit(1)
+ except ValueError:
+ self.log.fatal("pg_receivexlog lock file %s does not contain a pid: %s", prxloglock, pidstring)
+ sys.exit(1)
+
+ # create directories
+ self.walmgr_setup()
+
+ # ensure that backup destination is 0700
+ if not self.not_really:
+ os.chmod(full_dir,0700)
+
+ self.args = [str(os.getpid())]
+ if self.slave_lock_backups() != 0:
+ self.log.fatal("Cannot obtain backup lock.")
+ sys.exit(1)
+
+ # get host and user from primary_conninfo
+ primary_conninfo = self.cf.get("primary_conninfo", "")
+ if not primary_conninfo:
+ die(1, "primary_conninfo missing")
+ host, port, user, sslmode = self.parse_conninfo(primary_conninfo)
+
+ # change sslmode for pg_receivexlog and pg_basebackup
+ envssl=None
+ if sslmode:
+ envssl={"PGSSLMODE": sslmode}
+
+ try:
+ # determine postgres version, we cannot use pg_control version number since
+ # 9.0 and 9.1 are using the same number in controlfile
+ pg_ver = ""
+ try:
+ cmdline = [os.path.join(self.cf.getfile("slave_bin"), "postgres"),'-V']
+ process = subprocess.Popen(cmdline, stdout=subprocess.PIPE)
+ output = process.communicate()
+ pg_ver = output[0].split()[2]
+ self.log.debug("PostgreSQL version: %s" % pg_ver)
+ except:
+ pass
+
+ # create pg_receivexlog process
+ cmdline = [pg_receivexlog,'-D', xlog_dir, '-h', host, '-U', user, '-p', port, '-w']
+ self.log.info("Starting pg_receivexlog")
+
+ if not self.not_really:
+ p_rxlog = subprocess.Popen(cmdline,env=envssl)
+
+ # create pg_receivexlog lock file
+ open(prxloglock, "w").write(str(p_rxlog.pid))
+
+ # leave error checking for pg_basebackup
+ # if pg_basebackup command fails then pg_receivexlog is not working either
+
+ # start backup
+ self.log.info("Starting pg_basebackup")
+ cmdline = [pg_basebackup, '-D', full_dir, '-h', host, '-U', user, '-p', port, '-w']
+ if not self.not_really:
+ p_basebackup = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=envssl)
+ output = p_basebackup.communicate()
+ res = p_basebackup.returncode
+
+ if res != 0:
+ raise Exception("exec failed, res=%d (%r), %s" % (res, cmdline, output[1]))
+
+ # fix skipped ssl symlinks (only relevant for 9.1)
+ if pg_ver.startswith('9.1.'):
+ for line in output[1].splitlines():
+ m = re.match('WARNING: skipping special file "\./server\.(crt|key)"', line)
+ if m:
+ # create symlinks
+ if m.group(1) == 'crt':
+ os.symlink('/etc/ssl/certs/ssl-cert-snakeoil.pem',
+ os.path.join(full_dir,'server.crt'))
+ elif m.group(1) == 'key':
+ os.symlink('/etc/ssl/private/ssl-cert-snakeoil.key',
+ os.path.join(full_dir,'server.key'))
+
+ self.log.info("pg_basebackup finished successfully")
+
+ # restore
+ self.args = []
+ self.restore_database(False)
+
+ # wait for recovery
+ while os.path.isfile(prxloglock) and not self.not_really:
+ time.sleep(5)
+
+ except Exception, e:
+ self.log.error(e)
+ errors = True
+
+ finally:
+ # stop pg_receivexlog
+ try:
+ if not self.not_really:
+ os.kill(p_rxlog.pid, signal.SIGTERM)
+ self.log.info("pg_receivelog stopped")
+ except Exception, det:
+ self.log.warning("Failed to stop pg_receivexlog: %s", det)
+
+ # cleanup
+ if os.path.isfile(prxloglock):
+ os.remove(prxloglock)
+
+ if not self.not_really:
+ for f in os.listdir(xlog_dir):
+ if f.endswith('.partial'):
+ self.log.debug("Removing %s", os.path.join(xlog_dir,f))
+ os.remove(os.path.join(xlog_dir,f))
+
+ if not self.not_really and os.path.isdir(full_dir):
+ shutil.rmtree(full_dir)
+
+ self.slave_resume_backups()
+
+ if not errors:
+ self.log.info("Streaming replication standby created successfully")
+ else:
+ self.log.error("Failed to create streaming replication standby")
+ sys.exit(1)
+
def slave_pause(self, waitcomplete=0):
"""Pause the WAL apply, wait until last file applied if needed"""
diff --git a/scripts/data_maintainer.py b/scripts/data_maintainer.py
index 0c2c48b0..5bd8cd87 100644
--- a/scripts/data_maintainer.py
+++ b/scripts/data_maintainer.py
@@ -7,7 +7,7 @@ either one by one or in batches.
Config template::
- [data_maintainer]
+ [data_maintainer3]
job_name = dm_remove_expired_services
dbread = dbname=sourcedb_test
@@ -81,7 +81,7 @@ class DataMaintainer (skytools.DBScript):
loop_delay = -1
def __init__(self, args):
- super(DataMaintainer, self).__init__("data_maintainer", args)
+ super(DataMaintainer, self).__init__("data_maintainer3", args)
# query for fetching the PK-s of the data set to be maintained
self.sql_pk = self.cf.get("sql_get_pk_list")
diff --git a/scripts/simple_local_consumer.py b/scripts/simple_local_consumer.py
index 1c8f97dd..87c65868 100755
--- a/scripts/simple_local_consumer.py
+++ b/scripts/simple_local_consumer.py
@@ -13,6 +13,10 @@ Config::
# query to call
dst_query = select * from somefunc(%%(pgq.ev_data)s);
+
+ ## Use table_filter where possible instead of this ##
+ # filter for events (SQL fragment)
+ consumer_filter = ev_extra1 = 'public.mytable1'
"""
@@ -30,6 +34,8 @@ class SimpleLocalConsumer(pgq.LocalConsumer):
def reload(self):
super(SimpleLocalConsumer, self).reload()
self.dst_query = self.cf.get("dst_query")
+ if self.cf.get("consumer_filter", ""):
+ self.consumer_filter = self.cf.get("consumer_filter", "")
def process_local_event(self, db, batch_id, ev):
curs = self.get_database('dst_db', autocommit = 1).cursor()
diff --git a/sql/pgq/triggers/stringutil.c b/sql/pgq/triggers/stringutil.c
index ebbdfe6b..ae67a77d 100644
--- a/sql/pgq/triggers/stringutil.c
+++ b/sql/pgq/triggers/stringutil.c
@@ -98,7 +98,7 @@ static int pgq_urlencode(char *dst, const uint8 *src, int srclen)
} else if ((c >= '0' && c <= '9')
|| (c >= 'A' && c <= 'Z')
|| (c >= 'a' && c <= 'z')
- || c == '_' || c == '.') {
+ || c == '_' || c == '.' || c == '-') {
*p++ = c;
} else {
*p++ = '%';
diff --git a/sql/pgq_node/functions/pgq_node.get_node_info.sql b/sql/pgq_node/functions/pgq_node.get_node_info.sql
index a11dbfc3..04c4d889 100644
--- a/sql/pgq_node/functions/pgq_node.get_node_info.sql
+++ b/sql/pgq_node/functions/pgq_node.get_node_info.sql
@@ -38,11 +38,12 @@ create or replace function pgq_node.get_node_info(
-- provider_node - provider node name
-- provider_location - provider connect string
-- combined_queue - queue name for target set
--- combined_type - node type of target setA
+-- combined_type - node type of target set
-- worker_name - consumer name that maintains this node
-- worker_paused - is worker paused
-- worker_uptodate - is worker seen the changes
-- worker_last_tick - last committed tick_id by worker
+-- node_attrs - urlencoded dict of random attrs for worker (eg. sync_watermark)
-- ----------------------------------------------------------------------
declare
sql text;