Implement waiting for given lsn at transaction start
authorAlexander Korotkov <akorotkov@postgresql.org>
Tue, 7 Apr 2020 20:51:10 +0000 (23:51 +0300)
committerAlexander Korotkov <akorotkov@postgresql.org>
Tue, 7 Apr 2020 20:51:10 +0000 (23:51 +0300)
This commit adds following optional clause to BEGIN and START TRANSACTION
commands.

  WAIT FOR LSN lsn [ TIMEOUT timeout ]

New clause pospones transaction start till given lsn is applied on standby.
This clause allows user be sure, that changes previously made on primary would
be visible on standby.

New shared memory struct is used to track awaited lsn per backend.  Recovery
process wakes up backend once required lsn is applied.

Author: Ivan Kartyshov, Anna Akenteva
Reviewed-by: Craig Ringer, Thomas Munro, Robert Haas, Kyotaro Horiguchi
Reviewed-by: Masahiko Sawada, Ants Aasma, Dmitry Ivanov, Simon Riggs
Reviewed-by: Amit Kapila, Alexander Korotkov
Discussion: https://postgr.es/m/0240c26c-9f84-30ea-fca9-93ab2df5f305%40postgrespro.ru

20 files changed:
doc/src/sgml/ref/begin.sgml
doc/src/sgml/ref/start_transaction.sgml
src/backend/access/transam/xlog.c
src/backend/commands/Makefile
src/backend/commands/wait.c [new file with mode: 0644]
src/backend/nodes/copyfuncs.c
src/backend/nodes/equalfuncs.c
src/backend/nodes/outfuncs.c
src/backend/parser/gram.y
src/backend/storage/ipc/ipci.c
src/backend/storage/lmgr/proc.c
src/backend/tcop/utility.c
src/backend/utils/adt/misc.c
src/include/commands/wait.h [new file with mode: 0644]
src/include/nodes/nodes.h
src/include/nodes/parsenodes.h
src/include/parser/kwlist.h
src/include/utils/timestamp.h
src/test/recovery/t/020_begin_wait.pl [new file with mode: 0644]
src/tools/pgindent/typedefs.list

index c23bbfb4e711c3f8c7df8939f862d050dadc0b85..66d9ad7cb234b8115bbf1c2b6f1ffe26ea6e7959 100644 (file)
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] [ WAIT FOR LSN <replaceable class="parameter">lsn_value</replaceable> [TIMEOUT <replaceable class="parameter">number_of_milliseconds</replaceable> ] ]
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
@@ -63,6 +63,17 @@ BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</
    <xref linkend="sql-set-transaction"/>
    was executed.
   </para>
+
+  <para>
+   The <literal>WAIT FOR</literal> clause allows to wait for the target log
+   sequence number (<acronym>LSN</acronym>) to be replayed on standby before
+   starting the transaction in <productname>PostgreSQL</productname> databases
+   with master-standby asynchronous replication. Wait time can be limited by
+   specifying a timeout, which is measured in milliseconds and must be a positive
+   integer. If <acronym>LSN</acronym> was not reached before timeout, transaction
+   doesn't begin. Waiting can be interrupted by cancelling
+   <literal>BEGIN</literal> command.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -146,6 +157,10 @@ BEGIN;
    different purpose in embedded SQL. You are advised to be careful
    about the transaction semantics when porting database applications.
   </para>
+
+  <para>
+   There is no <literal>WAIT FOR</literal> clause in the SQL standard.
+  </para>
  </refsect1>
 
  <refsect1>
index d6cd1d41779216ea600faf9501805dae6344e392..b94ab00b403202eb66914dac5a3659bb403a06a0 100644 (file)
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] [ WAIT FOR LSN <replaceable class="parameter">lsn_value</replaceable> [TIMEOUT <replaceable class="parameter">number_of_milliseconds</replaceable> ] ]
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
@@ -40,6 +40,17 @@ START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable
    characteristics, as if <xref linkend="sql-set-transaction"/> was executed. This is the same
    as the <xref linkend="sql-begin"/> command.
   </para>
+
+  <para>
+   The <literal>WAIT FOR</literal> clause allows to wait for the target log
+   sequence number (<acronym>LSN</acronym>) to be replayed on standby before
+   starting the transaction in <productname>PostgreSQL</productname> databases
+   with master-standby asynchronous replication. Wait time can be limited by
+   specifying a timeout, which is measured in milliseconds and must be a positive
+   integer. If <acronym>LSN</acronym> was not reached before timeout, transaction
+   doesn't begin. Waiting can be interrupted by cancelling
+   <literal>START TRANSACTION</literal> command.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -78,6 +89,10 @@ START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable
    omitted.
   </para>
 
+  <para>
+   There is no <literal>WAIT FOR</literal> clause in the SQL standard.
+  </para>
+
   <para>
    See also the compatibility section of <xref linkend="sql-set-transaction"/>.
   </para>
index ec55d68d2726f53b0f0c8510c1673bdf581acfdf..1651e15e8988cd2cbc2fc793cd5dc13b706fe1e2 100644 (file)
@@ -42,6 +42,7 @@
 #include "catalog/pg_database.h"
 #include "commands/progress.h"
 #include "commands/tablespace.h"
+#include "commands/wait.h"
 #include "common/controldata_utils.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
@@ -7154,6 +7155,7 @@ StartupXLOG(void)
                        do
                        {
                                bool            switchedTLI = false;
+                               XLogRecPtr      minWaitedLSN;
 
 #ifdef WAL_DEBUG
                                if (XLOG_DEBUG ||
@@ -7357,6 +7359,17 @@ StartupXLOG(void)
                                        break;
                                }
 
+                               /*
+                                * If we replayed an LSN that someone was waiting for, set
+                                * latches in shared memory array to notify the waiter.
+                                */
+                               minWaitedLSN = WaitLSNGetMin();
+                               if (!XLogRecPtrIsInvalid(minWaitedLSN) &&
+                                       minWaitedLSN <= XLogCtl->lastReplayedEndRecPtr)
+                               {
+                                       WaitLSNSetLatch(XLogCtl->lastReplayedEndRecPtr);
+                               }
+
                                /* Else, try to fetch the next WAL record */
                                record = ReadRecord(xlogreader, LOG, false);
                        } while (record != NULL);
index d4815d3ce659a2984f033043a7e13b0f6001a9dd..9b310926c12b690fa8a26e546085daa311b30474 100644 (file)
@@ -57,6 +57,7 @@ OBJS = \
        user.o \
        vacuum.o \
        variable.o \
-       view.o
+       view.o \
+       wait.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644 (file)
index 0000000..b7aee5b
--- /dev/null
@@ -0,0 +1,295 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ *       Implements WAIT FOR clause for BEGIN and START TRANSACTION commands.
+ *       This clause allows waiting for given LSN to be replayed on standby.
+ *
+ * Copyright (c) 2020, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/backendid.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+
+/*
+ * Shared memory structure representing information about LSNs, which backends
+ * are waiting for replay.
+ */
+typedef struct
+{
+       slock_t         mutex;                  /* mutex protecting the fields below */
+       int                     max_backend_id; /* max backend_id present in lsns[] */
+       pg_atomic_uint64 min_lsn;       /* minimal waited LSN */
+       /* per-backend array of waited LSNs */
+       XLogRecPtr      lsns[FLEXIBLE_ARRAY_MEMBER];
+}                      WaitLSNState;
+
+static WaitLSNState * state;
+
+/*
+ * Add the wait event of the current backend to shared memory array
+ */
+static void
+WaitLSNAdd(XLogRecPtr lsn_to_wait)
+{
+       SpinLockAcquire(&state->mutex);
+       if (state->max_backend_id < MyBackendId)
+               state->max_backend_id = MyBackendId;
+
+       state->lsns[MyBackendId] = lsn_to_wait;
+
+       if (lsn_to_wait < state->min_lsn.value)
+               state->min_lsn.value = lsn_to_wait;
+       SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete wait event of the current backend from the shared memory array.
+ */
+void
+WaitLSNDelete(void)
+{
+       int                     i;
+       XLogRecPtr      deleted_lsn;
+
+       SpinLockAcquire(&state->mutex);
+
+       deleted_lsn = state->lsns[MyBackendId];
+       state->lsns[MyBackendId] = InvalidXLogRecPtr;
+
+       /* If we are deleting the minimal LSN, then choose the next min_lsn */
+       if (!XLogRecPtrIsInvalid(deleted_lsn) &&
+               deleted_lsn == state->min_lsn.value)
+       {
+               state->min_lsn.value = InvalidXLogRecPtr;
+               for (i = 2; i <= state->max_backend_id; i++)
+               {
+                       if (!XLogRecPtrIsInvalid(state->lsns[i]) &&
+                               (state->lsns[i] < state->min_lsn.value ||
+                                XLogRecPtrIsInvalid(state->min_lsn.value)))
+                       {
+                               state->min_lsn.value = state->lsns[i];
+                       }
+               }
+       }
+
+       /* If deleting from the end of the array, shorten the array's used part */
+       if (state->max_backend_id == MyBackendId)
+       {
+               for (i = (MyBackendId); i >= 2; i--)
+                       if (!XLogRecPtrIsInvalid(state->lsns[i]))
+                       {
+                               state->max_backend_id = i;
+                               break;
+                       }
+       }
+
+       SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitLSNState
+ */
+Size
+WaitLSNShmemSize(void)
+{
+       Size            size;
+
+       size = offsetof(WaitLSNState, lsns);
+       size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+       return size;
+}
+
+/*
+ * Initialize an shared memory structure for waiting for LSN
+ */
+void
+WaitLSNShmemInit(void)
+{
+       bool            found;
+       uint32          i;
+
+       state = (WaitLSNState *) ShmemInitStruct("pg_wait_lsn",
+                                                                                        WaitLSNShmemSize(),
+                                                                                        &found);
+       if (!found)
+       {
+               SpinLockInit(&state->mutex);
+
+               for (i = 0; i < (MaxBackends + 1); i++)
+                       state->lsns[i] = InvalidXLogRecPtr;
+
+               state->max_backend_id = 0;
+               pg_atomic_init_u64(&state->min_lsn, InvalidXLogRecPtr);
+       }
+}
+
+/*
+ * Set latches in shared memory to signal that new LSN has been replayed
+ */
+void
+WaitLSNSetLatch(XLogRecPtr cur_lsn)
+{
+       uint32          i;
+       int                     max_backend_id;
+       PGPROC     *backend;
+
+       SpinLockAcquire(&state->mutex);
+       max_backend_id = state->max_backend_id;
+
+       for (i = 2; i <= max_backend_id; i++)
+       {
+               backend = BackendIdGetProc(i);
+
+               if (backend && state->lsns[i] != 0 &&
+                       state->lsns[i] <= cur_lsn)
+               {
+                       SetLatch(&backend->procLatch);
+               }
+       }
+       SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Get minimal LSN that some backend is waiting for
+ */
+XLogRecPtr
+WaitLSNGetMin(void)
+{
+       return state->min_lsn.value;
+}
+
+/*
+ * On WAIT use a latch to wait till LSN is replayed, postmaster dies or timeout
+ * happens. Timeout is specified in milliseconds.  Returns true if LSN was
+ * reached and false otherwise.
+ */
+bool
+WaitLSNUtility(XLogRecPtr target_lsn, const int timeout_ms)
+{
+       XLogRecPtr      cur_lsn;
+       int                     latch_events;
+       float8          endtime;
+       bool            res = false;
+       bool            wait_forever = (timeout_ms <= 0);
+
+       endtime = GetNowFloat() + timeout_ms / 1000.0;
+
+       latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+       /* Check if we already reached the needed LSN */
+       cur_lsn = GetXLogReplayRecPtr(NULL);
+       if (cur_lsn >= target_lsn)
+               return true;
+
+       WaitLSNAdd(target_lsn);
+       ResetLatch(MyLatch);
+
+       /* Recheck if LSN was reached while WaitLSNAdd() and ResetLatch() */
+       cur_lsn = GetXLogReplayRecPtr(NULL);
+       if (cur_lsn >= target_lsn)
+               return true;
+
+       for (;;)
+       {
+               int                     rc;
+               float8          time_left = 0;
+               long            time_left_ms = 0;
+
+               time_left = endtime - GetNowFloat();
+
+               /* Use 1 second as the default timeout to check for interrupts */
+               if (wait_forever || time_left < 0 || time_left > 1.0)
+                       time_left_ms = 1000;
+               else
+                       time_left_ms = (long) ceil(time_left * 1000.0);
+
+               /* If interrupt, LockErrorCleanup() will do WaitLSNDelete() for us */
+               CHECK_FOR_INTERRUPTS();
+
+               /* If postmaster dies, finish immediately */
+               if (!PostmasterIsAlive())
+                       break;
+
+               rc = WaitLatch(MyLatch, latch_events, time_left_ms,
+                                          WAIT_EVENT_CLIENT_READ);
+
+               ResetLatch(MyLatch);
+
+               if (rc & WL_LATCH_SET)
+                       cur_lsn = GetXLogReplayRecPtr(NULL);
+
+               if (rc & WL_TIMEOUT)
+               {
+                       time_left = endtime - GetNowFloat();
+                       /* If the time specified by user has passed, stop waiting */
+                       if (!wait_forever && time_left <= 0.0)
+                               break;
+                       cur_lsn = GetXLogReplayRecPtr(NULL);
+               }
+
+               /* If LSN has been replayed */
+               if (target_lsn <= cur_lsn)
+                       break;
+       }
+
+       WaitLSNDelete();
+
+       if (cur_lsn < target_lsn)
+               ereport(WARNING,
+                               (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
+                                errmsg("didn't start transaction because LSN was not reached"),
+                                errhint("Try to increase wait timeout.")));
+       else
+               res = true;
+
+       return res;
+}
+
+/*
+ * Implementation of WAIT FOR clause for BEGIN and START TRANSACTION commands
+ */
+int
+WaitLSNMain(WaitClause *stmt, DestReceiver *dest)
+{
+       TupleDesc       tupdesc;
+       TupOutputState *tstate;
+       XLogRecPtr      target_lsn;
+       bool            res = false;
+
+       target_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+                                                                                                CStringGetDatum(stmt->lsn)));
+       res = WaitLSNUtility(target_lsn, stmt->timeout);
+
+       /* Need a tuple descriptor representing a single TEXT column */
+       tupdesc = CreateTemplateTupleDesc(1);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+
+       /* Prepare for projection of tuples */
+       tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+       /* Send the result */
+       do_text_output_oneline(tstate, res ? "t" : "f");
+       end_tup_output(tstate);
+       return res;
+}
index 1525c0de7258e31583aedb9a230d48273698928d..db179becab5e274d51625f4e174a8397be5dff17 100644 (file)
@@ -3748,10 +3748,22 @@ _copyTransactionStmt(const TransactionStmt *from)
        COPY_STRING_FIELD(savepoint_name);
        COPY_STRING_FIELD(gid);
        COPY_SCALAR_FIELD(chain);
+       COPY_NODE_FIELD(wait);
 
        return newnode;
 }
 
+static WaitClause *
+_copyWaitClause(const WaitClause *from)
+{
+       WaitClause *newnode = makeNode(WaitClause);
+
+       COPY_STRING_FIELD(lsn);
+       COPY_SCALAR_FIELD(timeout);
+
+       return newnode;
+};
+
 static CompositeTypeStmt *
 _copyCompositeTypeStmt(const CompositeTypeStmt *from)
 {
@@ -5339,6 +5351,9 @@ copyObjectImpl(const void *from)
                case T_TransactionStmt:
                        retval = _copyTransactionStmt(from);
                        break;
+               case T_WaitClause:
+                       retval = _copyWaitClause(from);
+                       break;
                case T_CompositeTypeStmt:
                        retval = _copyCompositeTypeStmt(from);
                        break;
index 4f34189ab5cbf36c0da6240ddfa0305f99726686..854d484f603458600443a2274c417a16872ba6d5 100644 (file)
@@ -1541,6 +1541,16 @@ _equalTransactionStmt(const TransactionStmt *a, const TransactionStmt *b)
        COMPARE_STRING_FIELD(savepoint_name);
        COMPARE_STRING_FIELD(gid);
        COMPARE_SCALAR_FIELD(chain);
+       COMPARE_NODE_FIELD(wait);
+
+       return true;
+}
+
+static bool
+_equalWaitClause(const WaitClause *a, const WaitClause *b)
+{
+       COMPARE_STRING_FIELD(lsn);
+       COMPARE_SCALAR_FIELD(timeout);
 
        return true;
 }
@@ -3391,6 +3401,9 @@ equal(const void *a, const void *b)
                case T_TransactionStmt:
                        retval = _equalTransactionStmt(a, b);
                        break;
+               case T_WaitClause:
+                       retval = _equalWaitClause(a, b);
+                       break;
                case T_CompositeTypeStmt:
                        retval = _equalCompositeTypeStmt(a, b);
                        break;
index 5b826509ebedaedb79e827183bd83a3009af6679..47753b42c698070229b68cce8ac7a76152c6eaaa 100644 (file)
@@ -2784,6 +2784,28 @@ _outDefElem(StringInfo str, const DefElem *node)
        WRITE_LOCATION_FIELD(location);
 }
 
+static void
+_outTransactionStmt(StringInfo str, const TransactionStmt *node)
+{
+       WRITE_NODE_TYPE("TRANSACTIONSTMT");
+
+       WRITE_STRING_FIELD(savepoint_name);
+       WRITE_STRING_FIELD(gid);
+       WRITE_NODE_FIELD(options);
+       WRITE_BOOL_FIELD(chain);
+       WRITE_ENUM_FIELD(kind, TransactionStmtKind);
+       WRITE_NODE_FIELD(wait);
+}
+
+static void
+_outWaitClause(StringInfo str, const WaitClause *node)
+{
+       WRITE_NODE_TYPE("WAITCLAUSE");
+
+       WRITE_STRING_FIELD(lsn);
+       WRITE_UINT_FIELD(timeout);
+}
+
 static void
 _outTableLikeClause(StringInfo str, const TableLikeClause *node)
 {
@@ -4334,6 +4356,12 @@ outNode(StringInfo str, const void *obj)
                        case T_PartitionRangeDatum:
                                _outPartitionRangeDatum(str, obj);
                                break;
+                       case T_TransactionStmt:
+                               _outTransactionStmt(str, obj);
+                               break;
+                       case T_WaitClause:
+                               _outWaitClause(str, obj);
+                               break;
 
                        default:
 
index 1219ac8c26494df560e7ba47217073bbb069bf1e..ea1084fa3cf55d283721ef0e846cb5e850da5878 100644 (file)
@@ -601,6 +601,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <partboundspec> PartitionBoundSpec
 %type <list>           hash_partbound
 %type <defelt>         hash_partbound_elem
+%type <ival>           wait_time
+%type <node>           wait_for
 
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
@@ -670,7 +672,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
        LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
        LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-       LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+       LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
 
        MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
 
@@ -701,7 +703,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
        SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P
 
        TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
-       TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+       TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
        TREAT TRIGGER TRIM TRUE_P
        TRUNCATE TRUSTED TYPE_P TYPES_P
 
@@ -711,7 +713,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
        VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
        VERBOSE VERSION_P VIEW VIEWS VOLATILE
 
-       WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+       WAIT WHEN WHERE WHITESPACE_P WINDOW
+       WITH WITHIN WITHOUT WORK WRAPPER WRITE
 
        XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
        XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -9955,18 +9958,20 @@ TransactionStmt:
                                        n->chain = $3;
                                        $$ = (Node *)n;
                                }
-                       | BEGIN_P opt_transaction transaction_mode_list_or_empty
+                       | BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
                                {
                                        TransactionStmt *n = makeNode(TransactionStmt);
                                        n->kind = TRANS_STMT_BEGIN;
                                        n->options = $3;
+                                       n->wait = $4;
                                        $$ = (Node *)n;
                                }
-                       | START TRANSACTION transaction_mode_list_or_empty
+                       | START TRANSACTION transaction_mode_list_or_empty wait_for
                                {
                                        TransactionStmt *n = makeNode(TransactionStmt);
                                        n->kind = TRANS_STMT_START;
                                        n->options = $3;
+                                       n->wait = $4;
                                        $$ = (Node *)n;
                                }
                        | COMMIT opt_transaction opt_transaction_chain
@@ -14240,6 +14245,25 @@ xml_passing_mech:
                        | BY VALUE_P
                ;
 
+/*
+ * WAIT FOR clause of BEGIN and START TRANSACTION statements
+ */
+wait_for:
+                       WAIT FOR LSN Sconst wait_time
+                               {
+                                       WaitClause *n = makeNode(WaitClause);
+                                       n->lsn = $4;
+                                       n->timeout = $5;
+                                       $$ = (Node *)n;
+                               }
+                       | /* EMPTY */           { $$ = NULL; }
+               ;
+
+wait_time:
+                       TIMEOUT Iconst          { $$ = $2; }
+                       | /* EMPTY */           { $$ = 0; }
+               ;
+
 
 /*
  * Aggregate decoration clauses
@@ -15391,6 +15415,7 @@ unreserved_keyword:
                        | LOCK_P
                        | LOCKED
                        | LOGGED
+                       | LSN
                        | MAPPING
                        | MATCH
                        | MATERIALIZED
@@ -15518,6 +15543,7 @@ unreserved_keyword:
                        | TEMPORARY
                        | TEXT_P
                        | TIES
+                       | TIMEOUT
                        | TRANSACTION
                        | TRANSFORM
                        | TRIGGER
@@ -15544,6 +15570,7 @@ unreserved_keyword:
                        | VIEW
                        | VIEWS
                        | VOLATILE
+                       | WAIT
                        | WHITESPACE_P
                        | WITHIN
                        | WITHOUT
index 427b0d59cde2cc02e2aa48ff45186f858cfe038b..417840a8f1149e62740bddd7ad82ccaf27b1ea83 100644 (file)
@@ -22,6 +22,7 @@
 #include "access/subtrans.h"
 #include "access/twophase.h"
 #include "commands/async.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(void)
                size = add_size(size, BTreeShmemSize());
                size = add_size(size, SyncScanShmemSize());
                size = add_size(size, AsyncShmemSize());
+               size = add_size(size, WaitLSNShmemSize());
 #ifdef EXEC_BACKEND
                size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -264,6 +266,11 @@ CreateSharedMemoryAndSemaphores(void)
        SyncScanShmemInit();
        AsyncShmemInit();
 
+       /*
+        * Init array of Latches in shared memory for WAIT
+        */
+       WaitLSNShmemInit();
+
 #ifdef EXEC_BACKEND
 
        /*
index 9938cddb570e28419447dd5db89aa4086cebb3bf..baecb39787f341e12ea71038f2d2f25d985574c6 100644 (file)
@@ -38,6 +38,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -717,6 +718,9 @@ LockErrorCleanup(void)
 
        AbortStrongLockAcquire();
 
+       /* If BEGIN WAIT FOR LSN was interrupted, then stop waiting for that LSN */
+       WaitLSNDelete();
+
        /* Nothing to do if we weren't waiting for a lock */
        if (lockAwaited == NULL)
        {
index b1f7f6e2d01d35e1dd21aa4518edef2184fe9c15..f516bd22eae720f42d727c2ee0b0eb9d52fb23fd 100644 (file)
@@ -57,6 +57,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -591,6 +592,18 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                                        case TRANS_STMT_START:
                                                {
                                                        ListCell   *lc;
+                                                       WaitClause *waitstmt = (WaitClause *) stmt->wait;
+
+                                                       /* WAIT FOR cannot be used on master */
+                                                       if (stmt->wait && !RecoveryInProgress())
+                                                               ereport(ERROR,
+                                                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                                                errmsg("WAIT FOR can only be "
+                                                                                               "used on standby")));
+
+                                                       /* If needed to WAIT FOR something but failed */
+                                                       if (stmt->wait && WaitLSNMain(waitstmt, dest) == 0)
+                                                               break;
 
                                                        BeginTransactionBlock();
                                                        foreach(lc, stmt->options)
index ee340fb0f021bee659374ef589320b09601ceaac..03f997cba704ab866f6f3c0dc33764229bc68880 100644 (file)
@@ -372,8 +372,6 @@ pg_sleep(PG_FUNCTION_ARGS)
         * less than the specified time when WaitLatch is terminated early by a
         * non-query-canceling signal such as SIGHUP.
         */
-#define GetNowFloat()  ((float8) GetCurrentTimestamp() / 1000000.0)
-
        endtime = GetNowFloat() + secs;
 
        for (;;)
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644 (file)
index 0000000..2a95c95
--- /dev/null
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ *       prototypes for commands/wait.c
+ *
+ * Copyright (c) 2020, PostgreSQL Global Development Group
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+
+#include "tcop/dest.h"
+#include "nodes/parsenodes.h"
+
+extern bool WaitLSNUtility(XLogRecPtr lsn, const int timeout_ms);
+extern Size WaitLSNShmemSize(void);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr WaitLSNGetMin(void);
+extern int     WaitLSNMain(WaitClause *stmt, DestReceiver *dest);
+extern void WaitLSNDelete(void);
+
+#endif                                                 /* WAIT_H */
index 381d84b4e4f859e91d9b61de543a3344f06c769c..822827aa32dcfee27b5cbec2e76fc832dc20571d 100644 (file)
@@ -492,6 +492,7 @@ typedef enum NodeTag
        T_StartReplicationCmd,
        T_TimeLineHistoryCmd,
        T_SQLCmd,
+       T_WaitClause,
 
        /*
         * TAGS FOR RANDOM OTHER STUFF
index 518abe42c1098ef86c6bd620ab129335f402718c..7ad3ddbf5795f293edc67ffc5a1874c943adee53 100644 (file)
@@ -1431,6 +1431,17 @@ typedef struct OnConflictClause
        int                     location;               /* token location, or -1 if unknown */
 } OnConflictClause;
 
+/*
+ * WaitClause -
+ *             representation of WAIT FOR clause for BEGIN and START TRANSACTION.
+ */
+typedef struct WaitClause
+{
+       NodeTag         type;
+       char       *lsn;                        /* LSN to wait for */
+       int                     timeout;                /* Number of milliseconds to limit wait time */
+} WaitClause;
+
 /*
  * CommonTableExpr -
  *        representation of WITH list element
@@ -3060,6 +3071,7 @@ typedef struct TransactionStmt
        char       *savepoint_name; /* for savepoint commands */
        char       *gid;                        /* for two-phase-commit related commands */
        bool            chain;                  /* AND CHAIN option */
+       Node       *wait;                       /* WAIT FOR clause */
 } TransactionStmt;
 
 /* ----------------------
index 08f22ce211ddb6b44a8983fad41e51fba5cf560d..6e1848fe4cc3409006d3cbbf9ff59b4783149c6c 100644 (file)
@@ -243,6 +243,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
 PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
@@ -410,6 +411,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
 PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD)
 PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD)
 PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
 PG_KEYWORD("to", TO, RESERVED_KEYWORD)
 PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -450,6 +452,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
 PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
 PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD)
 PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
 PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
 PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
index 03a1de569f06fa8269aeb8eb324a4b4ce2a7e41f..eaeeb79c41189e47df677e09106fa09ad40d4f69 100644 (file)
@@ -109,4 +109,6 @@ extern int  date2isoyearday(int year, int mon, int mday);
 
 extern bool TimestampTimestampTzRequiresRewrite(void);
 
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+
 #endif                                                 /* TIMESTAMP_H */
diff --git a/src/test/recovery/t/020_begin_wait.pl b/src/test/recovery/t/020_begin_wait.pl
new file mode 100644 (file)
index 0000000..3db25bd
--- /dev/null
@@ -0,0 +1,85 @@
+# Checks for BEGIN WAIT FOR LSN
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# And some content and take a backup
+$node_master->safe_psql('postgres',
+       "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Using the backup, create a streaming standby with a 1 second delay
+my $node_standby = get_new_node('standby');
+my $delay        = 1;
+$node_standby->init_from_backup($node_master, $backup_name,
+       has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+       recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+# Check that timeouts make us wait for the specified time (1s here)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $two_seconds = 2000; # in milliseconds
+my $start_time = time();
+$node_standby->safe_psql('postgres',
+       "BEGIN WAIT FOR LSN '0/FFFFFFFF' TIMEOUT $two_seconds");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $two_seconds, "WAIT FOR TIMEOUT waits for enough time");
+
+
+# Check that timeouts let us stop waiting right away, before reaching target LSN
+$node_master->safe_psql('postgres',
+       "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my ($ret, $out, $err) = $node_standby->psql('postgres',
+       "BEGIN WAIT FOR LSN '$lsn1' TIMEOUT 1");
+
+ok($ret == 0, "zero return value when failed to WAIT FOR LSN on standby");
+ok($err =~ /WARNING:  didn't start transaction because LSN was not reached/,
+       "correct error message when failed to WAIT FOR LSN on standby");
+ok($out eq "f", "if given too little wait time, WAIT doesn't reach target LSN");
+
+
+# Check that WAIT FOR works fine and reaches target LSN if given no timeout
+
+# Add data on master, memorize master's last LSN
+$node_master->safe_psql('postgres',
+       "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Wait for it to appear on replica, memorize replica's last LSN
+$node_standby->safe_psql('postgres',
+       "BEGIN WAIT FOR LSN '$lsn2'");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+       "SELECT pg_last_wal_replay_lsn()");
+
+# Make sure that master's and replica's LSNs are the same after WAIT
+my $compare_lsns = $node_standby->safe_psql('postgres',
+       "SELECT pg_lsn_cmp('$reached_lsn'::pg_lsn, '$lsn2'::pg_lsn)");
+ok($compare_lsns eq 0,
+       "standby reached the same LSN as master before starting transaction");
+
+
+# Make sure that it's not allowed to use WAIT FOR on master
+($ret, $out, $err) = $node_master->psql('postgres',
+       "BEGIN WAIT FOR LSN '0/FFFFFFFF'");
+
+ok($ret != 0, "non-zero return value when trying to WAIT FOR LSN on master");
+ok($err =~ /ERROR:  WAIT FOR can only be used on standby/,
+       "correct error message when trying to WAIT FOR LSN on master");
+ok($out eq '', "empty output when trying to WAIT FOR LSN on master");
+
+
+$node_standby->stop;
+$node_master->stop;
index 525d58e7f01d0bf14786f52ea6962ec9b4945e2e..020f75c5e20d4db341ea4fff65774fa33ea80cd4 100644 (file)
@@ -2621,6 +2621,7 @@ WSABUF
 WSADATA
 WSANETWORKEVENTS
 WSAPROTOCOL_INFO
+WaitClause
 WaitEvent
 WaitEventActivity
 WaitEventClient
@@ -2628,6 +2629,7 @@ WaitEventIO
 WaitEventIPC
 WaitEventSet
 WaitEventTimeout
+WaitLSNState
 WaitPMResult
 WalCloseMethod
 WalLevel