Make the 'fullcopy' mode of push_rows a little more explicit and cleaner.
authorGreg Sabino Mullane <greg@endpoint.com>
Mon, 24 Aug 2015 16:34:59 +0000 (12:34 -0400)
committerGreg Sabino Mullane <greg@endpoint.com>
Mon, 24 Aug 2015 16:34:59 +0000 (12:34 -0400)
Bucardo.pm

index 33086258256e6271a70b8fa610f0d4b8276fd2d3..b9ea258ac909987106151f0af99b4ab52fc50481 100644 (file)
@@ -4239,7 +4239,7 @@ sub start_kid {
 
                                 ## For this table, copy all rows from source to target(s)
                                 $dmlcount{inserts} += $self->push_rows(
-                                    $deltabin{$dbname1}, $g, $sync, $sourcedb, \@pushdbs);
+                                    $deltabin{$dbname1}, $g, $sync, $sourcedb, \@pushdbs, 'copy');
 
                                 ## Store references to the list of changes in case custom code needs them
                                 $sync->{deltarows}{$S}{$T} = $deltabin{$dbname1};
@@ -4642,9 +4642,9 @@ sub start_kid {
 
                 ## For this table, copy all rows from source to target(s)
                 $dmlcount{inserts} += $dmlcount{I}{target}{$S}{$T} = $self->push_rows(
-                    'fullcopy', $g, $sync, $sourcex,
+                    {}, $g, $sync, $sourcex,
                     ## We need an array of database objects here:
-                    [ map { $sync->{db}{$_} } @dbs_copytarget ]);
+                    [ map { $sync->{db}{$_} } @dbs_copytarget ], 'fullcopy');
 
                 ## Add to our cross-table tally
                 $dmlcount{allinserts}{target} += $dmlcount{I}{target}{$S}{$T};
@@ -9566,38 +9566,25 @@ sub push_rows {
 
     ## Copy rows from one table to another
     ## Typically called after delete_rows()
-    ## Arguments: five
-    ## 1. Hash of rows to copy, where the keys are the primary keys (\0 joined if multi)
+    ## Arguments: six
+    ## 1. Hashref of rows to copy, where the keys are the primary keys (\0 joined if multi). Can be empty.
     ## 2. Table object
     ## 3. Sync object (may be empty if we are not associated with a sync)
     ## 4. Source database object
     ## 5. Target database object, or arrayref of the same
+    ## 6. Action mode - currently only 'copy' and 'fullcopy'
     ## Returns: number of rows copied
 
-    my ($self,$rows,$Table,$Sync,$SourceDB,$TargetDB) = @_;
+    my ($self,$rows,$Table,$Sync,$SourceDB,$TargetDB,$mode) = @_;
 
-    ## Build a list of all PK values to feed to IN clauses
-    ## This is an array in case we go over $chunksize
-    my @pkvals = [];
-
-    ## If fullcopy, $rows will be the scalar 'fullcopy' instead of a hashref
-    my $fullcopy = 0;
-    if (! ref $rows) {
-        if ($rows eq 'fullcopy') {
-            $fullcopy = 1;
-            $self->glog('Setting push_rows to fullcopy mode', LOG_DEBUG);
-        }
-        else {
-            die "Invalid rows passed to push_rows: $rows\n";
-        }
-        $rows = {};
-        $pkvals[0] = 'fullcopy';
+    if ($mode eq 'fullcopy') {
+        $self->glog('Setting push_rows to fullcopy mode', LOG_DEBUG);
     }
 
     ## This will be zero for fullcopy of course
     my $total_rows = keys %$rows;
 
-    if (!$total_rows and !$fullcopy) {
+    if (!$total_rows and $mode ne 'fullcopy') {
         return 0; ## Can happen on a truncation
     }
 
@@ -9612,14 +9599,18 @@ sub push_rows {
 
     my $numpks = $Table->{numpkcols};
 
-    ## As with delete, we may break this into more than one step
+    ## As with delete, we may break this into more than one statement
     ## Should only be a factor for very large numbers of keys
     my $chunksize = $config{statement_chunk_size} || $default_statement_chunk_size;
 
+    ## Build a list of all PK values to feed to IN clauses
+    ## This is an array in case we go over $chunksize
+    my @pkvals = [];
+
     ## If there is only one primary key, and a sane number of rows, we can use '= ANY(?)'
-    if (! $fullcopy) {
-        if ($numpks == 1 and $total_rows < $chunksize) {
-            $pkvals[0] = 'allrows';
+    if ($mode ne 'fullcopy') {
+        if ($numpks == 1 and $total_rows <= $chunksize) {
+            $mode = 'anyclause';
         }
         ## Otherwise, we push our completed SQL into bins
         else {
@@ -9645,7 +9636,6 @@ sub push_rows {
         $TargetDB = [$TargetDB];
     }
 
-
     ## Figure out the different SELECT clauses, and assign targets to them
     my %srccmd;
     for my $t (@$TargetDB ) {
@@ -9743,15 +9733,15 @@ sub push_rows {
 
             ## If we are doing a small batch of single primary keys, use ANY
             ## If we are doing fullcopy, leave out the WHERE clause completely
-            if (! ref $pk_values ) {
+            if ($mode eq 'fullcopy' or $mode eq 'anyclause') {
                 my $srccmd = sprintf '%sCOPY (%s FROM %s %s) TO STDOUT%s',
                 $self->{sqlprefix},
                 $SELECT,
                 $source_tablename,
-                $pk_values eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)",
+                $mode eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)",
                 $Sync->{copyextra} ? " $Sync->{copyextra}" : '';
                 my $srcsth = $sourcedbh->prepare($srccmd);
-                $pk_values eq 'fullcopy' ? $srcsth->execute()
+                $mode eq 'fullcopy' ? $srcsth->execute()
                     : $srcsth->execute( [ keys %$rows ]);
             }
             else {
@@ -9915,7 +9905,7 @@ sub push_rows {
                 ##   normal action of a trigger and add a row to bucardo.track to indicate that
                 ##   it has already been replicated here.
                 my $d = $Sync->{db}{ $target->{name} };
-                if (!$fullcopy and $d->{does_makedelta}{$source_tablename} ) {
+                if ($mode ne 'fullcopy' and $d->{does_makedelta}{$source_tablename} ) {
 
                     $self->glog("Using makedelta to populate delta and track tables for $target->{name}.$tname", LOG_VERBOSE);
 
@@ -9928,7 +9918,7 @@ sub push_rows {
                             $baseq = '?,' x $numpks;
                             chop $baseq;
                         }
-                        my $number_values = ref $pk_values ? @$pk_values : keys %$rows;
+                        my $number_values = $mode eq 'copy' ? @$pk_values : keys %$rows;
                         my $placeholders = "($baseq)," x ($number_values / $numpks);
                         chop $placeholders;
 
@@ -9936,9 +9926,8 @@ sub push_rows {
                             $Table->{deltatable},
                                 $cols,
                                     $placeholders;
-                        $self->glog("GOT $placeholders leading to $SQL", LOG_DEBUG);
                         my $sth = $dbh->prepare($SQL);
-                        $sth->execute(ref $pk_values ? @$pk_values : (keys %$rows));
+                        $sth->execute($mode eq 'copy' ? @$pk_values : (keys %$rows));
                     }
 
                     # Make sure we track it - but only if this sync already acts as a source!