## For this table, copy all rows from source to target(s)
$dmlcount{inserts} += $self->push_rows(
- $deltabin{$dbname1}, $g, $sync, $sourcedb, \@pushdbs);
+ $deltabin{$dbname1}, $g, $sync, $sourcedb, \@pushdbs, 'copy');
## Store references to the list of changes in case custom code needs them
$sync->{deltarows}{$S}{$T} = $deltabin{$dbname1};
## For this table, copy all rows from source to target(s)
$dmlcount{inserts} += $dmlcount{I}{target}{$S}{$T} = $self->push_rows(
- 'fullcopy', $g, $sync, $sourcex,
+ {}, $g, $sync, $sourcex,
## We need an array of database objects here:
- [ map { $sync->{db}{$_} } @dbs_copytarget ]);
+ [ map { $sync->{db}{$_} } @dbs_copytarget ], 'fullcopy');
## Add to our cross-table tally
$dmlcount{allinserts}{target} += $dmlcount{I}{target}{$S}{$T};
## Copy rows from one table to another
## Typically called after delete_rows()
- ## Arguments: five
- ## 1. Hash of rows to copy, where the keys are the primary keys (\0 joined if multi)
+ ## Arguments: six
+ ## 1. Hashref of rows to copy, where the keys are the primary keys (\0 joined if multi). Can be empty.
## 2. Table object
## 3. Sync object (may be empty if we are not associated with a sync)
## 4. Source database object
## 5. Target database object, or arrayref of the same
+ ## 6. Action mode - currently only 'copy' and 'fullcopy'
## Returns: number of rows copied
- my ($self,$rows,$Table,$Sync,$SourceDB,$TargetDB) = @_;
+ my ($self,$rows,$Table,$Sync,$SourceDB,$TargetDB,$mode) = @_;
- ## Build a list of all PK values to feed to IN clauses
- ## This is an array in case we go over $chunksize
- my @pkvals = [];
-
- ## If fullcopy, $rows will be the scalar 'fullcopy' instead of a hashref
- my $fullcopy = 0;
- if (! ref $rows) {
- if ($rows eq 'fullcopy') {
- $fullcopy = 1;
- $self->glog('Setting push_rows to fullcopy mode', LOG_DEBUG);
- }
- else {
- die "Invalid rows passed to push_rows: $rows\n";
- }
- $rows = {};
- $pkvals[0] = 'fullcopy';
+ if ($mode eq 'fullcopy') {
+ $self->glog('Setting push_rows to fullcopy mode', LOG_DEBUG);
}
## This will be zero for fullcopy of course
my $total_rows = keys %$rows;
- if (!$total_rows and !$fullcopy) {
+ if (!$total_rows and $mode ne 'fullcopy') {
return 0; ## Can happen on a truncation
}
my $numpks = $Table->{numpkcols};
- ## As with delete, we may break this into more than one step
+ ## As with delete, we may break this into more than one statement
## Should only be a factor for very large numbers of keys
my $chunksize = $config{statement_chunk_size} || $default_statement_chunk_size;
+ ## Build a list of all PK values to feed to IN clauses
+ ## This is an array in case we go over $chunksize
+ my @pkvals = [];
+
## If there is only one primary key, and a sane number of rows, we can use '= ANY(?)'
- if (! $fullcopy) {
- if ($numpks == 1 and $total_rows < $chunksize) {
- $pkvals[0] = 'allrows';
+ if ($mode ne 'fullcopy') {
+ if ($numpks == 1 and $total_rows <= $chunksize) {
+ $mode = 'anyclause';
}
## Otherwise, we push our completed SQL into bins
else {
$TargetDB = [$TargetDB];
}
-
## Figure out the different SELECT clauses, and assign targets to them
my %srccmd;
for my $t (@$TargetDB ) {
## If we are doing a small batch of single primary keys, use ANY
## If we are doing fullcopy, leave out the WHERE clause completely
- if (! ref $pk_values ) {
+ if ($mode eq 'fullcopy' or $mode eq 'anyclause') {
my $srccmd = sprintf '%sCOPY (%s FROM %s %s) TO STDOUT%s',
$self->{sqlprefix},
$SELECT,
$source_tablename,
- $pk_values eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)",
+ $mode eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)",
$Sync->{copyextra} ? " $Sync->{copyextra}" : '';
my $srcsth = $sourcedbh->prepare($srccmd);
- $pk_values eq 'fullcopy' ? $srcsth->execute()
+ $mode eq 'fullcopy' ? $srcsth->execute()
: $srcsth->execute( [ keys %$rows ]);
}
else {
## normal action of a trigger and add a row to bucardo.track to indicate that
## it has already been replicated here.
my $d = $Sync->{db}{ $target->{name} };
- if (!$fullcopy and $d->{does_makedelta}{$source_tablename} ) {
+ if ($mode ne 'fullcopy' and $d->{does_makedelta}{$source_tablename} ) {
$self->glog("Using makedelta to populate delta and track tables for $target->{name}.$tname", LOG_VERBOSE);
$baseq = '?,' x $numpks;
chop $baseq;
}
- my $number_values = ref $pk_values ? @$pk_values : keys %$rows;
+ my $number_values = $mode eq 'copy' ? @$pk_values : keys %$rows;
my $placeholders = "($baseq)," x ($number_values / $numpks);
chop $placeholders;
$Table->{deltatable},
$cols,
$placeholders;
- $self->glog("GOT $placeholders leading to $SQL", LOG_DEBUG);
my $sth = $dbh->prepare($SQL);
- $sth->execute(ref $pk_values ? @$pk_values : (keys %$rows));
+ $sth->execute($mode eq 'copy' ? @$pk_values : (keys %$rows));
}
# Make sure we track it - but only if this sync already acts as a source!