summaryrefslogtreecommitdiff
path: root/python/walmgr.py
diff options
context:
space:
mode:
authorTarvi Pillessaar2012-04-13 08:57:22 +0000
committerTarvi Pillessaar2012-05-05 18:41:47 +0000
commit5681a1ee3754588bc85c3d8db0335cb868c0cc5f (patch)
tree2acc9e886a07196578f399a44ef0c9ca7e14fb4c /python/walmgr.py
parente97eef4e12abf91f8fa4182560a77af735875bfb (diff)
walmgr: add option for init-slave to add password from file to .pgpass
walmgr: add command synch-standby
Diffstat (limited to 'python/walmgr.py')
-rwxr-xr-xpython/walmgr.py199
1 files changed, 177 insertions, 22 deletions
diff --git a/python/walmgr.py b/python/walmgr.py
index f990bdef..aa97607d 100755
--- a/python/walmgr.py
+++ b/python/walmgr.py
@@ -10,6 +10,7 @@ Master commands:
syncdaemon Daemon mode for regular syncing
stop Stop archiving - de-configure PostgreSQL
periodic Run periodic command if configured.
+ synch-standby Manage synchronous streaming replication.
Slave commands:
boot Stop playback, accept queries
@@ -183,6 +184,74 @@ class BackupLabel:
if m:
self.label_string = m.group(1)
+class Pgpass:
+ """Manipulate pgpass contents"""
+
+ def __init__(self, passfile):
+ """Load .pgpass contents"""
+ self.passfile = os.path.expanduser(passfile)
+ self.contents = []
+
+ if os.path.isfile(self.passfile):
+ self.contents = open(self.passfile).readlines()
+
+ def split_pgpass_line(selg, pgline):
+ """Parses pgpass line, returns dict"""
+ try:
+ (host, port, db, user, pwd) = pgline.rstrip('\n\r').split(":")
+ return {'host': host, 'port': port, 'db': db, 'user': user, 'pwd': pwd}
+ except ValueError:
+ return None
+
+ def ensure_user(self, host, port, user, pwd):
+ """Ensure that line for streaming replication exists in .pgpass"""
+ self.remove_user(host, port, user)
+ self.contents.insert(0, '%s:%s:%s:%s:%s\n' % (host, port, 'replication', user, pwd))
+
+ def remove_user(self, host, port, user):
+ """Remove all matching lines from .pgpass"""
+
+ new_contents = []
+ found = False
+ for l in self.contents:
+ p = self.split_pgpass_line(l)
+ if p and p['host'] == host and p['port'] == port and p['user'] == user and p['db'] == 'replication':
+ found = True
+ continue
+
+ new_contents.append(l)
+
+ self.contents = new_contents
+ return found
+
+ def write(self):
+ """Write contents back to file"""
+ f = open(self.passfile,'w')
+ os.chmod(self.passfile, 0600)
+ f.writelines(self.contents)
+ f.close()
+
+ def pgpass_fields_from_conninfo(self,conninfo):
+ """Extract host,user and port from primary-conninfo"""
+ m = re.match("^.*\s*host=\s*([^\s]+)\s*.*$", conninfo)
+ if m:
+ host = m.group(1)
+ else:
+ host = 'localhost'
+ m = re.match("^.*\s*user=\s*([^\s]+)\s*.*$", conninfo)
+ if m:
+ user = m.group(1)
+ else:
+ user = os.environ['USER']
+ m = re.match("^.*\s*port=\s*([^\s]+)\s*.*$", conninfo)
+ if m:
+ port = m.group(1)
+ else:
+ port = '5432'
+
+ return host,port,user
+
+
class PostgresConfiguration:
"""Postgres configuration manipulation"""
@@ -209,7 +278,7 @@ class PostgresConfiguration:
def synchronous_standby_names(self):
"""Return value for specified parameter"""
# see if explicitly set
- m = re.search("^\s*synchronous_standby_names\s*=\s*'?([a-zA-Z01]+)'?\s*#?.*$", self.cf_buf, re.M | re.I)
+ m = re.search("^\s*synchronous_standby_names\s*=\s*'([^']*)'\s*#?.*$", self.cf_buf, re.M | re.I)
if m:
return m.group(1)
# also, it could be commented out as initdb leaves it
@@ -273,6 +342,20 @@ class PostgresConfiguration:
# polite method does not work, as usually not enough perms for it
open(self.cf_file, "w").write(self.cf_buf)
+ def set_synchronous_standby_names(self,param_value):
+ """Helper function to change synchronous_standby_names and signal postmaster"""
+
+ self.log.info("Changing synchronous_standby_names from '%s' to '%s'" % (self.synchronous_standby_names(),param_value))
+ cf_params = dict()
+ cf_params['synchronous_standby_names'] = param_value
+ self.modify(cf_params)
+ self.write()
+
+ data_dir=self.walmgr.cf.getfile("master_data")
+ self.log.info("Sending SIGHUP to postmaster")
+ self.walmgr.signal_postmaster(data_dir, signal.SIGHUP)
+
+
class WalMgr(skytools.DBScript):
def init_optparse(self, parser=None):
@@ -294,6 +377,10 @@ class WalMgr(skytools.DBScript):
help = "slave: add public key to authorized_hosts", default=False)
p.add_option("", "--ssh-remove-key", action="store", dest="ssh_remove_key",
help = "slave: remove master key from authorized_hosts", default=False)
+ p.add_option("", "--add-password", action="store", dest="add_password",
+ help = "slave: add password from file to .pgpass. Additional fields will be extracted from primary-conninfo", default=False)
+ p.add_option("", "--remove-password", action="store_true", dest="remove_password",
+ help = "slave: remove previously added line from .pgpass", default=False)
p.add_option("", "--primary-conninfo", action="store", dest="primary_conninfo", default=None,
help = "slave: connect string for streaming replication master")
p.add_option("", "--init-slave", action="store_true", dest="init_slave",
@@ -402,27 +489,28 @@ class WalMgr(skytools.DBScript):
self.pidfile = None
cmdtab = {
- 'init_master': self.walmgr_init_master,
- 'init_slave': self.walmgr_init_slave,
- 'setup': self.walmgr_setup,
- 'stop': self.master_stop,
- 'backup': self.run_backup,
- 'listbackups': self.list_backups,
- 'restore': self.restore_database,
- 'periodic': self.master_periodic,
- 'sync': self.master_sync,
- 'syncdaemon': self.master_syncdaemon,
- 'pause': self.slave_pause,
- 'continue': self.slave_continue,
- 'boot': self.slave_boot,
- 'cleanup': self.walmgr_cleanup,
- 'xlock': self.slave_lock_backups_exit,
- 'xrelease': self.slave_resume_backups,
- 'xrotate': self.slave_rotate_backups,
- 'xpurgewals': self.slave_purge_wals,
- 'xarchive': self.master_xarchive,
- 'xrestore': self.xrestore,
- 'xpartialsync': self.slave_append_partial,
+ 'init_master': self.walmgr_init_master,
+ 'init_slave': self.walmgr_init_slave,
+ 'setup': self.walmgr_setup,
+ 'stop': self.master_stop,
+ 'backup': self.run_backup,
+ 'listbackups': self.list_backups,
+ 'restore': self.restore_database,
+ 'periodic': self.master_periodic,
+ 'sync': self.master_sync,
+ 'syncdaemon': self.master_syncdaemon,
+ 'pause': self.slave_pause,
+ 'continue': self.slave_continue,
+ 'boot': self.slave_boot,
+ 'cleanup': self.walmgr_cleanup,
+ 'synch-standby': self.master_synch_standby,
+ 'xlock': self.slave_lock_backups_exit,
+ 'xrelease': self.slave_resume_backups,
+ 'xrotate': self.slave_rotate_backups,
+ 'xpurgewals': self.slave_purge_wals,
+ 'xarchive': self.master_xarchive,
+ 'xrestore': self.xrestore,
+ 'xpartialsync': self.slave_append_partial,
}
if not cmdtab.has_key(self.cmd):
@@ -670,11 +758,63 @@ class WalMgr(skytools.DBScript):
else:
self.log.debug("authorized_keys:\n%s" % keys)
+ # remove password from .pgpass
+ primary_conninfo = self.cf.get("primary_conninfo", "")
+ if self.options.remove_password and primary_conninfo and not self.not_really:
+ pg = Pgpass('~/.pgpass')
+ host, port, user = pg.pgpass_fields_from_conninfo(primary_conninfo)
+ if pg.remove_user(host, port, user):
+ self.log.info("Removing line from .pgpass")
+ pg.write()
+
# get rid of the configuration file, both master and slave
self.log.info("Removing config file: %s" % self.cfgfile)
if not self.not_really:
os.remove(self.cfgfile)
+ def master_synch_standby(self):
+ """Manage synchronous_standby_names parameter"""
+
+ if len(self.args) < 1:
+ die(1, "usage: synch-standby SYNCHRONOUS_STANDBY_NAMES")
+
+ names = self.args[0]
+ cf = PostgresConfiguration(self, self.cf.getfile("master_config"))
+
+ self.assert_is_master(True)
+
+ # list of slaves
+ db = self.get_database("master_db")
+ cur = db.cursor()
+ cur.execute("select application_name from pg_stat_replication")
+ slave_names = [slave[0] for slave in cur.fetchall()]
+ self.close_database("master_db")
+
+ if names.strip() == "":
+ cf.set_synchronous_standby_names("")
+ return
+
+ if names.strip() == "*":
+ if slave_names:
+ cf.set_synchronous_standby_names(names)
+ return
+ else:
+ die(1,"At least one slave must be available when enabling synchronous mode")
+
+ # ensure that at least one slave is available from new parameter value
+ slave_found = None
+ for new_synch_slave in re.findall(r"[^\s,]+",names):
+ if new_synch_slave not in slave_names:
+ self.log.warning("No slave available with name %s" % new_synch_slave)
+ else:
+ slave_found = True
+ break
+
+ if not slave_found:
+ die(1,"At least one slave must be available from new list when enabling synchronous mode")
+ else:
+ cf.set_synchronous_standby_names(names)
+
def master_configure_archiving(self, enable_archiving, can_restart):
"""Turn the archiving on or off"""
@@ -1055,6 +1195,21 @@ primary_conninfo = %(primary_conninfo)s
af.write(master_pubkey)
af.close()
+ if self.options.add_password and self.options.primary_conninfo:
+ # add password to pgpass
+
+ self.log.debug("Reading password from file %s" % self.options.add_password)
+ pwd = open(self.options.add_password).readline().rstrip('\n\r')
+
+ pg = Pgpass('~/.pgpass')
+ host, port, user = pg.pgpass_fields_from_conninfo(self.options.primary_conninfo)
+ pg.ensure_user(host, port, user, pwd)
+ pg.write()
+
+ self.log.info("Added password from %s to .pgpass" % self.options.add_password)
+
+
+
def walmgr_setup(self):
if self.is_master:
self.log.info("Configuring WAL archiving")