diff options
-rw-r--r-- | doc/walmgr3.txt | 13 | ||||
-rw-r--r-- | python/pgq/consumer.py | 34 | ||||
-rw-r--r-- | python/pgq/event.py | 28 | ||||
-rw-r--r-- | python/skytools/sockutil.py | 11 | ||||
-rwxr-xr-x | python/walmgr.py | 251 | ||||
-rwxr-xr-x | scripts/simple_local_consumer.py | 6 | ||||
-rw-r--r-- | sql/pgq/triggers/stringutil.c | 2 |
7 files changed, 279 insertions, 66 deletions
diff --git a/doc/walmgr3.txt b/doc/walmgr3.txt index 257ff2db..5ea572c1 100644 --- a/doc/walmgr3.txt +++ b/doc/walmgr3.txt @@ -88,6 +88,12 @@ listed below. Remove .pgpass entry, which was used for streaming replication (used in Slave) + --synch-standby='synchronous_standby_names':: + Do the same thing as command synch-standby, but walmgr ini file is not used. + This option can be used when walmgr ini is not available. It tries to guess + the postgres config location, --pgdata option may also be needed. + (used in Master) + == DAEMON OPTIONS == -r, --reload:: @@ -151,6 +157,13 @@ Pauses WAL playback. Continues previously paused WAL playback. +=== createslave === + +Creates backup from Master database using streaming replication. +Also creates recovery.conf and starts slave standby. +Backup is created with pg_basebackup and pg_receivexlog (available in 9.2 and +up). + == COMMON COMMANDS == === listbackups === diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index 10cb0909..d109f749 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -4,17 +4,47 @@ """ from pgq.baseconsumer import BaseConsumer, BaseBatchWalker -from pgq.event import * +from pgq.event import Event __all__ = ['Consumer'] +# Event status codes +EV_UNTAGGED = -1 +EV_RETRY = 0 +EV_DONE = 1 + + +class RetriableEvent(Event): + """Event which can be retryed + + Consumer is supposed to tag them after processing. + """ + + __slots__ = ('_status', ) + + def __init__(self, queue_name, row): + super(RetriableEvent, self).__init__(self, queue_name, row) + self._status = EV_DONE + + def tag_done(self): + self._status = EV_DONE + + def get_status(self): + return self._status + + def tag_retry(self, retry_time = 60): + self._status = EV_RETRY + self.retry_time = retry_time + + class RetriableWalkerEvent(RetriableEvent): """Redirects status flags to RetriableBatchWalker. That way event data can be gc'd immediately and tag_done() events don't need to be remembered. """ + __slots__ = ('_walker', ) def __init__(self, walker, queue, row): Event.__init__(self, queue, row) self._walker = walker @@ -61,7 +91,7 @@ class Consumer(BaseConsumer): _batch_walker_class = RetriableBatchWalker def _make_event(self, queue_name, row): - return RetriableWalkerEvent(self, queue_name, row) + return RetriableEvent(queue_name, row) def _flush_retry(self, curs, batch_id, list): """Tag retry events.""" diff --git a/python/pgq/event.py b/python/pgq/event.py index b083ba63..22e648a1 100644 --- a/python/pgq/event.py +++ b/python/pgq/event.py @@ -2,12 +2,7 @@ """PgQ event container. """ -__all__ = ['EV_UNTAGGED', 'EV_RETRY', 'EV_DONE', 'Event', 'RetriableEvent'] - -# Event status codes -EV_UNTAGGED = -1 -EV_RETRY = 0 -EV_DONE = 1 +__all__ = ['Event'] _fldmap = { 'ev_id': 'ev_id', @@ -67,24 +62,3 @@ class Event(object): return "<id=%d type=%s data=%s e1=%s e2=%s e3=%s e4=%s>" % ( self.id, self.type, self.data, self.extra1, self.extra2, self.extra3, self.extra4) -class RetriableEvent(Event): - """Event which can be retryed - - Consumer is supposed to tag them after processing. - """ - - __slots__ = ('_status', ) - - def __init__(self, queue_name, row): - super(RetriableEvent, self).__init__(self, queue_name, row) - self._status = EV_DONE - - def tag_done(self): - self._status = EV_DONE - - def get_status(self): - return self._status - - def tag_retry(self, retry_time = 60): - self._status = EV_RETRY - self.retry_time = retry_time diff --git a/python/skytools/sockutil.py b/python/skytools/sockutil.py index f950f5a0..dbcd021b 100644 --- a/python/skytools/sockutil.py +++ b/python/skytools/sockutil.py @@ -37,10 +37,13 @@ def set_tcp_keepalive(fd, keepalive = True, if not hasattr(socket, 'SO_KEEPALIVE') or not hasattr(socket, 'fromfd'): return - # get numeric fd and cast to socket - if hasattr(fd, 'fileno'): - fd = fd.fileno() - s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) + # need socket object + if isinstance(fd, socket.SocketType): + s = fd + else: + if hasattr(fd, 'fileno'): + fd = fd.fileno() + s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) # skip if unix socket if type(s.getsockname()) != type(()): diff --git a/python/walmgr.py b/python/walmgr.py index 5f7292ee..2918b067 100755 --- a/python/walmgr.py +++ b/python/walmgr.py @@ -16,6 +16,7 @@ Slave commands: boot Stop playback, accept queries pause Just wait, don't play WAL-s continue Start playing WAL-s again + createslave Create streaming replication slave Common commands: init Create configuration files, set up ssh keys. @@ -231,26 +232,6 @@ class Pgpass: f.writelines(self.contents) f.close() - def pgpass_fields_from_conninfo(self,conninfo): - """Extract host,user and port from primary-conninfo""" - m = re.match("^.*\s*host=\s*([^\s]+)\s*.*$", conninfo) - if m: - host = m.group(1) - else: - host = 'localhost' - m = re.match("^.*\s*user=\s*([^\s]+)\s*.*$", conninfo) - if m: - user = m.group(1) - else: - user = os.environ['USER'] - m = re.match("^.*\s*port=\s*([^\s]+)\s*.*$", conninfo) - if m: - port = m.group(1) - else: - port = '5432' - - return host,port,user - class PostgresConfiguration: """Postgres configuration manipulation""" @@ -385,6 +366,8 @@ class WalMgr(skytools.DBScript): help = "slave: connect string for streaming replication master") p.add_option("", "--init-slave", action="store_true", dest="init_slave", help = "Initialize slave walmgr.", default=False) + p.add_option("", "--synch-standby", action="store", dest="synchronous_standby_names", default=None, + help = "master: do the same thing as command synch-standby, but do not use INI file") return p def load_config(self): @@ -477,6 +460,8 @@ class WalMgr(skytools.DBScript): self.cmd = 'init_master' elif self.options.init_slave: self.cmd = 'init_slave' + elif self.options.synchronous_standby_names is not None: + self.cmd = "synch-standby" else: usage(1) @@ -502,6 +487,7 @@ class WalMgr(skytools.DBScript): 'pause': self.slave_pause, 'continue': self.slave_continue, 'boot': self.slave_boot, + 'createslave': self.slave_createslave, 'cleanup': self.walmgr_cleanup, 'synch-standby': self.master_synch_standby, 'xlock': self.slave_lock_backups_exit, @@ -663,6 +649,33 @@ class WalMgr(skytools.DBScript): self.pg_stop_backup() sys.exit(1) + def parse_conninfo(self,conninfo): + """Extract host,user and port from primary-conninfo""" + m = re.match("^.*\s*host\s*=\s*([^\s]+)\s*.*$", conninfo) + if m: + host = m.group(1) + else: + host = 'localhost' + m = re.match("^.*\s*user\s*=\s*([^\s]+)\s*.*$", conninfo) + if m: + user = m.group(1) + else: + user = os.environ['USER'] + m = re.match("^.*\s*port\s*=\s*([^\s]+)\s*.*$", conninfo) + if m: + port = m.group(1) + else: + port = '5432' + + m = re.match("^.*\s*sslmode\s*=\s*([^\s]+)\s*.*$", conninfo) + if m: + sslmode = m.group(1) + else: + sslmode = None + + return host,port,user,sslmode + + def get_last_complete(self): """Get the name of last xarchived segment.""" @@ -762,7 +775,7 @@ class WalMgr(skytools.DBScript): primary_conninfo = self.cf.get("primary_conninfo", "") if self.options.remove_password and primary_conninfo and not self.not_really: pg = Pgpass('~/.pgpass') - host, port, user = pg.pgpass_fields_from_conninfo(primary_conninfo) + host, port, user, _ = self.parse_conninfo(primary_conninfo) if pg.remove_user(host, port, user): self.log.info("Removing line from .pgpass") pg.write() @@ -775,13 +788,25 @@ class WalMgr(skytools.DBScript): def master_synch_standby(self): """Manage synchronous_standby_names parameter""" - if len(self.args) < 1: - die(1, "usage: synch-standby SYNCHRONOUS_STANDBY_NAMES") + if self.options.synchronous_standby_names is None: + if len(self.args) < 1: + die(1, "usage: synch-standby SYNCHRONOUS_STANDBY_NAMES") - names = self.args[0] - cf = PostgresConfiguration(self, self.cf.getfile("master_config")) + names = self.args[0] + self.assert_is_master(True) + else: + # as synchronous_standby_names is available since 9.1 + # we can override DEFAULT_PG_VERSION + global DEFAULT_PG_VERSION + DEFAULT_PG_VERSION = "9.1" - self.assert_is_master(True) + self.guess_locations() + self.override_cf_option('master_config', self.postgres_conf) + self.override_cf_option('master_data', self.pgdata) + self.override_cf_option('master_db', 'dbname=template1') + names = self.options.synchronous_standby_names + + cf = PostgresConfiguration(self, self.cf.getfile("master_config")) # list of slaves db = self.get_database("master_db") @@ -791,12 +816,14 @@ class WalMgr(skytools.DBScript): self.close_database("master_db") if names.strip() == "": - cf.set_synchronous_standby_names("") + if not self.not_really: + cf.set_synchronous_standby_names("") return if names.strip() == "*": if slave_names: - cf.set_synchronous_standby_names(names) + if not self.not_really: + cf.set_synchronous_standby_names(names) return else: die(1,"At least one slave must be available when enabling synchronous mode") @@ -812,7 +839,7 @@ class WalMgr(skytools.DBScript): if not slave_found: die(1,"At least one slave must be available from new list when enabling synchronous mode") - else: + elif not self.not_really: cf.set_synchronous_standby_names(names) def master_configure_archiving(self, enable_archiving, can_restart): @@ -1202,7 +1229,7 @@ primary_conninfo = %(primary_conninfo)s pwd = open(self.options.add_password).readline().rstrip('\n\r') pg = Pgpass('~/.pgpass') - host, port, user = pg.pgpass_fields_from_conninfo(self.options.primary_conninfo) + host, port, user, _ = self.parse_conninfo(self.options.primary_conninfo) pg.ensure_user(host, port, user, pwd) pg.write() @@ -1783,6 +1810,7 @@ STOP TIME: %(stop_time)s pausefile = os.path.join(srcdir, "PAUSE") stopfile = os.path.join(srcdir, "STOP") prgrfile = os.path.join(srcdir, "PROGRESS") + prxlogfile = os.path.join(srcdir,"PG_RECEIVEXLOG") srcfile = os.path.join(srcdir, srcname) partfile = os.path.join(partdir, srcname) @@ -1791,6 +1819,11 @@ STOP TIME: %(stop_time)s primary_conninfo = self.cf.get("primary_conninfo", "") if primary_conninfo and not os.path.isfile(srcfile): self.log.info("%s: not found (ignored)", srcname) + + # remove PG_RECEIVEXLOG file if it's present + if os.path.isfile(prxlogfile): + os.remove(prxlogfile) + sys.exit(1) # assume that postgres has processed the WAL file and is @@ -1858,7 +1891,7 @@ STOP TIME: %(stop_time)s self.stat_add('count', 1) self.send_stats() - def restore_database(self): + def restore_database(self, restore_config=True): """Restore the database from backup If setname is specified, the contents of that backup set directory are @@ -2063,7 +2096,8 @@ STOP TIME: %(stop_time)s # attempt to restore configuration. Note that we cannot # postpone this to boot time, as the configuration is needed # to start postmaster. - self.slave_restore_config() + if restore_config: + self.slave_restore_config() # run database in recovery mode self.log.info("Starting postmaster: %s", start_cmd) @@ -2119,6 +2153,159 @@ STOP TIME: %(stop_time)s open(stopfile, "w").write("1") self.log.info("Stopping recovery mode") + def slave_createslave(self): + self.assert_is_master(False) + + errors = False + xlog_dir = self.cf.getfile("completed_wals") + full_dir = self.cf.getfile("full_backup") + prxloglock = os.path.join(xlog_dir,"PG_RECEIVEXLOG") + pg_receivexlog = os.path.join(self.cf.getfile("slave_bin"), "pg_receivexlog") + pg_basebackup = os.path.join(self.cf.getfile("slave_bin"), "pg_basebackup") + + # check if pg_receivexlog is available + if not os.access(pg_receivexlog, os.X_OK): + die(1, "pg_receivexlog not available") + + # check if pg_receivexlog is already running + if os.path.isfile(prxloglock): + pidstring = open(prxloglock,"r").read() + try: + pid =int(pidstring) + try: + os.kill(pid, 0) + except OSError, e: + if e.errno == errno.EPERM: + self.log.fatal("Found pg_receivexlog lock file %s, pid %d in use", prxloglock, pid) + sys.exit(1) + elif e.errno == errno.ESRCH: + self.log.info("Ignoring stale pg_receivexlog lock file") + if not self.not_really: + os.remove(prxloglock) + else: + self.log.fatal("pg_receivexlog is already running in %s, pid %d", xlog_dir, pid) + sys.exit(1) + except ValueError: + self.log.fatal("pg_receivexlog lock file %s does not contain a pid: %s", prxloglock, pidstring) + sys.exit(1) + + # create directories + self.walmgr_setup() + + # ensure that backup destination is 0700 + if not self.not_really: + os.chmod(full_dir,0700) + + self.args = [str(os.getpid())] + if self.slave_lock_backups() != 0: + self.log.fatal("Cannot obtain backup lock.") + sys.exit(1) + + # get host and user from primary_conninfo + primary_conninfo = self.cf.get("primary_conninfo", "") + if not primary_conninfo: + die(1, "primary_conninfo missing") + host, port, user, sslmode = self.parse_conninfo(primary_conninfo) + + # change sslmode for pg_receivexlog and pg_basebackup + envssl=None + if sslmode: + envssl={"PGSSLMODE": sslmode} + + try: + # determine postgres version, we cannot use pg_control version number since + # 9.0 and 9.1 are using the same number in controlfile + pg_ver = "" + try: + cmdline = [os.path.join(self.cf.getfile("slave_bin"), "postgres"),'-V'] + process = subprocess.Popen(cmdline, stdout=subprocess.PIPE) + output = process.communicate() + pg_ver = output[0].split()[2] + self.log.debug("PostgreSQL version: %s" % pg_ver) + except: + pass + + # create pg_receivexlog process + cmdline = [pg_receivexlog,'-D', xlog_dir, '-h', host, '-U', user, '-p', port, '-w'] + self.log.info("Starting pg_receivexlog") + + if not self.not_really: + p_rxlog = subprocess.Popen(cmdline,env=envssl) + + # create pg_receivexlog lock file + open(prxloglock, "w").write(str(p_rxlog.pid)) + + # leave error checking for pg_basebackup + # if pg_basebackup command fails then pg_receivexlog is not working either + + # start backup + self.log.info("Starting pg_basebackup") + cmdline = [pg_basebackup, '-D', full_dir, '-h', host, '-U', user, '-p', port, '-w'] + if not self.not_really: + p_basebackup = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=envssl) + output = p_basebackup.communicate() + res = p_basebackup.returncode + + if res != 0: + raise Exception("exec failed, res=%d (%r), %s" % (res, cmdline, output[1])) + + # fix skipped ssl symlinks (only relevant for 9.1) + if pg_ver.startswith('9.1.'): + for line in output[1].splitlines(): + m = re.match('WARNING: skipping special file "\./server\.(crt|key)"', line) + if m: + # create symlinks + if m.group(1) == 'crt': + os.symlink('/etc/ssl/certs/ssl-cert-snakeoil.pem', + os.path.join(full_dir,'server.crt')) + elif m.group(1) == 'key': + os.symlink('/etc/ssl/private/ssl-cert-snakeoil.key', + os.path.join(full_dir,'server.key')) + + self.log.info("pg_basebackup finished successfully") + + # restore + self.args = [] + self.restore_database(False) + + # wait for recovery + while os.path.isfile(prxloglock) and not self.not_really: + time.sleep(5) + + except Exception, e: + self.log.error(e) + errors = True + + finally: + # stop pg_receivexlog + try: + if not self.not_really: + os.kill(p_rxlog.pid, signal.SIGTERM) + self.log.info("pg_receivelog stopped") + except Exception, det: + self.log.warning("Failed to stop pg_receivexlog: %s", det) + + # cleanup + if os.path.isfile(prxloglock): + os.remove(prxloglock) + + if not self.not_really: + for f in os.listdir(xlog_dir): + if f.endswith('.partial'): + self.log.debug("Removing %s", os.path.join(xlog_dir,f)) + os.remove(os.path.join(xlog_dir,f)) + + if not self.not_really and os.path.isdir(full_dir): + shutil.rmtree(full_dir) + + self.slave_resume_backups() + + if not errors: + self.log.info("Streaming replication standby created successfully") + else: + self.log.error("Failed to create streaming replication standby") + sys.exit(1) + def slave_pause(self, waitcomplete=0): """Pause the WAL apply, wait until last file applied if needed""" diff --git a/scripts/simple_local_consumer.py b/scripts/simple_local_consumer.py index 1c8f97dd..87c65868 100755 --- a/scripts/simple_local_consumer.py +++ b/scripts/simple_local_consumer.py @@ -13,6 +13,10 @@ Config:: # query to call dst_query = select * from somefunc(%%(pgq.ev_data)s); + + ## Use table_filter where possible instead of this ## + # filter for events (SQL fragment) + consumer_filter = ev_extra1 = 'public.mytable1' """ @@ -30,6 +34,8 @@ class SimpleLocalConsumer(pgq.LocalConsumer): def reload(self): super(SimpleLocalConsumer, self).reload() self.dst_query = self.cf.get("dst_query") + if self.cf.get("consumer_filter", ""): + self.consumer_filter = self.cf.get("consumer_filter", "") def process_local_event(self, db, batch_id, ev): curs = self.get_database('dst_db', autocommit = 1).cursor() diff --git a/sql/pgq/triggers/stringutil.c b/sql/pgq/triggers/stringutil.c index ebbdfe6b..ae67a77d 100644 --- a/sql/pgq/triggers/stringutil.c +++ b/sql/pgq/triggers/stringutil.c @@ -98,7 +98,7 @@ static int pgq_urlencode(char *dst, const uint8 *src, int srclen) } else if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') - || c == '_' || c == '.') { + || c == '_' || c == '.' || c == '-') { *p++ = c; } else { *p++ = '%'; |