More cleanup of push_rows().
authorGreg Sabino Mullane <greg@endpoint.com>
Sun, 30 Aug 2015 03:06:07 +0000 (23:06 -0400)
committerGreg Sabino Mullane <greg@endpoint.com>
Sun, 30 Aug 2015 03:06:07 +0000 (23:06 -0400)
Now at about minimum desired code quality. :)

Bucardo.pm

index f368076a7f9c2f52330512b6c9200c0b42bff950..8ca860a8ba4e46b342bada910e54f0b35f6d6d94 100644 (file)
@@ -9234,7 +9234,7 @@ sub delete_rows {
         if ('firebird' eq $type) {
             $goat->{pklist} =~ s/\"//g; ## not ideal: fix someday
             $goat->{pklist} = uc $goat->{pklist};
-                       $tname = qq{"$tname"} if $tname !~ /"/;
+            $tname = qq{"$tname"} if $tname !~ /"/;
         }
 
         ## Internal counters to help us break queries into chunks if needed
@@ -9405,11 +9405,11 @@ sub delete_rows {
             ## The actual target name
             my $tname = $newname->{$t->{name}};
 
-                       $self->glog("Deleting from target $tname (type=$type)", LOG_DEBUG);
+            $self->glog("Deleting from target $tname (type=$type)", LOG_DEBUG);
 
-                       if ('firebird' eq $type) {
-                               $tname = qq{"$tname"} if $tname !~ /"/;
-                       }
+            if ('firebird' eq $type) {
+                $tname = qq{"$tname"} if $tname !~ /"/;
+            }
 
             if ('mongo' eq $type) {
 
@@ -9600,23 +9600,19 @@ sub delete_rows {
 
 sub push_rows {
 
-    ## Copy rows from one table to another
+    ## Copy rows from one table to others
     ## Typically called after delete_rows()
     ## Arguments: six
     ## 1. Hashref of rows to copy, where the keys are the primary keys (\0 joined if multi). Can be empty.
     ## 2. Table object
     ## 3. Sync object (may be empty if we are not associated with a sync)
     ## 4. Source database object
-    ## 5. Target database object, or arrayref of the same
+    ## 5. Target database object (or an arrayref of the same)
     ## 6. Action mode - currently only 'copy' and 'fullcopy'
     ## Returns: number of rows copied
 
     my ($self,$rows,$Table,$Sync,$SourceDB,$TargetDB,$mode) = @_;
 
-    if ($mode eq 'fullcopy') {
-        $self->glog('Setting push_rows to fullcopy mode', LOG_DEBUG);
-    }
-
     ## This will be zero for fullcopy of course
     my $total_rows = keys %$rows;
 
@@ -9624,19 +9620,9 @@ sub push_rows {
         return 0; ## Can happen on a truncation
     }
 
-    my $syncname = $Sync->{name} || '';
-
-    ## We may want to change the target table based on the customname table
-    ## It is up to the caller to populate these, even if the syncname is ''
-    my $customname = $Table->{newname}{$syncname} || {};
-
-    ## We may want to change the SELECT based on the customcols table
-    my $customcols = $Table->{newcols}{$syncname} || {};
-
     my $numpks = $Table->{numpkcols};
 
-    ## As with delete, we may break this into more than one statement
-    ## Should only be a factor for very large numbers of keys
+    ## If there are a large number of rows (and we are not using ANY) break the statement up
     my $chunksize = $config{statement_chunk_size} || $default_statement_chunk_size;
 
     ## Build a list of all PK values to feed to IN clauses
@@ -9648,25 +9634,27 @@ sub push_rows {
         if ($numpks == 1 and $total_rows <= $chunksize) {
             $mode = 'anyclause';
         }
-        ## Otherwise, we push our completed SQL into bins
+        ## Otherwise, we split up the primary key values into bins
         else {
-            my $pkrounds = 1;
+            my $pk_array_number = 0;
             my $current_row = 1;
 
             ## Loop through each row and create the needed SQL fragment
             for my $key (keys %$rows) {
 
-                push @{ $pkvals[$pkrounds-1] ||= [] } => split '\0', $key, -1;
+                push @{ $pkvals[$pk_array_number] ||= [] } => split '\0', $key, -1;
 
                 ## Make sure our SQL statement doesn't grow too large
                 if (++$current_row > $chunksize) {
                     $current_row = 1;
-                    $pkrounds++;
+                    $pk_array_number++;
                 }
             }
         }
     }
 
+    my $syncname = $Sync->{name} || '';
+
     ## Make sure TargetDB is an arrayref (may come as a single TargetDB object)
     if (ref $TargetDB ne 'ARRAY') {
         $TargetDB = [$TargetDB];
@@ -9674,20 +9662,25 @@ sub push_rows {
 
     ## Figure out the different SELECT clauses, and assign targets to them
     my %srccmd;
-    for my $t (@$TargetDB ) {
+    for my $Target (@$TargetDB ) {
 
         ## The SELECT clause we use (usually an empty string unless customcols is being used)
-        my $select_clause = $customcols->{$t->{name}} || '';
+        my $select_clause = $Table->{newcols}{$syncname}{$Target->{name}} || '';
 
         ## Associate this target with this clause
-        push @{$srccmd{$select_clause}} => $t;
+        push @{$srccmd{$select_clause}} => $Target;
     }
 
+    ## We may want to change the target table based on the customname table
+    ## It is up to the caller to populate these, even if the syncname is ''
+    my $customname = $Table->{newname}{$syncname} || {};
+
+     ## Name of the table to copy. Only Postgres can be used as a source
     my $source_tablename = "$Table->{safeschema}.$Table->{safetable}";
     my $sourcedbh = $SourceDB->{dbh};
 
-    ## The total number of source rows returned
-    my $total_source_rows = 0;
+    ## Actual number of source rows read and copied. May be less than $total_rows
+    my $source_rows_read = 0;
 
     ## Loop through each select command and push it out to all targets that are associated with it
     for my $select_clause (sort keys %srccmd) {
@@ -9696,99 +9689,90 @@ sub push_rows {
         my $SELECT = $select_clause || 'SELECT *';
 
         ## Prepare each target that is using this select clause
-        for my $target (@{ $srccmd{$select_clause} }) {
+        for my $Target (@{ $srccmd{$select_clause} }) {
 
             ## Internal name of this target
-            my $targetname = $target->{name};
+            my $targetname = $Target->{name};
 
             ## The actual target table name. Depends on dbtype and customname table entries
             my $target_tablename = $customname->{$targetname};
 
             ## The columns we are pushing to, both as an arrayref and a CSV:
             my $cols = $Table->{tcolumns}{$SELECT};
-            my $columnlist = $target->{does_sql} ?
-                ('(' . (join ',', map { $target->{dbh}->quote_identifier($_) } @$cols) . ')')
+            my $columnlist = $Target->{does_sql} ?
+                ('(' . (join ',', map { $Target->{dbh}->quote_identifier($_) } @$cols) . ')')
               : ('(' . (join ',', map { $_ } @$cols) . ')');
 
-            my $type = $target->{dbtype};
+            my $type = $Target->{dbtype};
 
-            ## Use columnlist below so we never have to worry about the order
-            ## of the columns on the target
+            ## Using columnlist avoids worrying about the order of columns
 
             if ('postgres' eq $type) {
                 my $tgtcmd = "$self->{sqlprefix}COPY $target_tablename$columnlist FROM STDIN";
-                $target->{dbh}->do($tgtcmd);
+                $Target->{dbh}->do($tgtcmd);
+            }
+            elsif ('firebird' eq $type) {
+                $columnlist =~ s/\"//g;
+                $target_tablename = qq{"$target_tablename"} if $target_tablename !~ /"/;
+                my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
+                $tgtcmd .= '?,' x @$cols;
+                $tgtcmd =~ s/,$/)/o;
+                $Target->{sth} = $Target->{dbh}->prepare($tgtcmd);
             }
             elsif ('flatpg' eq $type) {
-                print {$target->{filehandle}} "COPY $target_tablename$columnlist FROM STDIN;\n";
-                $self->glog(qq{Appended to flatfile "$target->{filename}"}, LOG_VERBOSE);
+                print {$Target->{filehandle}} "COPY $target_tablename$columnlist FROM STDIN;\n";
             }
             elsif ('flatsql' eq $type) {
-                print {$target->{filehandle}} "INSERT INTO $target_tablename$columnlist VALUES\n";
-                $self->glog(qq{Appended to flatfile "$target->{filename}"}, LOG_VERBOSE);
+                print {$Target->{filehandle}} "INSERT INTO $target_tablename$columnlist VALUES\n";
             }
             elsif ('mongo' eq $type) {
-                $self->{collection} = $target->{dbh}->get_collection($target_tablename);
+                $self->{collection} = $Target->{dbh}->get_collection($target_tablename);
             }
             elsif ('redis' eq $type) {
                 ## No setup needed
             }
-            elsif ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type) {
+            elsif ('sqlite' eq $type or 'oracle' eq $type or
+                   'mysql' eq $type or 'mariadb' eq $type or 'drizzle' eq $type) {
                 my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
                 $tgtcmd .= '?,' x @$cols;
                 $tgtcmd =~ s/,$/)/o;
-                $target->{sth} = $target->{dbh}->prepare($tgtcmd);
-            }
-            elsif ('firebird' eq $type) {
-                               $columnlist =~ s/\"//g;
-                               $target_tablename = qq{"$target_tablename"} if $target_tablename !~ /"/;
-                my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
-                $tgtcmd .= '?,' x @$cols;
-                $tgtcmd =~ s/,$/)/o;
-                $target->{sth} = $target->{dbh}->prepare($tgtcmd);
-            }
-            elsif ('oracle' eq $type) {
-                my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
-                $tgtcmd .= '?,' x @$cols;
-                $tgtcmd =~ s/,$/)/o;
-                $target->{sth} = $target->{dbh}->prepare($tgtcmd);
-            }
-            elsif ('sqlite' eq $type) {
-                my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
-                $tgtcmd .= '?,' x @$cols;
-                $tgtcmd =~ s/,$/)/o;
-                $target->{sth} = $target->{dbh}->prepare($tgtcmd);
+                $Target->{sth} = $Target->{dbh}->prepare($tgtcmd);
             }
             else {
                 die qq{No support for database type "$type" yet!};
             }
 
+            if ($type =~ /flat/) {
+                $self->glog(qq{Appended to flatfile "$Target->{filename}"}, LOG_VERBOSE);
+            }
+
         } ## end preparing each target for this select clause
 
         my $loop = 1;
-        my $pcount = @pkvals;
-        my $fromname = $SourceDB->{name};
+        my $number_chunks = @pkvals;
 
         ## Loop through each chunk of primary keys to copy over
         for my $pk_values (@pkvals) {
 
             ## Start streaming rows from the source
-            $self->glog(qq{Copying from $fromname.$source_tablename}, LOG_VERBOSE);
+            my $pre = $number_chunks > 1 ? "/* $loop of $number_chunks */ " : '';
+            $self->glog(qq{${pre}Copying from $SourceDB->{name}.$source_tablename}, LOG_VERBOSE);
 
             ## If we are doing a small batch of single primary keys, use ANY
-            ## If we are doing fullcopy, leave out the WHERE clause completely
+            ## For a fullcopy mode, leave the WHERE clause out completely
             if ($mode eq 'fullcopy' or $mode eq 'anyclause') {
                 my $srccmd = sprintf '%sCOPY (%s FROM %s %s) TO STDOUT%s',
-                $self->{sqlprefix},
-                $SELECT,
-                $source_tablename,
-                $mode eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)",
-                $Sync->{copyextra} ? " $Sync->{copyextra}" : '';
+                    $self->{sqlprefix},
+                    $SELECT,
+                    $source_tablename,
+                    $mode eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)",
+                    $Sync->{copyextra} ? " $Sync->{copyextra}" : '';
+
                 my $srcsth = $sourcedbh->prepare($srccmd);
-                $mode eq 'fullcopy' ? $srcsth->execute()
-                    : $srcsth->execute( [ keys %$rows ]);
+                $mode eq 'fullcopy' ? $srcsth->execute() : $srcsth->execute( [ keys %$rows ]);
             }
             else {
+                ## Create the proper number of placeholders
                 my $baseq = '?';
                 if ($numpks > 1) {
                     $baseq = '?,' x $numpks;
@@ -9797,53 +9781,52 @@ sub push_rows {
                 my $number_values = @$pk_values;
                 my $placeholders = "$baseq," x ($number_values / $numpks);
                 chop $placeholders;
+
                 my $srccmd = sprintf '%s%sCOPY (%s FROM %s WHERE %s IN (%s)) TO STDOUT%s',
-                    "/* $loop of $pcount */ ",
+                    $pre,
                     $self->{sqlprefix},
                     $SELECT,
                     $source_tablename,
                     $Table->{pkeycols},
                     $placeholders,
                     $Sync->{copyextra} ? " $Sync->{copyextra}" : '';
+
                 my $srcsth = $sourcedbh->prepare($srccmd);
                 $srcsth->execute( @$pk_values );
-                $loop++;
             }
 
-            ## How many rows we read from the source database this chunk
-            my $source_rows = 0;
-
             ## Loop through each row output from the source, storing it in $buffer
-            ## Future optimzation: slurp in X rows at a time, then process them
+            ## Future optimization: slurp in X rows at a time, then process them
             my $buffer = '';
             while ($sourcedbh->pg_getcopydata($buffer) >= 0) {
 
-                $source_rows++;
+                $source_rows_read++;
 
                 ## For each target using this particular SELECT clause
-                for my $target (@{ $srccmd{$select_clause} }) {
+                for my $Target (@{ $srccmd{$select_clause} }) {
 
-                    my $type = $target->{dbtype};
+                    my $type = $Target->{dbtype};
 
                     ## For Postgres, we simply do COPY to COPY
                     if ('postgres' eq $type) {
-                        $target->{dbh}->pg_putcopydata($buffer);
+                        $Target->{dbh}->pg_putcopydata($buffer);
                     }
                     ## For flat files destined for Postgres, just do a tab-delimited dump
                     elsif ('flatpg' eq $type) {
-                        print {$target->{filehandle}} $buffer;
+                        print {$Target->{filehandle}} $buffer;
                     }
                     ## For other flat files, make a standard VALUES list
                     elsif ('flatsql' eq $type) {
                         chomp $buffer;
-                        if ($source_rows > 1) {
-                            print {$target->{filehandle}} ",\n";
+                        if ($source_rows_read > 1) {
+                            print {$Target->{filehandle}} ",\n";
                         }
-                        print {$target->{filehandle}} '(' .
+                        print {$Target->{filehandle}} '(' .
                              (join ',' => map { $self->{masterdbh}->quote($_) } split /\t/, $buffer, -1) . ')';
                     }
                     ## For Mongo, do some mongomagic
                     elsif ('mongo' eq $type) {
+
                         ## Have to map these values back to their names
                         chomp $buffer;
                         my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
@@ -9875,26 +9858,8 @@ sub push_rows {
                         }
                         $self->{collection}->insert($object, { safe => 1 });
                     }
-                    ## For MySQL, MariaDB, Firebird, Drizzle, Oracle, and SQLite, do some basic INSERTs
-                    elsif ('mysql' eq $type
-                            or 'mariadb' eq $type
-                            or 'drizzle' eq $type
-                            or 'firebird' eq $type
-                            or 'oracle' eq $type
-                            or 'sqlite' eq $type) {
-                        chomp $buffer;
-                        my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
-                        my $targetcols = $Table->{tcolumns}{$SELECT};
-                        for my $cindex (0..@cols) {
-                            next unless defined $cols[$cindex];
-                            if ($Table->{columnhash}{$targetcols->[$cindex]}{ftype} eq 'boolean') {
-                                # BOOLEAN support is inconsistent, but almost everyone will coerce 1/0 to TRUE/FALSE
-                                $cols[$cindex] = ( $cols[$cindex] =~ /^[1ty]/i )? 1 : 0;
-                            }
-                        }
-                        $target->{sth}->execute(@cols);
-                    }
                     elsif ('redis' eq $type) {
+
                         ## We are going to set a Redis hash, in which the key is "tablename:pkeyvalue"
                         chomp $buffer;
                         my @colvals = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
@@ -9913,17 +9878,41 @@ sub push_rows {
                             push @add, $targetcols->[$i], $val;
                         }
 
-                        my $target_tablename = $customname->{$target->{name}};
-                        $target->{dbh}->hmset("$target_tablename:$pkeyval", @add);
+                        my $target_tablename = $customname->{$Target->{name}};
+                        $Target->{dbh}->hmset("$target_tablename:$pkeyval", @add);
+                    }
+                    ## For SQLite, MySQL, MariaDB, Firebird, Drizzle, and Oracle, do some basic INSERTs
+                    elsif ('sqlite' eq $type
+                            or 'oracle' eq $type
+                            or 'mysql' eq $type
+                            or 'mariadb' eq $type
+                            or 'drizzle' eq $type
+                            or 'firebird' eq $type) {
+
+                        chomp $buffer;
+                        my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
+                        my $targetcols = $Table->{tcolumns}{$SELECT};
+                        for my $cindex (0..@cols) {
+                            next unless defined $cols[$cindex];
+                            if ($Table->{columnhash}{$targetcols->[$cindex]}{ftype} eq 'boolean') {
+                                # BOOLEAN support is inconsistent, but almost everyone will coerce 1/0 to TRUE/FALSE
+                                $cols[$cindex] = ( $cols[$cindex] =~ /^[1ty]/i )? 1 : 0;
+                            }
+                        }
+                        $Target->{sth}->execute(@cols);
+                    }
+                    ## Safety valve:
+                    else {
+                        die qq{No support for database type "$type" yet!};
                     }
 
                 } ## end each target
 
             } ## end each row pulled from the source
 
-            $total_source_rows += $source_rows;
+            $loop++;
 
-        } ## end each pklist
+        } ## end each chunk of primary keys
 
         ## Workaround for DBD::Pg bug
         ## Once we require a minimum version of 2.18.1 or better, we can remove this!
@@ -9932,32 +9921,35 @@ sub push_rows {
         }
 
         ## Perform final cleanups for each target
-        for my $target (@{ $srccmd{$select_clause} }) {
+        for my $Target (@{ $srccmd{$select_clause} }) {
+
+            my $target_tablename = $customname->{$Target->{name}};
 
-            my $type = $target->{dbtype};
+            my $type = $Target->{dbtype};
 
-            my $tname = $customname->{$target->{name}};
+            $self->glog(qq{Rows copied to ($type) $Target->{name}.$target_tablename: $source_rows_read}, LOG_VERBOSE);
 
             if ('postgres' eq $type) {
-                my $dbh = $target->{dbh};
+                my $dbh = $Target->{dbh};
                 $dbh->pg_putcopyend();
                 ## Same bug as above
                 if ($self->{dbdpgversion} < 21801) {
                     $dbh->do('SELECT 1');
                 }
-                $self->glog(qq{Rows copied to $target->{name}.$tname: $total_rows}, LOG_VERBOSE);
                 ## If this table is set to makedelta, add rows to bucardo.delta to simulate the
                 ##   normal action of a trigger and add a row to bucardo.track to indicate that
                 ##   it has already been replicated here.
-                my $d = $Sync->{db}{ $target->{name} };
+                my $d = $Sync->{db}{ $Target->{name} };
                 if ($mode ne 'fullcopy' and $d->{does_makedelta}{$source_tablename} ) {
 
-                    $self->glog("Using makedelta to populate delta and track tables for $target->{name}.$tname", LOG_VERBOSE);
+                    $self->glog("Using makedelta to populate delta and track tables for $Target->{name}.$target_tablename", LOG_VERBOSE);
 
                     my $cols = join ',' => @{ $Table->{qpkey} };
 
+                    ## We use the original list, not what may have actually got copied!
                     for my $pk_values (@pkvals) {
 
+                        ## Generate the correct number of placeholders
                         my $baseq = '?';
                         if ($numpks > 1) {
                             $baseq = '?,' x $numpks;
@@ -9969,14 +9961,15 @@ sub push_rows {
 
                         my $SQL = sprintf 'INSERT INTO bucardo.%s (%s) VALUES %s',
                             $Table->{deltatable},
-                                $cols,
-                                    $placeholders;
+                            $cols,
+                            $placeholders;
+
                         my $sth = $dbh->prepare($SQL);
                         $sth->execute($mode eq 'copy' ? @$pk_values : (keys %$rows));
                     }
 
                     # Make sure we track it - but only if this sync already acts as a source!
-                    if ($target->{role} eq 'source') {
+                    if ($Target->{role} eq 'source') {
                         $dbh->do(qq{
                             INSERT INTO bucardo.$Table->{tracktable}
                             VALUES (NOW(), ?)
@@ -9987,35 +9980,34 @@ sub push_rows {
                     ## However, we do not want to kick unless they are set to autokick and active
                     ## This works even if we do not have a real syncs, as $syncname will be ''
                     $self->glog('Signalling other syncs that this table has changed', LOG_DEBUG);
-                    if (! exists $self->{kick_othersyncs}{$syncname}{$tname}) {
+                    if (! exists $self->{kick_othersyncs}{$syncname}{$target_tablename}) {
                         $SQL = 'SELECT name FROM sync WHERE herd IN (SELECT herd FROM herdmap WHERE goat IN (SELECT id FROM goat WHERE schemaname=? AND tablename = ?)) AND name <> ? AND autokick AND status = ?';
                         $sth = $self->{masterdbh}->prepare($SQL);
                         $sth->execute($Table->{schemaname}, $Table->{tablename}, $syncname, 'active');
-                        $self->{kick_othersyncs}{$syncname}{$tname} = $sth->fetchall_arrayref();
+                        $self->{kick_othersyncs}{$syncname}{$target_tablename} = $sth->fetchall_arrayref();
                     }
-                    for my $row (@{ $self->{kick_othersyncs}{$syncname}{$tname} }) {
+                    ## For each sync returned from the query above, send a kick request
+                    for my $row (@{ $self->{kick_othersyncs}{$syncname}{$target_tablename} }) {
                         my $othersync = $row->[0];
                         $self->db_notify($dbh, "kick_sync_$othersync", 0, '', 1);
                     }
                 }
             }
             elsif ('flatpg' eq $type) {
-                print {$target->{filehandle}} "\\\.\n\n";
+                print {$Target->{filehandle}} "\\\.\n\n";
             }
             elsif ('flatsql' eq $type) {
-                print {$target->{filehandle}} ";\n\n";
-            }
-            elsif ('redis' eq $type) {
-                $self->glog(qq{Rows copied to Redis $target->{name}.$tname:<pkeyvalue>: $total_source_rows}, LOG_VERBOSE);
+                print {$Target->{filehandle}} ";\n\n";
             }
             else {
-                ## Nothing to be done for mongo, mysql, mariadb, sqlite, oracle, firebird
+                ## Nothing to be done for mongo, mysql, mariadb, sqlite, oracle, firebird, redis
             }
-        }
+
+        } ## end each Target
 
     } ## end of each clause in the source command list
 
-    return $total_source_rows;
+    return $source_rows_read;
 
 } ## end of push_rows