##
## This script should only be called via the 'bucardo' program
##
-## Copyright 2006-2015 Greg Sabino Mullane <greg@endpoint.com>
+## Copyright 2006-2016 Greg Sabino Mullane <greg@endpoint.com>
##
## Please visit http://bucardo.org for more information
use utf8;
use open qw( :std :utf8 );
-our $VERSION = '5.4.1';
+our $VERSION = '5.4.0';
use DBI 1.51; ## How Perl talks to databases
use DBD::Pg 2.0 qw( :async ); ## How Perl talks to Postgres databases
my $sequence_columns = join ',' => map { $_->[0] } @sequence_columns;
+## Default statement chunk size in case config does not have it
+my $default_statement_chunk_size = 10_000;
+
## Output messages per language
our %msg = (
'en' => {
## Also setup common attributes
my (@dbs, @dbs_source, @dbs_target, @dbs_delta, @dbs_fullcopy,
@dbs_connectable, @dbs_dbi, @dbs_write, @dbs_non_fullcopy,
- @dbs_postgres, @dbs_drizzle, @dbs_mongo, @dbs_mysql, @dbs_oracle,
+ @dbs_postgres, @dbs_drizzle, @dbs_firebird, @dbs_mongo, @dbs_mysql, @dbs_oracle,
@dbs_redis, @dbs_sqlite);
## Used to weed out all but one source if in onetimecopy mode
$d->{has_mysql_timestamp_issue} = 1;
}
+ ## Firebird
+ if ('firebird' eq $d->{dbtype}) {
+ push @dbs_firebird => $dbname;
+ $d->{does_sql} = 1;
+ $d->{does_truncate} = 1;
+ $d->{does_savepoints} = 1;
+ $d->{does_limit} = 1;
+ $d->{has_mysql_timestamp_issue} = 1;
+ }
+
## Oracle
if ('oracle' eq $d->{dbtype}) {
push @dbs_oracle => $dbname;
## Databases with Perl DBI support
if ($d->{dbtype} eq 'postgres'
or $d->{dbtype} eq 'drizzle'
+ or $d->{dbtype} eq 'firebird'
or $d->{dbtype} eq 'mariadb'
or $d->{dbtype} eq 'mysql'
or $d->{dbtype} eq 'oracle'
if ($d->{dbtype} eq 'mongo') {
my $collection = $d->{dbh}->get_collection($tname);
my $object = {
- sync => $syncname,
- status => 'failed',
- endtime => scalar gmtime,
+ '$.sync' => $syncname,
+ '$.status' => 'failed',
+ '$.endtime' => scalar gmtime,
};
- $collection->update
+ $collection->update_one
(
{sync => $syncname},
$object,
if ($d->{dbtype} eq 'mongo') {
my $collection = $d->{dbh}->get_collection($tname);
my $object = {
- sync => $syncname,
- status => 'started',
- starttime => scalar gmtime,
+ '$.sync' => $syncname,
+ '$.status' => 'started',
+ '$.starttime' => scalar gmtime
};
- $collection->update
+ $collection->update_one
(
{sync => $syncname},
$object,
## For this table, delete all rows that may exist on the target(s)
$dmlcount{deletes} += $self->delete_rows(
- $deltabin{$dbname1}, $S, $T, $g, $sync, \@pushdbs);
+ $deltabin{$dbname1}, $g, $sync, \@pushdbs);
## For this table, copy all rows from source to target(s)
$dmlcount{inserts} += $self->push_rows(
- $deltabin{$dbname1}, $g, $sync, $sourcedb, \@pushdbs);
+ $deltabin{$dbname1}, $g, $sync, $sourcedb, \@pushdbs, 'copy');
## Store references to the list of changes in case custom code needs them
$sync->{deltarows}{$S}{$T} = $deltabin{$dbname1};
## For this table, copy all rows from source to target(s)
$dmlcount{inserts} += $dmlcount{I}{target}{$S}{$T} = $self->push_rows(
- 'fullcopy', $g, $sync, $sourcex,
+ {}, $g, $sync, $sourcex,
## We need an array of database objects here:
- [ map { $sync->{db}{$_} } @dbs_copytarget ]);
+ [ map { $sync->{db}{$_} } @dbs_copytarget ], 'fullcopy');
## Add to our cross-table tally
$dmlcount{allinserts}{target} += $dmlcount{I}{target}{$S}{$T};
if ($d->{dbtype} eq 'mongo') {
my $collection = $d->{dbh}->get_collection($tname);
my $object = {
- sync => $syncname,
- status => 'complete',
- endtime => scalar gmtime,
+ '$.sync' => $syncname,
+ '$.status' => 'complete',
+ '$.endtime' => scalar gmtime,
};
- $collection->update
+ $collection->update_one
(
{sync => $syncname},
$object,
}
## For now, we simply require it
require MongoDB;
- my $conn = MongoDB::Connection->new($mongodsn); ## no critic
+ my $mongoURI = 'mongodb://';
+ if (exists $mongodsn->{dbuser}) {
+ my $pass = $mongodsn->{dbpass} || '';
+ $mongoURI .= "$mongodsn->{dbuser}:$pass\@";
+ }
+ $mongoURI .= $mongodsn->{dbhost} || 'localhost';
+ $mongoURI .= ":$mongodsn->{dbport}" if exists $mongodsn->{dbport};
+ my $conn = MongoDB->connect($mongoURI); ## no critic
$dbh = $conn->get_database($dbname);
my $backend = 0;
+ if (! $self->{show_mongodb_version}++) {
+ $self->glog("Perl module MongoDB loaded. Version $MongoDB::VERSION", LOG_NORMAL);
+ }
return $backend, $dbh;
}
+ elsif ('firebird' eq $dbtype) {
+ $dsn = "dbi:Firebird:db=$dbname";
+ }
elsif ('mysql' eq $dbtype or 'mariadb' eq $dbtype) {
$dsn = "dbi:mysql:database=$dbname";
}
## For now, we simply require it
require Redis;
$dbh = Redis->new(@dsn);
- my $backend = 0;
+ if (! $self->{show_redis_version}++) {
+ $self->glog("Perl module Redis loaded. Version $Redis::VERSION", LOG_NORMAL);
+ }
- return $backend, $dbh;
+ return 0, $dbh;
}
elsif ('sqlite' eq $dbtype) {
$dsn = "dbi:SQLite:dbname=$dbname";
## From here on out we are setting Postgres-specific items, so everyone else is done
if ($dbtype ne 'postgres') {
+ my $modname = "DBD::" . $dbh->{Driver}->{Name};
+ if (! $self->{"show_${modname}_version"}++) {
+ my $modver = $modname->VERSION;
+ $self->glog("Perl module $modname loaded. Version $modver", LOG_NORMAL);
+ }
return 0, $dbh;
}
sub delete_rows {
- ## Given a list of rows, delete them from a database
- ## Arguments: six
- ## 1. Hash of rows, where the key is \0 joined pkeys
- ## 2. Schema name
- ## 3. Table name
- ## 4. Goat object
- ## 5. Sync object
- ## 6. Target database object, or arrayref of the same
+ ## Given a list of rows, delete them from a table in one or more databases
+ ## Arguments: four
+ ## 1. Hashref of rows to delete, where the keys are the primary keys (\0 joined if multi).
+ ## 2. Table object
+ ## 3. Sync object
+ ## 4. Target database object (or an arrayref of the same)
## Returns: number of rows deleted
- my ($self,$rows,$S,$T,$goat,$sync,$deldb) = @_;
-
- my $syncname = $sync->{name};
- my $pkcols = $goat->{pkeycols};
- my $pkcolsraw = $goat->{pkeycolsraw};
- my $numpks = $goat->{numpkcols};
+ my ($self,$rows,$Table,$Sync,$TargetDB) = @_;
## Have we already truncated this table? If yes, skip and reset the flag
- if (exists $goat->{truncatewinner}) {
+ if (exists $Table->{truncatewinner}) {
return 0;
}
+ my ($S,$T) = ($Table->{safeschema},$Table->{safetable});
+
+ my $syncname = $Sync->{name};
+ my $pkcols = $Table->{pkeycols};
+ my $pkcolsraw = $Table->{pkeycolsraw};
+
## Ensure the target database argument is always an array
- if (ref $deldb ne 'ARRAY') {
- $deldb = [$deldb];
+ if (ref $TargetDB ne 'ARRAY') {
+ $TargetDB = [$TargetDB];
}
## We may be going from one table to another - this is the mapping hash
- my $newname = $goat->{newname}{$self->{syncname}};
+ my $customname = $Table->{newname}{$syncname} || {};
## Are we truncating?
if (exists $self->{truncateinfo} and exists $self->{truncateinfo}{$S}{$T}) {
## Try and truncate each target
- for my $d (@$deldb) {
+ for my $Target (@$TargetDB) {
- my $type = $d->{dbtype};
+ my $target_tablename = $customname->{$Target->{name}};
- my $tname = $newname->{$d->{name}};
+ my $type = $Target->{dbtype};
## Postgres is a plain and simple TRUNCATE, with an async flag
## TRUNCATE CASCADE is not needed as everything should be in one
## sync (herd), and we have turned all FKs off
if ('postgres' eq $type) {
- my $tdbh = $d->{dbh};
- $tdbh->do("TRUNCATE table $tname", { pg_async => PG_ASYNC });
- $d->{async_active} = time;
- } ## end postgres database
+ $Target->{dbh}->do("$self->{sqlprefix}TRUNCATE table $target_tablename", { pg_async => PG_ASYNC });
+ $Target->{async_active} = time;
+ }
## For all other SQL databases, we simply truncate
- elsif ($d->{does_sql}) {
- $d->{dbh}->do("TRUNCATE TABLE $tname");
+ elsif ($Target->{does_sql}) {
+ $Target->{dbh}->do("$self->{sqlprefix}TRUNCATE TABLE $target_tablename");
}
## For MongoDB, we simply remove everything from the collection
## This keeps the indexes around (which is why we don't "drop")
elsif ('mongo' eq $type) {
- $self->{collection} = $d->{dbh}->get_collection($tname);
- $self->{collection}->remove({}, { safe => 1} );
- }
- ## For Redis, do nothing
- elsif ('redis' eq $type) {
+ $self->{collection} = $Target->{dbh}->get_collection($target_tablename);
+ $self->{collection}->remove({}, { safe => 1 } );
}
## For flatfiles, write out a basic truncate statement
elsif ($type =~ /flat/o) {
- printf {$d->{filehandle}} qq{TRUNCATE TABLE %S;\n\n},
- 'flatpg' eq $type ? $tname : $tname;
- $self->glog(qq{Appended to flatfile "$d->{filename}"}, LOG_VERBOSE);
+ printf {$Target->{filehandle}} qq{TRUNCATE TABLE $target_tablename;\n\n};
+ $self->glog(qq{Appended truncate command to flatfile "$Target->{filename}"}, LOG_VERBOSE);
+ }
+ elsif ('redis' eq $type) {
+ ## For Redis, do nothing
+ }
+ ## Safety valve:
+ else {
+ die qq{Do not know how to do truncate for type $type!\n};
}
- } ## end each database to be truncated
+ } ## end each target to be truncated
## Final cleanup for each target
- for my $d (@$deldb) {
- if ('postgres' eq $d->{dbtype}) {
- ## Wrap up all the async truncate call
- $d->{dbh}->pg_result();
- $d->{async_active} = 0;
+ for my $Target (@$TargetDB) {
+ if ('postgres' eq $Target->{dbtype}) {
+ ## Wait for the async truncate call to finish
+ $Target->{dbh}->pg_result();
+ $Target->{async_active} = 0;
}
}
+ ## We do not know how many rows were actually truncated
return 0;
} ## end truncation
- ## The number of items before we break it into a separate statement
- ## This is inexact, as we don't know how large each key is,
- ## but should be good enough as long as not set too high.
- ## For now, all targets have the same chunksize
- my $chunksize = $config{statement_chunk_size} || 8_000;
+ ## We may want to break the SQL into separate statements if there are lots of keys
+ my $chunksize = $config{statement_chunk_size} || $default_statement_chunk_size;
+
+ ## The number of primary keys this table has affects our SQL
+ my $numpks = $Table->{numpkcols};
## Setup our deletion SQL as needed
my %SQL;
- for my $t (@$deldb) {
+ for my $Target (@$TargetDB) {
- my $type = $t->{dbtype};
+ my $type = $Target->{dbtype};
## Track the number of rows actually deleted from this target
- $t->{deleted_rows} = 0;
+ $Target->{deleted_rows} = 0;
## Set to true when all rounds completed
- $t->{delete_complete} = 0;
+ $Target->{delete_complete} = 0;
## No special preparation for mongo or redis
next if $type =~ /mongo|redis/;
- ## Set the type of SQL we are using: IN vs ANY. Default is IN
- my $sqltype = 'IN';
+ ## The actual target table name: may differ from the source!
+ my $target_tablename = $customname->{$Target->{name}};
- ## Use of ANY is greatly preferred, but can only use if the
- ## underlying database supports it, and if we have a single column pk
- if ($t->{does_ANY_clause} and 1==$numpks) {
- $sqltype = 'ANY';
+ if ('firebird' eq $type) {
+ $Table->{pklist} =~ s/\"//g; ## not ideal: fix someday
+ $Table->{pklist} = uc $Table->{pklist};
+ $target_tablename = qq{"$target_tablename"} if $target_tablename !~ /"/;
}
- ## The actual target table name: may differ from the source!
- my $tname = $newname->{$t->{name}};
+ ## Set the type of SQL we are using: IN vs ANY. Default is IN
+ ## Use of ANY is greatly preferred, but can only use if the
+ ## underlying database supports it, and if we have a single column pk
+ my $sqltype = ($Target->{does_ANY_clause} and 1==$numpks) ? 'ANY' : 'IN';
## Internal counters to help us break queries into chunks if needed
my ($round, $roundtotal) = (0,0);
## Array to store each chunk of SQL
my @chunk;
## Optimization for a single primary key using ANY(?)
- if ('ANY' eq $sqltype and ! exists $SQL{ANY}{$tname}) {
- $SQL{ANY}{$tname} = "$self->{sqlprefix}DELETE FROM $tname WHERE $pkcols = ANY(?)";
+ if ('ANY' eq $sqltype and ! exists $SQL{ANY}{$target_tablename}) {
+ $SQL{ANY}{$target_tablename} = "$self->{sqlprefix}DELETE FROM $target_tablename WHERE $pkcols = ANY(?)";
for my $key (keys %$rows) {
push @{$chunk[$round]} => length $key ? ([split '\0', $key, -1]) : [''];
if (++$roundtotal >= $chunksize) {
$SQL{ANYargs} = \@chunk;
}
## Normal DELETE call with IN() clause
- elsif ('IN' eq $sqltype and ! exists $SQL{IN}{$tname}) {
- $SQL{IN}{$tname} = sprintf '%sDELETE FROM %s WHERE (%s) IN (',
+ elsif ('IN' eq $sqltype and ! exists $SQL{IN}{$target_tablename}) {
+ $SQL{IN}{$target_tablename} = sprintf '%sDELETE FROM %s WHERE (%s) IN (',
$self->{sqlprefix},
- $tname,
- $goat->{pklist};
+ $target_tablename,
+ $Table->{pklist};
my $inner;
- if ($t->{has_mysql_timestamp_issue}) {
+ if ($Target->{has_mysql_timestamp_issue}) {
for my $key (keys %$rows) {
$inner = length $key
? (join ',' => map { s/\'/''/go; s{\\}{\\\\}; s/\+\d\d$//; qq{'$_'}; } split '\0', $key, -1)
## Cleanup
for (@chunk) {
chop;
- $_ = "$SQL{IN}{$tname} $_)";
+ $_ = "$SQL{IN}{$target_tablename} $_)";
}
- $SQL{IN}{$tname} = \@chunk;
+ $SQL{IN}{$target_tablename} = \@chunk;
}
- $t->{delete_rounds} = @chunk;
+ $Target->{delete_rounds} = @chunk;
## If we bypassed because of a cached version, use the cached delete_rounds too
if ('ANY' eq $sqltype) {
- if (exists $SQL{ANYrounds}{$tname}) {
- $t->{delete_rounds} = $SQL{ANYrounds}{$tname};
+ if (exists $SQL{ANYrounds}{$target_tablename}) {
+ $Target->{delete_rounds} = $SQL{ANYrounds}{$target_tablename};
}
else {
- $SQL{ANYrounds}{$tname} = $t->{delete_rounds};
+ $SQL{ANYrounds}{$target_tablename} = $Target->{delete_rounds};
}
}
elsif ('IN' eq $sqltype) {
- if (exists $SQL{INrounds}{$tname}) {
- $t->{delete_rounds} = $SQL{INrounds}{$tname};
+ if (exists $SQL{INrounds}{$target_tablename}) {
+ $Target->{delete_rounds} = $SQL{INrounds}{$target_tablename};
}
else {
- $SQL{INrounds}{$tname} = $t->{delete_rounds};
+ $SQL{INrounds}{$target_tablename} = $Target->{delete_rounds};
}
}
## Empty our internal tracking items that may have been set previously
- $t->{delete_round} = 0;
- delete $t->{delete_sth};
+ $Target->{delete_round} = 0;
+ delete $Target->{delete_sth};
- }
+ } ## end each Target
## Start the main deletion loop
## The idea is to be efficient as possible by always having as many
$did_something = 0;
## Wrap up any async targets that have finished
- for my $t (@$deldb) {
- next if !$t->{async_active} or $t->{delete_complete};
- if ('postgres' eq $t->{dbtype}) {
- if ($t->{dbh}->pg_ready) {
+ for my $Target (@$TargetDB) {
+ next if ! $Target->{async_active} or $Target->{delete_complete};
+ if ('postgres' eq $Target->{dbtype}) {
+ if ($Target->{dbh}->pg_ready) {
## If this was a do(), we already have the number of rows
if (1 == $numpks) {
- $t->{deleted_rows} += $t->{dbh}->pg_result();
+ $Target->{deleted_rows} += $Target->{dbh}->pg_result();
}
else {
- $t->{dbh}->pg_result();
+ $Target->{dbh}->pg_result();
}
- $t->{async_active} = 0;
+ $Target->{async_active} = 0;
}
}
## Don't need to check for invalid types: happens on the kick off below
}
## Kick off all dormant async targets
- for my $t (@$deldb) {
+ for my $Target (@$TargetDB) {
## Skip if this target does not support async, or is in the middle of a query
- next if !$t->{does_async} or $t->{async_active} or $t->{delete_complete};
+ next if ! $Target->{does_async} or $Target->{async_active} or $Target->{delete_complete};
## The actual target name
- my $tname = $newname->{$t->{name}};
+ my $target_tablename = $customname->{$Target->{name}};
- if ('postgres' eq $t->{dbtype}) {
+ if ('postgres' eq $Target->{dbtype}) {
## Which chunk we are processing.
- $t->{delete_round}++;
- if ($t->{delete_round} > $t->{delete_rounds}) {
- $t->{delete_complete} = 1;
+ $Target->{delete_round}++;
+ if ($Target->{delete_round} > $Target->{delete_rounds}) {
+ $Target->{delete_complete} = 1;
next;
}
- my $dbname = $t->{name};
- $self->glog("Deleting from target $dbname.$tname (round $t->{delete_round} of $t->{delete_rounds})", LOG_DEBUG);
+ my $dbname = $Target->{name};
+ $self->glog("Deleting from target $dbname.$target_tablename (round $Target->{delete_round} of $Target->{delete_rounds})", LOG_DEBUG);
$did_something++;
## Single primary key, so delete using the ANY(?) format
if (1 == $numpks) {
## Use the or-equal so we only prepare this once
- $t->{delete_sth} ||= $t->{dbh}->prepare("$SQL{ANY}{$tname}", { pg_async => PG_ASYNC });
- $t->{delete_sth}->execute($SQL{ANYargs}->[$t->{delete_round}-1]);
+ $Target->{delete_sth} ||= $Target->{dbh}->prepare("$SQL{ANY}{$target_tablename}", { pg_async => PG_ASYNC });
+ $Target->{delete_sth}->execute($SQL{ANYargs}->[$Target->{delete_round}-1]);
}
## Multiple primary keys, so delete old school via IN ((x,y),(a,b))
else {
- my $pre = $t->{delete_rounds} > 1 ? "/* $t->{delete_round} of $t->{delete_rounds} */ " : '';
+ my $pre = $Target->{delete_rounds} > 1 ? "/* $Target->{delete_round} of $Target->{delete_rounds} */ " : '';
## The pg_direct tells DBD::Pg there are no placeholders, and to use PQexec directly
- $t->{deleted_rows} += $t->{dbh}->
- do($pre.$SQL{IN}{$tname}->[$t->{delete_round}-1], { pg_async => PG_ASYNC, pg_direct => 1 });
+ $Target->{deleted_rows} += $Target->{dbh}->
+ do($pre.$SQL{IN}{$target_tablename}->[$Target->{delete_round}-1], { pg_async => PG_ASYNC, pg_direct => 1 });
}
- $t->{async_active} = time;
+ $Target->{async_active} = time;
} ## end postgres
else {
- die qq{Do not know how to do async for type $t->{dbtype}!\n};
+ die qq{Do not know how to do async for type $Target->{dbtype}!\n};
}
} ## end all async targets
## Kick off a single non-async target
- for my $t (@$deldb) {
+ for my $Target (@$TargetDB) {
## Skip if this target is async, or has no more rounds
- next if $t->{does_async} or $t->{delete_complete};
+ next if $Target->{does_async} or $Target->{delete_complete};
$did_something++;
- my $type = $t->{dbtype};
+ my $type = $Target->{dbtype};
## The actual target name
- my $tname = $newname->{$t->{name}};
+ my $target_tablename = $customname->{$Target->{name}};
+
+ $self->glog("Deleting from target $target_tablename (type=$type)", LOG_DEBUG);
+
+ if ('firebird' eq $type) {
+ $target_tablename = qq{"$target_tablename"} if $target_tablename !~ /"/;
+ }
if ('mongo' eq $type) {
## Grab the collection name and store it
- $self->{collection} = $t->{dbh}->get_collection($tname);
+ $self->{collection} = $Target->{dbh}->get_collection($target_tablename);
## Because we may have multi-column primary keys, and each key may need modifying,
## we have to put everything into an array of arrays.
## Binary PKs are easy: all we have to do is decode
## We can assume that binary PK means not a multi-column PK
- if ($goat->{hasbinarypkey}) {
+ if ($Table->{hasbinarypkey}) {
@{ $delkeys[0] } = map { decode_base64($_) } keys %$rows;
}
else {
## Grab what type this column is
## We need to map non-strings to correct types as best we can
- my $ctype = $goat->{columnhash}{$realpkname}{ftype};
+ my $ctype = $Table->{columnhash}{$realpkname}{ftype};
## For integers, we simply force to a Perlish int
if ($ctype =~ /smallint|integer|bigint/o) {
my @newarray = @{ $delkeys[0] }[$bottom..$top];
my $result = $self->{collection}->remove(
{$pkcolsraw => { '$in' => \@newarray }}, { safe => 1 });
- $t->{deleted_rows} += $result->{n};
+ $Target->{deleted_rows} += $result->{n};
}
else {
## For multi-column primary keys, we cannot use '$in', sadly.
my $result = $self->{collection}->remove(
{ '$and' => \@find }, { safe => 1 });
- $t->{deleted_rows} += $result->{n};
+ $Target->{deleted_rows} += $result->{n};
## We do not need to loop, as we just went 1 by 1 through the whole list
last MONGODEL;
redo MONGODEL;
}
- $self->glog("Mongo objects removed from $tname: $t->{deleted_rows}", LOG_VERBOSE);
+ $self->glog("Mongo objects removed from $target_tablename: $Target->{deleted_rows}", LOG_VERBOSE);
}
elsif ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type
- or 'oracle' eq $type or 'sqlite' eq $type) {
- my $tdbh = $t->{dbh};
- for (@{ $SQL{IN}{$tname} }) {
- $t->{deleted_rows} += $tdbh->do($_);
+ or 'oracle' eq $type or 'sqlite' eq $type or 'firebird' eq $type) {
+ my $tdbh = $Target->{dbh};
+ for (@{ $SQL{IN}{$target_tablename} }) {
+ $Target->{deleted_rows} += $tdbh->do($_);
}
}
elsif ('redis' eq $type) {
## We need to remove the entire tablename:pkey:column for each column we know about
- my $cols = $goat->{cols};
+ my $cols = $Table->{cols};
for my $pk (keys %$rows) {
## If this is a multi-column primary key, change our null delimiter to a colon
- if ($goat->{numpkcols} > 1) {
+ if ($Table->{numpkcols} > 1) {
$pk =~ s{\0}{:}go;
}
- $t->{deleted_rows} += $t->{dbh}->del("$tname:$pk");
+ $Target->{deleted_rows} += $Target->{dbh}->del("$target_tablename:$pk");
}
}
elsif ($type =~ /flat/o) { ## same as flatpg for now
- for (@{ $SQL{IN}{$tname} }) {
- print {$t->{filehandle}} qq{$_;\n\n};
+ for (@{ $SQL{IN}{$target_tablename} }) {
+ print {$Target->{filehandle}} qq{$_;\n\n};
}
- $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE);
+ $self->glog(qq{Appended to flatfile "$Target->{filename}"}, LOG_VERBOSE);
}
else {
die qq{No support for database type "$type" yet!};
}
- $t->{delete_complete} = 1;
+ $Target->{delete_complete} = 1;
## Only one target at a time, please: we need to check on the asyncs
last;
## If we did nothing this round, and there are no asyncs running, we are done.
## Otherwise, we will wait for the oldest async to finish
if (!$did_something) {
- if (! grep { $_->{async_active} } @$deldb) {
+ if (! grep { $_->{async_active} } @$TargetDB) {
$done = 1;
}
else {
## Since nothing else is going on, let's wait for the oldest async to finish
- my $t = ( sort { $a->{async_active} > $b->{async_active} } grep { $_->{async_active} } @$deldb)[0];
+ my $Target = ( sort { $a->{async_active} > $b->{async_active} } grep { $_->{async_active} } @$TargetDB)[0];
if (1 == $numpks) {
- $t->{deleted_rows} += $t->{dbh}->pg_result();
+ $Target->{deleted_rows} += $Target->{dbh}->pg_result();
}
else {
- $t->{dbh}->pg_result();
+ $Target->{dbh}->pg_result();
}
- $t->{async_active} = 0;
+ $Target->{async_active} = 0;
}
}
} ## end of main deletion loop
## Generate our final deletion counts
- my $count = 0;
- for my $t (@$deldb) {
+ my $rows_deleted = 0;
+
+ for my $Target (@$TargetDB) {
## We do not delete from certain types of targets
- next if $t->{dbtype} =~ /mongo|flat|redis/o;
+ next if $Target->{dbtype} =~ /mongo|flat|redis/o;
- my $tname = $newname->{$t->{name}};
+ my $target_tablename = $customname->{$Target->{name}};
- $count += $t->{deleted_rows};
- $self->glog(qq{Rows deleted from $t->{name}.$tname: $t->{deleted_rows}}, LOG_VERBOSE);
+ $rows_deleted += $Target->{deleted_rows};
+ $self->glog(qq{Rows deleted from $Target->{name}.$target_tablename: $Target->{deleted_rows}}, LOG_VERBOSE);
}
- return $count;
+ return $rows_deleted;
} ## end of delete_rows
sub push_rows {
- ## Copy rows from one database to another
- ## Arguments: five
- ## 1. Hash of rows, where the key is \0 joined pkeys
+ ## Copy rows from one table to others
+ ## Typically called after delete_rows()
+ ## Arguments: six
+ ## 1. Hashref of rows to copy, where the keys are the primary keys (\0 joined if multi). Can be empty.
## 2. Table object
- ## 3. Sync object
+ ## 3. Sync object (may be empty if we are not associated with a sync)
## 4. Source database object
- ## 5. Target database object, or arrayref of the same
- ## Returns: number of rows copied
+ ## 5. Target database object (or an arrayref of the same)
+ ## 6. Action mode - currently only 'copy' and 'fullcopy'
+ ## Returns: number of rows copied (to each target, not the total)
- my ($self,$rows,$goat,$sync,$fromdb,$todb) = @_;
-
- my $tablename = "$goat->{safeschema}.$goat->{safetable}";
- my $fromdbh = $fromdb->{dbh};
- my $syncname = $sync->{name};
-
- my $pkcols = $goat->{pkeycols};
- my $numpks = $goat->{numpkcols};
-
- ## This may be a fullcopy. If it is, $rows will not be a hashref
- ## If it is fullcopy, flip it to a dummy hashref
- my $fullcopy = 0;
- if (! ref $rows) {
- if ($rows eq 'fullcopy') {
- $fullcopy = 1;
- $self->glog('Setting push_rows to fullcopy mode', LOG_DEBUG);
- }
- else {
- die "Invalid rows passed to push_rows: $rows\n";
- }
- $rows = {};
- }
+ my ($self,$rows,$Table,$Sync,$SourceDB,$TargetDB,$mode) = @_;
## This will be zero for fullcopy of course
- my $total = keys %$rows;
+ my $total_rows = keys %$rows;
- ## Total number of rows written
- $count = 0;
+ if (!$total_rows and $mode ne 'fullcopy') {
+ return 0; ## Can happen on a truncation
+ }
- my $newname = $goat->{newname}{$self->{syncname}};
+ my $numpks = $Table->{numpkcols};
- ## As with delete, we may break this into more than one step
- ## Should only be a factor for very large numbers of keys
- my $chunksize = $config{statement_chunk_size} || 10_000;
+ ## If there are a large number of rows (and we are not using ANY) break the statement up
+ my $chunksize = $config{statement_chunk_size} || $default_statement_chunk_size;
## Build a list of all PK values to feed to IN clauses
- my @pkvals;
- my $round = 0;
- my $roundtotal = 0;
- for my $key (keys %$rows) {
- my $inner = length $key
- ? (join ',' => map { s{\'}{''}go; s{\\}{\\\\}go; qq{'$_'}; } split '\0', $key, -1)
- : q{''};
- push @{ $pkvals[$round] ||= [] } => $numpks > 1 ? "($inner)" : $inner;
- if (++$roundtotal >= $chunksize) {
- $roundtotal = 0;
- $round++;
+ ## This is an array in case we go over $chunksize
+ my @pkvals = [];
+
+ ## If there is only one primary key, and a sane number of rows, we can use '= ANY(?)'
+ if ($mode ne 'fullcopy') {
+ if ($numpks == 1 and $total_rows <= $chunksize) {
+ $mode = 'anyclause';
}
- }
+ ## Otherwise, we split up the primary key values into bins
+ else {
+ my $pk_array_number = 0;
+ my $current_row = 1;
- ## Example: 1234, 221
- ## Example MCPK: ('1234','Don''t Stop','2008-01-01'),('221','foobar','2008-11-01')
+ ## Loop through each row and create the needed SQL fragment
+ for my $key (keys %$rows) {
- ## Allow for non-arrays by forcing to an array
- if (ref $todb ne 'ARRAY') {
- $todb = [$todb];
- }
+ push @{ $pkvals[$pk_array_number] ||= [] } => split '\0', $key, -1;
- ## This can happen if we truncated but had no delta activity
- return 0 if (! $pkvals[0] or ! length $pkvals[0]->[0] ) and ! $fullcopy;
+ ## Make sure our SQL statement doesn't grow too large
+ if (++$current_row > $chunksize) {
+ $current_row = 1;
+ $pk_array_number++;
+ }
+ }
+ }
+ }
- ## Get ready to export from the source
- ## This may have multiple versions depending on the customcols table
- my $newcols = $goat->{newcols}{$syncname} || {};
+ my $syncname = $Sync->{name} || '';
- ## Walk through and grab which SQL is needed for each target
- ## Cache this earlier on - controller?
+ ## Make sure TargetDB is an arrayref (may come as a single TargetDB object)
+ if (ref $TargetDB ne 'ARRAY') {
+ $TargetDB = [$TargetDB];
+ }
+ ## Figure out the different SELECT clauses, and assign targets to them
my %srccmd;
- for my $t (@$todb) {
+ for my $Target (@$TargetDB ) {
- ## The SELECT clause we use (may be empty)
- my $clause = $newcols->{$t->{name}};
+ ## The SELECT clause we use (usually an empty string unless customcols is being used)
+ my $select_clause = $Table->{newcols}{$syncname}{$Target->{name}} || '';
## Associate this target with this clause
- push @{$srccmd{$clause}} => $t;
+ push @{$srccmd{$select_clause}} => $Target;
}
- ## Loop through each source command and push it out to all targets
- ## that are associated with it
- for my $clause (sort keys %srccmd) {
+ ## We may want to change the target table based on the customname table
+ ## It is up to the caller to populate these, even if the syncname is ''
+ my $customname = $Table->{newname}{$syncname} || {};
+
+ ## Name of the table to copy. Only Postgres can be used as a source
+ my $source_tablename = "$Table->{safeschema}.$Table->{safetable}";
+ my $sourcedbh = $SourceDB->{dbh};
+
+ ## Actual number of source rows read and copied. May be less than $total_rows
+ my $source_rows_read = 0;
+
+ ## Loop through each select command and push it out to all targets that are associated with it
+ for my $select_clause (sort keys %srccmd) {
## Build the clause (cache) and kick it off
- my $SELECT = $clause || 'SELECT *';
+ my $SELECT = $select_clause || 'SELECT *';
- ## Prepare each target in turn
- for my $t (@{ $srccmd{$clause} }) {
+ ## Prepare each target that is using this select clause
+ for my $Target (@{ $srccmd{$select_clause} }) {
## Internal name of this target
- my $targetname = $t->{name};
+ my $targetname = $Target->{name};
- ## Name of the table we are pushing to on this target
- my $tname = $newname->{$targetname};
+ ## The actual target table name. Depends on dbtype and customname table entries
+ my $target_tablename = $customname->{$targetname};
## The columns we are pushing to, both as an arrayref and a CSV:
- my $cols = $goat->{tcolumns}{$SELECT};
- my $columnlist = $t->{does_sql} ?
- ('(' . (join ',', map { $t->{dbh}->quote_identifier($_) } @$cols) . ')')
+ my $cols = $Table->{tcolumns}{$SELECT};
+ my $columnlist = $Target->{does_sql} ?
+ ('(' . (join ',', map { $Target->{dbh}->quote_identifier($_) } @$cols) . ')')
: ('(' . (join ',', map { $_ } @$cols) . ')');
- my $type = $t->{dbtype};
+ my $type = $Target->{dbtype};
- ## Use columnlist below so we never have to worry about the order
- ## of the columns on the target
+ ## Using columnlist avoids worrying about the order of columns
if ('postgres' eq $type) {
- my $tgtcmd = "$self->{sqlprefix}COPY $tname$columnlist FROM STDIN";
- $t->{dbh}->do($tgtcmd);
+ my $tgtcmd = "$self->{sqlprefix}COPY $target_tablename$columnlist FROM STDIN";
+ $Target->{dbh}->do($tgtcmd);
+ }
+ elsif ('firebird' eq $type) {
+ $columnlist =~ s/\"//g;
+ $target_tablename = qq{"$target_tablename"} if $target_tablename !~ /"/;
+ my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
+ $tgtcmd .= '?,' x @$cols;
+ $tgtcmd =~ s/,$/)/o;
+ $Target->{sth} = $Target->{dbh}->prepare($tgtcmd);
}
elsif ('flatpg' eq $type) {
- print {$t->{filehandle}} "COPY $tname$columnlist FROM STDIN;\n";
- $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE);
+ print {$Target->{filehandle}} "COPY $target_tablename$columnlist FROM STDIN;\n";
}
elsif ('flatsql' eq $type) {
- print {$t->{filehandle}} "INSERT INTO $tname$columnlist VALUES\n";
- $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE);
+ print {$Target->{filehandle}} "INSERT INTO $target_tablename$columnlist VALUES\n";
}
elsif ('mongo' eq $type) {
- $self->{collection} = $t->{dbh}->get_collection($tname);
+ $self->{collection} = $Target->{dbh}->get_collection($target_tablename);
}
elsif ('redis' eq $type) {
- ## No prep needed, other than to reset our count of changes
- $t->{redis} = 0;
- }
- elsif ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type) {
- my $tgtcmd = "INSERT INTO $tname$columnlist VALUES (";
- $tgtcmd .= '?,' x @$cols;
- $tgtcmd =~ s/,$/)/o;
- $t->{sth} = $t->{dbh}->prepare($tgtcmd);
+ ## No setup needed
}
- elsif ('oracle' eq $type) {
- my $tgtcmd = "INSERT INTO $tname$columnlist VALUES (";
+ elsif ('sqlite' eq $type or 'oracle' eq $type or
+ 'mysql' eq $type or 'mariadb' eq $type or 'drizzle' eq $type) {
+ my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
$tgtcmd .= '?,' x @$cols;
$tgtcmd =~ s/,$/)/o;
- $t->{sth} = $t->{dbh}->prepare($tgtcmd);
- }
- elsif ('sqlite' eq $type) {
- my $tgtcmd = "INSERT INTO $tname$columnlist VALUES (";
- $tgtcmd .= '?,' x @$cols;
- $tgtcmd =~ s/,$/)/o;
- $t->{sth} = $t->{dbh}->prepare($tgtcmd);
+ $Target->{sth} = $Target->{dbh}->prepare($tgtcmd);
}
else {
die qq{No support for database type "$type" yet!};
}
- } ## end preparing each target for this clause
+ if ($type =~ /flat/) {
+ $self->glog(qq{Appended to flatfile "$Target->{filename}"}, LOG_VERBOSE);
+ }
- ## Put dummy data into @pkvals if using fullcopy
- if ($fullcopy) {
- push @pkvals => ['fullcopy'];
- }
+ } ## end preparing each target for this select clause
my $loop = 1;
- my $pcount = @pkvals;
+ my $number_chunks = @pkvals;
## Loop through each chunk of primary keys to copy over
for my $pk_values (@pkvals) {
- my $pkvs = join ',' => @{ $pk_values };
- ## Message to prepend to the statement if chunking
- my $pre = $pcount <= 1 ? '' : "/* $loop of $pcount */ ";
- $loop++;
+ ## Start streaming rows from the source
+ my $pre = $number_chunks > 1 ? "/* $loop of $number_chunks */ " : '';
+ $self->glog(qq{${pre}Copying from $SourceDB->{name}.$source_tablename}, LOG_VERBOSE);
- ## Kick off the copy on the source
- my $fromname = $fromdb->{name};
- $self->glog(qq{${pre}Copying from $fromname.$tablename}, LOG_VERBOSE);
- my $srccmd = sprintf '%s%sCOPY (%s FROM %s %s) TO STDOUT%s',
- $pre,
- $self->{sqlprefix},
- $SELECT,
- $tablename,
- $fullcopy ? '' : " WHERE $pkcols IN ($pkvs)",
- $sync->{copyextra} ? " $sync->{copyextra}" : '';
- $fromdbh->do($srccmd);
+ ## If we are doing a small batch of single primary keys, use ANY
+ ## For a fullcopy mode, leave the WHERE clause out completely
+ if ($mode eq 'fullcopy' or $mode eq 'anyclause') {
+ my $srccmd = sprintf '%sCOPY (%s FROM %s %s) TO STDOUT%s',
+ $self->{sqlprefix},
+ $SELECT,
+ $source_tablename,
+ $mode eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)",
+ $Sync->{copyextra} ? " $Sync->{copyextra}" : '';
- my $buffer = '';
+ my $srcsth = $sourcedbh->prepare($srccmd);
+ $mode eq 'fullcopy' ? $srcsth->execute() : $srcsth->execute( [ keys %$rows ]);
+ }
+ else {
+ ## Create the proper number of placeholders
+ my $baseq = '?';
+ if ($numpks > 1) {
+ $baseq = '?,' x $numpks;
+ $baseq =~ s/(.+?).$/\($1\)/;
+ }
+ my $number_values = @$pk_values;
+ my $placeholders = "$baseq," x ($number_values / $numpks);
+ chop $placeholders;
- ## Loop through all changed rows on the source, and push to the target(s)
- my $multirow = 0;
+ my $srccmd = sprintf '%s%sCOPY (%s FROM %s WHERE %s IN (%s)) TO STDOUT%s',
+ $pre,
+ $self->{sqlprefix},
+ $SELECT,
+ $source_tablename,
+ $Table->{pkeycols},
+ $placeholders,
+ $Sync->{copyextra} ? " $Sync->{copyextra}" : '';
- ## If in fullcopy mode, we don't know how many rows will get copied,
- ## so we count as we go along
- if ($fullcopy) {
- $total = 0;
+ my $srcsth = $sourcedbh->prepare($srccmd);
+ $srcsth->execute( @$pk_values );
}
## Loop through each row output from the source, storing it in $buffer
- while ($fromdbh->pg_getcopydata($buffer) >= 0) {
-
- $total++ if $fullcopy;
+ ## Future optimization: slurp in X rows at a time, then process them
+ my $buffer = '';
+ while ($sourcedbh->pg_getcopydata($buffer) >= 0) {
- ## For each target using this particular COPY statement
- for my $t (@{ $srccmd{$clause} }) {
+ $source_rows_read++;
- my $type = $t->{dbtype};
- my $cols = $goat->{tcolumns}{$SELECT};
- my $tname = $newname->{$t->{name}};
+ ## For each target using this particular SELECT clause
+ for my $Target (@{ $srccmd{$select_clause} }) {
- chomp $buffer;
+ my $type = $Target->{dbtype};
## For Postgres, we simply do COPY to COPY
if ('postgres' eq $type) {
- $t->{dbh}->pg_putcopydata("$buffer\n");
+ $Target->{dbh}->pg_putcopydata($buffer);
}
## For flat files destined for Postgres, just do a tab-delimited dump
elsif ('flatpg' eq $type) {
- print {$t->{filehandle}} "$buffer\n";
+ print {$Target->{filehandle}} $buffer;
}
## For other flat files, make a standard VALUES list
elsif ('flatsql' eq $type) {
- if ($multirow++) {
- print {$t->{filehandle}} ",\n";
+ chomp $buffer;
+ if ($source_rows_read > 1) {
+ print {$Target->{filehandle}} ",\n";
}
- print {$t->{filehandle}} '(' .
- (join ',' => map { $self->{masterdbh}->quote($_) } split /\t/, $buffer, -1) . ')';
+ print {$Target->{filehandle}} '(' .
+ (join ',' => map { $self->{masterdbh}->quote($_) } split /\t/, $buffer, -1) . ')';
}
## For Mongo, do some mongomagic
elsif ('mongo' eq $type) {
+
## Have to map these values back to their names
+ chomp $buffer;
my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
+ my $targetcols = $Table->{tcolumns}{$SELECT};
+
## Our object consists of the primary keys, plus all other fields
my $object = {};
- for my $cname (@{ $cols }) {
+ for my $cname (@{ $targetcols }) {
$object->{$cname} = shift @cols;
}
## Coerce non-strings into different objects
if (!defined($object->{$key})) {
delete $object->{$key};
}
- elsif ($goat->{columnhash}{$key}{ftype} =~ /smallint|integer|bigint/o) {
+ elsif ($Table->{columnhash}{$key}{ftype} =~ /smallint|integer|bigint/o) {
$object->{$key} = int $object->{$key};
}
- elsif ($goat->{columnhash}{$key}{ftype} eq 'boolean') {
+ elsif ($Table->{columnhash}{$key}{ftype} eq 'boolean') {
if (defined $object->{$key}) {
$object->{$key} = $object->{$key} eq 't' ? true : false;
}
}
- elsif ($goat->{columnhash}{$key}{ftype} =~ /real|double|numeric/o) {
+ elsif ($Table->{columnhash}{$key}{ftype} =~ /real|double|numeric/o) {
$object->{$key} = strtod($object->{$key});
}
}
$self->{collection}->insert($object, { safe => 1 });
}
- ## For MySQL, MariaDB, Drizzle, Oracle, and SQLite, do some basic INSERTs
- elsif ('mysql' eq $type
- or 'mariadb' eq $type
- or 'drizzle' eq $type
- or 'oracle' eq $type
- or 'sqlite' eq $type) {
- my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
- for my $cindex (0..@cols) {
- next unless defined $cols[$cindex];
- if ($goat->{columnhash}{$cols->[$cindex]}{ftype} eq 'boolean') {
- # BOOLEAN support is inconsistent, but almost everyone will coerce 1/0 to TRUE/FALSE
- $cols[$cindex] = ( $cols[$cindex] =~ /^[1ty]/i )? 1 : 0;
- }
- }
- $count += $t->{sth}->execute(@cols);
- }
elsif ('redis' eq $type) {
+
## We are going to set a Redis hash, in which the key is "tablename:pkeyvalue"
+ chomp $buffer;
my @colvals = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
my @pkey;
- for (1 .. $goat->{numpkcols}) {
+ for (1 .. $Table->{numpkcols}) {
push @pkey => shift @colvals;
}
my $pkeyval = join ':' => @pkey;
## Build a list of non-null key/value pairs to set in the hash
my @add;
- $i = $goat->{numpkcols} - 1;
+ $i = $Table->{numpkcols} - 1;
+ my $targetcols = $Table->{tcolumns}{$SELECT};
for my $val (@colvals) {
$i++;
next if ! defined $val;
- push @add, $cols->[$i], $val;
+ push @add, $targetcols->[$i], $val;
}
- $t->{dbh}->hmset("$tname:$pkeyval", @add);
- $count++;
- $t->{redis}++;
+ my $target_tablename = $customname->{$Target->{name}};
+ $Target->{dbh}->hmset("$target_tablename:$pkeyval", @add);
+ }
+ ## For SQLite, MySQL, MariaDB, Firebird, Drizzle, and Oracle, do some basic INSERTs
+ elsif ('sqlite' eq $type
+ or 'oracle' eq $type
+ or 'mysql' eq $type
+ or 'mariadb' eq $type
+ or 'drizzle' eq $type
+ or 'firebird' eq $type) {
+
+ chomp $buffer;
+ my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
+ my $targetcols = $Table->{tcolumns}{$SELECT};
+ for my $cindex (0..@cols) {
+ next unless defined $cols[$cindex];
+ if ($Table->{columnhash}{$targetcols->[$cindex]}{ftype} eq 'boolean') {
+ # BOOLEAN support is inconsistent, but almost everyone will coerce 1/0 to TRUE/FALSE
+ $cols[$cindex] = ( $cols[$cindex] =~ /^[1ty]/i )? 1 : 0;
+ }
+ }
+ $Target->{sth}->execute(@cols);
+ }
+ ## Safety valve:
+ else {
+ die qq{No support for database type "$type" yet!};
}
} ## end each target
} ## end each row pulled from the source
- } ## end each pklist
+ $loop++;
+
+ } ## end each chunk of primary keys
## Workaround for DBD::Pg bug
## Once we require a minimum version of 2.18.1 or better, we can remove this!
- if ($self->{dbdpgversion} < 21801) {
- $fromdbh->do('SELECT 1');
+ if ($SourceDB->{dbtype} eq 'postgres' and $self->{dbdpgversion} < 21801) {
+ $sourcedbh->do('SELECT 1');
}
## Perform final cleanups for each target
- for my $t (@{ $srccmd{$clause} }) {
+ for my $Target (@{ $srccmd{$select_clause} }) {
- my $type = $t->{dbtype};
+ my $target_tablename = $customname->{$Target->{name}};
- my $tname = $newname->{$t->{name}};
+ my $type = $Target->{dbtype};
+
+ my $tname = $Target->{name};
+
+ $self->glog(qq{Rows copied to ($type) $tname.$target_tablename: $source_rows_read}, LOG_VERBOSE);
if ('postgres' eq $type) {
- my $dbh = $t->{dbh};
+ my $dbh = $Target->{dbh};
$dbh->pg_putcopyend();
## Same bug as above
if ($self->{dbdpgversion} < 21801) {
$dbh->do('SELECT 1');
}
- $self->glog(qq{Rows copied to $t->{name}.$tname: $total}, LOG_VERBOSE);
- $count += $total;
## If this table is set to makedelta, add rows to bucardo.delta to simulate the
## normal action of a trigger and add a row to bucardo.track to indicate that
## it has already been replicated here.
- my $d = $sync->{db}{ $t->{name} };
- if (!$fullcopy and $d->{does_makedelta}{$tablename}) {
- $self->glog("Using makedelta to populate delta and track tables for $t->{name}.$tname", LOG_VERBOSE);
- my $vals;
- if ($numpks == 1) {
- $vals = join ',', map { "($_)" } map { @{ $_ } } @pkvals;
- }
- else {
- $vals = join ',', map { @{ $_ } } @pkvals;
+ my $d = $Sync->{db}{$tname};
+ if ($mode ne 'fullcopy' and $d->{does_makedelta}{$source_tablename} ) {
+
+ $self->glog("Using makedelta to populate delta and track tables for $tname.$target_tablename", LOG_VERBOSE);
+
+ my $cols = join ',' => @{ $Table->{qpkey} };
+
+ ## We use the original list, not what may have actually got copied!
+ for my $pk_values (@pkvals) {
+
+ ## Generate the correct number of placeholders
+ my $baseq = '?';
+ if ($numpks > 1) {
+ $baseq = '?,' x $numpks;
+ chop $baseq;
+ }
+ my $number_values = $mode eq 'copy' ? @$pk_values : keys %$rows;
+ my $placeholders = "($baseq)," x ($number_values / $numpks);
+ chop $placeholders;
+
+ my $SQL = sprintf 'INSERT INTO bucardo.%s (%s) VALUES %s',
+ $Table->{deltatable},
+ $cols,
+ $placeholders;
+
+ my $sth = $dbh->prepare($SQL);
+ $sth->execute($mode eq 'copy' ? @$pk_values : (keys %$rows));
}
- my $cols = join ',' => @{ $goat->{qpkey} };
- $dbh->do(qq{
- INSERT INTO bucardo.$goat->{deltatable} ($cols)
- VALUES $vals
- });
# Make sure we track it - but only if this sync already acts as a source!
- if ($t->{role} eq 'source') {
+ if ($Target->{role} eq 'source') {
$dbh->do(qq{
- INSERT INTO bucardo.$goat->{tracktable}
+ INSERT INTO bucardo.$Table->{tracktable}
VALUES (NOW(), ?)
}, undef, $d->{DBGROUPNAME});
}
## We want to send a kick signal to other syncs that are using this table
## However, we do not want to kick unless they are set to autokick and active
-
+ ## This works even if we do not have a real syncs, as $syncname will be ''
$self->glog('Signalling other syncs that this table has changed', LOG_DEBUG);
- ## Cache this
- if (! exists $self->{kick_othersyncs}{$syncname}{$tname}) {
- #$SQL = 'SELECT sync FROM bucardo.bucardo_delta_names WHERE sync <> ? AND tablename = ?';
+ if (! exists $self->{kick_othersyncs}{$syncname}{$tname}{$target_tablename}) {
$SQL = 'SELECT name FROM sync WHERE herd IN (SELECT herd FROM herdmap WHERE goat IN (SELECT id FROM goat WHERE schemaname=? AND tablename = ?)) AND name <> ? AND autokick AND status = ?';
$sth = $self->{masterdbh}->prepare($SQL);
- $sth->execute($goat->{schemaname}, $goat->{tablename}, $syncname, 'active');
- $self->{kick_othersyncs}{$syncname}{$tname} = $sth->fetchall_arrayref();
+ $sth->execute($Table->{schemaname}, $Table->{tablename}, $syncname, 'active');
+ $self->{kick_othersyncs}{$syncname}{$tname}{$target_tablename} = $sth->fetchall_arrayref();
}
- for my $row (@{ $self->{kick_othersyncs}{$syncname}{$tname} }) {
+ ## For each sync returned from the query above, send a kick request
+ for my $row (@{ $self->{kick_othersyncs}{$syncname}{$tname}{$target_tablename} }) {
my $othersync = $row->[0];
$self->db_notify($dbh, "kick_sync_$othersync", 0, '', 1);
}
}
}
elsif ('flatpg' eq $type) {
- print {$t->{filehandle}} "\\\.\n\n";
+ print {$Target->{filehandle}} "\\\.\n\n";
}
elsif ('flatsql' eq $type) {
- print {$t->{filehandle}} ";\n\n";
+ print {$Target->{filehandle}} ";\n\n";
}
- elsif ('redis' eq $type) {
- $self->glog(qq{Rows copied to Redis $t->{name}.$tname:<pkeyvalue>: $t->{redis}}, LOG_VERBOSE);
+ else {
+ ## Nothing to be done for mongo, mysql, mariadb, sqlite, oracle, firebird, redis
}
- }
+
+ } ## end each Target
} ## end of each clause in the source command list
- return $count;
+ return $source_rows_read;
} ## end of push_rows
=head1 VERSION
-This document describes version 5.4.1 of Bucardo
+This document describes version 5.4.0 of Bucardo
=head1 WEBSITE
=head1 LICENSE AND COPYRIGHT
-Copyright (c) 2005-2015 Greg Sabino Mullane <greg@endpoint.com>.
+Copyright (c) 2005-2016 Greg Sabino Mullane <greg@endpoint.com>.
This software is free to use: see the LICENSE file for details.