if ('firebird' eq $type) {
$goat->{pklist} =~ s/\"//g; ## not ideal: fix someday
$goat->{pklist} = uc $goat->{pklist};
- $tname = qq{"$tname"} if $tname !~ /"/;
+ $tname = qq{"$tname"} if $tname !~ /"/;
}
## Internal counters to help us break queries into chunks if needed
## The actual target name
my $tname = $newname->{$t->{name}};
- $self->glog("Deleting from target $tname (type=$type)", LOG_DEBUG);
+ $self->glog("Deleting from target $tname (type=$type)", LOG_DEBUG);
- if ('firebird' eq $type) {
- $tname = qq{"$tname"} if $tname !~ /"/;
- }
+ if ('firebird' eq $type) {
+ $tname = qq{"$tname"} if $tname !~ /"/;
+ }
if ('mongo' eq $type) {
sub push_rows {
- ## Copy rows from one table to another
+ ## Copy rows from one table to others
## Typically called after delete_rows()
## Arguments: six
## 1. Hashref of rows to copy, where the keys are the primary keys (\0 joined if multi). Can be empty.
## 2. Table object
## 3. Sync object (may be empty if we are not associated with a sync)
## 4. Source database object
- ## 5. Target database object, or arrayref of the same
+ ## 5. Target database object (or an arrayref of the same)
## 6. Action mode - currently only 'copy' and 'fullcopy'
## Returns: number of rows copied
my ($self,$rows,$Table,$Sync,$SourceDB,$TargetDB,$mode) = @_;
- if ($mode eq 'fullcopy') {
- $self->glog('Setting push_rows to fullcopy mode', LOG_DEBUG);
- }
-
## This will be zero for fullcopy of course
my $total_rows = keys %$rows;
return 0; ## Can happen on a truncation
}
- my $syncname = $Sync->{name} || '';
-
- ## We may want to change the target table based on the customname table
- ## It is up to the caller to populate these, even if the syncname is ''
- my $customname = $Table->{newname}{$syncname} || {};
-
- ## We may want to change the SELECT based on the customcols table
- my $customcols = $Table->{newcols}{$syncname} || {};
-
my $numpks = $Table->{numpkcols};
- ## As with delete, we may break this into more than one statement
- ## Should only be a factor for very large numbers of keys
+ ## If there are a large number of rows (and we are not using ANY) break the statement up
my $chunksize = $config{statement_chunk_size} || $default_statement_chunk_size;
## Build a list of all PK values to feed to IN clauses
if ($numpks == 1 and $total_rows <= $chunksize) {
$mode = 'anyclause';
}
- ## Otherwise, we push our completed SQL into bins
+ ## Otherwise, we split up the primary key values into bins
else {
- my $pkrounds = 1;
+ my $pk_array_number = 0;
my $current_row = 1;
## Loop through each row and create the needed SQL fragment
for my $key (keys %$rows) {
- push @{ $pkvals[$pkrounds-1] ||= [] } => split '\0', $key, -1;
+ push @{ $pkvals[$pk_array_number] ||= [] } => split '\0', $key, -1;
## Make sure our SQL statement doesn't grow too large
if (++$current_row > $chunksize) {
$current_row = 1;
- $pkrounds++;
+ $pk_array_number++;
}
}
}
}
+ my $syncname = $Sync->{name} || '';
+
## Make sure TargetDB is an arrayref (may come as a single TargetDB object)
if (ref $TargetDB ne 'ARRAY') {
$TargetDB = [$TargetDB];
## Figure out the different SELECT clauses, and assign targets to them
my %srccmd;
- for my $t (@$TargetDB ) {
+ for my $Target (@$TargetDB ) {
## The SELECT clause we use (usually an empty string unless customcols is being used)
- my $select_clause = $customcols->{$t->{name}} || '';
+ my $select_clause = $Table->{newcols}{$syncname}{$Target->{name}} || '';
## Associate this target with this clause
- push @{$srccmd{$select_clause}} => $t;
+ push @{$srccmd{$select_clause}} => $Target;
}
+ ## We may want to change the target table based on the customname table
+ ## It is up to the caller to populate these, even if the syncname is ''
+ my $customname = $Table->{newname}{$syncname} || {};
+
+ ## Name of the table to copy. Only Postgres can be used as a source
my $source_tablename = "$Table->{safeschema}.$Table->{safetable}";
my $sourcedbh = $SourceDB->{dbh};
- ## The total number of source rows returned
- my $total_source_rows = 0;
+ ## Actual number of source rows read and copied. May be less than $total_rows
+ my $source_rows_read = 0;
## Loop through each select command and push it out to all targets that are associated with it
for my $select_clause (sort keys %srccmd) {
my $SELECT = $select_clause || 'SELECT *';
## Prepare each target that is using this select clause
- for my $target (@{ $srccmd{$select_clause} }) {
+ for my $Target (@{ $srccmd{$select_clause} }) {
## Internal name of this target
- my $targetname = $target->{name};
+ my $targetname = $Target->{name};
## The actual target table name. Depends on dbtype and customname table entries
my $target_tablename = $customname->{$targetname};
## The columns we are pushing to, both as an arrayref and a CSV:
my $cols = $Table->{tcolumns}{$SELECT};
- my $columnlist = $target->{does_sql} ?
- ('(' . (join ',', map { $target->{dbh}->quote_identifier($_) } @$cols) . ')')
+ my $columnlist = $Target->{does_sql} ?
+ ('(' . (join ',', map { $Target->{dbh}->quote_identifier($_) } @$cols) . ')')
: ('(' . (join ',', map { $_ } @$cols) . ')');
- my $type = $target->{dbtype};
+ my $type = $Target->{dbtype};
- ## Use columnlist below so we never have to worry about the order
- ## of the columns on the target
+ ## Using columnlist avoids worrying about the order of columns
if ('postgres' eq $type) {
my $tgtcmd = "$self->{sqlprefix}COPY $target_tablename$columnlist FROM STDIN";
- $target->{dbh}->do($tgtcmd);
+ $Target->{dbh}->do($tgtcmd);
+ }
+ elsif ('firebird' eq $type) {
+ $columnlist =~ s/\"//g;
+ $target_tablename = qq{"$target_tablename"} if $target_tablename !~ /"/;
+ my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
+ $tgtcmd .= '?,' x @$cols;
+ $tgtcmd =~ s/,$/)/o;
+ $Target->{sth} = $Target->{dbh}->prepare($tgtcmd);
}
elsif ('flatpg' eq $type) {
- print {$target->{filehandle}} "COPY $target_tablename$columnlist FROM STDIN;\n";
- $self->glog(qq{Appended to flatfile "$target->{filename}"}, LOG_VERBOSE);
+ print {$Target->{filehandle}} "COPY $target_tablename$columnlist FROM STDIN;\n";
}
elsif ('flatsql' eq $type) {
- print {$target->{filehandle}} "INSERT INTO $target_tablename$columnlist VALUES\n";
- $self->glog(qq{Appended to flatfile "$target->{filename}"}, LOG_VERBOSE);
+ print {$Target->{filehandle}} "INSERT INTO $target_tablename$columnlist VALUES\n";
}
elsif ('mongo' eq $type) {
- $self->{collection} = $target->{dbh}->get_collection($target_tablename);
+ $self->{collection} = $Target->{dbh}->get_collection($target_tablename);
}
elsif ('redis' eq $type) {
## No setup needed
}
- elsif ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type) {
+ elsif ('sqlite' eq $type or 'oracle' eq $type or
+ 'mysql' eq $type or 'mariadb' eq $type or 'drizzle' eq $type) {
my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
$tgtcmd .= '?,' x @$cols;
$tgtcmd =~ s/,$/)/o;
- $target->{sth} = $target->{dbh}->prepare($tgtcmd);
- }
- elsif ('firebird' eq $type) {
- $columnlist =~ s/\"//g;
- $target_tablename = qq{"$target_tablename"} if $target_tablename !~ /"/;
- my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
- $tgtcmd .= '?,' x @$cols;
- $tgtcmd =~ s/,$/)/o;
- $target->{sth} = $target->{dbh}->prepare($tgtcmd);
- }
- elsif ('oracle' eq $type) {
- my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
- $tgtcmd .= '?,' x @$cols;
- $tgtcmd =~ s/,$/)/o;
- $target->{sth} = $target->{dbh}->prepare($tgtcmd);
- }
- elsif ('sqlite' eq $type) {
- my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
- $tgtcmd .= '?,' x @$cols;
- $tgtcmd =~ s/,$/)/o;
- $target->{sth} = $target->{dbh}->prepare($tgtcmd);
+ $Target->{sth} = $Target->{dbh}->prepare($tgtcmd);
}
else {
die qq{No support for database type "$type" yet!};
}
+ if ($type =~ /flat/) {
+ $self->glog(qq{Appended to flatfile "$Target->{filename}"}, LOG_VERBOSE);
+ }
+
} ## end preparing each target for this select clause
my $loop = 1;
- my $pcount = @pkvals;
- my $fromname = $SourceDB->{name};
+ my $number_chunks = @pkvals;
## Loop through each chunk of primary keys to copy over
for my $pk_values (@pkvals) {
## Start streaming rows from the source
- $self->glog(qq{Copying from $fromname.$source_tablename}, LOG_VERBOSE);
+ my $pre = $number_chunks > 1 ? "/* $loop of $number_chunks */ " : '';
+ $self->glog(qq{${pre}Copying from $SourceDB->{name}.$source_tablename}, LOG_VERBOSE);
## If we are doing a small batch of single primary keys, use ANY
- ## If we are doing fullcopy, leave out the WHERE clause completely
+ ## For a fullcopy mode, leave the WHERE clause out completely
if ($mode eq 'fullcopy' or $mode eq 'anyclause') {
my $srccmd = sprintf '%sCOPY (%s FROM %s %s) TO STDOUT%s',
- $self->{sqlprefix},
- $SELECT,
- $source_tablename,
- $mode eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)",
- $Sync->{copyextra} ? " $Sync->{copyextra}" : '';
+ $self->{sqlprefix},
+ $SELECT,
+ $source_tablename,
+ $mode eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)",
+ $Sync->{copyextra} ? " $Sync->{copyextra}" : '';
+
my $srcsth = $sourcedbh->prepare($srccmd);
- $mode eq 'fullcopy' ? $srcsth->execute()
- : $srcsth->execute( [ keys %$rows ]);
+ $mode eq 'fullcopy' ? $srcsth->execute() : $srcsth->execute( [ keys %$rows ]);
}
else {
+ ## Create the proper number of placeholders
my $baseq = '?';
if ($numpks > 1) {
$baseq = '?,' x $numpks;
my $number_values = @$pk_values;
my $placeholders = "$baseq," x ($number_values / $numpks);
chop $placeholders;
+
my $srccmd = sprintf '%s%sCOPY (%s FROM %s WHERE %s IN (%s)) TO STDOUT%s',
- "/* $loop of $pcount */ ",
+ $pre,
$self->{sqlprefix},
$SELECT,
$source_tablename,
$Table->{pkeycols},
$placeholders,
$Sync->{copyextra} ? " $Sync->{copyextra}" : '';
+
my $srcsth = $sourcedbh->prepare($srccmd);
$srcsth->execute( @$pk_values );
- $loop++;
}
- ## How many rows we read from the source database this chunk
- my $source_rows = 0;
-
## Loop through each row output from the source, storing it in $buffer
- ## Future optimzation: slurp in X rows at a time, then process them
+ ## Future optimization: slurp in X rows at a time, then process them
my $buffer = '';
while ($sourcedbh->pg_getcopydata($buffer) >= 0) {
- $source_rows++;
+ $source_rows_read++;
## For each target using this particular SELECT clause
- for my $target (@{ $srccmd{$select_clause} }) {
+ for my $Target (@{ $srccmd{$select_clause} }) {
- my $type = $target->{dbtype};
+ my $type = $Target->{dbtype};
## For Postgres, we simply do COPY to COPY
if ('postgres' eq $type) {
- $target->{dbh}->pg_putcopydata($buffer);
+ $Target->{dbh}->pg_putcopydata($buffer);
}
## For flat files destined for Postgres, just do a tab-delimited dump
elsif ('flatpg' eq $type) {
- print {$target->{filehandle}} $buffer;
+ print {$Target->{filehandle}} $buffer;
}
## For other flat files, make a standard VALUES list
elsif ('flatsql' eq $type) {
chomp $buffer;
- if ($source_rows > 1) {
- print {$target->{filehandle}} ",\n";
+ if ($source_rows_read > 1) {
+ print {$Target->{filehandle}} ",\n";
}
- print {$target->{filehandle}} '(' .
+ print {$Target->{filehandle}} '(' .
(join ',' => map { $self->{masterdbh}->quote($_) } split /\t/, $buffer, -1) . ')';
}
## For Mongo, do some mongomagic
elsif ('mongo' eq $type) {
+
## Have to map these values back to their names
chomp $buffer;
my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
}
$self->{collection}->insert($object, { safe => 1 });
}
- ## For MySQL, MariaDB, Firebird, Drizzle, Oracle, and SQLite, do some basic INSERTs
- elsif ('mysql' eq $type
- or 'mariadb' eq $type
- or 'drizzle' eq $type
- or 'firebird' eq $type
- or 'oracle' eq $type
- or 'sqlite' eq $type) {
- chomp $buffer;
- my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
- my $targetcols = $Table->{tcolumns}{$SELECT};
- for my $cindex (0..@cols) {
- next unless defined $cols[$cindex];
- if ($Table->{columnhash}{$targetcols->[$cindex]}{ftype} eq 'boolean') {
- # BOOLEAN support is inconsistent, but almost everyone will coerce 1/0 to TRUE/FALSE
- $cols[$cindex] = ( $cols[$cindex] =~ /^[1ty]/i )? 1 : 0;
- }
- }
- $target->{sth}->execute(@cols);
- }
elsif ('redis' eq $type) {
+
## We are going to set a Redis hash, in which the key is "tablename:pkeyvalue"
chomp $buffer;
my @colvals = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
push @add, $targetcols->[$i], $val;
}
- my $target_tablename = $customname->{$target->{name}};
- $target->{dbh}->hmset("$target_tablename:$pkeyval", @add);
+ my $target_tablename = $customname->{$Target->{name}};
+ $Target->{dbh}->hmset("$target_tablename:$pkeyval", @add);
+ }
+ ## For SQLite, MySQL, MariaDB, Firebird, Drizzle, and Oracle, do some basic INSERTs
+ elsif ('sqlite' eq $type
+ or 'oracle' eq $type
+ or 'mysql' eq $type
+ or 'mariadb' eq $type
+ or 'drizzle' eq $type
+ or 'firebird' eq $type) {
+
+ chomp $buffer;
+ my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
+ my $targetcols = $Table->{tcolumns}{$SELECT};
+ for my $cindex (0..@cols) {
+ next unless defined $cols[$cindex];
+ if ($Table->{columnhash}{$targetcols->[$cindex]}{ftype} eq 'boolean') {
+ # BOOLEAN support is inconsistent, but almost everyone will coerce 1/0 to TRUE/FALSE
+ $cols[$cindex] = ( $cols[$cindex] =~ /^[1ty]/i )? 1 : 0;
+ }
+ }
+ $Target->{sth}->execute(@cols);
+ }
+ ## Safety valve:
+ else {
+ die qq{No support for database type "$type" yet!};
}
} ## end each target
} ## end each row pulled from the source
- $total_source_rows += $source_rows;
+ $loop++;
- } ## end each pklist
+ } ## end each chunk of primary keys
## Workaround for DBD::Pg bug
## Once we require a minimum version of 2.18.1 or better, we can remove this!
}
## Perform final cleanups for each target
- for my $target (@{ $srccmd{$select_clause} }) {
+ for my $Target (@{ $srccmd{$select_clause} }) {
+
+ my $target_tablename = $customname->{$Target->{name}};
- my $type = $target->{dbtype};
+ my $type = $Target->{dbtype};
- my $tname = $customname->{$target->{name}};
+ $self->glog(qq{Rows copied to ($type) $Target->{name}.$target_tablename: $source_rows_read}, LOG_VERBOSE);
if ('postgres' eq $type) {
- my $dbh = $target->{dbh};
+ my $dbh = $Target->{dbh};
$dbh->pg_putcopyend();
## Same bug as above
if ($self->{dbdpgversion} < 21801) {
$dbh->do('SELECT 1');
}
- $self->glog(qq{Rows copied to $target->{name}.$tname: $total_rows}, LOG_VERBOSE);
## If this table is set to makedelta, add rows to bucardo.delta to simulate the
## normal action of a trigger and add a row to bucardo.track to indicate that
## it has already been replicated here.
- my $d = $Sync->{db}{ $target->{name} };
+ my $d = $Sync->{db}{ $Target->{name} };
if ($mode ne 'fullcopy' and $d->{does_makedelta}{$source_tablename} ) {
- $self->glog("Using makedelta to populate delta and track tables for $target->{name}.$tname", LOG_VERBOSE);
+ $self->glog("Using makedelta to populate delta and track tables for $Target->{name}.$target_tablename", LOG_VERBOSE);
my $cols = join ',' => @{ $Table->{qpkey} };
+ ## We use the original list, not what may have actually got copied!
for my $pk_values (@pkvals) {
+ ## Generate the correct number of placeholders
my $baseq = '?';
if ($numpks > 1) {
$baseq = '?,' x $numpks;
my $SQL = sprintf 'INSERT INTO bucardo.%s (%s) VALUES %s',
$Table->{deltatable},
- $cols,
- $placeholders;
+ $cols,
+ $placeholders;
+
my $sth = $dbh->prepare($SQL);
$sth->execute($mode eq 'copy' ? @$pk_values : (keys %$rows));
}
# Make sure we track it - but only if this sync already acts as a source!
- if ($target->{role} eq 'source') {
+ if ($Target->{role} eq 'source') {
$dbh->do(qq{
INSERT INTO bucardo.$Table->{tracktable}
VALUES (NOW(), ?)
## However, we do not want to kick unless they are set to autokick and active
## This works even if we do not have a real syncs, as $syncname will be ''
$self->glog('Signalling other syncs that this table has changed', LOG_DEBUG);
- if (! exists $self->{kick_othersyncs}{$syncname}{$tname}) {
+ if (! exists $self->{kick_othersyncs}{$syncname}{$target_tablename}) {
$SQL = 'SELECT name FROM sync WHERE herd IN (SELECT herd FROM herdmap WHERE goat IN (SELECT id FROM goat WHERE schemaname=? AND tablename = ?)) AND name <> ? AND autokick AND status = ?';
$sth = $self->{masterdbh}->prepare($SQL);
$sth->execute($Table->{schemaname}, $Table->{tablename}, $syncname, 'active');
- $self->{kick_othersyncs}{$syncname}{$tname} = $sth->fetchall_arrayref();
+ $self->{kick_othersyncs}{$syncname}{$target_tablename} = $sth->fetchall_arrayref();
}
- for my $row (@{ $self->{kick_othersyncs}{$syncname}{$tname} }) {
+ ## For each sync returned from the query above, send a kick request
+ for my $row (@{ $self->{kick_othersyncs}{$syncname}{$target_tablename} }) {
my $othersync = $row->[0];
$self->db_notify($dbh, "kick_sync_$othersync", 0, '', 1);
}
}
}
elsif ('flatpg' eq $type) {
- print {$target->{filehandle}} "\\\.\n\n";
+ print {$Target->{filehandle}} "\\\.\n\n";
}
elsif ('flatsql' eq $type) {
- print {$target->{filehandle}} ";\n\n";
- }
- elsif ('redis' eq $type) {
- $self->glog(qq{Rows copied to Redis $target->{name}.$tname:<pkeyvalue>: $total_source_rows}, LOG_VERBOSE);
+ print {$Target->{filehandle}} ";\n\n";
}
else {
- ## Nothing to be done for mongo, mysql, mariadb, sqlite, oracle, firebird
+ ## Nothing to be done for mongo, mysql, mariadb, sqlite, oracle, firebird, redis
}
- }
+
+ } ## end each Target
} ## end of each clause in the source command list
- return $total_source_rows;
+ return $source_rows_read;
} ## end of push_rows