Split the SetSubscriptionRelState function into two
authorPeter Eisentraut <peter_e@gmx.net>
Fri, 6 Apr 2018 14:00:26 +0000 (10:00 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Fri, 6 Apr 2018 14:00:26 +0000 (10:00 -0400)
We don't actually need the insert-or-update logic, so it's clearer to
have separate functions for the inserting and updating.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
src/backend/catalog/pg_subscription.c
src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/tablesync.c
src/include/catalog/pg_subscription_rel.h

index 8e16d3b7bcec3239fb151f0038e23966a1c92a87..8705d8b1d36d536e3ee33266b504a890f3208896 100644 (file)
@@ -227,24 +227,15 @@ textarray_to_stringlist(ArrayType *textarray)
 }
 
 /*
- * Set the state of a subscription table.
- *
- * If update_only is true and the record for given table doesn't exist, do
- * nothing.  This can be used to avoid inserting a new record that was deleted
- * by someone else.  Generally, subscription DDL commands should use false,
- * workers should use true.
- *
- * The insert-or-update logic in this function is not concurrency safe so it
- * might raise an error in rare circumstances.  But if we took a stronger lock
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
+ * Add new state record for a subscription table.
  */
 Oid
-SetSubscriptionRelState(Oid subid, Oid relid, char state,
-                       XLogRecPtr sublsn, bool update_only)
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+                       XLogRecPtr sublsn)
 {
    Relation    rel;
    HeapTuple   tup;
-   Oid         subrelid = InvalidOid;
+   Oid         subrelid;
    bool        nulls[Natts_pg_subscription_rel];
    Datum       values[Natts_pg_subscription_rel];
 
@@ -256,57 +247,81 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
    tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
                              ObjectIdGetDatum(relid),
                              ObjectIdGetDatum(subid));
+   if (HeapTupleIsValid(tup))
+       elog(ERROR, "subscription table %u in subscription %u already exists",
+            relid, subid);
 
-   /*
-    * If the record for given table does not exist yet create new record,
-    * otherwise update the existing one.
-    */
-   if (!HeapTupleIsValid(tup) && !update_only)
-   {
-       /* Form the tuple. */
-       memset(values, 0, sizeof(values));
-       memset(nulls, false, sizeof(nulls));
-       values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
-       values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
-       values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
-       if (sublsn != InvalidXLogRecPtr)
-           values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-       else
-           nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
-
-       tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
-       /* Insert tuple into catalog. */
-       subrelid = CatalogTupleInsert(rel, tup);
-
-       heap_freetuple(tup);
-   }
-   else if (HeapTupleIsValid(tup))
-   {
-       bool        replaces[Natts_pg_subscription_rel];
+   /* Form the tuple. */
+   memset(values, 0, sizeof(values));
+   memset(nulls, false, sizeof(nulls));
+   values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+   values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+   values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+   if (sublsn != InvalidXLogRecPtr)
+       values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+   else
+       nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
 
-       /* Update the tuple. */
-       memset(values, 0, sizeof(values));
-       memset(nulls, false, sizeof(nulls));
-       memset(replaces, false, sizeof(replaces));
+   tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
-       replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
-       values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+   /* Insert tuple into catalog. */
+   subrelid = CatalogTupleInsert(rel, tup);
 
-       replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
-       if (sublsn != InvalidXLogRecPtr)
-           values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-       else
-           nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+   heap_freetuple(tup);
 
-       tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
-                               replaces);
+   /* Cleanup. */
+   heap_close(rel, NoLock);
 
-       /* Update the catalog. */
-       CatalogTupleUpdate(rel, &tup->t_self, tup);
+   return subrelid;
+}
 
-       subrelid = HeapTupleGetOid(tup);
-   }
+/*
+ * Update the state of a subscription table.
+ */
+Oid
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+                          XLogRecPtr sublsn)
+{
+   Relation    rel;
+   HeapTuple   tup;
+   Oid         subrelid;
+   bool        nulls[Natts_pg_subscription_rel];
+   Datum       values[Natts_pg_subscription_rel];
+   bool        replaces[Natts_pg_subscription_rel];
+
+   LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+   rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+   /* Try finding existing mapping. */
+   tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+                             ObjectIdGetDatum(relid),
+                             ObjectIdGetDatum(subid));
+   if (!HeapTupleIsValid(tup))
+       elog(ERROR, "subscription table %u in subscription %u does not exist",
+            relid, subid);
+
+   /* Update the tuple. */
+   memset(values, 0, sizeof(values));
+   memset(nulls, false, sizeof(nulls));
+   memset(replaces, false, sizeof(replaces));
+
+   replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+   values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+   replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+   if (sublsn != InvalidXLogRecPtr)
+       values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+   else
+       nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+   tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+                           replaces);
+
+   /* Update the catalog. */
+   CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+   subrelid = HeapTupleGetOid(tup);
 
    /* Cleanup. */
    heap_close(rel, NoLock);
index 2694e1b2d746ff714b25fdcbd8bc38fd3717b31c..f138e61a8d3d9d111e52b5fe59742df1967e9042 100644 (file)
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
                CheckSubscriptionRelkind(get_rel_relkind(relid),
                                         rv->schemaname, rv->relname);
 
-               SetSubscriptionRelState(subid, relid, table_state,
-                                       InvalidXLogRecPtr, false);
+               AddSubscriptionRelState(subid, relid, table_state,
+                                       InvalidXLogRecPtr);
            }
 
            /*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
        if (!bsearch(&relid, subrel_local_oids,
                     list_length(subrel_states), sizeof(Oid), oid_cmp))
        {
-           SetSubscriptionRelState(sub->oid, relid,
+           AddSubscriptionRelState(sub->oid, relid,
                                    copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-                                   InvalidXLogRecPtr, false);
+                                   InvalidXLogRecPtr);
            ereport(DEBUG1,
                    (errmsg("table \"%s.%s\" added to subscription \"%s\"",
                            rv->schemaname, rv->relname, sub->name)));
index e50b9f790565b912594995b1189fe0573f2fb575..acc6498567d07c3e93d0adf5039a10c693e38f64 100644 (file)
@@ -298,11 +298,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
        SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-       SetSubscriptionRelState(MyLogicalRepWorker->subid,
-                               MyLogicalRepWorker->relid,
-                               MyLogicalRepWorker->relstate,
-                               MyLogicalRepWorker->relstate_lsn,
-                               true);
+       UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+                                  MyLogicalRepWorker->relid,
+                                  MyLogicalRepWorker->relstate,
+                                  MyLogicalRepWorker->relstate_lsn);
 
        walrcv_endstreaming(wrconn, &tli);
        finish_sync_worker();
@@ -427,9 +426,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                    StartTransactionCommand();
                    started_tx = true;
                }
-               SetSubscriptionRelState(MyLogicalRepWorker->subid,
-                                       rstate->relid, rstate->state,
-                                       rstate->lsn, true);
+
+               UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+                                          rstate->relid, rstate->state,
+                                          rstate->lsn);
            }
        }
        else
@@ -870,11 +870,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
                /* Update the state and make it visible to others. */
                StartTransactionCommand();
-               SetSubscriptionRelState(MyLogicalRepWorker->subid,
-                                       MyLogicalRepWorker->relid,
-                                       MyLogicalRepWorker->relstate,
-                                       MyLogicalRepWorker->relstate_lsn,
-                                       true);
+               UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+                                          MyLogicalRepWorker->relid,
+                                          MyLogicalRepWorker->relstate,
+                                          MyLogicalRepWorker->relstate_lsn);
                CommitTransactionCommand();
                pgstat_report_stat(false);
 
@@ -961,11 +960,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                     * Update the new state in catalog.  No need to bother
                     * with the shmem state as we are exiting for good.
                     */
-                   SetSubscriptionRelState(MyLogicalRepWorker->subid,
-                                           MyLogicalRepWorker->relid,
-                                           SUBREL_STATE_SYNCDONE,
-                                           *origin_startpos,
-                                           true);
+                   UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+                                              MyLogicalRepWorker->relid,
+                                              SUBREL_STATE_SYNCDONE,
+                                              *origin_startpos);
                    finish_sync_worker();
                }
                break;
index d936973a9da091f768037fa1f4e543daf500356f..5cf268f181451e894be293e1feeaeb7f699db430 100644 (file)
@@ -67,8 +67,10 @@ typedef struct SubscriptionRelState
    char        state;
 } SubscriptionRelState;
 
-extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
-                       XLogRecPtr sublsn, bool update_only);
+extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
+                       XLogRecPtr sublsn);
+extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+                          XLogRecPtr sublsn);
 extern char GetSubscriptionRelState(Oid subid, Oid relid,
                        XLogRecPtr *sublsn, bool missing_ok);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);