PostgreSQL Source Code git master
pg_backup_archiver.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_backup_archiver.c
4 *
5 * Private implementation of the archiver routines.
6 *
7 * See the headers to pg_restore for more details.
8 *
9 * Copyright (c) 2000, Philip Warner
10 * Rights are granted to use this software in any way so long
11 * as this notice is not removed.
12 *
13 * The author is not responsible for loss or damages that may
14 * result from its use.
15 *
16 *
17 * IDENTIFICATION
18 * src/bin/pg_dump/pg_backup_archiver.c
19 *
20 *-------------------------------------------------------------------------
21 */
22#include "postgres_fe.h"
23
24#include <ctype.h>
25#include <fcntl.h>
26#include <unistd.h>
27#include <sys/stat.h>
28#include <sys/wait.h>
29#ifdef WIN32
30#include <io.h>
31#endif
32
33#include "catalog/pg_class_d.h"
34#include "common/string.h"
35#include "compress_io.h"
36#include "dumputils.h"
38#include "lib/binaryheap.h"
39#include "lib/stringinfo.h"
40#include "libpq/libpq-fs.h"
41#include "parallel.h"
42#include "pg_backup_archiver.h"
43#include "pg_backup_db.h"
44#include "pg_backup_utils.h"
45
46#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
47#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
48
49#define TOC_PREFIX_NONE ""
50#define TOC_PREFIX_DATA "Data for "
51#define TOC_PREFIX_STATS "Statistics for "
52
53static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
54 const pg_compress_specification compression_spec,
56 SetupWorkerPtrType setupWorkerPtr,
58static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
59static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
60static char *sanitize_line(const char *str, bool want_hyphen);
62static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
63static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
64static void _becomeUser(ArchiveHandle *AH, const char *user);
65static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
66static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
67static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
68static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
70 TocEntry *te);
71static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
74static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
76static bool _tocEntryIsACL(TocEntry *te);
80static void buildTocEntryArrays(ArchiveHandle *AH);
81static void _moveBefore(TocEntry *pos, TocEntry *te);
83
84static int RestoringToDB(ArchiveHandle *AH);
85static void dump_lo_buf(ArchiveHandle *AH);
86static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
87static void SetOutput(ArchiveHandle *AH, const char *filename,
88 const pg_compress_specification compression_spec, bool append_data);
90static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
91
92static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
94 TocEntry *pending_list);
96 ParallelState *pstate,
97 TocEntry *pending_list);
99 TocEntry *pending_list);
100static void pending_list_header_init(TocEntry *l);
101static void pending_list_append(TocEntry *l, TocEntry *te);
102static void pending_list_remove(TocEntry *te);
103static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
104static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
105static void move_to_ready_heap(TocEntry *pending_list,
106 binaryheap *ready_heap,
107 RestorePass pass);
108static TocEntry *pop_next_work_item(binaryheap *ready_heap,
109 ParallelState *pstate);
110static void mark_dump_job_done(ArchiveHandle *AH,
111 TocEntry *te,
112 int status,
113 void *callback_data);
115 TocEntry *te,
116 int status,
117 void *callback_data);
118static void fix_dependencies(ArchiveHandle *AH);
119static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
122static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
123 binaryheap *ready_heap);
124static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
126
127static void StrictNamesCheck(RestoreOptions *ropt);
128
129
130/*
131 * Allocate a new DumpOptions block containing all default values.
132 */
135{
137
139 return opts;
140}
141
142/*
143 * Initialize a DumpOptions struct to all default values
144 */
145void
147{
148 memset(opts, 0, sizeof(DumpOptions));
149 /* set any fields that shouldn't default to zeroes */
150 opts->include_everything = true;
151 opts->cparams.promptPassword = TRI_DEFAULT;
152 opts->dumpSections = DUMP_UNSECTIONED;
153 opts->dumpSchema = true;
154 opts->dumpData = true;
155 opts->dumpStatistics = true;
156}
157
158/*
159 * Create a freshly allocated DumpOptions with options equivalent to those
160 * found in the given RestoreOptions.
161 */
164{
165 DumpOptions *dopt = NewDumpOptions();
166
167 /* this is the inverse of what's at the end of pg_dump.c's main() */
168 dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
169 dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
170 dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
171 dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
173 dopt->outputClean = ropt->dropSchema;
174 dopt->dumpData = ropt->dumpData;
175 dopt->dumpSchema = ropt->dumpSchema;
176 dopt->dumpSections = ropt->dumpSections;
177 dopt->dumpStatistics = ropt->dumpStatistics;
178 dopt->if_exists = ropt->if_exists;
179 dopt->column_inserts = ropt->column_inserts;
180 dopt->aclsSkip = ropt->aclsSkip;
181 dopt->outputSuperuser = ropt->superuser;
182 dopt->outputCreateDB = ropt->createDB;
183 dopt->outputNoOwner = ropt->noOwner;
184 dopt->outputNoTableAm = ropt->noTableAm;
185 dopt->outputNoTablespaces = ropt->noTablespace;
187 dopt->use_setsessauth = ropt->use_setsessauth;
189 dopt->dump_inserts = ropt->dump_inserts;
190 dopt->no_comments = ropt->no_comments;
191 dopt->no_policies = ropt->no_policies;
192 dopt->no_publications = ropt->no_publications;
195 dopt->lockWaitTimeout = ropt->lockWaitTimeout;
198 dopt->sequence_data = ropt->sequence_data;
199
200 return dopt;
201}
202
203
204/*
205 * Wrapper functions.
206 *
207 * The objective is to make writing new formats and dumpers as simple
208 * as possible, if necessary at the expense of extra function calls etc.
209 *
210 */
211
212/*
213 * The dump worker setup needs lots of knowledge of the internals of pg_dump,
214 * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
215 * setup doesn't need to know anything much, so it's defined here.
216 */
217static void
219{
220 ArchiveHandle *AH = (ArchiveHandle *) AHX;
221
222 AH->ReopenPtr(AH);
223}
224
225
226/* Create a new archive */
227/* Public */
228Archive *
229CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
230 const pg_compress_specification compression_spec,
231 bool dosync, ArchiveMode mode,
234
235{
236 ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
238
239 return (Archive *) AH;
240}
241
242/* Open an existing archive */
243/* Public */
244Archive *
245OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
246{
247 ArchiveHandle *AH;
248 pg_compress_specification compression_spec = {0};
249
250 compression_spec.algorithm = PG_COMPRESSION_NONE;
251 AH = _allocAH(FileSpec, fmt, compression_spec, true,
254
255 return (Archive *) AH;
256}
257
258/* Public */
259void
261{
262 ArchiveHandle *AH = (ArchiveHandle *) AHX;
263
264 AH->ClosePtr(AH);
265
266 /* Close the output */
267 errno = 0;
268 if (!EndCompressFileHandle(AH->OF))
269 pg_fatal("could not close output file: %m");
270}
271
272/* Public */
273void
275{
276 /* Caller can omit dump options, in which case we synthesize them */
277 if (dopt == NULL && ropt != NULL)
279
280 /* Save options for later access */
281 AH->dopt = dopt;
282 AH->ropt = ropt;
283}
284
285/* Public */
286void
288{
289 ArchiveHandle *AH = (ArchiveHandle *) AHX;
290 RestoreOptions *ropt = AH->public.ropt;
291 TocEntry *te;
292 teSection curSection;
293
294 /* Decide which TOC entries will be dumped/restored, and mark them */
295 curSection = SECTION_PRE_DATA;
296 for (te = AH->toc->next; te != AH->toc; te = te->next)
297 {
298 /*
299 * When writing an archive, we also take this opportunity to check
300 * that we have generated the entries in a sane order that respects
301 * the section divisions. When reading, don't complain, since buggy
302 * old versions of pg_dump might generate out-of-order archives.
303 */
304 if (AH->mode != archModeRead)
305 {
306 switch (te->section)
307 {
308 case SECTION_NONE:
309 /* ok to be anywhere */
310 break;
311 case SECTION_PRE_DATA:
312 if (curSection != SECTION_PRE_DATA)
313 pg_log_warning("archive items not in correct section order");
314 break;
315 case SECTION_DATA:
316 if (curSection == SECTION_POST_DATA)
317 pg_log_warning("archive items not in correct section order");
318 break;
320 /* ok no matter which section we were in */
321 break;
322 default:
323 pg_fatal("unexpected section code %d",
324 (int) te->section);
325 break;
326 }
327 }
328
329 if (te->section != SECTION_NONE)
330 curSection = te->section;
331
332 te->reqs = _tocEntryRequired(te, curSection, AH);
333 }
334
335 /* Enforce strict names checking */
336 if (ropt->strict_names)
337 StrictNamesCheck(ropt);
338}
339
340/*
341 * RestoreArchive
342 *
343 * If append_data is set, then append data into file as we are restoring dump
344 * of multiple databases which was taken by pg_dumpall.
345 */
346void
348{
349 ArchiveHandle *AH = (ArchiveHandle *) AHX;
350 RestoreOptions *ropt = AH->public.ropt;
351 bool parallel_mode;
352 TocEntry *te;
354
356
357 /*
358 * If we're going to do parallel restore, there are some restrictions.
359 */
360 parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
361 if (parallel_mode)
362 {
363 /* We haven't got round to making this work for all archive formats */
364 if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
365 pg_fatal("parallel restore is not supported with this archive file format");
366
367 /* Doesn't work if the archive represents dependencies as OIDs */
368 if (AH->version < K_VERS_1_8)
369 pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
370
371 /*
372 * It's also not gonna work if we can't reopen the input file, so
373 * let's try that immediately.
374 */
375 AH->ReopenPtr(AH);
376 }
377
378 /*
379 * Make sure we won't need (de)compression we haven't got
380 */
381 if (AH->PrintTocDataPtr != NULL)
382 {
383 for (te = AH->toc->next; te != AH->toc; te = te->next)
384 {
385 if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
386 {
388
389 if (errmsg)
390 pg_fatal("cannot restore from compressed archive (%s)",
391 errmsg);
392 else
393 break;
394 }
395 }
396 }
397
398 /*
399 * Prepare index arrays, so we can assume we have them throughout restore.
400 * It's possible we already did this, though.
401 */
402 if (AH->tocsByDumpId == NULL)
404
405 /*
406 * If we're using a DB connection, then connect it.
407 */
408 if (ropt->useDB)
409 {
410 pg_log_info("connecting to database for restore");
411 if (AH->version < K_VERS_1_3)
412 pg_fatal("direct database connections are not supported in pre-1.3 archives");
413
414 /*
415 * We don't want to guess at whether the dump will successfully
416 * restore; allow the attempt regardless of the version of the restore
417 * target.
418 */
419 AHX->minRemoteVersion = 0;
420 AHX->maxRemoteVersion = 9999999;
421
422 ConnectDatabaseAhx(AHX, &ropt->cparams, false);
423
424 /*
425 * If we're talking to the DB directly, don't send comments since they
426 * obscure SQL when displaying errors
427 */
428 AH->noTocComments = 1;
429 }
430
431 /*
432 * Work out if we have an implied schema-less restore. This can happen if
433 * the dump excluded the schema or the user has used a toc list to exclude
434 * all of the schema data. All we do is look for schema entries - if none
435 * are found then we unset the dumpSchema flag.
436 *
437 * We could scan for wanted TABLE entries, but that is not the same as
438 * data-only. At this stage, it seems unnecessary (6-Mar-2001).
439 */
440 if (ropt->dumpSchema)
441 {
442 bool no_schema_found = true;
443
444 for (te = AH->toc->next; te != AH->toc; te = te->next)
445 {
446 if ((te->reqs & REQ_SCHEMA) != 0)
447 {
448 no_schema_found = false;
449 break;
450 }
451 }
452 if (no_schema_found)
453 {
454 ropt->dumpSchema = false;
455 pg_log_info("implied no-schema restore");
456 }
457 }
458
459 /*
460 * Setup the output file if necessary.
461 */
462 sav = SaveOutput(AH);
465
466 ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
467
468 if (AH->archiveRemoteVersion)
469 ahprintf(AH, "-- Dumped from database version %s\n",
471 if (AH->archiveDumpVersion)
472 ahprintf(AH, "-- Dumped by pg_dump version %s\n",
474
475 ahprintf(AH, "\n");
476
477 if (AH->public.verbose)
478 dumpTimestamp(AH, "Started on", AH->createDate);
479
480 if (ropt->single_txn)
481 {
482 if (AH->connection)
483 StartTransaction(AHX);
484 else
485 ahprintf(AH, "BEGIN;\n\n");
486 }
487
488 /*
489 * Establish important parameter values right away.
490 */
492
494
495 /*
496 * Drop the items at the start, in reverse order
497 */
498 if (ropt->dropSchema)
499 {
500 for (te = AH->toc->prev; te != AH->toc; te = te->prev)
501 {
502 AH->currentTE = te;
503
504 /*
505 * In createDB mode, issue a DROP *only* for the database as a
506 * whole. Issuing drops against anything else would be wrong,
507 * because at this point we're connected to the wrong database.
508 * (The DATABASE PROPERTIES entry, if any, should be treated like
509 * the DATABASE entry.)
510 */
511 if (ropt->createDB)
512 {
513 if (strcmp(te->desc, "DATABASE") != 0 &&
514 strcmp(te->desc, "DATABASE PROPERTIES") != 0)
515 continue;
516 }
517
518 /* Otherwise, drop anything that's selected and has a dropStmt */
519 if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
520 {
521 bool not_allowed_in_txn = false;
522
523 pg_log_info("dropping %s %s", te->desc, te->tag);
524
525 /*
526 * In --transaction-size mode, we have to temporarily exit our
527 * transaction block to drop objects that can't be dropped
528 * within a transaction.
529 */
530 if (ropt->txn_size > 0)
531 {
532 if (strcmp(te->desc, "DATABASE") == 0 ||
533 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
534 {
535 not_allowed_in_txn = true;
536 if (AH->connection)
538 else
539 ahprintf(AH, "COMMIT;\n");
540 }
541 }
542
543 /* Select owner and schema as necessary */
544 _becomeOwner(AH, te);
545 _selectOutputSchema(AH, te->namespace);
546
547 /*
548 * Now emit the DROP command, if the object has one. Note we
549 * don't necessarily emit it verbatim; at this point we add an
550 * appropriate IF EXISTS clause, if the user requested it.
551 */
552 if (strcmp(te->desc, "BLOB METADATA") == 0)
553 {
554 /* We must generate the per-blob commands */
555 if (ropt->if_exists)
556 IssueCommandPerBlob(AH, te,
557 "SELECT pg_catalog.lo_unlink(oid) "
558 "FROM pg_catalog.pg_largeobject_metadata "
559 "WHERE oid = '", "'");
560 else
561 IssueCommandPerBlob(AH, te,
562 "SELECT pg_catalog.lo_unlink('",
563 "')");
564 }
565 else if (*te->dropStmt != '\0')
566 {
567 if (!ropt->if_exists ||
568 strncmp(te->dropStmt, "--", 2) == 0)
569 {
570 /*
571 * Without --if-exists, or if it's just a comment (as
572 * happens for the public schema), print the dropStmt
573 * as-is.
574 */
575 ahprintf(AH, "%s", te->dropStmt);
576 }
577 else
578 {
579 /*
580 * Inject an appropriate spelling of "if exists". For
581 * old-style large objects, we have a routine that
582 * knows how to do it, without depending on
583 * te->dropStmt; use that. For other objects we need
584 * to parse the command.
585 */
586 if (strcmp(te->desc, "BLOB") == 0)
587 {
589 }
590 else
591 {
592 char *dropStmt = pg_strdup(te->dropStmt);
593 char *dropStmtOrig = dropStmt;
595
596 /*
597 * Need to inject IF EXISTS clause after ALTER
598 * TABLE part in ALTER TABLE .. DROP statement
599 */
600 if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
601 {
603 "ALTER TABLE IF EXISTS");
604 dropStmt = dropStmt + 11;
605 }
606
607 /*
608 * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
609 * not support the IF EXISTS clause, and therefore
610 * we simply emit the original command for DEFAULT
611 * objects (modulo the adjustment made above).
612 *
613 * Likewise, don't mess with DATABASE PROPERTIES.
614 *
615 * If we used CREATE OR REPLACE VIEW as a means of
616 * quasi-dropping an ON SELECT rule, that should
617 * be emitted unchanged as well.
618 *
619 * For other object types, we need to extract the
620 * first part of the DROP which includes the
621 * object type. Most of the time this matches
622 * te->desc, so search for that; however for the
623 * different kinds of CONSTRAINTs, we know to
624 * search for hardcoded "DROP CONSTRAINT" instead.
625 */
626 if (strcmp(te->desc, "DEFAULT") == 0 ||
627 strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
628 strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
629 appendPQExpBufferStr(ftStmt, dropStmt);
630 else
631 {
632 char buffer[40];
633 char *mark;
634
635 if (strcmp(te->desc, "CONSTRAINT") == 0 ||
636 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
637 strcmp(te->desc, "FK CONSTRAINT") == 0)
638 strcpy(buffer, "DROP CONSTRAINT");
639 else
640 snprintf(buffer, sizeof(buffer), "DROP %s",
641 te->desc);
642
643 mark = strstr(dropStmt, buffer);
644
645 if (mark)
646 {
647 *mark = '\0';
648 appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
649 dropStmt, buffer,
650 mark + strlen(buffer));
651 }
652 else
653 {
654 /* complain and emit unmodified command */
655 pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
656 dropStmtOrig);
657 appendPQExpBufferStr(ftStmt, dropStmt);
658 }
659 }
660
661 ahprintf(AH, "%s", ftStmt->data);
662
663 destroyPQExpBuffer(ftStmt);
664 pg_free(dropStmtOrig);
665 }
666 }
667 }
668
669 /*
670 * In --transaction-size mode, re-establish the transaction
671 * block if needed; otherwise, commit after every N drops.
672 */
673 if (ropt->txn_size > 0)
674 {
675 if (not_allowed_in_txn)
676 {
677 if (AH->connection)
678 StartTransaction(AHX);
679 else
680 ahprintf(AH, "BEGIN;\n");
681 AH->txnCount = 0;
682 }
683 else if (++AH->txnCount >= ropt->txn_size)
684 {
685 if (AH->connection)
686 {
688 StartTransaction(AHX);
689 }
690 else
691 ahprintf(AH, "COMMIT;\nBEGIN;\n");
692 AH->txnCount = 0;
693 }
694 }
695 }
696 }
697
698 /*
699 * _selectOutputSchema may have set currSchema to reflect the effect
700 * of a "SET search_path" command it emitted. However, by now we may
701 * have dropped that schema; or it might not have existed in the first
702 * place. In either case the effective value of search_path will not
703 * be what we think. Forcibly reset currSchema so that we will
704 * re-establish the search_path setting when needed (after creating
705 * the schema).
706 *
707 * If we treated users as pg_dump'able objects then we'd need to reset
708 * currUser here too.
709 */
710 free(AH->currSchema);
711 AH->currSchema = NULL;
712 }
713
714 if (parallel_mode)
715 {
716 /*
717 * In parallel mode, turn control over to the parallel-restore logic.
718 */
719 ParallelState *pstate;
720 TocEntry pending_list;
721
722 /* The archive format module may need some setup for this */
725
726 pending_list_header_init(&pending_list);
727
728 /* This runs PRE_DATA items and then disconnects from the database */
729 restore_toc_entries_prefork(AH, &pending_list);
730 Assert(AH->connection == NULL);
731
732 /* ParallelBackupStart() will actually fork the processes */
733 pstate = ParallelBackupStart(AH);
734 restore_toc_entries_parallel(AH, pstate, &pending_list);
735 ParallelBackupEnd(AH, pstate);
736
737 /* reconnect the leader and see if we missed something */
738 restore_toc_entries_postfork(AH, &pending_list);
739 Assert(AH->connection != NULL);
740 }
741 else
742 {
743 /*
744 * In serial mode, process everything in three phases: normal items,
745 * then ACLs, then post-ACL items. We might be able to skip one or
746 * both extra phases in some cases, eg data-only restores.
747 */
748 bool haveACL = false;
749 bool havePostACL = false;
750
751 for (te = AH->toc->next; te != AH->toc; te = te->next)
752 {
753 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
754 continue; /* ignore if not to be dumped at all */
755
756 switch (_tocEntryRestorePass(te))
757 {
759 (void) restore_toc_entry(AH, te, false);
760 break;
761 case RESTORE_PASS_ACL:
762 haveACL = true;
763 break;
765 havePostACL = true;
766 break;
767 }
768 }
769
770 if (haveACL)
771 {
772 for (te = AH->toc->next; te != AH->toc; te = te->next)
773 {
774 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
776 (void) restore_toc_entry(AH, te, false);
777 }
778 }
779
780 if (havePostACL)
781 {
782 for (te = AH->toc->next; te != AH->toc; te = te->next)
783 {
784 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
786 (void) restore_toc_entry(AH, te, false);
787 }
788 }
789 }
790
791 /*
792 * Close out any persistent transaction we may have. While these two
793 * cases are started in different places, we can end both cases here.
794 */
795 if (ropt->single_txn || ropt->txn_size > 0)
796 {
797 if (AH->connection)
799 else
800 ahprintf(AH, "COMMIT;\n\n");
801 }
802
803 if (AH->public.verbose)
804 dumpTimestamp(AH, "Completed on", time(NULL));
805
806 ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
807
808 /*
809 * Clean up & we're done.
810 */
812
814 RestoreOutput(AH, sav);
815
816 if (ropt->useDB)
818}
819
820/*
821 * Restore a single TOC item. Used in both parallel and non-parallel restore;
822 * is_parallel is true if we are in a worker child process.
823 *
824 * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
825 * the parallel parent has to make the corresponding status update.
826 */
827static int
828restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
829{
830 RestoreOptions *ropt = AH->public.ropt;
831 int status = WORKER_OK;
832 int reqs;
833 bool defnDumped;
834
835 AH->currentTE = te;
836
837 /* Dump any relevant dump warnings to stderr */
838 if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
839 {
840 if (ropt->dumpSchema && te->defn != NULL && strlen(te->defn) != 0)
841 pg_log_warning("warning from original dump file: %s", te->defn);
842 else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
843 pg_log_warning("warning from original dump file: %s", te->copyStmt);
844 }
845
846 /* Work out what, if anything, we want from this entry */
847 reqs = te->reqs;
848
849 defnDumped = false;
850
851 /*
852 * If it has a schema component that we want, then process that
853 */
854 if ((reqs & REQ_SCHEMA) != 0)
855 {
856 bool object_is_db = false;
857
858 /*
859 * In --transaction-size mode, must exit our transaction block to
860 * create a database or set its properties.
861 */
862 if (strcmp(te->desc, "DATABASE") == 0 ||
863 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
864 {
865 object_is_db = true;
866 if (ropt->txn_size > 0)
867 {
868 if (AH->connection)
870 else
871 ahprintf(AH, "COMMIT;\n\n");
872 }
873 }
874
875 /* Show namespace in log message if available */
876 if (te->namespace)
877 pg_log_info("creating %s \"%s.%s\"",
878 te->desc, te->namespace, te->tag);
879 else
880 pg_log_info("creating %s \"%s\"",
881 te->desc, te->tag);
882
884 defnDumped = true;
885
886 if (strcmp(te->desc, "TABLE") == 0)
887 {
888 if (AH->lastErrorTE == te)
889 {
890 /*
891 * We failed to create the table. If
892 * --no-data-for-failed-tables was given, mark the
893 * corresponding TABLE DATA to be ignored.
894 *
895 * In the parallel case this must be done in the parent, so we
896 * just set the return value.
897 */
898 if (ropt->noDataForFailedTables)
899 {
900 if (is_parallel)
901 status = WORKER_INHIBIT_DATA;
902 else
904 }
905 }
906 else
907 {
908 /*
909 * We created the table successfully. Mark the corresponding
910 * TABLE DATA for possible truncation.
911 *
912 * In the parallel case this must be done in the parent, so we
913 * just set the return value.
914 */
915 if (is_parallel)
916 status = WORKER_CREATE_DONE;
917 else
918 mark_create_done(AH, te);
919 }
920 }
921
922 /*
923 * If we created a DB, connect to it. Also, if we changed DB
924 * properties, reconnect to ensure that relevant GUC settings are
925 * applied to our session. (That also restarts the transaction block
926 * in --transaction-size mode.)
927 */
928 if (object_is_db)
929 {
930 pg_log_info("connecting to new database \"%s\"", te->tag);
931 _reconnectToDB(AH, te->tag);
932 }
933 }
934
935 /*
936 * If it has a data component that we want, then process that
937 */
938 if ((reqs & REQ_DATA) != 0)
939 {
940 /*
941 * hadDumper will be set if there is genuine data component for this
942 * node. Otherwise, we need to check the defn field for statements
943 * that need to be executed in data-only restores.
944 */
945 if (te->hadDumper)
946 {
947 /*
948 * If we can output the data, then restore it.
949 */
950 if (AH->PrintTocDataPtr != NULL)
951 {
953
954 if (strcmp(te->desc, "BLOBS") == 0 ||
955 strcmp(te->desc, "BLOB COMMENTS") == 0)
956 {
957 pg_log_info("processing %s", te->desc);
958
959 _selectOutputSchema(AH, "pg_catalog");
960
961 /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
962 if (strcmp(te->desc, "BLOB COMMENTS") == 0)
964
965 AH->PrintTocDataPtr(AH, te);
966
968 }
969 else
970 {
971 bool use_truncate;
972
974
975 /* Select owner and schema as necessary */
976 _becomeOwner(AH, te);
977 _selectOutputSchema(AH, te->namespace);
978
979 pg_log_info("processing data for table \"%s.%s\"",
980 te->namespace, te->tag);
981
982 /*
983 * In parallel restore, if we created the table earlier in
984 * this run (so that we know it is empty) and we are not
985 * restoring a load-via-partition-root data item then we
986 * wrap the COPY in a transaction and precede it with a
987 * TRUNCATE. If wal_level is set to minimal this prevents
988 * WAL-logging the COPY. This obtains a speedup similar
989 * to that from using single_txn mode in non-parallel
990 * restores.
991 *
992 * We mustn't do this for load-via-partition-root cases
993 * because some data might get moved across partition
994 * boundaries, risking deadlock and/or loss of previously
995 * loaded data. (We assume that all partitions of a
996 * partitioned table will be treated the same way.)
997 */
998 use_truncate = is_parallel && te->created &&
1000
1001 if (use_truncate)
1002 {
1003 /*
1004 * Parallel restore is always talking directly to a
1005 * server, so no need to see if we should issue BEGIN.
1006 */
1008
1009 /*
1010 * Issue TRUNCATE with ONLY so that child tables are
1011 * not wiped.
1012 */
1013 ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
1014 fmtQualifiedId(te->namespace, te->tag));
1015 }
1016
1017 /*
1018 * If we have a copy statement, use it.
1019 */
1020 if (te->copyStmt && strlen(te->copyStmt) > 0)
1021 {
1022 ahprintf(AH, "%s", te->copyStmt);
1024 }
1025 else
1027
1028 AH->PrintTocDataPtr(AH, te);
1029
1030 /*
1031 * Terminate COPY if needed.
1032 */
1033 if (AH->outputKind == OUTPUT_COPYDATA &&
1034 RestoringToDB(AH))
1035 EndDBCopyMode(&AH->public, te->tag);
1037
1038 /* close out the transaction started above */
1039 if (use_truncate)
1041
1043 }
1044 }
1045 }
1046 else if (!defnDumped)
1047 {
1048 /* If we haven't already dumped the defn part, do so now */
1049 pg_log_info("executing %s %s", te->desc, te->tag);
1051 }
1052 }
1053
1054 /*
1055 * If it has a statistics component that we want, then process that
1056 */
1057 if ((reqs & REQ_STATS) != 0)
1059
1060 /*
1061 * If we emitted anything for this TOC entry, that counts as one action
1062 * against the transaction-size limit. Commit if it's time to.
1063 */
1064 if ((reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 && ropt->txn_size > 0)
1065 {
1066 if (++AH->txnCount >= ropt->txn_size)
1067 {
1068 if (AH->connection)
1069 {
1072 }
1073 else
1074 ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
1075 AH->txnCount = 0;
1076 }
1077 }
1078
1079 if (AH->public.n_errors > 0 && status == WORKER_OK)
1080 status = WORKER_IGNORED_ERRORS;
1081
1082 return status;
1083}
1084
1085/*
1086 * Allocate a new RestoreOptions block.
1087 * This is mainly so we can initialize it, but also for future expansion,
1088 */
1091{
1093
1095
1096 /* set any fields that shouldn't default to zeroes */
1097 opts->format = archUnknown;
1098 opts->cparams.promptPassword = TRI_DEFAULT;
1099 opts->dumpSections = DUMP_UNSECTIONED;
1100 opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
1101 opts->compression_spec.level = 0;
1102 opts->dumpSchema = true;
1103 opts->dumpData = true;
1104 opts->dumpStatistics = true;
1105
1106 return opts;
1107}
1108
1109static void
1111{
1112 RestoreOptions *ropt = AH->public.ropt;
1113
1114 /* This hack is only needed in a data-only restore */
1115 if (ropt->dumpSchema || !ropt->disable_triggers)
1116 return;
1117
1118 pg_log_info("disabling triggers for %s", te->tag);
1119
1120 /*
1121 * Become superuser if possible, since they are the only ones who can
1122 * disable constraint triggers. If -S was not given, assume the initial
1123 * user identity is a superuser. (XXX would it be better to become the
1124 * table owner?)
1125 */
1126 _becomeUser(AH, ropt->superuser);
1127
1128 /*
1129 * Disable them.
1130 */
1131 ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1132 fmtQualifiedId(te->namespace, te->tag));
1133}
1134
1135static void
1137{
1138 RestoreOptions *ropt = AH->public.ropt;
1139
1140 /* This hack is only needed in a data-only restore */
1141 if (ropt->dumpSchema || !ropt->disable_triggers)
1142 return;
1143
1144 pg_log_info("enabling triggers for %s", te->tag);
1145
1146 /*
1147 * Become superuser if possible, since they are the only ones who can
1148 * disable constraint triggers. If -S was not given, assume the initial
1149 * user identity is a superuser. (XXX would it be better to become the
1150 * table owner?)
1151 */
1152 _becomeUser(AH, ropt->superuser);
1153
1154 /*
1155 * Enable them.
1156 */
1157 ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1158 fmtQualifiedId(te->namespace, te->tag));
1159}
1160
1161/*
1162 * Detect whether a TABLE DATA TOC item is performing "load via partition
1163 * root", that is the target table is an ancestor partition rather than the
1164 * table the TOC item is nominally for.
1165 *
1166 * In newer archive files this can be detected by checking for a special
1167 * comment placed in te->defn. In older files we have to fall back to seeing
1168 * if the COPY statement targets the named table or some other one. This
1169 * will not work for data dumped as INSERT commands, so we could give a false
1170 * negative in that case; fortunately, that's a rarely-used option.
1171 */
1172static bool
1174{
1175 if (te->defn &&
1176 strncmp(te->defn, "-- load via partition root ", 27) == 0)
1177 return true;
1178 if (te->copyStmt && *te->copyStmt)
1179 {
1180 PQExpBuffer copyStmt = createPQExpBuffer();
1181 bool result;
1182
1183 /*
1184 * Build the initial part of the COPY as it would appear if the
1185 * nominal target table is the actual target. If we see anything
1186 * else, it must be a load-via-partition-root case.
1187 */
1188 appendPQExpBuffer(copyStmt, "COPY %s ",
1189 fmtQualifiedId(te->namespace, te->tag));
1190 result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1191 destroyPQExpBuffer(copyStmt);
1192 return result;
1193 }
1194 /* Assume it's not load-via-partition-root */
1195 return false;
1196}
1197
1198/*
1199 * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1200 */
1201
1202/* Public */
1203void
1204WriteData(Archive *AHX, const void *data, size_t dLen)
1205{
1206 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1207
1208 if (!AH->currToc)
1209 pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1210
1211 AH->WriteDataPtr(AH, data, dLen);
1212}
1213
1214/*
1215 * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1216 * repository for all metadata. But the name has stuck.
1217 *
1218 * The new entry is added to the Archive's TOC list. Most callers can ignore
1219 * the result value because nothing else need be done, but a few want to
1220 * manipulate the TOC entry further.
1221 */
1222
1223/* Public */
1224TocEntry *
1225ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1227{
1228 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1229 TocEntry *newToc;
1230
1231 newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1232
1233 AH->tocCount++;
1234 if (dumpId > AH->maxDumpId)
1235 AH->maxDumpId = dumpId;
1236
1237 newToc->prev = AH->toc->prev;
1238 newToc->next = AH->toc;
1239 AH->toc->prev->next = newToc;
1240 AH->toc->prev = newToc;
1241
1242 newToc->catalogId = catalogId;
1243 newToc->dumpId = dumpId;
1244 newToc->section = opts->section;
1245
1246 newToc->tag = pg_strdup(opts->tag);
1247 newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1248 newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1249 newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1250 newToc->relkind = opts->relkind;
1251 newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1252 newToc->desc = pg_strdup(opts->description);
1253 newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1254 newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1255 newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1256
1257 if (opts->nDeps > 0)
1258 {
1259 newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1260 memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1261 newToc->nDeps = opts->nDeps;
1262 }
1263 else
1264 {
1265 newToc->dependencies = NULL;
1266 newToc->nDeps = 0;
1267 }
1268
1269 newToc->dataDumper = opts->dumpFn;
1270 newToc->dataDumperArg = opts->dumpArg;
1271 newToc->hadDumper = opts->dumpFn ? true : false;
1272
1273 newToc->defnDumper = opts->defnFn;
1274 newToc->defnDumperArg = opts->defnArg;
1275
1276 newToc->formatData = NULL;
1277 newToc->dataLength = 0;
1278
1279 if (AH->ArchiveEntryPtr != NULL)
1280 AH->ArchiveEntryPtr(AH, newToc);
1281
1282 return newToc;
1283}
1284
1285/* Public */
1286void
1288{
1289 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1290 RestoreOptions *ropt = AH->public.ropt;
1291 TocEntry *te;
1292 pg_compress_specification out_compression_spec = {0};
1293 teSection curSection;
1294 CompressFileHandle *sav;
1295 const char *fmtName;
1296 char stamp_str[64];
1297
1298 /* TOC is always uncompressed */
1299 out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1300
1301 sav = SaveOutput(AH);
1302 if (ropt->filename)
1303 SetOutput(AH, ropt->filename, out_compression_spec, false);
1304
1305 if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1306 localtime(&AH->createDate)) == 0)
1307 strcpy(stamp_str, "[unknown]");
1308
1309 ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1310 ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1311 sanitize_line(AH->archdbname, false),
1312 AH->tocCount,
1314
1315 switch (AH->format)
1316 {
1317 case archCustom:
1318 fmtName = "CUSTOM";
1319 break;
1320 case archDirectory:
1321 fmtName = "DIRECTORY";
1322 break;
1323 case archTar:
1324 fmtName = "TAR";
1325 break;
1326 default:
1327 fmtName = "UNKNOWN";
1328 }
1329
1330 ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1332 ahprintf(AH, "; Format: %s\n", fmtName);
1333 ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1334 ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1335 if (AH->archiveRemoteVersion)
1336 ahprintf(AH, "; Dumped from database version: %s\n",
1338 if (AH->archiveDumpVersion)
1339 ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1340 AH->archiveDumpVersion);
1341
1342 ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1343
1344 curSection = SECTION_PRE_DATA;
1345 for (te = AH->toc->next; te != AH->toc; te = te->next)
1346 {
1347 /* This bit must match ProcessArchiveRestoreOptions' marking logic */
1348 if (te->section != SECTION_NONE)
1349 curSection = te->section;
1350 te->reqs = _tocEntryRequired(te, curSection, AH);
1351 /* Now, should we print it? */
1352 if (ropt->verbose ||
1353 (te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0)
1354 {
1355 char *sanitized_name;
1356 char *sanitized_schema;
1357 char *sanitized_owner;
1358
1359 /*
1360 */
1361 sanitized_name = sanitize_line(te->tag, false);
1362 sanitized_schema = sanitize_line(te->namespace, true);
1363 sanitized_owner = sanitize_line(te->owner, false);
1364
1365 ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1367 te->desc, sanitized_schema, sanitized_name,
1368 sanitized_owner);
1369
1370 free(sanitized_name);
1371 free(sanitized_schema);
1372 free(sanitized_owner);
1373 }
1374 if (ropt->verbose && te->nDeps > 0)
1375 {
1376 int i;
1377
1378 ahprintf(AH, ";\tdepends on:");
1379 for (i = 0; i < te->nDeps; i++)
1380 ahprintf(AH, " %d", te->dependencies[i]);
1381 ahprintf(AH, "\n");
1382 }
1383 }
1384
1385 /* Enforce strict names checking */
1386 if (ropt->strict_names)
1387 StrictNamesCheck(ropt);
1388
1389 if (ropt->filename)
1390 RestoreOutput(AH, sav);
1391}
1392
1393/***********
1394 * Large Object Archival
1395 ***********/
1396
1397/* Called by a dumper to signal start of a LO */
1398int
1400{
1401 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1402
1403 if (!AH->StartLOPtr)
1404 pg_fatal("large-object output not supported in chosen format");
1405
1406 AH->StartLOPtr(AH, AH->currToc, oid);
1407
1408 return 1;
1409}
1410
1411/* Called by a dumper to signal end of a LO */
1412int
1414{
1415 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1416
1417 if (AH->EndLOPtr)
1418 AH->EndLOPtr(AH, AH->currToc, oid);
1419
1420 return 1;
1421}
1422
1423/**********
1424 * Large Object Restoration
1425 **********/
1426
1427/*
1428 * Called by a format handler before a group of LOs is restored
1429 */
1430void
1432{
1433 RestoreOptions *ropt = AH->public.ropt;
1434
1435 /*
1436 * LOs must be restored within a transaction block, since we need the LO
1437 * handle to stay open while we write it. Establish a transaction unless
1438 * there's one being used globally.
1439 */
1440 if (!(ropt->single_txn || ropt->txn_size > 0))
1441 {
1442 if (AH->connection)
1444 else
1445 ahprintf(AH, "BEGIN;\n\n");
1446 }
1447
1448 AH->loCount = 0;
1449}
1450
1451/*
1452 * Called by a format handler after a group of LOs is restored
1453 */
1454void
1456{
1457 RestoreOptions *ropt = AH->public.ropt;
1458
1459 if (!(ropt->single_txn || ropt->txn_size > 0))
1460 {
1461 if (AH->connection)
1463 else
1464 ahprintf(AH, "COMMIT;\n\n");
1465 }
1466
1467 pg_log_info(ngettext("restored %d large object",
1468 "restored %d large objects",
1469 AH->loCount),
1470 AH->loCount);
1471}
1472
1473
1474/*
1475 * Called by a format handler to initiate restoration of a LO
1476 */
1477void
1479{
1480 bool old_lo_style = (AH->version < K_VERS_1_12);
1481 Oid loOid;
1482
1483 AH->loCount++;
1484
1485 /* Initialize the LO Buffer */
1486 if (AH->lo_buf == NULL)
1487 {
1488 /* First time through (in this process) so allocate the buffer */
1489 AH->lo_buf_size = LOBBUFSIZE;
1491 }
1492 AH->lo_buf_used = 0;
1493
1494 pg_log_info("restoring large object with OID %u", oid);
1495
1496 /* With an old archive we must do drop and create logic here */
1497 if (old_lo_style && drop)
1498 DropLOIfExists(AH, oid);
1499
1500 if (AH->connection)
1501 {
1502 if (old_lo_style)
1503 {
1504 loOid = lo_create(AH->connection, oid);
1505 if (loOid == 0 || loOid != oid)
1506 pg_fatal("could not create large object %u: %s",
1507 oid, PQerrorMessage(AH->connection));
1508 }
1509 AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1510 if (AH->loFd == -1)
1511 pg_fatal("could not open large object %u: %s",
1512 oid, PQerrorMessage(AH->connection));
1513 }
1514 else
1515 {
1516 if (old_lo_style)
1517 ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1518 oid, INV_WRITE);
1519 else
1520 ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1521 oid, INV_WRITE);
1522 }
1523
1524 AH->writingLO = true;
1525}
1526
1527void
1529{
1530 if (AH->lo_buf_used > 0)
1531 {
1532 /* Write remaining bytes from the LO buffer */
1533 dump_lo_buf(AH);
1534 }
1535
1536 AH->writingLO = false;
1537
1538 if (AH->connection)
1539 {
1540 lo_close(AH->connection, AH->loFd);
1541 AH->loFd = -1;
1542 }
1543 else
1544 {
1545 ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1546 }
1547}
1548
1549/***********
1550 * Sorting and Reordering
1551 ***********/
1552
1553void
1555{
1556 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1557 RestoreOptions *ropt = AH->public.ropt;
1558 FILE *fh;
1559 StringInfoData linebuf;
1560
1561 /* Allocate space for the 'wanted' array, and init it */
1562 ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1563
1564 /* Setup the file */
1565 fh = fopen(ropt->tocFile, PG_BINARY_R);
1566 if (!fh)
1567 pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1568
1569 initStringInfo(&linebuf);
1570
1571 while (pg_get_line_buf(fh, &linebuf))
1572 {
1573 char *cmnt;
1574 char *endptr;
1575 DumpId id;
1576 TocEntry *te;
1577
1578 /* Truncate line at comment, if any */
1579 cmnt = strchr(linebuf.data, ';');
1580 if (cmnt != NULL)
1581 {
1582 cmnt[0] = '\0';
1583 linebuf.len = cmnt - linebuf.data;
1584 }
1585
1586 /* Ignore if all blank */
1587 if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1588 continue;
1589
1590 /* Get an ID, check it's valid and not already seen */
1591 id = strtol(linebuf.data, &endptr, 10);
1592 if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1593 ropt->idWanted[id - 1])
1594 {
1595 pg_log_warning("line ignored: %s", linebuf.data);
1596 continue;
1597 }
1598
1599 /* Find TOC entry */
1600 te = getTocEntryByDumpId(AH, id);
1601 if (!te)
1602 pg_fatal("could not find entry for ID %d",
1603 id);
1604
1605 /* Mark it wanted */
1606 ropt->idWanted[id - 1] = true;
1607
1608 /*
1609 * Move each item to the end of the list as it is selected, so that
1610 * they are placed in the desired order. Any unwanted items will end
1611 * up at the front of the list, which may seem unintuitive but it's
1612 * what we need. In an ordinary serial restore that makes no
1613 * difference, but in a parallel restore we need to mark unrestored
1614 * items' dependencies as satisfied before we start examining
1615 * restorable items. Otherwise they could have surprising
1616 * side-effects on the order in which restorable items actually get
1617 * restored.
1618 */
1619 _moveBefore(AH->toc, te);
1620 }
1621
1622 pg_free(linebuf.data);
1623
1624 if (fclose(fh) != 0)
1625 pg_fatal("could not close TOC file: %m");
1626}
1627
1628/**********************
1629 * Convenience functions that look like standard IO functions
1630 * for writing data when in dump mode.
1631 **********************/
1632
1633/* Public */
1634void
1635archputs(const char *s, Archive *AH)
1636{
1637 WriteData(AH, s, strlen(s));
1638}
1639
1640/* Public */
1641int
1642archprintf(Archive *AH, const char *fmt,...)
1643{
1644 int save_errno = errno;
1645 char *p;
1646 size_t len = 128; /* initial assumption about buffer size */
1647 size_t cnt;
1648
1649 for (;;)
1650 {
1651 va_list args;
1652
1653 /* Allocate work buffer. */
1654 p = (char *) pg_malloc(len);
1655
1656 /* Try to format the data. */
1657 errno = save_errno;
1658 va_start(args, fmt);
1659 cnt = pvsnprintf(p, len, fmt, args);
1660 va_end(args);
1661
1662 if (cnt < len)
1663 break; /* success */
1664
1665 /* Release buffer and loop around to try again with larger len. */
1666 free(p);
1667 len = cnt;
1668 }
1669
1670 WriteData(AH, p, cnt);
1671 free(p);
1672 return (int) cnt;
1673}
1674
1675
1676/*******************************
1677 * Stuff below here should be 'private' to the archiver routines
1678 *******************************/
1679
1680static void
1682 const pg_compress_specification compression_spec,
1683 bool append_data)
1684{
1685 CompressFileHandle *CFH;
1686 const char *mode;
1687 int fn = -1;
1688
1689 if (filename)
1690 {
1691 if (strcmp(filename, "-") == 0)
1692 fn = fileno(stdout);
1693 }
1694 else if (AH->FH)
1695 fn = fileno(AH->FH);
1696 else if (AH->fSpec)
1697 {
1698 filename = AH->fSpec;
1699 }
1700 else
1701 fn = fileno(stdout);
1702
1703 if (append_data || AH->mode == archModeAppend)
1704 mode = PG_BINARY_A;
1705 else
1706 mode = PG_BINARY_W;
1707
1708 CFH = InitCompressFileHandle(compression_spec);
1709
1710 if (!CFH->open_func(filename, fn, mode, CFH))
1711 {
1712 if (filename)
1713 pg_fatal("could not open output file \"%s\": %m", filename);
1714 else
1715 pg_fatal("could not open output file: %m");
1716 }
1717
1718 AH->OF = CFH;
1719}
1720
1721static CompressFileHandle *
1723{
1724 return (CompressFileHandle *) AH->OF;
1725}
1726
1727static void
1729{
1730 errno = 0;
1731 if (!EndCompressFileHandle(AH->OF))
1732 pg_fatal("could not close output file: %m");
1733
1734 AH->OF = savedOutput;
1735}
1736
1737
1738
1739/*
1740 * Print formatted text to the output file (usually stdout).
1741 */
1742int
1743ahprintf(ArchiveHandle *AH, const char *fmt,...)
1744{
1745 int save_errno = errno;
1746 char *p;
1747 size_t len = 128; /* initial assumption about buffer size */
1748 size_t cnt;
1749
1750 for (;;)
1751 {
1752 va_list args;
1753
1754 /* Allocate work buffer. */
1755 p = (char *) pg_malloc(len);
1756
1757 /* Try to format the data. */
1758 errno = save_errno;
1759 va_start(args, fmt);
1760 cnt = pvsnprintf(p, len, fmt, args);
1761 va_end(args);
1762
1763 if (cnt < len)
1764 break; /* success */
1765
1766 /* Release buffer and loop around to try again with larger len. */
1767 free(p);
1768 len = cnt;
1769 }
1770
1771 ahwrite(p, 1, cnt, AH);
1772 free(p);
1773 return (int) cnt;
1774}
1775
1776/*
1777 * Single place for logic which says 'We are restoring to a direct DB connection'.
1778 */
1779static int
1781{
1782 RestoreOptions *ropt = AH->public.ropt;
1783
1784 return (ropt && ropt->useDB && AH->connection);
1785}
1786
1787/*
1788 * Dump the current contents of the LO data buffer while writing a LO
1789 */
1790static void
1792{
1793 if (AH->connection)
1794 {
1795 int res;
1796
1797 res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1798 pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1799 "wrote %zu bytes of large object data (result = %d)",
1800 AH->lo_buf_used),
1801 AH->lo_buf_used, res);
1802 /* We assume there are no short writes, only errors */
1803 if (res != AH->lo_buf_used)
1804 warn_or_exit_horribly(AH, "could not write to large object: %s",
1806 }
1807 else
1808 {
1810
1812 (const unsigned char *) AH->lo_buf,
1813 AH->lo_buf_used,
1814 AH);
1815
1816 /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1817 AH->writingLO = false;
1818 ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1819 AH->writingLO = true;
1820
1822 }
1823 AH->lo_buf_used = 0;
1824}
1825
1826
1827/*
1828 * Write buffer to the output file (usually stdout). This is used for
1829 * outputting 'restore' scripts etc. It is even possible for an archive
1830 * format to create a custom output routine to 'fake' a restore if it
1831 * wants to generate a script (see TAR output).
1832 */
1833void
1834ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1835{
1836 int bytes_written = 0;
1837
1838 if (AH->writingLO)
1839 {
1840 size_t remaining = size * nmemb;
1841
1842 while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1843 {
1844 size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1845
1846 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1847 ptr = (const char *) ptr + avail;
1848 remaining -= avail;
1849 AH->lo_buf_used += avail;
1850 dump_lo_buf(AH);
1851 }
1852
1853 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1854 AH->lo_buf_used += remaining;
1855
1856 bytes_written = size * nmemb;
1857 }
1858 else if (AH->CustomOutPtr)
1859 bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1860
1861 /*
1862 * If we're doing a restore, and it's direct to DB, and we're connected
1863 * then send it to the DB.
1864 */
1865 else if (RestoringToDB(AH))
1866 bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1867 else
1868 {
1870
1871 if (CFH->write_func(ptr, size * nmemb, CFH))
1872 bytes_written = size * nmemb;
1873 }
1874
1875 if (bytes_written != size * nmemb)
1877}
1878
1879/* on some error, we may decide to go on... */
1880void
1881warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
1882{
1883 va_list ap;
1884
1885 switch (AH->stage)
1886 {
1887
1888 case STAGE_NONE:
1889 /* Do nothing special */
1890 break;
1891
1892 case STAGE_INITIALIZING:
1893 if (AH->stage != AH->lastErrorStage)
1894 pg_log_info("while INITIALIZING:");
1895 break;
1896
1897 case STAGE_PROCESSING:
1898 if (AH->stage != AH->lastErrorStage)
1899 pg_log_info("while PROCESSING TOC:");
1900 break;
1901
1902 case STAGE_FINALIZING:
1903 if (AH->stage != AH->lastErrorStage)
1904 pg_log_info("while FINALIZING:");
1905 break;
1906 }
1907 if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1908 {
1909 pg_log_info("from TOC entry %d; %u %u %s %s %s",
1910 AH->currentTE->dumpId,
1912 AH->currentTE->catalogId.oid,
1913 AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1914 AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1915 AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1916 }
1917 AH->lastErrorStage = AH->stage;
1918 AH->lastErrorTE = AH->currentTE;
1919
1920 va_start(ap, fmt);
1922 va_end(ap);
1923
1924 if (AH->public.exit_on_error)
1925 exit_nicely(1);
1926 else
1927 AH->public.n_errors++;
1928}
1929
1930#ifdef NOT_USED
1931
1932static void
1933_moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1934{
1935 /* Unlink te from list */
1936 te->prev->next = te->next;
1937 te->next->prev = te->prev;
1938
1939 /* and insert it after "pos" */
1940 te->prev = pos;
1941 te->next = pos->next;
1942 pos->next->prev = te;
1943 pos->next = te;
1944}
1945#endif
1946
1947static void
1949{
1950 /* Unlink te from list */
1951 te->prev->next = te->next;
1952 te->next->prev = te->prev;
1953
1954 /* and insert it before "pos" */
1955 te->prev = pos->prev;
1956 te->next = pos;
1957 pos->prev->next = te;
1958 pos->prev = te;
1959}
1960
1961/*
1962 * Build index arrays for the TOC list
1963 *
1964 * This should be invoked only after we have created or read in all the TOC
1965 * items.
1966 *
1967 * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1968 * array entries run only up to maxDumpId. We might see dependency dump IDs
1969 * beyond that (if the dump was partial); so always check the array bound
1970 * before trying to touch an array entry.
1971 */
1972static void
1974{
1975 DumpId maxDumpId = AH->maxDumpId;
1976 TocEntry *te;
1977
1978 AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1979 AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1980
1981 for (te = AH->toc->next; te != AH->toc; te = te->next)
1982 {
1983 /* this check is purely paranoia, maxDumpId should be correct */
1984 if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1985 pg_fatal("bad dumpId");
1986
1987 /* tocsByDumpId indexes all TOCs by their dump ID */
1988 AH->tocsByDumpId[te->dumpId] = te;
1989
1990 /*
1991 * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1992 * TOC entry that has a DATA item. We compute this by reversing the
1993 * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1994 * just one dependency and it is the TABLE item.
1995 */
1996 if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1997 {
1998 DumpId tableId = te->dependencies[0];
1999
2000 /*
2001 * The TABLE item might not have been in the archive, if this was
2002 * a data-only dump; but its dump ID should be less than its data
2003 * item's dump ID, so there should be a place for it in the array.
2004 */
2005 if (tableId <= 0 || tableId > maxDumpId)
2006 pg_fatal("bad table dumpId for TABLE DATA item");
2007
2008 AH->tableDataId[tableId] = te->dumpId;
2009 }
2010 }
2011}
2012
2013TocEntry *
2015{
2016 /* build index arrays if we didn't already */
2017 if (AH->tocsByDumpId == NULL)
2019
2020 if (id > 0 && id <= AH->maxDumpId)
2021 return AH->tocsByDumpId[id];
2022
2023 return NULL;
2024}
2025
2026int
2028{
2029 TocEntry *te = getTocEntryByDumpId(AH, id);
2030
2031 if (!te)
2032 return 0;
2033
2034 return te->reqs;
2035}
2036
2037size_t
2039{
2040 int off;
2041
2042 /* Save the flag */
2043 AH->WriteBytePtr(AH, wasSet);
2044
2045 /* Write out pgoff_t smallest byte first, prevents endian mismatch */
2046 for (off = 0; off < sizeof(pgoff_t); off++)
2047 {
2048 AH->WriteBytePtr(AH, o & 0xFF);
2049 o >>= 8;
2050 }
2051 return sizeof(pgoff_t) + 1;
2052}
2053
2054int
2056{
2057 int i;
2058 int off;
2059 int offsetFlg;
2060
2061 /* Initialize to zero */
2062 *o = 0;
2063
2064 /* Check for old version */
2065 if (AH->version < K_VERS_1_7)
2066 {
2067 /* Prior versions wrote offsets using WriteInt */
2068 i = ReadInt(AH);
2069 /* -1 means not set */
2070 if (i < 0)
2071 return K_OFFSET_POS_NOT_SET;
2072 else if (i == 0)
2073 return K_OFFSET_NO_DATA;
2074
2075 /* Cast to pgoff_t because it was written as an int. */
2076 *o = (pgoff_t) i;
2077 return K_OFFSET_POS_SET;
2078 }
2079
2080 /*
2081 * Read the flag indicating the state of the data pointer. Check if valid
2082 * and die if not.
2083 *
2084 * This used to be handled by a negative or zero pointer, now we use an
2085 * extra byte specifically for the state.
2086 */
2087 offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
2088
2089 switch (offsetFlg)
2090 {
2092 case K_OFFSET_NO_DATA:
2093 case K_OFFSET_POS_SET:
2094
2095 break;
2096
2097 default:
2098 pg_fatal("unexpected data offset flag %d", offsetFlg);
2099 }
2100
2101 /*
2102 * Read the bytes
2103 */
2104 for (off = 0; off < AH->offSize; off++)
2105 {
2106 if (off < sizeof(pgoff_t))
2107 *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
2108 else
2109 {
2110 if (AH->ReadBytePtr(AH) != 0)
2111 pg_fatal("file offset in dump file is too large");
2112 }
2113 }
2114
2115 return offsetFlg;
2116}
2117
2118size_t
2120{
2121 int b;
2122
2123 /*
2124 * This is a bit yucky, but I don't want to make the binary format very
2125 * dependent on representation, and not knowing much about it, I write out
2126 * a sign byte. If you change this, don't forget to change the file
2127 * version #, and modify ReadInt to read the new format AS WELL AS the old
2128 * formats.
2129 */
2130
2131 /* SIGN byte */
2132 if (i < 0)
2133 {
2134 AH->WriteBytePtr(AH, 1);
2135 i = -i;
2136 }
2137 else
2138 AH->WriteBytePtr(AH, 0);
2139
2140 for (b = 0; b < AH->intSize; b++)
2141 {
2142 AH->WriteBytePtr(AH, i & 0xFF);
2143 i >>= 8;
2144 }
2145
2146 return AH->intSize + 1;
2147}
2148
2149int
2151{
2152 int res = 0;
2153 int bv,
2154 b;
2155 int sign = 0; /* Default positive */
2156 int bitShift = 0;
2157
2158 if (AH->version > K_VERS_1_0)
2159 /* Read a sign byte */
2160 sign = AH->ReadBytePtr(AH);
2161
2162 for (b = 0; b < AH->intSize; b++)
2163 {
2164 bv = AH->ReadBytePtr(AH) & 0xFF;
2165 if (bv != 0)
2166 res = res + (bv << bitShift);
2167 bitShift += 8;
2168 }
2169
2170 if (sign)
2171 res = -res;
2172
2173 return res;
2174}
2175
2176size_t
2177WriteStr(ArchiveHandle *AH, const char *c)
2178{
2179 size_t res;
2180
2181 if (c)
2182 {
2183 int len = strlen(c);
2184
2185 res = WriteInt(AH, len);
2186 AH->WriteBufPtr(AH, c, len);
2187 res += len;
2188 }
2189 else
2190 res = WriteInt(AH, -1);
2191
2192 return res;
2193}
2194
2195char *
2197{
2198 char *buf;
2199 int l;
2200
2201 l = ReadInt(AH);
2202 if (l < 0)
2203 buf = NULL;
2204 else
2205 {
2206 buf = (char *) pg_malloc(l + 1);
2207 AH->ReadBufPtr(AH, buf, l);
2208
2209 buf[l] = '\0';
2210 }
2211
2212 return buf;
2213}
2214
2215static bool
2216_fileExistsInDirectory(const char *dir, const char *filename)
2217{
2218 struct stat st;
2219 char buf[MAXPGPATH];
2220
2221 if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2222 pg_fatal("directory name too long: \"%s\"", dir);
2223
2224 return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2225}
2226
2227static int
2229{
2230 FILE *fh;
2231 char sig[6]; /* More than enough */
2232 size_t cnt;
2233 int wantClose = 0;
2234
2235 pg_log_debug("attempting to ascertain archive format");
2236
2237 free(AH->lookahead);
2238
2239 AH->readHeader = 0;
2240 AH->lookaheadSize = 512;
2241 AH->lookahead = pg_malloc0(512);
2242 AH->lookaheadLen = 0;
2243 AH->lookaheadPos = 0;
2244
2245 if (AH->fSpec)
2246 {
2247 struct stat st;
2248
2249 wantClose = 1;
2250
2251 /*
2252 * Check if the specified archive is a directory. If so, check if
2253 * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2254 */
2255 if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2256 {
2257 AH->format = archDirectory;
2258 if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2259 return AH->format;
2260#ifdef HAVE_LIBZ
2261 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2262 return AH->format;
2263#endif
2264#ifdef USE_LZ4
2265 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2266 return AH->format;
2267#endif
2268#ifdef USE_ZSTD
2269 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2270 return AH->format;
2271#endif
2272 pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2273 AH->fSpec);
2274 fh = NULL; /* keep compiler quiet */
2275 }
2276 else
2277 {
2278 fh = fopen(AH->fSpec, PG_BINARY_R);
2279 if (!fh)
2280 pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2281 }
2282 }
2283 else
2284 {
2285 fh = stdin;
2286 if (!fh)
2287 pg_fatal("could not open input file: %m");
2288 }
2289
2290 if ((cnt = fread(sig, 1, 5, fh)) != 5)
2291 {
2292 if (ferror(fh))
2293 pg_fatal("could not read input file: %m");
2294 else
2295 pg_fatal("input file is too short (read %lu, expected 5)",
2296 (unsigned long) cnt);
2297 }
2298
2299 /* Save it, just in case we need it later */
2300 memcpy(&AH->lookahead[0], sig, 5);
2301 AH->lookaheadLen = 5;
2302
2303 if (strncmp(sig, "PGDMP", 5) == 0)
2304 {
2305 /* It's custom format, stop here */
2306 AH->format = archCustom;
2307 AH->readHeader = 1;
2308 }
2309 else
2310 {
2311 /*
2312 * *Maybe* we have a tar archive format file or a text dump ... So,
2313 * read first 512 byte header...
2314 */
2315 cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2316 /* read failure is checked below */
2317 AH->lookaheadLen += cnt;
2318
2319 if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2320 (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2321 strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2322 {
2323 /*
2324 * looks like it's probably a text format dump. so suggest they
2325 * try psql
2326 */
2327 pg_fatal("input file appears to be a text format dump. Please use psql.");
2328 }
2329
2330 if (AH->lookaheadLen != 512)
2331 {
2332 if (feof(fh))
2333 pg_fatal("input file does not appear to be a valid archive (too short?)");
2334 else
2335 READ_ERROR_EXIT(fh);
2336 }
2337
2338 if (!isValidTarHeader(AH->lookahead))
2339 pg_fatal("input file does not appear to be a valid archive");
2340
2341 AH->format = archTar;
2342 }
2343
2344 /* Close the file if we opened it */
2345 if (wantClose)
2346 {
2347 if (fclose(fh) != 0)
2348 pg_fatal("could not close input file: %m");
2349 /* Forget lookahead, since we'll re-read header after re-opening */
2350 AH->readHeader = 0;
2351 AH->lookaheadLen = 0;
2352 }
2353
2354 return AH->format;
2355}
2356
2357
2358/*
2359 * Allocate an archive handle
2360 */
2361static ArchiveHandle *
2362_allocAH(const char *FileSpec, const ArchiveFormat fmt,
2363 const pg_compress_specification compression_spec,
2364 bool dosync, ArchiveMode mode,
2366{
2367 ArchiveHandle *AH;
2368 CompressFileHandle *CFH;
2369 pg_compress_specification out_compress_spec = {0};
2370
2371 pg_log_debug("allocating AH for %s, format %d",
2372 FileSpec ? FileSpec : "(stdio)", fmt);
2373
2374 AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2375
2376 AH->version = K_VERS_SELF;
2377
2378 /* initialize for backwards compatible string processing */
2379 AH->public.encoding = 0; /* PG_SQL_ASCII */
2380 AH->public.std_strings = false;
2381
2382 /* sql error handling */
2383 AH->public.exit_on_error = true;
2384 AH->public.n_errors = 0;
2385
2386 AH->archiveDumpVersion = PG_VERSION;
2387
2388 AH->createDate = time(NULL);
2389
2390 AH->intSize = sizeof(int);
2391 AH->offSize = sizeof(pgoff_t);
2392 if (FileSpec)
2393 {
2394 AH->fSpec = pg_strdup(FileSpec);
2395
2396 /*
2397 * Not used; maybe later....
2398 *
2399 * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2400 * i--) if (AH->workDir[i-1] == '/')
2401 */
2402 }
2403 else
2404 AH->fSpec = NULL;
2405
2406 AH->currUser = NULL; /* unknown */
2407 AH->currSchema = NULL; /* ditto */
2408 AH->currTablespace = NULL; /* ditto */
2409 AH->currTableAm = NULL; /* ditto */
2410
2411 AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2412
2413 AH->toc->next = AH->toc;
2414 AH->toc->prev = AH->toc;
2415
2416 AH->mode = mode;
2417 AH->compression_spec = compression_spec;
2418 AH->dosync = dosync;
2420
2421 memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2422
2423 /* Open stdout with no compression for AH output handle */
2424 out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2425 CFH = InitCompressFileHandle(out_compress_spec);
2426 if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2427 pg_fatal("could not open stdout for appending: %m");
2428 AH->OF = CFH;
2429
2430 /*
2431 * On Windows, we need to use binary mode to read/write non-text files,
2432 * which include all archive formats as well as compressed plain text.
2433 * Force stdin/stdout into binary mode if that is what we are using.
2434 */
2435#ifdef WIN32
2436 if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2437 (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2438 {
2439 if (mode == archModeWrite)
2440 _setmode(fileno(stdout), O_BINARY);
2441 else
2442 _setmode(fileno(stdin), O_BINARY);
2443 }
2444#endif
2445
2446 AH->SetupWorkerPtr = setupWorkerPtr;
2447
2448 if (fmt == archUnknown)
2450 else
2451 AH->format = fmt;
2452
2453 switch (AH->format)
2454 {
2455 case archCustom:
2457 break;
2458
2459 case archNull:
2461 break;
2462
2463 case archDirectory:
2465 break;
2466
2467 case archTar:
2469 break;
2470
2471 default:
2472 pg_fatal("unrecognized file format \"%d\"", AH->format);
2473 }
2474
2475 return AH;
2476}
2477
2478/*
2479 * Write out all data (tables & LOs)
2480 */
2481void
2483{
2484 TocEntry *te;
2485
2486 if (pstate && pstate->numWorkers > 1)
2487 {
2488 /*
2489 * In parallel mode, this code runs in the leader process. We
2490 * construct an array of candidate TEs, then sort it into decreasing
2491 * size order, then dispatch each TE to a data-transfer worker. By
2492 * dumping larger tables first, we avoid getting into a situation
2493 * where we're down to one job and it's big, losing parallelism.
2494 */
2495 TocEntry **tes;
2496 int ntes;
2497
2498 tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2499 ntes = 0;
2500 for (te = AH->toc->next; te != AH->toc; te = te->next)
2501 {
2502 /* Consider only TEs with dataDumper functions ... */
2503 if (!te->dataDumper)
2504 continue;
2505 /* ... and ignore ones not enabled for dump */
2506 if ((te->reqs & REQ_DATA) == 0)
2507 continue;
2508
2509 tes[ntes++] = te;
2510 }
2511
2512 if (ntes > 1)
2513 qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
2514
2515 for (int i = 0; i < ntes; i++)
2516 DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2517 mark_dump_job_done, NULL);
2518
2519 pg_free(tes);
2520
2521 /* Now wait for workers to finish. */
2522 WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2523 }
2524 else
2525 {
2526 /* Non-parallel mode: just dump all candidate TEs sequentially. */
2527 for (te = AH->toc->next; te != AH->toc; te = te->next)
2528 {
2529 /* Must have same filter conditions as above */
2530 if (!te->dataDumper)
2531 continue;
2532 if ((te->reqs & REQ_DATA) == 0)
2533 continue;
2534
2536 }
2537 }
2538}
2539
2540
2541/*
2542 * Callback function that's invoked in the leader process after a step has
2543 * been parallel dumped.
2544 *
2545 * We don't need to do anything except check for worker failure.
2546 */
2547static void
2549 TocEntry *te,
2550 int status,
2551 void *callback_data)
2552{
2553 pg_log_info("finished item %d %s %s",
2554 te->dumpId, te->desc, te->tag);
2555
2556 if (status != 0)
2557 pg_fatal("worker process failed: exit code %d",
2558 status);
2559}
2560
2561
2562void
2564{
2565 StartDataPtrType startPtr;
2566 EndDataPtrType endPtr;
2567
2568 AH->currToc = te;
2569
2570 if (strcmp(te->desc, "BLOBS") == 0)
2571 {
2572 startPtr = AH->StartLOsPtr;
2573 endPtr = AH->EndLOsPtr;
2574 }
2575 else
2576 {
2577 startPtr = AH->StartDataPtr;
2578 endPtr = AH->EndDataPtr;
2579 }
2580
2581 if (startPtr != NULL)
2582 (*startPtr) (AH, te);
2583
2584 /*
2585 * The user-provided DataDumper routine needs to call AH->WriteData
2586 */
2587 te->dataDumper((Archive *) AH, te->dataDumperArg);
2588
2589 if (endPtr != NULL)
2590 (*endPtr) (AH, te);
2591
2592 AH->currToc = NULL;
2593}
2594
2595void
2597{
2598 TocEntry *te;
2599 char workbuf[32];
2600 int tocCount;
2601 int i;
2602
2603 /* count entries that will actually be dumped */
2604 tocCount = 0;
2605 for (te = AH->toc->next; te != AH->toc; te = te->next)
2606 {
2607 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) != 0)
2608 tocCount++;
2609 }
2610
2611 /* printf("%d TOC Entries to save\n", tocCount); */
2612
2613 WriteInt(AH, tocCount);
2614
2615 for (te = AH->toc->next; te != AH->toc; te = te->next)
2616 {
2617 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) == 0)
2618 continue;
2619
2620 WriteInt(AH, te->dumpId);
2621 WriteInt(AH, te->dataDumper ? 1 : 0);
2622
2623 /* OID is recorded as a string for historical reasons */
2624 sprintf(workbuf, "%u", te->catalogId.tableoid);
2625 WriteStr(AH, workbuf);
2626 sprintf(workbuf, "%u", te->catalogId.oid);
2627 WriteStr(AH, workbuf);
2628
2629 WriteStr(AH, te->tag);
2630 WriteStr(AH, te->desc);
2631 WriteInt(AH, te->section);
2632
2633 if (te->defnLen)
2634 {
2635 /*
2636 * defnLen should only be set for custom format's second call to
2637 * WriteToc(), which rewrites the TOC in place to update data
2638 * offsets. Instead of calling the defnDumper a second time
2639 * (which could involve re-executing queries), just skip writing
2640 * the entry. While regenerating the definition should
2641 * theoretically produce the same result as before, it's expensive
2642 * and feels risky.
2643 *
2644 * The custom format only calls WriteToc() a second time if
2645 * fseeko() is usable (see _CloseArchive() in pg_backup_custom.c),
2646 * so we can safely use it without checking. For other formats,
2647 * we fail because one of our assumptions must no longer hold
2648 * true.
2649 *
2650 * XXX This is a layering violation, but the alternative is an
2651 * awkward and complicated callback infrastructure for this
2652 * special case. This might be worth revisiting in the future.
2653 */
2654 if (AH->format != archCustom)
2655 pg_fatal("unexpected TOC entry in WriteToc(): %d %s %s",
2656 te->dumpId, te->desc, te->tag);
2657
2658 if (fseeko(AH->FH, te->defnLen, SEEK_CUR != 0))
2659 pg_fatal("error during file seek: %m");
2660 }
2661 else if (te->defnDumper)
2662 {
2663 char *defn = te->defnDumper((Archive *) AH, te->defnDumperArg, te);
2664
2665 te->defnLen = WriteStr(AH, defn);
2666 pg_free(defn);
2667 }
2668 else
2669 WriteStr(AH, te->defn);
2670
2671 WriteStr(AH, te->dropStmt);
2672 WriteStr(AH, te->copyStmt);
2673 WriteStr(AH, te->namespace);
2674 WriteStr(AH, te->tablespace);
2675 WriteStr(AH, te->tableam);
2676 WriteInt(AH, te->relkind);
2677 WriteStr(AH, te->owner);
2678 WriteStr(AH, "false");
2679
2680 /* Dump list of dependencies */
2681 for (i = 0; i < te->nDeps; i++)
2682 {
2683 sprintf(workbuf, "%d", te->dependencies[i]);
2684 WriteStr(AH, workbuf);
2685 }
2686 WriteStr(AH, NULL); /* Terminate List */
2687
2688 if (AH->WriteExtraTocPtr)
2689 AH->WriteExtraTocPtr(AH, te);
2690 }
2691}
2692
2693void
2695{
2696 int i;
2697 char *tmp;
2698 DumpId *deps;
2699 int depIdx;
2700 int depSize;
2701 TocEntry *te;
2702 bool is_supported;
2703
2704 AH->tocCount = ReadInt(AH);
2705 AH->maxDumpId = 0;
2706
2707 for (i = 0; i < AH->tocCount; i++)
2708 {
2709 te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2710 te->dumpId = ReadInt(AH);
2711
2712 if (te->dumpId > AH->maxDumpId)
2713 AH->maxDumpId = te->dumpId;
2714
2715 /* Sanity check */
2716 if (te->dumpId <= 0)
2717 pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2718 te->dumpId);
2719
2720 te->hadDumper = ReadInt(AH);
2721
2722 if (AH->version >= K_VERS_1_8)
2723 {
2724 tmp = ReadStr(AH);
2725 sscanf(tmp, "%u", &te->catalogId.tableoid);
2726 free(tmp);
2727 }
2728 else
2730 tmp = ReadStr(AH);
2731 sscanf(tmp, "%u", &te->catalogId.oid);
2732 free(tmp);
2733
2734 te->tag = ReadStr(AH);
2735 te->desc = ReadStr(AH);
2736
2737 if (AH->version >= K_VERS_1_11)
2738 {
2739 te->section = ReadInt(AH);
2740 }
2741 else
2742 {
2743 /*
2744 * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2745 * the entries into sections. This list need not cover entry
2746 * types added later than 8.4.
2747 */
2748 if (strcmp(te->desc, "COMMENT") == 0 ||
2749 strcmp(te->desc, "ACL") == 0 ||
2750 strcmp(te->desc, "ACL LANGUAGE") == 0)
2751 te->section = SECTION_NONE;
2752 else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2753 strcmp(te->desc, "BLOBS") == 0 ||
2754 strcmp(te->desc, "BLOB COMMENTS") == 0)
2755 te->section = SECTION_DATA;
2756 else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2757 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2758 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2759 strcmp(te->desc, "INDEX") == 0 ||
2760 strcmp(te->desc, "RULE") == 0 ||
2761 strcmp(te->desc, "TRIGGER") == 0)
2763 else
2765 }
2766
2767 te->defn = ReadStr(AH);
2768 te->dropStmt = ReadStr(AH);
2769
2770 if (AH->version >= K_VERS_1_3)
2771 te->copyStmt = ReadStr(AH);
2772
2773 if (AH->version >= K_VERS_1_6)
2774 te->namespace = ReadStr(AH);
2775
2776 if (AH->version >= K_VERS_1_10)
2777 te->tablespace = ReadStr(AH);
2778
2779 if (AH->version >= K_VERS_1_14)
2780 te->tableam = ReadStr(AH);
2781
2782 if (AH->version >= K_VERS_1_16)
2783 te->relkind = ReadInt(AH);
2784
2785 te->owner = ReadStr(AH);
2786 is_supported = true;
2787 if (AH->version < K_VERS_1_9)
2788 is_supported = false;
2789 else
2790 {
2791 tmp = ReadStr(AH);
2792
2793 if (strcmp(tmp, "true") == 0)
2794 is_supported = false;
2795
2796 free(tmp);
2797 }
2798
2799 if (!is_supported)
2800 pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2801
2802 /* Read TOC entry dependencies */
2803 if (AH->version >= K_VERS_1_5)
2804 {
2805 depSize = 100;
2806 deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2807 depIdx = 0;
2808 for (;;)
2809 {
2810 tmp = ReadStr(AH);
2811 if (!tmp)
2812 break; /* end of list */
2813 if (depIdx >= depSize)
2814 {
2815 depSize *= 2;
2816 deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2817 }
2818 sscanf(tmp, "%d", &deps[depIdx]);
2819 free(tmp);
2820 depIdx++;
2821 }
2822
2823 if (depIdx > 0) /* We have a non-null entry */
2824 {
2825 deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2826 te->dependencies = deps;
2827 te->nDeps = depIdx;
2828 }
2829 else
2830 {
2831 free(deps);
2832 te->dependencies = NULL;
2833 te->nDeps = 0;
2834 }
2835 }
2836 else
2837 {
2838 te->dependencies = NULL;
2839 te->nDeps = 0;
2840 }
2841 te->dataLength = 0;
2842
2843 if (AH->ReadExtraTocPtr)
2844 AH->ReadExtraTocPtr(AH, te);
2845
2846 pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2847 i, te->dumpId, te->desc, te->tag);
2848
2849 /* link completed entry into TOC circular list */
2850 te->prev = AH->toc->prev;
2851 AH->toc->prev->next = te;
2852 AH->toc->prev = te;
2853 te->next = AH->toc;
2854
2855 /* special processing immediately upon read for some items */
2856 if (strcmp(te->desc, "ENCODING") == 0)
2857 processEncodingEntry(AH, te);
2858 else if (strcmp(te->desc, "STDSTRINGS") == 0)
2859 processStdStringsEntry(AH, te);
2860 else if (strcmp(te->desc, "SEARCHPATH") == 0)
2861 processSearchPathEntry(AH, te);
2862 }
2863}
2864
2865static void
2867{
2868 /* te->defn should have the form SET client_encoding = 'foo'; */
2869 char *defn = pg_strdup(te->defn);
2870 char *ptr1;
2871 char *ptr2 = NULL;
2872 int encoding;
2873
2874 ptr1 = strchr(defn, '\'');
2875 if (ptr1)
2876 ptr2 = strchr(++ptr1, '\'');
2877 if (ptr2)
2878 {
2879 *ptr2 = '\0';
2881 if (encoding < 0)
2882 pg_fatal("unrecognized encoding \"%s\"",
2883 ptr1);
2884 AH->public.encoding = encoding;
2886 }
2887 else
2888 pg_fatal("invalid ENCODING item: %s",
2889 te->defn);
2890
2891 free(defn);
2892}
2893
2894static void
2896{
2897 /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2898 char *ptr1;
2899
2900 ptr1 = strchr(te->defn, '\'');
2901 if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2902 AH->public.std_strings = true;
2903 else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2904 AH->public.std_strings = false;
2905 else
2906 pg_fatal("invalid STDSTRINGS item: %s",
2907 te->defn);
2908}
2909
2910static void
2912{
2913 /*
2914 * te->defn should contain a command to set search_path. We just copy it
2915 * verbatim for use later.
2916 */
2917 AH->public.searchpath = pg_strdup(te->defn);
2918}
2919
2920static void
2922{
2923 const char *missing_name;
2924
2925 Assert(ropt->strict_names);
2926
2927 if (ropt->schemaNames.head != NULL)
2928 {
2929 missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2930 if (missing_name != NULL)
2931 pg_fatal("schema \"%s\" not found", missing_name);
2932 }
2933
2934 if (ropt->tableNames.head != NULL)
2935 {
2936 missing_name = simple_string_list_not_touched(&ropt->tableNames);
2937 if (missing_name != NULL)
2938 pg_fatal("table \"%s\" not found", missing_name);
2939 }
2940
2941 if (ropt->indexNames.head != NULL)
2942 {
2943 missing_name = simple_string_list_not_touched(&ropt->indexNames);
2944 if (missing_name != NULL)
2945 pg_fatal("index \"%s\" not found", missing_name);
2946 }
2947
2948 if (ropt->functionNames.head != NULL)
2949 {
2950 missing_name = simple_string_list_not_touched(&ropt->functionNames);
2951 if (missing_name != NULL)
2952 pg_fatal("function \"%s\" not found", missing_name);
2953 }
2954
2955 if (ropt->triggerNames.head != NULL)
2956 {
2957 missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2958 if (missing_name != NULL)
2959 pg_fatal("trigger \"%s\" not found", missing_name);
2960 }
2961}
2962
2963/*
2964 * Determine whether we want to restore this TOC entry.
2965 *
2966 * Returns 0 if entry should be skipped, or some combination of the
2967 * REQ_SCHEMA, REQ_DATA, and REQ_STATS bits if we want to restore schema, data
2968 * and/or statistics portions of this TOC entry, or REQ_SPECIAL if it's a
2969 * special entry.
2970 */
2971static int
2973{
2974 int res = REQ_SCHEMA | REQ_DATA;
2975 RestoreOptions *ropt = AH->public.ropt;
2976
2977 /* These items are treated specially */
2978 if (strcmp(te->desc, "ENCODING") == 0 ||
2979 strcmp(te->desc, "STDSTRINGS") == 0 ||
2980 strcmp(te->desc, "SEARCHPATH") == 0)
2981 return REQ_SPECIAL;
2982
2983 if (strcmp(te->desc, "STATISTICS DATA") == 0)
2984 {
2985 if (!ropt->dumpStatistics)
2986 return 0;
2987
2988 res = REQ_STATS;
2989 }
2990
2991 /*
2992 * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2993 * restored in createDB mode, and not restored otherwise, independently of
2994 * all else.
2995 */
2996 if (strcmp(te->desc, "DATABASE") == 0 ||
2997 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2998 {
2999 if (ropt->createDB)
3000 return REQ_SCHEMA;
3001 else
3002 return 0;
3003 }
3004
3005 /*
3006 * Process exclusions that affect certain classes of TOC entries.
3007 */
3008
3009 /* If it's an ACL, maybe ignore it */
3010 if (ropt->aclsSkip && _tocEntryIsACL(te))
3011 return 0;
3012
3013 /* If it's a comment, maybe ignore it */
3014 if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
3015 return 0;
3016
3017 /* If it's a policy, maybe ignore it */
3018 if (ropt->no_policies &&
3019 (strcmp(te->desc, "POLICY") == 0 ||
3020 strcmp(te->desc, "ROW SECURITY") == 0))
3021 return 0;
3022
3023 /*
3024 * If it's a publication or a table part of a publication, maybe ignore
3025 * it.
3026 */
3027 if (ropt->no_publications &&
3028 (strcmp(te->desc, "PUBLICATION") == 0 ||
3029 strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
3030 strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
3031 return 0;
3032
3033 /* If it's a security label, maybe ignore it */
3034 if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
3035 return 0;
3036
3037 /* If it's a subscription, maybe ignore it */
3038 if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
3039 return 0;
3040
3041 /* Ignore it if section is not to be dumped/restored */
3042 switch (curSection)
3043 {
3044 case SECTION_PRE_DATA:
3045 if (!(ropt->dumpSections & DUMP_PRE_DATA))
3046 return 0;
3047 break;
3048 case SECTION_DATA:
3049 if (!(ropt->dumpSections & DUMP_DATA))
3050 return 0;
3051 break;
3052 case SECTION_POST_DATA:
3053 if (!(ropt->dumpSections & DUMP_POST_DATA))
3054 return 0;
3055 break;
3056 default:
3057 /* shouldn't get here, really, but ignore it */
3058 return 0;
3059 }
3060
3061 /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
3062 if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
3063 return 0;
3064
3065 /*
3066 * Check options for selective dump/restore.
3067 */
3068 if (strcmp(te->desc, "ACL") == 0 ||
3069 strcmp(te->desc, "COMMENT") == 0 ||
3070 strcmp(te->desc, "STATISTICS DATA") == 0 ||
3071 strcmp(te->desc, "SECURITY LABEL") == 0)
3072 {
3073 /* Database properties react to createDB, not selectivity options. */
3074 if (strncmp(te->tag, "DATABASE ", 9) == 0)
3075 {
3076 if (!ropt->createDB)
3077 return 0;
3078 }
3079 else if (ropt->schemaNames.head != NULL ||
3080 ropt->schemaExcludeNames.head != NULL ||
3081 ropt->selTypes)
3082 {
3083 /*
3084 * In a selective dump/restore, we want to restore these dependent
3085 * TOC entry types only if their parent object is being restored.
3086 * Without selectivity options, we let through everything in the
3087 * archive. Note there may be such entries with no parent, eg
3088 * non-default ACLs for built-in objects. Also, we make
3089 * per-column ACLs additionally depend on the table's ACL if any
3090 * to ensure correct restore order, so those dependencies should
3091 * be ignored in this check.
3092 *
3093 * This code depends on the parent having been marked already,
3094 * which should be the case; if it isn't, perhaps due to
3095 * SortTocFromFile rearrangement, skipping the dependent entry
3096 * seems prudent anyway.
3097 *
3098 * Ideally we'd handle, eg, table CHECK constraints this way too.
3099 * But it's hard to tell which of their dependencies is the one to
3100 * consult.
3101 */
3102 bool dumpthis = false;
3103
3104 for (int i = 0; i < te->nDeps; i++)
3105 {
3106 TocEntry *pte = getTocEntryByDumpId(AH, te->dependencies[i]);
3107
3108 if (!pte)
3109 continue; /* probably shouldn't happen */
3110 if (strcmp(pte->desc, "ACL") == 0)
3111 continue; /* ignore dependency on another ACL */
3112 if (pte->reqs == 0)
3113 continue; /* this object isn't marked, so ignore it */
3114 /* Found a parent to be dumped, so we want to dump this too */
3115 dumpthis = true;
3116 break;
3117 }
3118 if (!dumpthis)
3119 return 0;
3120 }
3121 }
3122 else
3123 {
3124 /* Apply selective-restore rules for standalone TOC entries. */
3125 if (ropt->schemaNames.head != NULL)
3126 {
3127 /* If no namespace is specified, it means all. */
3128 if (!te->namespace)
3129 return 0;
3130 if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
3131 return 0;
3132 }
3133
3134 if (ropt->schemaExcludeNames.head != NULL &&
3135 te->namespace &&
3136 simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
3137 return 0;
3138
3139 if (ropt->selTypes)
3140 {
3141 if (strcmp(te->desc, "TABLE") == 0 ||
3142 strcmp(te->desc, "TABLE DATA") == 0 ||
3143 strcmp(te->desc, "VIEW") == 0 ||
3144 strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3145 strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3146 strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
3147 strcmp(te->desc, "SEQUENCE") == 0 ||
3148 strcmp(te->desc, "SEQUENCE SET") == 0)
3149 {
3150 if (!ropt->selTable)
3151 return 0;
3152 if (ropt->tableNames.head != NULL &&
3154 return 0;
3155 }
3156 else if (strcmp(te->desc, "INDEX") == 0)
3157 {
3158 if (!ropt->selIndex)
3159 return 0;
3160 if (ropt->indexNames.head != NULL &&
3162 return 0;
3163 }
3164 else if (strcmp(te->desc, "FUNCTION") == 0 ||
3165 strcmp(te->desc, "AGGREGATE") == 0 ||
3166 strcmp(te->desc, "PROCEDURE") == 0)
3167 {
3168 if (!ropt->selFunction)
3169 return 0;
3170 if (ropt->functionNames.head != NULL &&
3172 return 0;
3173 }
3174 else if (strcmp(te->desc, "TRIGGER") == 0)
3175 {
3176 if (!ropt->selTrigger)
3177 return 0;
3178 if (ropt->triggerNames.head != NULL &&
3180 return 0;
3181 }
3182 else
3183 return 0;
3184 }
3185 }
3186
3187
3188 /*
3189 * Determine whether the TOC entry contains schema and/or data components,
3190 * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3191 * it's both schema and data. Otherwise it's probably schema-only, but
3192 * there are exceptions.
3193 */
3194 if (!te->hadDumper)
3195 {
3196 /*
3197 * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
3198 * is considered a data entry. We don't need to check for BLOBS or
3199 * old-style BLOB COMMENTS entries, because they will have hadDumper =
3200 * true ... but we do need to check new-style BLOB ACLs, comments,
3201 * etc.
3202 */
3203 if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3204 strcmp(te->desc, "BLOB") == 0 ||
3205 strcmp(te->desc, "BLOB METADATA") == 0 ||
3206 (strcmp(te->desc, "ACL") == 0 &&
3207 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3208 (strcmp(te->desc, "COMMENT") == 0 &&
3209 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3210 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3211 strncmp(te->tag, "LARGE OBJECT", 12) == 0))
3212 res = res & REQ_DATA;
3213 else
3214 res = res & ~REQ_DATA;
3215 }
3216
3217 /*
3218 * If there's no definition command, there's no schema component. Treat
3219 * "load via partition root" comments as not schema.
3220 */
3221 if (!te->defn || !te->defn[0] ||
3222 strncmp(te->defn, "-- load via partition root ", 27) == 0)
3223 res = res & ~REQ_SCHEMA;
3224
3225 /*
3226 * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3227 * always ignore it.
3228 */
3229 if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3230 return 0;
3231
3232 /* Mask it if we don't want data */
3233 if (!ropt->dumpData)
3234 {
3235 /*
3236 * The sequence_data option overrides dumpData for SEQUENCE SET.
3237 *
3238 * In binary-upgrade mode, even with dumpData unset, we do not mask
3239 * out large objects. (Only large object definitions, comments and
3240 * other metadata should be generated in binary-upgrade mode, not the
3241 * actual data, but that need not concern us here.)
3242 */
3243 if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3244 !(ropt->binary_upgrade &&
3245 (strcmp(te->desc, "BLOB") == 0 ||
3246 strcmp(te->desc, "BLOB METADATA") == 0 ||
3247 (strcmp(te->desc, "ACL") == 0 &&
3248 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3249 (strcmp(te->desc, "COMMENT") == 0 &&
3250 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3251 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3252 strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
3253 res = res & (REQ_SCHEMA | REQ_STATS);
3254 }
3255
3256 /* Mask it if we don't want schema */
3257 if (!ropt->dumpSchema)
3258 res = res & (REQ_DATA | REQ_STATS);
3259
3260 return res;
3261}
3262
3263/*
3264 * Identify which pass we should restore this TOC entry in.
3265 *
3266 * See notes with the RestorePass typedef in pg_backup_archiver.h.
3267 */
3268static RestorePass
3270{
3271 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3272 if (strcmp(te->desc, "ACL") == 0 ||
3273 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3274 strcmp(te->desc, "DEFAULT ACL") == 0)
3275 return RESTORE_PASS_ACL;
3276 if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3277 strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3278 return RESTORE_PASS_POST_ACL;
3279
3280 /*
3281 * Comments need to be emitted in the same pass as their parent objects.
3282 * ACLs haven't got comments, and neither do matview data objects, but
3283 * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3284 * we'd need yet another weird special case.)
3285 */
3286 if (strcmp(te->desc, "COMMENT") == 0 &&
3287 strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3288 return RESTORE_PASS_POST_ACL;
3289
3290 /*
3291 * If statistics data is dependent on materialized view data, it must be
3292 * deferred to RESTORE_PASS_POST_ACL. Those entries are already marked as
3293 * SECTION_POST_DATA, and some other stats entries (e.g., index stats)
3294 * will also be marked as SECTION_POST_DATA. Additionally, our lookahead
3295 * code in fetchAttributeStats() assumes that we dump all statistics data
3296 * entries in TOC order. To ensure this assumption holds, we move all
3297 * statistics data entries in SECTION_POST_DATA to RESTORE_PASS_POST_ACL.
3298 */
3299 if (strcmp(te->desc, "STATISTICS DATA") == 0 &&
3301 return RESTORE_PASS_POST_ACL;
3302
3303 /* All else can be handled in the main pass. */
3304 return RESTORE_PASS_MAIN;
3305}
3306
3307/*
3308 * Identify TOC entries that are ACLs.
3309 *
3310 * Note: it seems worth duplicating some code here to avoid a hard-wired
3311 * assumption that these are exactly the same entries that we restore during
3312 * the RESTORE_PASS_ACL phase.
3313 */
3314static bool
3316{
3317 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3318 if (strcmp(te->desc, "ACL") == 0 ||
3319 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3320 strcmp(te->desc, "DEFAULT ACL") == 0)
3321 return true;
3322 return false;
3323}
3324
3325/*
3326 * Issue SET commands for parameters that we want to have set the same way
3327 * at all times during execution of a restore script.
3328 */
3329static void
3331{
3332 RestoreOptions *ropt = AH->public.ropt;
3333
3334 /*
3335 * Disable timeouts to allow for slow commands, idle parallel workers, etc
3336 */
3337 ahprintf(AH, "SET statement_timeout = 0;\n");
3338 ahprintf(AH, "SET lock_timeout = 0;\n");
3339 ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3340 ahprintf(AH, "SET transaction_timeout = 0;\n");
3341
3342 /* Select the correct character set encoding */
3343 ahprintf(AH, "SET client_encoding = '%s';\n",
3345
3346 /* Select the correct string literal syntax */
3347 ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3348 AH->public.std_strings ? "on" : "off");
3349
3350 /* Select the role to be used during restore */
3351 if (ropt && ropt->use_role)
3352 ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3353
3354 /* Select the dump-time search_path */
3355 if (AH->public.searchpath)
3356 ahprintf(AH, "%s", AH->public.searchpath);
3357
3358 /* Make sure function checking is disabled */
3359 ahprintf(AH, "SET check_function_bodies = false;\n");
3360
3361 /* Ensure that all valid XML data will be accepted */
3362 ahprintf(AH, "SET xmloption = content;\n");
3363
3364 /* Avoid annoying notices etc */
3365 ahprintf(AH, "SET client_min_messages = warning;\n");
3366 if (!AH->public.std_strings)
3367 ahprintf(AH, "SET escape_string_warning = off;\n");
3368
3369 /* Adjust row-security state */
3370 if (ropt && ropt->enable_row_security)
3371 ahprintf(AH, "SET row_security = on;\n");
3372 else
3373 ahprintf(AH, "SET row_security = off;\n");
3374
3375 /*
3376 * In --transaction-size mode, we should always be in a transaction when
3377 * we begin to restore objects.
3378 */
3379 if (ropt && ropt->txn_size > 0)
3380 {
3381 if (AH->connection)
3383 else
3384 ahprintf(AH, "\nBEGIN;\n");
3385 AH->txnCount = 0;
3386 }
3387
3388 ahprintf(AH, "\n");
3389}
3390
3391/*
3392 * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3393 * for updating state if appropriate. If user is NULL or an empty string,
3394 * the specification DEFAULT will be used.
3395 */
3396static void
3398{
3400
3401 appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3402
3403 /*
3404 * SQL requires a string literal here. Might as well be correct.
3405 */
3406 if (user && *user)
3407 appendStringLiteralAHX(cmd, user, AH);
3408 else
3409 appendPQExpBufferStr(cmd, "DEFAULT");
3410 appendPQExpBufferChar(cmd, ';');
3411
3412 if (RestoringToDB(AH))
3413 {
3414 PGresult *res;
3415
3416 res = PQexec(AH->connection, cmd->data);
3417
3418 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3419 /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3420 pg_fatal("could not set session user to \"%s\": %s",
3422
3423 PQclear(res);
3424 }
3425 else
3426 ahprintf(AH, "%s\n\n", cmd->data);
3427
3428 destroyPQExpBuffer(cmd);
3429}
3430
3431
3432/*
3433 * Issue the commands to connect to the specified database.
3434 *
3435 * If we're currently restoring right into a database, this will
3436 * actually establish a connection. Otherwise it puts a \connect into
3437 * the script output.
3438 */
3439static void
3441{
3442 if (RestoringToDB(AH))
3444 else
3445 {
3446 PQExpBufferData connectbuf;
3447
3448 initPQExpBuffer(&connectbuf);
3449 appendPsqlMetaConnect(&connectbuf, dbname);
3450 ahprintf(AH, "%s\n", connectbuf.data);
3451 termPQExpBuffer(&connectbuf);
3452 }
3453
3454 /*
3455 * NOTE: currUser keeps track of what the imaginary session user in our
3456 * script is. It's now effectively reset to the original userID.
3457 */
3458 free(AH->currUser);
3459 AH->currUser = NULL;
3460
3461 /* don't assume we still know the output schema, tablespace, etc either */
3462 free(AH->currSchema);
3463 AH->currSchema = NULL;
3464
3465 free(AH->currTableAm);
3466 AH->currTableAm = NULL;
3467
3468 free(AH->currTablespace);
3469 AH->currTablespace = NULL;
3470
3471 /* re-establish fixed state */
3473}
3474
3475/*
3476 * Become the specified user, and update state to avoid redundant commands
3477 *
3478 * NULL or empty argument is taken to mean restoring the session default
3479 */
3480static void
3482{
3483 if (!user)
3484 user = ""; /* avoid null pointers */
3485
3486 if (AH->currUser && strcmp(AH->currUser, user) == 0)
3487 return; /* no need to do anything */
3488
3490
3491 /*
3492 * NOTE: currUser keeps track of what the imaginary session user in our
3493 * script is
3494 */
3495 free(AH->currUser);
3496 AH->currUser = pg_strdup(user);
3497}
3498
3499/*
3500 * Become the owner of the given TOC entry object. If
3501 * changes in ownership are not allowed, this doesn't do anything.
3502 */
3503static void
3505{
3506 RestoreOptions *ropt = AH->public.ropt;
3507
3508 if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3509 return;
3510
3511 _becomeUser(AH, te->owner);
3512}
3513
3514
3515/*
3516 * Issue the commands to select the specified schema as the current schema
3517 * in the target database.
3518 */
3519static void
3520_selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3521{
3522 PQExpBuffer qry;
3523
3524 /*
3525 * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3526 * that search_path rather than switching to entry-specific paths.
3527 * Otherwise, it's an old archive that will not restore correctly unless
3528 * we set the search_path as it's expecting.
3529 */
3530 if (AH->public.searchpath)
3531 return;
3532
3533 if (!schemaName || *schemaName == '\0' ||
3534 (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3535 return; /* no need to do anything */
3536
3537 qry = createPQExpBuffer();
3538
3539 appendPQExpBuffer(qry, "SET search_path = %s",
3540 fmtId(schemaName));
3541 if (strcmp(schemaName, "pg_catalog") != 0)
3542 appendPQExpBufferStr(qry, ", pg_catalog");
3543
3544 if (RestoringToDB(AH))
3545 {
3546 PGresult *res;
3547
3548 res = PQexec(AH->connection, qry->data);
3549
3550 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3552 "could not set \"search_path\" to \"%s\": %s",
3553 schemaName, PQerrorMessage(AH->connection));
3554
3555 PQclear(res);
3556 }
3557 else
3558 ahprintf(AH, "%s;\n\n", qry->data);
3559
3560 free(AH->currSchema);
3561 AH->currSchema = pg_strdup(schemaName);
3562
3563 destroyPQExpBuffer(qry);
3564}
3565
3566/*
3567 * Issue the commands to select the specified tablespace as the current one
3568 * in the target database.
3569 */
3570static void
3572{
3573 RestoreOptions *ropt = AH->public.ropt;
3574 PQExpBuffer qry;
3575 const char *want,
3576 *have;
3577
3578 /* do nothing in --no-tablespaces mode */
3579 if (ropt->noTablespace)
3580 return;
3581
3582 have = AH->currTablespace;
3583 want = tablespace;
3584
3585 /* no need to do anything for non-tablespace object */
3586 if (!want)
3587 return;
3588
3589 if (have && strcmp(want, have) == 0)
3590 return; /* no need to do anything */
3591
3592 qry = createPQExpBuffer();
3593
3594 if (strcmp(want, "") == 0)
3595 {
3596 /* We want the tablespace to be the database's default */
3597 appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3598 }
3599 else
3600 {
3601 /* We want an explicit tablespace */
3602 appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3603 }
3604
3605 if (RestoringToDB(AH))
3606 {
3607 PGresult *res;
3608
3609 res = PQexec(AH->connection, qry->data);
3610
3611 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3613 "could not set \"default_tablespace\" to %s: %s",
3614 fmtId(want), PQerrorMessage(AH->connection));
3615
3616 PQclear(res);
3617 }
3618 else
3619 ahprintf(AH, "%s;\n\n", qry->data);
3620
3621 free(AH->currTablespace);
3622 AH->currTablespace = pg_strdup(want);
3623
3624 destroyPQExpBuffer(qry);
3625}
3626
3627/*
3628 * Set the proper default_table_access_method value for the table.
3629 */
3630static void
3632{
3633 RestoreOptions *ropt = AH->public.ropt;
3634 PQExpBuffer cmd;
3635 const char *want,
3636 *have;
3637
3638 /* do nothing in --no-table-access-method mode */
3639 if (ropt->noTableAm)
3640 return;
3641
3642 have = AH->currTableAm;
3643 want = tableam;
3644
3645 if (!want)
3646 return;
3647
3648 if (have && strcmp(want, have) == 0)
3649 return;
3650
3651 cmd = createPQExpBuffer();
3652 appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3653
3654 if (RestoringToDB(AH))
3655 {
3656 PGresult *res;
3657
3658 res = PQexec(AH->connection, cmd->data);
3659
3660 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3662 "could not set \"default_table_access_method\": %s",
3664
3665 PQclear(res);
3666 }
3667 else
3668 ahprintf(AH, "%s\n\n", cmd->data);
3669
3670 destroyPQExpBuffer(cmd);
3671
3672 free(AH->currTableAm);
3673 AH->currTableAm = pg_strdup(want);
3674}
3675
3676/*
3677 * Set the proper default table access method for a table without storage.
3678 * Currently, this is required only for partitioned tables with a table AM.
3679 */
3680static void
3682{
3683 RestoreOptions *ropt = AH->public.ropt;
3684 const char *tableam = te->tableam;
3685 PQExpBuffer cmd;
3686
3687 /* do nothing in --no-table-access-method mode */
3688 if (ropt->noTableAm)
3689 return;
3690
3691 if (!tableam)
3692 return;
3693
3694 Assert(te->relkind == RELKIND_PARTITIONED_TABLE);
3695
3696 cmd = createPQExpBuffer();
3697
3698 appendPQExpBufferStr(cmd, "ALTER TABLE ");
3699 appendPQExpBuffer(cmd, "%s ", fmtQualifiedId(te->namespace, te->tag));
3700 appendPQExpBuffer(cmd, "SET ACCESS METHOD %s;",
3701 fmtId(tableam));
3702
3703 if (RestoringToDB(AH))
3704 {
3705 PGresult *res;
3706
3707 res = PQexec(AH->connection, cmd->data);
3708
3709 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3711 "could not alter table access method: %s",
3713 PQclear(res);
3714 }
3715 else
3716 ahprintf(AH, "%s\n\n", cmd->data);
3717
3718 destroyPQExpBuffer(cmd);
3719}
3720
3721/*
3722 * Extract an object description for a TOC entry, and append it to buf.
3723 *
3724 * This is used for ALTER ... OWNER TO.
3725 *
3726 * If the object type has no owner, do nothing.
3727 */
3728static void
3730{
3731 const char *type = te->desc;
3732
3733 /* objects that don't require special decoration */
3734 if (strcmp(type, "COLLATION") == 0 ||
3735 strcmp(type, "CONVERSION") == 0 ||
3736 strcmp(type, "DOMAIN") == 0 ||
3737 strcmp(type, "FOREIGN TABLE") == 0 ||
3738 strcmp(type, "MATERIALIZED VIEW") == 0 ||
3739 strcmp(type, "SEQUENCE") == 0 ||
3740 strcmp(type, "STATISTICS") == 0 ||
3741 strcmp(type, "TABLE") == 0 ||
3742 strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3743 strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3744 strcmp(type, "TYPE") == 0 ||
3745 strcmp(type, "VIEW") == 0 ||
3746 /* non-schema-specified objects */
3747 strcmp(type, "DATABASE") == 0 ||
3748 strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3749 strcmp(type, "SCHEMA") == 0 ||
3750 strcmp(type, "EVENT TRIGGER") == 0 ||
3751 strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3752 strcmp(type, "SERVER") == 0 ||
3753 strcmp(type, "PUBLICATION") == 0 ||
3754 strcmp(type, "SUBSCRIPTION") == 0)
3755 {
3756 appendPQExpBuffer(buf, "%s ", type);
3757 if (te->namespace && *te->namespace)
3758 appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3760 }
3761 /* LOs just have a name, but it's numeric so must not use fmtId */
3762 else if (strcmp(type, "BLOB") == 0)
3763 {
3764 appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3765 }
3766
3767 /*
3768 * These object types require additional decoration. Fortunately, the
3769 * information needed is exactly what's in the DROP command.
3770 */
3771 else if (strcmp(type, "AGGREGATE") == 0 ||
3772 strcmp(type, "FUNCTION") == 0 ||
3773 strcmp(type, "OPERATOR") == 0 ||
3774 strcmp(type, "OPERATOR CLASS") == 0 ||
3775 strcmp(type, "OPERATOR FAMILY") == 0 ||
3776 strcmp(type, "PROCEDURE") == 0)
3777 {
3778 /* Chop "DROP " off the front and make a modifiable copy */
3779 char *first = pg_strdup(te->dropStmt + 5);
3780 char *last;
3781
3782 /* point to last character in string */
3783 last = first + strlen(first) - 1;
3784
3785 /* Strip off any ';' or '\n' at the end */
3786 while (last >= first && (*last == '\n' || *last == ';'))
3787 last--;
3788 *(last + 1) = '\0';
3789
3790 appendPQExpBufferStr(buf, first);
3791
3792 free(first);
3793 return;
3794 }
3795 /* these object types don't have separate owners */
3796 else if (strcmp(type, "CAST") == 0 ||
3797 strcmp(type, "CHECK CONSTRAINT") == 0 ||
3798 strcmp(type, "CONSTRAINT") == 0 ||
3799 strcmp(type, "DATABASE PROPERTIES") == 0 ||
3800 strcmp(type, "DEFAULT") == 0 ||
3801 strcmp(type, "FK CONSTRAINT") == 0 ||
3802 strcmp(type, "INDEX") == 0 ||
3803 strcmp(type, "RULE") == 0 ||
3804 strcmp(type, "TRIGGER") == 0 ||
3805 strcmp(type, "ROW SECURITY") == 0 ||
3806 strcmp(type, "POLICY") == 0 ||
3807 strcmp(type, "USER MAPPING") == 0)
3808 {
3809 /* do nothing */
3810 }
3811 else
3812 pg_fatal("don't know how to set owner for object type \"%s\"", type);
3813}
3814
3815/*
3816 * Emit the SQL commands to create the object represented by a TOC entry
3817 *
3818 * This now also includes issuing an ALTER OWNER command to restore the
3819 * object's ownership, if wanted. But note that the object's permissions
3820 * will remain at default, until the matching ACL TOC entry is restored.
3821 */
3822static void
3823_printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx)
3824{
3825 RestoreOptions *ropt = AH->public.ropt;
3826
3827 /*
3828 * Select owner, schema, tablespace and default AM as necessary. The
3829 * default access method for partitioned tables is handled after
3830 * generating the object definition, as it requires an ALTER command
3831 * rather than SET.
3832 */
3833 _becomeOwner(AH, te);
3834 _selectOutputSchema(AH, te->namespace);
3836 if (te->relkind != RELKIND_PARTITIONED_TABLE)
3838
3839 /* Emit header comment for item */
3840 if (!AH->noTocComments)
3841 {
3842 char *sanitized_name;
3843 char *sanitized_schema;
3844 char *sanitized_owner;
3845
3846 ahprintf(AH, "--\n");
3847 if (AH->public.verbose)
3848 {
3849 ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3850 te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3851 if (te->nDeps > 0)
3852 {
3853 int i;
3854
3855 ahprintf(AH, "-- Dependencies:");
3856 for (i = 0; i < te->nDeps; i++)
3857 ahprintf(AH, " %d", te->dependencies[i]);
3858 ahprintf(AH, "\n");
3859 }
3860 }
3861
3862 sanitized_name = sanitize_line(te->tag, false);
3863 sanitized_schema = sanitize_line(te->namespace, true);
3864 sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3865
3866 ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3867 pfx, sanitized_name, te->desc, sanitized_schema,
3868 sanitized_owner);
3869
3870 free(sanitized_name);
3871 free(sanitized_schema);
3872 free(sanitized_owner);
3873
3874 if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3875 {
3876 char *sanitized_tablespace;
3877
3878 sanitized_tablespace = sanitize_line(te->tablespace, false);
3879 ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3880 free(sanitized_tablespace);
3881 }
3882 ahprintf(AH, "\n");
3883
3884 if (AH->PrintExtraTocPtr != NULL)
3885 AH->PrintExtraTocPtr(AH, te);
3886 ahprintf(AH, "--\n\n");
3887 }
3888
3889 /*
3890 * Actually print the definition. Normally we can just print the defn
3891 * string if any, but we have four special cases:
3892 *
3893 * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
3894 * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3895 * "public" that is a comment. We have to do this when --no-owner mode is
3896 * selected. This is ugly, but I see no other good way ...
3897 *
3898 * 2. BLOB METADATA entries need special processing since their defn
3899 * strings are just lists of OIDs, not complete SQL commands.
3900 *
3901 * 3. ACL LARGE OBJECTS entries need special processing because they
3902 * contain only one copy of the ACL GRANT/REVOKE commands, which we must
3903 * apply to each large object listed in the associated BLOB METADATA.
3904 *
3905 * 4. Entries with a defnDumper need to call it to generate the
3906 * definition. This is primarily intended to provide a way to save memory
3907 * for objects that would otherwise need a lot of it (e.g., statistics
3908 * data).
3909 */
3910 if (ropt->noOwner &&
3911 strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3912 {
3913 ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3914 }
3915 else if (strcmp(te->desc, "BLOB METADATA") == 0)
3916 {
3917 IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
3918 }
3919 else if (strcmp(te->desc, "ACL") == 0 &&
3920 strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
3921 {
3922 IssueACLPerBlob(AH, te);
3923 }
3924 else if (te->defnLen && AH->format != archTar)
3925 {
3926 /*
3927 * If defnLen is set, the defnDumper has already been called for this
3928 * TOC entry. We don't normally expect a defnDumper to be called for
3929 * a TOC entry a second time in _printTocEntry(), but there's an
3930 * exception. The tar format first calls WriteToc(), which scans the
3931 * entire TOC, and then it later calls RestoreArchive() to generate
3932 * restore.sql, which scans the TOC again. There doesn't appear to be
3933 * a good way to prevent a second defnDumper call in this case without
3934 * storing the definition in memory, which defeats the purpose. This
3935 * second defnDumper invocation should generate the same output as the
3936 * first, but even if it doesn't, the worst-case scenario is that
3937 * restore.sql might have different statistics data than the archive.
3938 *
3939 * In all other cases, encountering a TOC entry a second time in
3940 * _printTocEntry() is unexpected, so we fail because one of our
3941 * assumptions must no longer hold true.
3942 *
3943 * XXX This is a layering violation, but the alternative is an awkward
3944 * and complicated callback infrastructure for this special case. This
3945 * might be worth revisiting in the future.
3946 */
3947 pg_fatal("unexpected TOC entry in _printTocEntry(): %d %s %s",
3948 te->dumpId, te->desc, te->tag);
3949 }
3950 else if (te->defnDumper)
3951 {
3952 char *defn = te->defnDumper((Archive *) AH, te->defnDumperArg, te);
3953
3954 te->defnLen = ahprintf(AH, "%s\n\n", defn);
3955 pg_free(defn);
3956 }
3957 else if (te->defn && strlen(te->defn) > 0)
3958 {
3959 ahprintf(AH, "%s\n\n", te->defn);
3960
3961 /*
3962 * If the defn string contains multiple SQL commands, txn_size mode
3963 * should count it as N actions not one. But rather than build a full
3964 * SQL parser, approximate this by counting semicolons. One case
3965 * where that tends to be badly fooled is function definitions, so
3966 * ignore them. (restore_toc_entry will count one action anyway.)
3967 */
3968 if (ropt->txn_size > 0 &&
3969 strcmp(te->desc, "FUNCTION") != 0 &&
3970 strcmp(te->desc, "PROCEDURE") != 0)
3971 {
3972 const char *p = te->defn;
3973 int nsemis = 0;
3974
3975 while ((p = strchr(p, ';')) != NULL)
3976 {
3977 nsemis++;
3978 p++;
3979 }
3980 if (nsemis > 1)
3981 AH->txnCount += nsemis - 1;
3982 }
3983 }
3984
3985 /*
3986 * If we aren't using SET SESSION AUTH to determine ownership, we must
3987 * instead issue an ALTER OWNER command. Schema "public" is special; when
3988 * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3989 * when using SET SESSION for all other objects. We assume that anything
3990 * without a DROP command is not a separately ownable object.
3991 */
3992 if (!ropt->noOwner &&
3993 (!ropt->use_setsessauth ||
3994 (strcmp(te->desc, "SCHEMA") == 0 &&
3995 strncmp(te->defn, "--", 2) == 0)) &&
3996 te->owner && strlen(te->owner) > 0 &&
3997 te->dropStmt && strlen(te->dropStmt) > 0)
3998 {
3999 if (strcmp(te->desc, "BLOB METADATA") == 0)
4000 {
4001 /* BLOB METADATA needs special code to handle multiple LOs */
4002 char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
4003
4004 IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
4005 pg_free(cmdEnd);
4006 }
4007 else
4008 {
4009 /* For all other cases, we can use _getObjectDescription */
4010 PQExpBufferData temp;
4011
4012 initPQExpBuffer(&temp);
4013 _getObjectDescription(&temp, te);
4014
4015 /*
4016 * If _getObjectDescription() didn't fill the buffer, then there
4017 * is no owner.
4018 */
4019 if (temp.data[0])
4020 ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
4021 temp.data, fmtId(te->owner));
4022 termPQExpBuffer(&temp);
4023 }
4024 }
4025
4026 /*
4027 * Select a partitioned table's default AM, once the table definition has
4028 * been generated.
4029 */
4030 if (te->relkind == RELKIND_PARTITIONED_TABLE)
4032
4033 /*
4034 * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
4035 * commands, so we can no longer assume we know the current auth setting.
4036 */
4037 if (_tocEntryIsACL(te))
4038 {
4039 free(AH->currUser);
4040 AH->currUser = NULL;
4041 }
4042}
4043
4044/*
4045 * Sanitize a string to be included in an SQL comment or TOC listing, by
4046 * replacing any newlines with spaces. This ensures each logical output line
4047 * is in fact one physical output line, to prevent corruption of the dump
4048 * (which could, in the worst case, present an SQL injection vulnerability
4049 * if someone were to incautiously load a dump containing objects with
4050 * maliciously crafted names).
4051 *
4052 * The result is a freshly malloc'd string. If the input string is NULL,
4053 * return a malloc'ed empty string, unless want_hyphen, in which case return a
4054 * malloc'ed hyphen.
4055 *
4056 * Note that we currently don't bother to quote names, meaning that the name
4057 * fields aren't automatically parseable. "pg_restore -L" doesn't care because
4058 * it only examines the dumpId field, but someday we might want to try harder.
4059 */
4060static char *
4061sanitize_line(const char *str, bool want_hyphen)
4062{
4063 char *result;
4064 char *s;
4065
4066 if (!str)
4067 return pg_strdup(want_hyphen ? "-" : "");
4068
4069 result = pg_strdup(str);
4070
4071 for (s = result; *s != '\0'; s++)
4072 {
4073 if (*s == '\n' || *s == '\r')
4074 *s = ' ';
4075 }
4076
4077 return result;
4078}
4079
4080/*
4081 * Write the file header for a custom-format archive
4082 */
4083void
4085{
4086 struct tm crtm;
4087
4088 AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
4089 AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
4090 AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
4091 AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
4092 AH->WriteBytePtr(AH, AH->intSize);
4093 AH->WriteBytePtr(AH, AH->offSize);
4094 AH->WriteBytePtr(AH, AH->format);
4096 crtm = *localtime(&AH->createDate);
4097 WriteInt(AH, crtm.tm_sec);
4098 WriteInt(AH, crtm.tm_min);
4099 WriteInt(AH, crtm.tm_hour);
4100 WriteInt(AH, crtm.tm_mday);
4101 WriteInt(AH, crtm.tm_mon);
4102 WriteInt(AH, crtm.tm_year);
4103 WriteInt(AH, crtm.tm_isdst);
4104 WriteStr(AH, PQdb(AH->connection));
4106 WriteStr(AH, PG_VERSION);
4107}
4108
4109void
4111{
4112 char *errmsg;
4113 char vmaj,
4114 vmin,
4115 vrev;
4116 int fmt;
4117
4118 /*
4119 * If we haven't already read the header, do so.
4120 *
4121 * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
4122 * way to unify the cases?
4123 */
4124 if (!AH->readHeader)
4125 {
4126 char tmpMag[7];
4127
4128 AH->ReadBufPtr(AH, tmpMag, 5);
4129
4130 if (strncmp(tmpMag, "PGDMP", 5) != 0)
4131 pg_fatal("did not find magic string in file header");
4132 }
4133
4134 vmaj = AH->ReadBytePtr(AH);
4135 vmin = AH->ReadBytePtr(AH);
4136
4137 if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
4138 vrev = AH->ReadBytePtr(AH);
4139 else
4140 vrev = 0;
4141
4142 AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
4143
4144 if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
4145 pg_fatal("unsupported version (%d.%d) in file header",
4146 vmaj, vmin);
4147
4148 AH->intSize = AH->ReadBytePtr(AH);
4149 if (AH->intSize > 32)
4150 pg_fatal("sanity check on integer size (%lu) failed",
4151 (unsigned long) AH->intSize);
4152
4153 if (AH->intSize > sizeof(int))
4154 pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
4155
4156 if (AH->version >= K_VERS_1_7)
4157 AH->offSize = AH->ReadBytePtr(AH);
4158 else
4159 AH->offSize = AH->intSize;
4160
4161 fmt = AH->ReadBytePtr(AH);
4162
4163 if (AH->format != fmt)
4164 pg_fatal("expected format (%d) differs from format found in file (%d)",
4165 AH->format, fmt);
4166
4167 if (AH->version >= K_VERS_1_15)
4169 else if (AH->version >= K_VERS_1_2)
4170 {
4171 /* Guess the compression method based on the level */
4172 if (AH->version < K_VERS_1_4)
4173 AH->compression_spec.level = AH->ReadBytePtr(AH);
4174 else
4175 AH->compression_spec.level = ReadInt(AH);
4176
4177 if (AH->compression_spec.level != 0)
4179 }
4180 else
4182
4184 if (errmsg)
4185 {
4186 pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
4187 errmsg);
4188 pg_free(errmsg);
4189 }
4190
4191 if (AH->version >= K_VERS_1_4)
4192 {
4193 struct tm crtm;
4194
4195 crtm.tm_sec = ReadInt(AH);
4196 crtm.tm_min = ReadInt(AH);
4197 crtm.tm_hour = ReadInt(AH);
4198 crtm.tm_mday = ReadInt(AH);
4199 crtm.tm_mon = ReadInt(AH);
4200 crtm.tm_year = ReadInt(AH);
4201 crtm.tm_isdst = ReadInt(AH);
4202
4203 /*
4204 * Newer versions of glibc have mktime() report failure if tm_isdst is
4205 * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
4206 * TZ=UTC. This is problematic when restoring an archive under a
4207 * different timezone setting. If we get a failure, try again with
4208 * tm_isdst set to -1 ("don't know").
4209 *
4210 * XXX with or without this hack, we reconstruct createDate
4211 * incorrectly when the prevailing timezone is different from
4212 * pg_dump's. Next time we bump the archive version, we should flush
4213 * this representation and store a plain seconds-since-the-Epoch
4214 * timestamp instead.
4215 */
4216 AH->createDate = mktime(&crtm);
4217 if (AH->createDate == (time_t) -1)
4218 {
4219 crtm.tm_isdst = -1;
4220 AH->createDate = mktime(&crtm);
4221 if (AH->createDate == (time_t) -1)
4222 pg_log_warning("invalid creation date in header");
4223 }
4224 }
4225
4226 if (AH->version >= K_VERS_1_4)
4227 {
4228 AH->archdbname = ReadStr(AH);
4229 }
4230
4231 if (AH->version >= K_VERS_1_10)
4232 {
4233 AH->archiveRemoteVersion = ReadStr(AH);
4234 AH->archiveDumpVersion = ReadStr(AH);
4235 }
4236}
4237
4238
4239/*
4240 * checkSeek
4241 * check to see if ftell/fseek can be performed.
4242 */
4243bool
4244checkSeek(FILE *fp)
4245{
4246 pgoff_t tpos;
4247
4248 /* Check that ftello works on this file */
4249 tpos = ftello(fp);
4250 if (tpos < 0)
4251 return false;
4252
4253 /*
4254 * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
4255 * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
4256 * successful no-op even on files that are otherwise unseekable.
4257 */
4258 if (fseeko(fp, tpos, SEEK_SET) != 0)
4259 return false;
4260
4261 return true;
4262}
4263
4264
4265/*
4266 * dumpTimestamp
4267 */
4268static void
4269dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
4270{
4271 char buf[64];
4272
4273 if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
4274 ahprintf(AH, "-- %s %s\n\n", msg, buf);
4275}
4276
4277/*
4278 * Main engine for parallel restore.
4279 *
4280 * Parallel restore is done in three phases. In this first phase,
4281 * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
4282 * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
4283 * PRE_DATA items other than ACLs.) Entries we can't process now are
4284 * added to the pending_list for later phases to deal with.
4285 */
4286static void
4288{
4289 bool skipped_some;
4290 TocEntry *next_work_item;
4291
4292 pg_log_debug("entering restore_toc_entries_prefork");
4293
4294 /* Adjust dependency information */
4295 fix_dependencies(AH);
4296
4297 /*
4298 * Do all the early stuff in a single connection in the parent. There's no
4299 * great point in running it in parallel, in fact it will actually run
4300 * faster in a single connection because we avoid all the connection and
4301 * setup overhead. Also, pre-9.2 pg_dump versions were not very good
4302 * about showing all the dependencies of SECTION_PRE_DATA items, so we do
4303 * not risk trying to process them out-of-order.
4304 *
4305 * Stuff that we can't do immediately gets added to the pending_list.
4306 * Note: we don't yet filter out entries that aren't going to be restored.
4307 * They might participate in dependency chains connecting entries that
4308 * should be restored, so we treat them as live until we actually process
4309 * them.
4310 *
4311 * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
4312 * before DATA items, and all DATA items before POST_DATA items. That is
4313 * not certain to be true in older archives, though, and in any case use
4314 * of a list file would destroy that ordering (cf. SortTocFromFile). So
4315 * this loop cannot assume that it holds.
4316 */
4318 skipped_some = false;
4319 for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
4320 {
4321 bool do_now = true;
4322
4323 if (next_work_item->section != SECTION_PRE_DATA)
4324 {
4325 /* DATA and POST_DATA items are just ignored for now */
4326 if (next_work_item->section == SECTION_DATA ||
4327 next_work_item->section == SECTION_POST_DATA)
4328 {
4329 do_now = false;
4330 skipped_some = true;
4331 }
4332 else
4333 {
4334 /*
4335 * SECTION_NONE items, such as comments, can be processed now
4336 * if we are still in the PRE_DATA part of the archive. Once
4337 * we've skipped any items, we have to consider whether the
4338 * comment's dependencies are satisfied, so skip it for now.
4339 */
4340 if (skipped_some)
4341 do_now = false;
4342 }
4343 }
4344
4345 /*
4346 * Also skip items that need to be forced into later passes. We need
4347 * not set skipped_some in this case, since by assumption no main-pass
4348 * items could depend on these.
4349 */
4350 if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
4351 do_now = false;
4352
4353 if (do_now)
4354 {
4355 /* OK, restore the item and update its dependencies */
4356 pg_log_info("processing item %d %s %s",
4357 next_work_item->dumpId,
4358 next_work_item->desc, next_work_item->tag);
4359
4360 (void) restore_toc_entry(AH, next_work_item, false);
4361
4362 /* Reduce dependencies, but don't move anything to ready_heap */
4363 reduce_dependencies(AH, next_work_item, NULL);
4364 }
4365 else
4366 {
4367 /* Nope, so add it to pending_list */
4368 pending_list_append(pending_list, next_work_item);
4369 }
4370 }
4371
4372 /*
4373 * In --transaction-size mode, we must commit the open transaction before
4374 * dropping the database connection. This also ensures that child workers
4375 * can see the objects we've created so far.
4376 */
4377 if (AH->public.ropt->txn_size > 0)
4379
4380 /*
4381 * Now close parent connection in prep for parallel steps. We do this
4382 * mainly to ensure that we don't exceed the specified number of parallel
4383 * connections.
4384 */
4386
4387 /* blow away any transient state from the old connection */
4388 free(AH->currUser);
4389 AH->currUser = NULL;
4390 free(AH->currSchema);
4391 AH->currSchema = NULL;
4392 free(AH->currTablespace);
4393 AH->currTablespace = NULL;
4394 free(AH->currTableAm);
4395 AH->currTableAm = NULL;
4396}
4397
4398/*
4399 * Main engine for parallel restore.
4400 *
4401 * Parallel restore is done in three phases. In this second phase,
4402 * we process entries by dispatching them to parallel worker children
4403 * (processes on Unix, threads on Windows), each of which connects
4404 * separately to the database. Inter-entry dependencies are respected,
4405 * and so is the RestorePass multi-pass structure. When we can no longer
4406 * make any entries ready to process, we exit. Normally, there will be
4407 * nothing left to do; but if there is, the third phase will mop up.
4408 */
4409static void
4411 TocEntry *pending_list)
4412{
4413 binaryheap *ready_heap;
4414 TocEntry *next_work_item;
4415
4416 pg_log_debug("entering restore_toc_entries_parallel");
4417
4418 /* Set up ready_heap with enough room for all known TocEntrys */
4419 ready_heap = binaryheap_allocate(AH->tocCount,
4421 NULL);
4422
4423 /*
4424 * The pending_list contains all items that we need to restore. Move all
4425 * items that are available to process immediately into the ready_heap.
4426 * After this setup, the pending list is everything that needs to be done
4427 * but is blocked by one or more dependencies, while the ready heap
4428 * contains items that have no remaining dependencies and are OK to
4429 * process in the current restore pass.
4430 */
4432 move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4433
4434 /*
4435 * main parent loop
4436 *
4437 * Keep going until there is no worker still running AND there is no work
4438 * left to be done. Note invariant: at top of loop, there should always
4439 * be at least one worker available to dispatch a job to.
4440 */
4441 pg_log_info("entering main parallel loop");
4442
4443 for (;;)
4444 {
4445 /* Look for an item ready to be dispatched to a worker */
4446 next_work_item = pop_next_work_item(ready_heap, pstate);
4447 if (next_work_item != NULL)
4448 {
4449 /* If not to be restored, don't waste time launching a worker */
4450 if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
4451 {
4452 pg_log_info("skipping item %d %s %s",
4453 next_work_item->dumpId,
4454 next_work_item->desc, next_work_item->tag);
4455 /* Update its dependencies as though we'd completed it */
4456 reduce_dependencies(AH, next_work_item, ready_heap);
4457 /* Loop around to see if anything else can be dispatched */
4458 continue;
4459 }
4460
4461 pg_log_info("launching item %d %s %s",
4462 next_work_item->dumpId,
4463 next_work_item->desc, next_work_item->tag);
4464
4465 /* Dispatch to some worker */
4466 DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4467 mark_restore_job_done, ready_heap);
4468 }
4469 else if (IsEveryWorkerIdle(pstate))
4470 {
4471 /*
4472 * Nothing is ready and no worker is running, so we're done with
4473 * the current pass or maybe with the whole process.
4474 */
4475 if (AH->restorePass == RESTORE_PASS_LAST)
4476 break; /* No more parallel processing is possible */
4477
4478 /* Advance to next restore pass */
4479 AH->restorePass++;
4480 /* That probably allows some stuff to be made ready */
4481 move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4482 /* Loop around to see if anything's now ready */
4483 continue;
4484 }
4485 else
4486 {
4487 /*
4488 * We have nothing ready, but at least one child is working, so
4489 * wait for some subjob to finish.
4490 */
4491 }
4492
4493 /*
4494 * Before dispatching another job, check to see if anything has
4495 * finished. We should check every time through the loop so as to
4496 * reduce dependencies as soon as possible. If we were unable to
4497 * dispatch any job this time through, wait until some worker finishes
4498 * (and, hopefully, unblocks some pending item). If we did dispatch
4499 * something, continue as soon as there's at least one idle worker.
4500 * Note that in either case, there's guaranteed to be at least one
4501 * idle worker when we return to the top of the loop. This ensures we
4502 * won't block inside DispatchJobForTocEntry, which would be
4503 * undesirable: we'd rather postpone dispatching until we see what's
4504 * been unblocked by finished jobs.
4505 */
4506 WaitForWorkers(AH, pstate,
4507 next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4508 }
4509
4510 /* There should now be nothing in ready_heap. */
4511 Assert(binaryheap_empty(ready_heap));
4512
4513 binaryheap_free(ready_heap);
4514
4515 pg_log_info("finished main parallel loop");
4516}
4517
4518/*
4519 * Main engine for parallel restore.
4520 *
4521 * Parallel restore is done in three phases. In this third phase,
4522 * we mop up any remaining TOC entries by processing them serially.
4523 * This phase normally should have nothing to do, but if we've somehow
4524 * gotten stuck due to circular dependencies or some such, this provides
4525 * at least some chance of completing the restore successfully.
4526 */
4527static void
4529{
4530 RestoreOptions *ropt = AH->public.ropt;
4531 TocEntry *te;
4532
4533 pg_log_debug("entering restore_toc_entries_postfork");
4534
4535 /*
4536 * Now reconnect the single parent connection.
4537 */
4538 ConnectDatabaseAhx((Archive *) AH, &ropt->cparams, true);
4539
4540 /* re-establish fixed state */
4542
4543 /*
4544 * Make sure there is no work left due to, say, circular dependencies, or
4545 * some other pathological condition. If so, do it in the single parent
4546 * connection. We don't sweat about RestorePass ordering; it's likely we
4547 * already violated that.
4548 */
4549 for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4550 {
4551 pg_log_info("processing missed item %d %s %s",
4552 te->dumpId, te->desc, te->tag);
4553 (void) restore_toc_entry(AH, te, false);
4554 }
4555}
4556
4557/*
4558 * Check if te1 has an exclusive lock requirement for an item that te2 also
4559 * requires, whether or not te2's requirement is for an exclusive lock.
4560 */
4561static bool
4563{
4564 int j,
4565 k;
4566
4567 for (j = 0; j < te1->nLockDeps; j++)
4568 {
4569 for (k = 0; k < te2->nDeps; k++)
4570 {
4571 if (te1->lockDeps[j] == te2->dependencies[k])
4572 return true;
4573 }
4574 }
4575 return false;
4576}
4577
4578
4579/*
4580 * Initialize the header of the pending-items list.
4581 *
4582 * This is a circular list with a dummy TocEntry as header, just like the
4583 * main TOC list; but we use separate list links so that an entry can be in
4584 * the main TOC list as well as in the pending list.
4585 */
4586static void
4588{
4589 l->pending_prev = l->pending_next = l;
4590}
4591
4592/* Append te to the end of the pending-list headed by l */
4593static void
4595{
4596 te->pending_prev = l->pending_prev;
4597 l->pending_prev->pending_next = te;
4598 l->pending_prev = te;
4599 te->pending_next = l;
4600}
4601
4602/* Remove te from the pending-list */
4603static void
4605{
4608 te->pending_prev = NULL;
4609 te->pending_next = NULL;
4610}
4611
4612
4613/* qsort comparator for sorting TocEntries by dataLength */
4614static int
4615TocEntrySizeCompareQsort(const void *p1, const void *p2)
4616{
4617 const TocEntry *te1 = *(const TocEntry *const *) p1;
4618 const TocEntry *te2 = *(const TocEntry *const *) p2;
4619
4620 /* Sort by decreasing dataLength */
4621 if (te1->dataLength > te2->dataLength)
4622 return -1;
4623 if (te1->dataLength < te2->dataLength)
4624 return 1;
4625
4626 /* For equal dataLengths, sort by dumpId, just to be stable */
4627 if (te1->dumpId < te2->dumpId)
4628 return -1;
4629 if (te1->dumpId > te2->dumpId)
4630 return 1;
4631
4632 return 0;
4633}
4634
4635/* binaryheap comparator for sorting TocEntries by dataLength */
4636static int
4637TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
4638{
4639 /* return opposite of qsort comparator for max-heap */
4640 return -TocEntrySizeCompareQsort(&p1, &p2);
4641}
4642
4643
4644/*
4645 * Move all immediately-ready items from pending_list to ready_heap.
4646 *
4647 * Items are considered ready if they have no remaining dependencies and
4648 * they belong in the current restore pass. (See also reduce_dependencies,
4649 * which applies the same logic one-at-a-time.)
4650 */
4651static void
4653 binaryheap *ready_heap,
4654 RestorePass pass)
4655{
4656 TocEntry *te;
4657 TocEntry *next_te;
4658
4659 for (te = pending_list->pending_next; te != pending_list; te = next_te)
4660 {
4661 /* must save list link before possibly removing te from list */
4662 next_te = te->pending_next;
4663
4664 if (te->depCount == 0 &&
4665 _tocEntryRestorePass(te) == pass)
4666 {
4667 /* Remove it from pending_list ... */
4669 /* ... and add to ready_heap */
4670 binaryheap_add(ready_heap, te);
4671 }
4672 }
4673}
4674
4675/*
4676 * Find the next work item (if any) that is capable of being run now,
4677 * and remove it from the ready_heap.
4678 *
4679 * Returns the item, or NULL if nothing is runnable.
4680 *
4681 * To qualify, the item must have no remaining dependencies
4682 * and no requirements for locks that are incompatible with
4683 * items currently running. Items in the ready_heap are known to have
4684 * no remaining dependencies, but we have to check for lock conflicts.
4685 */
4686static TocEntry *
4688 ParallelState *pstate)
4689{
4690 /*
4691 * Search the ready_heap until we find a suitable item. Note that we do a
4692 * sequential scan through the heap nodes, so even though we will first
4693 * try to choose the highest-priority item, we might end up picking
4694 * something with a much lower priority. However, we expect that we will
4695 * typically be able to pick one of the first few items, which should
4696 * usually have a relatively high priority.
4697 */
4698 for (int i = 0; i < binaryheap_size(ready_heap); i++)
4699 {
4700 TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
4701 bool conflicts = false;
4702
4703 /*
4704 * Check to see if the item would need exclusive lock on something
4705 * that a currently running item also needs lock on, or vice versa. If
4706 * so, we don't want to schedule them together.
4707 */
4708 for (int k = 0; k < pstate->numWorkers; k++)
4709 {
4710 TocEntry *running_te = pstate->te[k];
4711
4712 if (running_te == NULL)
4713 continue;
4714 if (has_lock_conflicts(te, running_te) ||
4715 has_lock_conflicts(running_te, te))
4716 {
4717 conflicts = true;
4718 break;
4719 }
4720 }
4721
4722 if (conflicts)
4723 continue;
4724
4725 /* passed all tests, so this item can run */
4726 binaryheap_remove_node(ready_heap, i);
4727 return te;
4728 }
4729
4730 pg_log_debug("no item ready");
4731 return NULL;
4732}
4733
4734
4735/*
4736 * Restore a single TOC item in parallel with others
4737 *
4738 * this is run in the worker, i.e. in a thread (Windows) or a separate process
4739 * (everything else). A worker process executes several such work items during
4740 * a parallel backup or restore. Once we terminate here and report back that
4741 * our work is finished, the leader process will assign us a new work item.
4742 */
4743int
4745{
4746 int status;
4747
4748 Assert(AH->connection != NULL);
4749
4750 /* Count only errors associated with this TOC entry */
4751 AH->public.n_errors = 0;
4752
4753 /* Restore the TOC item */
4754 status = restore_toc_entry(AH, te, true);
4755
4756 return status;
4757}
4758
4759
4760/*
4761 * Callback function that's invoked in the leader process after a step has
4762 * been parallel restored.
4763 *
4764 * Update status and reduce the dependency count of any dependent items.
4765 */
4766static void
4768 TocEntry *te,
4769 int status,
4770 void *callback_data)
4771{
4772 binaryheap *ready_heap = (binaryheap *) callback_data;
4773
4774 pg_log_info("finished item %d %s %s",
4775 te->dumpId, te->desc, te->tag);
4776
4777 if (status == WORKER_CREATE_DONE)
4778 mark_create_done(AH, te);
4779 else if (status == WORKER_INHIBIT_DATA)
4780 {
4782 AH->public.n_errors++;
4783 }
4784 else if (status == WORKER_IGNORED_ERRORS)
4785 AH->public.n_errors++;
4786 else if (status != 0)
4787 pg_fatal("worker process failed: exit code %d",
4788 status);
4789
4790 reduce_dependencies(AH, te, ready_heap);
4791}
4792
4793
4794/*
4795 * Process the dependency information into a form useful for parallel restore.
4796 *
4797 * This function takes care of fixing up some missing or badly designed
4798 * dependencies, and then prepares subsidiary data structures that will be
4799 * used in the main parallel-restore logic, including:
4800 * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4801 * 2. We set up depCount fields that are the number of as-yet-unprocessed
4802 * dependencies for each TOC entry.
4803 *
4804 * We also identify locking dependencies so that we can avoid trying to
4805 * schedule conflicting items at the same time.
4806 */
4807static void
4809{
4810 TocEntry *te;
4811 int i;
4812
4813 /*
4814 * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4815 * items are marked as not being in any parallel-processing list.
4816 */
4817 for (te = AH->toc->next; te != AH->toc; te = te->next)
4818 {
4819 te->depCount = te->nDeps;
4820 te->revDeps = NULL;
4821 te->nRevDeps = 0;
4822 te->pending_prev = NULL;
4823 te->pending_next = NULL;
4824 }
4825
4826 /*
4827 * POST_DATA items that are shown as depending on a table need to be
4828 * re-pointed to depend on that table's data, instead. This ensures they
4829 * won't get scheduled until the data has been loaded.
4830 */
4832
4833 /*
4834 * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4835 * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4836 * one BLOB COMMENTS in such files.)
4837 */
4838 if (AH->version < K_VERS_1_11)
4839 {
4840 for (te = AH->toc->next; te != AH->toc; te = te->next)
4841 {
4842 if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4843 {
4844 TocEntry *te2;
4845
4846 for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4847 {
4848 if (strcmp(te2->desc, "BLOBS") == 0)
4849 {
4850 te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4851 te->dependencies[0] = te2->dumpId;
4852 te->nDeps++;
4853 te->depCount++;
4854 break;
4855 }
4856 }
4857 break;
4858 }
4859 }
4860 }
4861
4862 /*
4863 * At this point we start to build the revDeps reverse-dependency arrays,
4864 * so all changes of dependencies must be complete.
4865 */
4866
4867 /*
4868 * Count the incoming dependencies for each item. Also, it is possible
4869 * that the dependencies list items that are not in the archive at all
4870 * (that should not happen in 9.2 and later, but is highly likely in older
4871 * archives). Subtract such items from the depCounts.
4872 */
4873 for (te = AH->toc->next; te != AH->toc; te = te->next)
4874 {
4875 for (i = 0; i < te->nDeps; i++)
4876 {
4877 DumpId depid = te->dependencies[i];
4878
4879 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4880 AH->tocsByDumpId[depid]->nRevDeps++;
4881 else
4882 te->depCount--;
4883 }
4884 }
4885
4886 /*
4887 * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4888 * it as a counter below.
4889 */
4890 for (te = AH->toc->next; te != AH->toc; te = te->next)
4891 {
4892 if (te->nRevDeps > 0)
4893 te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4894 te->nRevDeps = 0;
4895 }
4896
4897 /*
4898 * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4899 * better agree with the loops above.
4900 */
4901 for (te = AH->toc->next; te != AH->toc; te = te->next)
4902 {
4903 for (i = 0; i < te->nDeps; i++)
4904 {
4905 DumpId depid = te->dependencies[i];
4906
4907 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4908 {
4909 TocEntry *otherte = AH->tocsByDumpId[depid];
4910
4911 otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4912 }
4913 }
4914 }
4915
4916 /*
4917 * Lastly, work out the locking dependencies.
4918 */
4919 for (te = AH->toc->next; te != AH->toc; te = te->next)
4920 {
4921 te->lockDeps = NULL;
4922 te->nLockDeps = 0;
4924 }
4925}
4926
4927/*
4928 * Change dependencies on table items to depend on table data items instead,
4929 * but only in POST_DATA items.
4930 *
4931 * Also, for any item having such dependency(s), set its dataLength to the
4932 * largest dataLength of the table data items it depends on. This ensures
4933 * that parallel restore will prioritize larger jobs (index builds, FK
4934 * constraint checks, etc) over smaller ones, avoiding situations where we
4935 * end a restore with only one active job working on a large table.
4936 */
4937static void
4939{
4940 TocEntry *te;
4941 int i;
4942 DumpId olddep;
4943
4944 for (te = AH->toc->next; te != AH->toc; te = te->next)
4945 {
4946 if (te->section != SECTION_POST_DATA)
4947 continue;
4948 for (i = 0; i < te->nDeps; i++)
4949 {
4950 olddep = te->dependencies[i];
4951 if (olddep <= AH->maxDumpId &&
4952 AH->tableDataId[olddep] != 0)
4953 {
4954 DumpId tabledataid = AH->tableDataId[olddep];
4955 TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4956
4957 te->dependencies[i] = tabledataid;
4958 te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4959 pg_log_debug("transferring dependency %d -> %d to %d",
4960 te->dumpId, olddep, tabledataid);
4961 }
4962 }
4963 }
4964}
4965
4966/*
4967 * Identify which objects we'll need exclusive lock on in order to restore
4968 * the given TOC entry (*other* than the one identified by the TOC entry
4969 * itself). Record their dump IDs in the entry's lockDeps[] array.
4970 */
4971static void
4973{
4974 DumpId *lockids;
4975 int nlockids;
4976 int i;
4977
4978 /*
4979 * We only care about this for POST_DATA items. PRE_DATA items are not
4980 * run in parallel, and DATA items are all independent by assumption.
4981 */
4982 if (te->section != SECTION_POST_DATA)
4983 return;
4984
4985 /* Quick exit if no dependencies at all */
4986 if (te->nDeps == 0)
4987 return;
4988
4989 /*
4990 * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4991 * and hence require exclusive lock. However, we know that CREATE INDEX
4992 * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4993 * category too ... but today is not that day.)
4994 */
4995 if (strcmp(te->desc, "INDEX") == 0)
4996 return;
4997
4998 /*
4999 * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
5000 * item listed among its dependencies. Originally all of these would have
5001 * been TABLE items, but repoint_table_dependencies would have repointed
5002 * them to the TABLE DATA items if those are present (which they might not
5003 * be, eg in a schema-only dump). Note that all of the entries we are
5004 * processing here are POST_DATA; otherwise there might be a significant
5005 * difference between a dependency on a table and a dependency on its
5006 * data, so that closer analysis would be needed here.
5007 */
5008 lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
5009 nlockids = 0;
5010 for (i = 0; i < te->nDeps; i++)
5011 {
5012 DumpId depid = te->dependencies[i];
5013
5014 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
5015 ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
5016 strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
5017 lockids[nlockids++] = depid;
5018 }
5019
5020 if (nlockids == 0)
5021 {
5022 free(lockids);
5023 return;
5024 }
5025
5026 te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
5027 te->nLockDeps = nlockids;
5028}
5029
5030/*
5031 * Remove the specified TOC entry from the depCounts of items that depend on
5032 * it, thereby possibly making them ready-to-run. Any pending item that
5033 * becomes ready should be moved to the ready_heap, if that's provided.
5034 */
5035static void
5037 binaryheap *ready_heap)
5038{
5039 int i;
5040
5041 pg_log_debug("reducing dependencies for %d", te->dumpId);
5042
5043 for (i = 0; i < te->nRevDeps; i++)
5044 {
5045 TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
5046
5047 Assert(otherte->depCount > 0);
5048 otherte->depCount--;
5049
5050 /*
5051 * It's ready if it has no remaining dependencies, and it belongs in
5052 * the current restore pass, and it is currently a member of the
5053 * pending list (that check is needed to prevent double restore in
5054 * some cases where a list-file forces out-of-order restoring).
5055 * However, if ready_heap == NULL then caller doesn't want any list
5056 * memberships changed.
5057 */
5058 if (otherte->depCount == 0 &&
5059 _tocEntryRestorePass(otherte) == AH->restorePass &&
5060 otherte->pending_prev != NULL &&
5061 ready_heap != NULL)
5062 {
5063 /* Remove it from pending list ... */
5064 pending_list_remove(otherte);
5065 /* ... and add to ready_heap */
5066 binaryheap_add(ready_heap, otherte);
5067 }
5068 }
5069}
5070
5071/*
5072 * Set the created flag on the DATA member corresponding to the given
5073 * TABLE member
5074 */
5075static void
5077{
5078 if (AH->tableDataId[te->dumpId] != 0)
5079 {
5080 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5081
5082 ted->created = true;
5083 }
5084}
5085
5086/*
5087 * Mark the DATA member corresponding to the given TABLE member
5088 * as not wanted
5089 */
5090static void
5092{
5093 pg_log_info("table \"%s\" could not be created, will not restore its data",
5094 te->tag);
5095
5096 if (AH->tableDataId[te->dumpId] != 0)
5097 {
5098 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5099
5100 ted->reqs = 0;
5101 }
5102}
5103
5104/*
5105 * Clone and de-clone routines used in parallel restoration.
5106 *
5107 * Enough of the structure is cloned to ensure that there is no
5108 * conflict between different threads each with their own clone.
5109 */
5112{
5113 ArchiveHandle *clone;
5114
5115 /* Make a "flat" copy */
5116 clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
5117 memcpy(clone, AH, sizeof(ArchiveHandle));
5118
5119 /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
5120 clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
5121 memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
5122
5123 /* Handle format-independent fields */
5124 memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
5125
5126 /* The clone will have its own connection, so disregard connection state */
5127 clone->connection = NULL;
5128 clone->connCancel = NULL;
5129 clone->currUser = NULL;
5130 clone->currSchema = NULL;
5131 clone->currTableAm = NULL;
5132 clone->currTablespace = NULL;
5133
5134 /* savedPassword must be local in case we change it while connecting */
5135 if (clone->savedPassword)
5136 clone->savedPassword = pg_strdup(clone->savedPassword);
5137
5138 /* clone has its own error count, too */
5139 clone->public.n_errors = 0;
5140
5141 /* clones should not share lo_buf */
5142 clone->lo_buf = NULL;
5143
5144 /*
5145 * Clone connections disregard --transaction-size; they must commit after
5146 * each command so that the results are immediately visible to other
5147 * workers.
5148 */
5149 clone->public.ropt->txn_size = 0;
5150
5151 /*
5152 * Connect our new clone object to the database, using the same connection
5153 * parameters used for the original connection.
5154 */
5155 ConnectDatabaseAhx((Archive *) clone, &clone->public.ropt->cparams, true);
5156
5157 /* re-establish fixed state */
5158 if (AH->mode == archModeRead)
5160 /* in write case, setupDumpWorker will fix up connection state */
5161
5162 /* Let the format-specific code have a chance too */
5163 clone->ClonePtr(clone);
5164
5165 Assert(clone->connection != NULL);
5166 return clone;
5167}
5168
5169/*
5170 * Release clone-local storage.
5171 *
5172 * Note: we assume any clone-local connection was already closed.
5173 */
5174void
5176{
5177 /* Should not have an open database connection */
5178 Assert(AH->connection == NULL);
5179
5180 /* Clear format-specific state */
5181 AH->DeClonePtr(AH);
5182
5183 /* Clear state allocated by CloneArchive */
5184 if (AH->sqlparse.curCmd)
5186
5187 /* Clear any connection-local state */
5188 free(AH->currUser);
5189 free(AH->currSchema);
5190 free(AH->currTablespace);
5191 free(AH->currTableAm);
5192 free(AH->savedPassword);
5193
5194 free(AH);
5195}
int lo_write(int fd, const char *buf, int len)
Definition: be-fsstubs.c:182
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
Definition: parallel.c:1071
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition: parallel.c:1463
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
Definition: parallel.c:909
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
Definition: parallel.c:1217
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition: parallel.c:1280
@ WFW_ALL_IDLE
Definition: parallel.h:35
@ WFW_GOT_STATUS
Definition: parallel.h:33
@ WFW_ONE_IDLE
Definition: parallel.h:34
void binaryheap_remove_node(binaryheap *heap, int n)
Definition: binaryheap.c:225
void binaryheap_add(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:154
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
#define binaryheap_size(h)
Definition: binaryheap.h:66
#define binaryheap_empty(h)
Definition: binaryheap.h:65
#define binaryheap_get_node(h, n)
Definition: binaryheap.h:67
#define PG_BINARY_R
Definition: c.h:1246
#define ngettext(s, p, n)
Definition: c.h:1152
#define PG_BINARY_A
Definition: c.h:1245
#define Max(x, y)
Definition: c.h:969
#define PG_BINARY_W
Definition: c.h:1247
bool EndCompressFileHandle(CompressFileHandle *CFH)
Definition: compress_io.c:288
char * supports_compression(const pg_compress_specification compression_spec)
Definition: compress_io.c:87
CompressFileHandle * InitCompressFileHandle(const pg_compress_specification compression_spec)
Definition: compress_io.c:194
const char * get_compress_algorithm_name(pg_compress_algorithm algorithm)
Definition: compression.c:69
@ PG_COMPRESSION_GZIP
Definition: compression.h:24
@ PG_COMPRESSION_NONE
Definition: compression.h:23
#define PGDUMP_STRFTIME_FMT
Definition: dumputils.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:1071
static size_t append_data(char *buf, size_t size, size_t nmemb, void *userdata)
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:7447
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7619
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
void PQclear(PGresult *res)
Definition: fe-exec.c:721
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
int lo_close(PGconn *conn, int fd)
Definition: fe-lobj.c:96
int lo_open(PGconn *conn, Oid lobjId, int mode)
Definition: fe-lobj.c:57
Oid lo_create(PGconn *conn, Oid lobjId)
Definition: fe-lobj.c:474
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
DataDirSyncMethod
Definition: file_utils.h:28
@ DATA_DIR_SYNC_METHOD_FSYNC
Definition: file_utils.h:29
Assert(PointerIsAligned(start, uint64))
const char * str
#define free(a)
Definition: header.h:65
int remaining
Definition: informix.c:692
char sign
Definition: informix.c:693
static DataDirSyncMethod sync_method
Definition: initdb.c:170
int b
Definition: isn.c:74
return true
Definition: isn.c:130
int j
Definition: isn.c:78
int i
Definition: isn.c:77
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
#define INV_WRITE
Definition: libpq-fs.h:21
static struct pg_tm tm
Definition: localtime.c:104
void pg_log_generic_v(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list ap)
Definition: logging.c:219
#define pg_log_info(...)
Definition: logging.h:124
@ PG_LOG_PRIMARY
Definition: logging.h:67
@ PG_LOG_ERROR
Definition: logging.h:43
#define pg_log_debug(...)
Definition: logging.h:133
static AmcheckOptions opts
Definition: pg_amcheck.c:112
@ SECTION_NONE
Definition: pg_backup.h:57
@ SECTION_POST_DATA
Definition: pg_backup.h:60
@ SECTION_PRE_DATA
Definition: pg_backup.h:58
@ SECTION_DATA
Definition: pg_backup.h:59
int DumpId
Definition: pg_backup.h:280
void(* SetupWorkerPtrType)(Archive *AH)
Definition: pg_backup.h:287
enum _archiveFormat ArchiveFormat
void ConnectDatabaseAhx(Archive *AHX, const ConnParams *cparams, bool isReconnect)
Definition: pg_backup_db.c:109
@ archModeWrite
Definition: pg_backup.h:51
@ archModeAppend
Definition: pg_backup.h:50
@ archModeRead
Definition: pg_backup.h:52
void DisconnectDatabase(Archive *AHX)
Definition: pg_backup_db.c:164
enum _teSection teSection
@ archUnknown
Definition: pg_backup.h:41
@ archTar
Definition: pg_backup.h:43
@ archCustom
Definition: pg_backup.h:42
@ archDirectory
Definition: pg_backup.h:45
@ archNull
Definition: pg_backup.h:44
static void fix_dependencies(ArchiveHandle *AH)
static void repoint_table_dependencies(ArchiveHandle *AH)
void DeCloneArchive(ArchiveHandle *AH)
static int _discoverArchiveFormat(ArchiveHandle *AH)
#define TEXT_DUMPALL_HEADER
int TocIDRequired(ArchiveHandle *AH, DumpId id)
bool checkSeek(FILE *fp)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
void warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
static void _becomeOwner(ArchiveHandle *AH, TocEntry *te)
void WriteHead(ArchiveHandle *AH)
int EndLO(Archive *AHX, Oid oid)
static CompressFileHandle * SaveOutput(ArchiveHandle *AH)
#define TOC_PREFIX_DATA
static void _becomeUser(ArchiveHandle *AH, const char *user)
static void pending_list_append(TocEntry *l, TocEntry *te)
size_t WriteInt(ArchiveHandle *AH, int i)
void ProcessArchiveRestoreOptions(Archive *AHX)
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
static void _moveBefore(TocEntry *pos, TocEntry *te)
RestoreOptions * NewRestoreOptions(void)
static bool _tocEntryIsACL(TocEntry *te)
static void move_to_ready_heap(TocEntry *pending_list, binaryheap *ready_heap, RestorePass pass)
char * ReadStr(ArchiveHandle *AH)
static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
static void buildTocEntryArrays(ArchiveHandle *AH)
static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
int archprintf(Archive *AH, const char *fmt,...)
static void StrictNamesCheck(RestoreOptions *ropt)
static void mark_restore_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
static RestorePass _tocEntryRestorePass(TocEntry *te)
TocEntry * ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId, ArchiveOpts *opts)
int StartLO(Archive *AHX, Oid oid)
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx)
#define TOC_PREFIX_STATS
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
static void setupRestoreWorker(Archive *AHX)
static void _reconnectToDB(ArchiveHandle *AH, const char *dbname)
static int TocEntrySizeCompareQsort(const void *p1, const void *p2)
Archive * OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
void StartRestoreLOs(ArchiveHandle *AH)
void RestoreArchive(Archive *AHX, bool append_data)
void CloseArchive(Archive *AHX)
static void pending_list_header_init(TocEntry *l)
static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
static void mark_create_done(ArchiveHandle *AH, TocEntry *te)
#define TEXT_DUMP_HEADER
static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
static bool is_load_via_partition_root(TocEntry *te)
Archive * CreateArchive(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker, DataDirSyncMethod sync_method)
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
DumpOptions * NewDumpOptions(void)
void SortTocFromFile(Archive *AHX)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
int ReadInt(ArchiveHandle *AH)
static void restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
static void _printTableAccessMethodNoStorage(ArchiveHandle *AH, TocEntry *te)
static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
#define TOC_PREFIX_NONE
static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
static void _doSetFixedOutputState(ArchiveHandle *AH)
void PrintTOCSummary(Archive *AHX)
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
static int RestoringToDB(ArchiveHandle *AH)
void ReadHead(ArchiveHandle *AH)
void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
static void pending_list_remove(TocEntry *te)
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2)
void ReadToc(ArchiveHandle *AH)
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, binaryheap *ready_heap)
static char * sanitize_line(const char *str, bool want_hyphen)
void EndRestoreLO(ArchiveHandle *AH, Oid oid)
static void _selectTablespace(ArchiveHandle *AH, const char *tablespace)
void WriteToc(ArchiveHandle *AH)
void archputs(const char *s, Archive *AH)
static bool _fileExistsInDirectory(const char *dir, const char *filename)
static void SetOutput(ArchiveHandle *AH, const char *filename, const pg_compress_specification compression_spec, bool append_data)
DumpOptions * dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
static void _doSetSessionAuth(ArchiveHandle *AH, const char *user)
void EndRestoreLOs(ArchiveHandle *AH)
void StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
void InitDumpOptions(DumpOptions *opts)
static ArchiveHandle * _allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
static void dump_lo_buf(ArchiveHandle *AH)
static TocEntry * pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate)
void WriteData(Archive *AHX, const void *data, size_t dLen)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
size_t WriteStr(ArchiveHandle *AH, const char *c)
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_1_15
void InitArchiveFmt_Null(ArchiveHandle *AH)
#define WORKER_CREATE_DONE
#define LOBBUFSIZE
void IssueACLPerBlob(ArchiveHandle *AH, TocEntry *te)
Definition: pg_backup_db.c:538
#define K_VERS_1_14
#define appendByteaLiteralAHX(buf, str, len, AH)
void struct _archiveOpts ArchiveOpts
void(* EndDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_SELF
#define K_VERS_1_10
void(* StartDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define ARCHIVE_MAJOR(version)
#define ARCHIVE_MINOR(version)
#define K_VERS_1_2
#define RESTORE_PASS_LAST
void InitArchiveFmt_Custom(ArchiveHandle *AH)
#define K_OFFSET_NO_DATA
void InitArchiveFmt_Tar(ArchiveHandle *AH)
#define REQ_SCHEMA
#define K_VERS_1_4
#define appendStringLiteralAHX(buf, str, AH)
void DropLOIfExists(ArchiveHandle *AH, Oid oid)
Definition: pg_backup_db.c:612
#define MAKE_ARCHIVE_VERSION(major, minor, rev)
#define K_VERS_1_5
void ReconnectToServer(ArchiveHandle *AH, const char *dbname)
Definition: pg_backup_db.c:73
#define ARCHIVE_REV(version)
#define REQ_STATS
#define WRITE_ERROR_EXIT
bool isValidTarHeader(char *header)
#define WORKER_IGNORED_ERRORS
#define REQ_SPECIAL
void IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te, const char *cmdBegin, const char *cmdEnd)
Definition: pg_backup_db.c:491
#define K_VERS_1_6
@ STAGE_INITIALIZING
@ STAGE_PROCESSING
@ STAGE_NONE
@ STAGE_FINALIZING
@ ACT_RESTORE
@ ACT_DUMP
#define K_VERS_1_0
#define K_OFFSET_POS_NOT_SET
#define K_OFFSET_POS_SET
#define K_VERS_MAX
#define K_VERS_1_8
#define WORKER_OK
@ OUTPUT_COPYDATA
@ OUTPUT_SQLCMDS
@ OUTPUT_OTHERDATA
#define K_VERS_1_12
#define REQ_DATA
#define READ_ERROR_EXIT(fd)
void InitArchiveFmt_Directory(ArchiveHandle *AH)
#define K_VERS_1_11
#define K_VERS_1_9
#define WORKER_INHIBIT_DATA
#define K_VERS_1_16
@ RESTORE_PASS_POST_ACL
@ RESTORE_PASS_ACL
@ RESTORE_PASS_MAIN
#define K_VERS_1_3
#define K_VERS_1_7
void EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
Definition: pg_backup_db.c:439
int ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
Definition: pg_backup_db.c:384
void exit_nicely(int code)
void * arg
#define DUMP_PRE_DATA
#define DUMP_DATA
#define DUMP_UNSECTIONED
#define pg_fatal(...)
#define DUMP_POST_DATA
static PgChecksumMode mode
Definition: pg_checksums.c:55
#define MAXPGPATH
const void size_t len
const void * data
static int sig
Definition: pg_ctl.c:80
int32 encoding
Definition: pg_database.h:41
static bool dosync
Definition: pg_dump.c:147
static void setupDumpWorker(Archive *AH)
Definition: pg_dump.c:1515
static char * filename
Definition: pg_dumpall.c:123
bool pg_get_line_buf(FILE *stream, StringInfo buf)
Definition: pg_get_line.c:95
static char * user
Definition: pg_regress.c:119
static char * buf
Definition: pg_test_fsync.c:72
#define pg_encoding_to_char
Definition: pg_wchar.h:630
#define pg_char_to_encoding
Definition: pg_wchar.h:629
static char * tablespace
Definition: pgbench.c:217
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define sprintf
Definition: port.h:241
#define snprintf
Definition: port.h:239
#define qsort(a, b, c, d)
Definition: port.h:479
#define pgoff_t
Definition: port.h:401
#define InvalidOid
Definition: postgres_ext.h:35
unsigned int Oid
Definition: postgres_ext.h:30
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
char * c
size_t pvsnprintf(char *buf, size_t len, const char *fmt, va_list args)
Definition: psprintf.c:103
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
const char * simple_string_list_not_touched(SimpleStringList *list)
Definition: simple_list.c:144
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
char * dbname
Definition: streamutil.c:49
const char * fmtQualifiedId(const char *schema, const char *id)
Definition: string_utils.c:296
const char * fmtId(const char *rawid)
Definition: string_utils.c:248
void setFmtEncoding(int encoding)
Definition: string_utils.c:69
void appendPsqlMetaConnect(PQExpBuffer buf, const char *dbname)
Definition: string_utils.c:743
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
int minRemoteVersion
Definition: pg_backup.h:232
char * remoteVersionStr
Definition: pg_backup.h:228
DumpOptions * dopt
Definition: pg_backup.h:224
bool exit_on_error
Definition: pg_backup.h:247
char * searchpath
Definition: pg_backup.h:243
int maxRemoteVersion
Definition: pg_backup.h:233
int n_errors
Definition: pg_backup.h:248
bool std_strings
Definition: pg_backup.h:240
int numWorkers
Definition: pg_backup.h:235
int encoding
Definition: pg_backup.h:239
int verbose
Definition: pg_backup.h:227
RestoreOptions * ropt
Definition: pg_backup.h:225
Oid tableoid
Definition: pg_backup.h:276
bool(* write_func)(const void *ptr, size_t size, struct CompressFileHandle *CFH)
Definition: compress_io.h:139
bool(* open_func)(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
Definition: compress_io.h:111
TocEntry ** te
Definition: parallel.h:59
int numWorkers
Definition: parallel.h:57
SimpleStringListCell * head
Definition: simple_list.h:42
ArchiverStage stage
RestorePass restorePass
ArchiveFormat format
struct _tocEntry * toc
DeClonePtrType DeClonePtr
EndLOsPtrType EndLOsPtr
DataDirSyncMethod sync_method
struct _tocEntry * lastErrorTE
ReadExtraTocPtrType ReadExtraTocPtr
struct _tocEntry * currentTE
CustomOutPtrType CustomOutPtr
PGcancel *volatile connCancel
StartLOsPtrType StartLOsPtr
ArchiveEntryPtrType ArchiveEntryPtr
pg_compress_specification compression_spec
WriteDataPtrType WriteDataPtr
StartLOPtrType StartLOPtr
struct _tocEntry ** tocsByDumpId
ClonePtrType ClonePtr
WriteBufPtrType WriteBufPtr
PrepParallelRestorePtrType PrepParallelRestorePtr
EndLOPtrType EndLOPtr
WriteExtraTocPtrType WriteExtraTocPtr
ReadBytePtrType ReadBytePtr
PrintTocDataPtrType PrintTocDataPtr
struct _tocEntry * currToc
WriteBytePtrType WriteBytePtr
sqlparseInfo sqlparse
ReadBufPtrType ReadBufPtr
PrintExtraTocPtrType PrintExtraTocPtr
ArchiverStage lastErrorStage
StartDataPtrType StartDataPtr
ReopenPtrType ReopenPtr
ArchiverOutput outputKind
EndDataPtrType EndDataPtr
SetupWorkerPtrType SetupWorkerPtr
ClosePtrType ClosePtr
char * pgport
Definition: pg_backup.h:87
char * pghost
Definition: pg_backup.h:88
trivalue promptPassword
Definition: pg_backup.h:90
char * username
Definition: pg_backup.h:89
char * dbname
Definition: pg_backup.h:86
int dump_inserts
Definition: pg_backup.h:178
int column_inserts
Definition: pg_backup.h:182
int use_setsessauth
Definition: pg_backup.h:195
int outputCreateDB
Definition: pg_backup.h:203
bool include_everything
Definition: pg_backup.h:200
int sequence_data
Definition: pg_backup.h:209
int disable_dollar_quoting
Definition: pg_backup.h:181
bool dumpSchema
Definition: pg_backup.h:213
int no_comments
Definition: pg_backup.h:184
int outputNoTableAm
Definition: pg_backup.h:193
int enable_row_security
Definition: pg_backup.h:196
char * outputSuperuser
Definition: pg_backup.h:207
int dumpSections
Definition: pg_backup.h:175
int no_security_labels
Definition: pg_backup.h:187
bool dumpData
Definition: pg_backup.h:214
bool dumpStatistics
Definition: pg_backup.h:215
int no_publications
Definition: pg_backup.h:186
ConnParams cparams
Definition: pg_backup.h:170
const char * lockWaitTimeout
Definition: pg_backup.h:177
int no_subscriptions
Definition: pg_backup.h:188
bool aclsSkip
Definition: pg_backup.h:176
int outputClean
Definition: pg_backup.h:202
int no_policies
Definition: pg_backup.h:185
int outputNoTablespaces
Definition: pg_backup.h:194
int disable_triggers
Definition: pg_backup.h:192
int outputNoOwner
Definition: pg_backup.h:206
SimpleStringList schemaExcludeNames
Definition: pg_backup.h:140
int include_everything
Definition: pg_backup.h:125
bool * idWanted
Definition: pg_backup.h:157
int suppressDumpWarnings
Definition: pg_backup.h:151
ConnParams cparams
Definition: pg_backup.h:145
SimpleStringList functionNames
Definition: pg_backup.h:138
char * use_role
Definition: pg_backup.h:107
SimpleStringList tableNames
Definition: pg_backup.h:142
SimpleStringList indexNames
Definition: pg_backup.h:137
pg_compress_specification compression_spec
Definition: pg_backup.h:149
int no_subscriptions
Definition: pg_backup.h:117
SimpleStringList triggerNames
Definition: pg_backup.h:141
bool dumpStatistics
Definition: pg_backup.h:165
int disable_dollar_quoting
Definition: pg_backup.h:109
SimpleStringList schemaNames
Definition: pg_backup.h:139
const char * filename
Definition: pg_backup.h:120
int no_security_labels
Definition: pg_backup.h:116
char * tocFile
Definition: pg_backup.h:128
char * superuser
Definition: pg_backup.h:106
const char * lockWaitTimeout
Definition: pg_backup.h:124
int enable_row_security
Definition: pg_backup.h:158
int disable_triggers
Definition: pg_backup.h:102
int noDataForFailedTables
Definition: pg_backup.h:147
struct _tocEntry * pending_next
struct _tocEntry * prev
teSection section
struct _tocEntry * pending_prev
DefnDumperPtr defnDumper
DataDumperPtr dataDumper
pgoff_t dataLength
CatalogId catalogId
struct _tocEntry * next
const void * dataDumperArg
DumpId * revDeps
const void * defnDumperArg
DumpId * dependencies
DumpId * lockDeps
pg_compress_algorithm algorithm
Definition: compression.h:34
int tm_sec
Definition: pgtime.h:36
PQExpBuffer curCmd
unsigned short st_mode
Definition: win32_port.h:258
static void * fn(void *arg)
Definition: thread-alloc.c:119
@ TRI_DEFAULT
Definition: vacuumlo.c:36
const char * type
#define stat
Definition: win32_port.h:274
#define S_ISDIR(m)
Definition: win32_port.h:315
#define ftello(stream)
Definition: win32_port.h:209
#define S_ISREG(m)
Definition: win32_port.h:318
#define fseeko(stream, offset, origin)
Definition: win32_port.h:206
static void StartTransaction(void)
Definition: xact.c:2064
static void CommitTransaction(void)
Definition: xact.c:2228
ArchiveMode
Definition: xlog.h:64