diff options
| author | Pavan Deolasee | 2017-02-20 07:44:19 +0000 |
|---|---|---|
| committer | Pavan Deolasee | 2017-05-05 04:59:34 +0000 |
| commit | 77e5c18763be07607c7f1b1ec1ac14eb034f4eb4 (patch) | |
| tree | 35cf4d60552166190277128942d8bc892b48381f /src/gtm | |
| parent | 416f11f2610738c87e7da8c25b1e284359e5835e (diff) | |
Handle sequence's transactional behaviour on GTM
Previously we were tracking changes to sequences on the coordinator side and
applying those changes at transaction commit/rollback time. While this worked
ok for most cases, there were issues such as what happens if a sequence is
dropped and then recreated in the same transaction. Since the DROP is not
executed until the transaction commit time, the subsequent CREATE would fail on
the GTM.
We now track sequences renamed/dropped/created on the GTM side and do a cleanup
on transaction commit/rollback. For example, if a sequence is renamed but the
transaction is later aborted, the sequence will be renamed back to its original
name. Similarly, if a sequence is dropped and the transaction aborts, the
sequence will be re-instated.
Diffstat (limited to 'src/gtm')
| -rw-r--r-- | src/gtm/client/gtm_client.c | 82 | ||||
| -rw-r--r-- | src/gtm/main/gtm_seq.c | 365 | ||||
| -rw-r--r-- | src/gtm/main/gtm_txn.c | 127 |
3 files changed, 485 insertions, 89 deletions
diff --git a/src/gtm/client/gtm_client.c b/src/gtm/client/gtm_client.c index 126ff30ca4..4ac13faeb6 100644 --- a/src/gtm/client/gtm_client.c +++ b/src/gtm/client/gtm_client.c @@ -56,7 +56,8 @@ static int abort_transaction_multi_internal(GTM_Conn *conn, int txn_count, Globa int *txn_count_out, int *status_out, bool is_backup); static int open_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, GTM_Sequence minval, GTM_Sequence maxval, - GTM_Sequence startval, bool cycle, bool is_backup); + GTM_Sequence startval, bool cycle, + GlobalTransactionId gxid, bool is_backup); static int get_next_internal(GTM_Conn *conn, GTM_SequenceKey key, char *coord_name, int coord_procid, GTM_Sequence range, GTM_Sequence *result, GTM_Sequence *rangemax, bool is_backup); @@ -68,11 +69,14 @@ static int commit_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, int waited_xid_count, GlobalTransactionId *waited_xids, bool is_backup); -static int close_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup); -static int rename_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey, bool is_backup); +static int close_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, + GlobalTransactionId gxid, bool is_backup); +static int rename_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, + GTM_SequenceKey newkey, GlobalTransactionId gxid, bool is_backup); static int alter_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, - GTM_Sequence minval, GTM_Sequence maxval, - GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart, bool is_backup); + GTM_Sequence minval, GTM_Sequence maxval, + GTM_Sequence startval, GTM_Sequence lastval, bool cycle, + bool is_restart, bool is_backup); static int node_register_worker(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, GTM_PGXCNodePort port, char *node_name, char *datafolder, GTM_PGXCNodeStatus status, bool is_backup); @@ -1139,23 +1143,31 @@ send_failed: int open_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, GTM_Sequence minval, GTM_Sequence maxval, - GTM_Sequence startval, bool cycle) + GTM_Sequence startval, + bool cycle, + GlobalTransactionId gxid) { - return open_sequence_internal(conn, key, increment, minval, maxval, startval, cycle, false); + return open_sequence_internal(conn, key, increment, minval, maxval, + startval, cycle, gxid, false); } int bkup_open_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, GTM_Sequence minval, GTM_Sequence maxval, - GTM_Sequence startval, bool cycle) + GTM_Sequence startval, + bool cycle, + GlobalTransactionId gxid) { - return open_sequence_internal(conn, key, increment, minval, maxval, startval, cycle, true); + return open_sequence_internal(conn, key, increment, minval, maxval, + startval, cycle, gxid, true); } static int open_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, GTM_Sequence minval, GTM_Sequence maxval, - GTM_Sequence startval, bool cycle, bool is_backup) + GTM_Sequence startval, bool cycle, + GlobalTransactionId gxid, + bool is_backup) { GTM_Result *res = NULL; time_t finish_time; @@ -1169,7 +1181,8 @@ open_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increme gtmpqPutnchar((char *)&minval, sizeof (GTM_Sequence), conn) || gtmpqPutnchar((char *)&maxval, sizeof (GTM_Sequence), conn) || gtmpqPutnchar((char *)&startval, sizeof (GTM_Sequence), conn) || - gtmpqPutc(cycle, conn)) + gtmpqPutc(cycle, conn) || + gtmpqPutnchar((char *)&gxid, sizeof (GlobalTransactionId), conn)) goto send_failed; /* Finish the message. */ @@ -1204,23 +1217,28 @@ send_failed: int alter_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, GTM_Sequence minval, GTM_Sequence maxval, - GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart) + GTM_Sequence startval, GTM_Sequence lastval, bool cycle, + bool is_restart) { - return alter_sequence_internal(conn, key, increment, minval, maxval, startval, lastval, cycle, is_restart, false); + return alter_sequence_internal(conn, key, increment, minval, maxval, + startval, lastval, cycle, is_restart, false); } int bkup_alter_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, GTM_Sequence minval, GTM_Sequence maxval, - GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart) + GTM_Sequence startval, GTM_Sequence lastval, bool cycle, + bool is_restart) { - return alter_sequence_internal(conn, key, increment, minval, maxval, startval, lastval, cycle, is_restart, true); + return alter_sequence_internal(conn, key, increment, minval, maxval, + startval, lastval, cycle, is_restart, true); } static int alter_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, GTM_Sequence minval, GTM_Sequence maxval, - GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart, bool is_backup) + GTM_Sequence startval, GTM_Sequence lastval, bool cycle, + bool is_restart, bool is_backup) { GTM_Result *res = NULL; time_t finish_time; @@ -1269,19 +1287,22 @@ send_failed: } int -close_sequence(GTM_Conn *conn, GTM_SequenceKey key) +close_sequence(GTM_Conn *conn, GTM_SequenceKey key, GlobalTransactionId gxid) { - return close_sequence_internal(conn, key, false); + return close_sequence_internal(conn, key, gxid, false); } int -bkup_close_sequence(GTM_Conn *conn, GTM_SequenceKey key) +bkup_close_sequence(GTM_Conn *conn, GTM_SequenceKey key, + GlobalTransactionId gxid) { - return close_sequence_internal(conn, key, true); + return close_sequence_internal(conn, key, gxid, true); } static int -close_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup) +close_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, + GlobalTransactionId gxid, + bool is_backup) { GTM_Result *res = NULL; time_t finish_time; @@ -1291,7 +1312,8 @@ close_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup) gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_CLOSE : MSG_SEQUENCE_CLOSE, sizeof (GTM_MessageType), conn) || gtmpqPutInt(key->gsk_keylen, 4, conn) || gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn) || - gtmpqPutnchar((char *)&key->gsk_type, sizeof(GTM_SequenceKeyType), conn)) + gtmpqPutnchar((char *)&key->gsk_type, sizeof(GTM_SequenceKeyType), conn) || + gtmpqPutnchar((char *)&gxid, sizeof (GlobalTransactionId), conn)) goto send_failed; /* Finish the message. */ @@ -1324,19 +1346,22 @@ send_failed: } int -rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey) +rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey, + GlobalTransactionId gxid) { - return rename_sequence_internal(conn, key, newkey, false); + return rename_sequence_internal(conn, key, newkey, gxid, false); } int -bkup_rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey) +bkup_rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, + GTM_SequenceKey newkey, GlobalTransactionId gxid) { - return rename_sequence_internal(conn, key, newkey, true); + return rename_sequence_internal(conn, key, newkey, gxid, true); } static int -rename_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey, bool is_backup) +rename_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey, + GlobalTransactionId gxid, bool is_backup) { GTM_Result *res = NULL; time_t finish_time; @@ -1347,7 +1372,8 @@ rename_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey ne gtmpqPutInt(key->gsk_keylen, 4, conn) || gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn)|| gtmpqPutInt(newkey->gsk_keylen, 4, conn) || - gtmpqPutnchar(newkey->gsk_key, newkey->gsk_keylen, conn)) + gtmpqPutnchar(newkey->gsk_key, newkey->gsk_keylen, conn) || + gtmpqPutnchar((char *)&gxid, sizeof (GlobalTransactionId), conn)) goto send_failed; /* Finish the message. */ diff --git a/src/gtm/main/gtm_seq.c b/src/gtm/main/gtm_seq.c index 2b3868ec5e..5ff1ee0fdd 100644 --- a/src/gtm/main/gtm_seq.c +++ b/src/gtm/main/gtm_seq.c @@ -37,6 +37,12 @@ typedef struct GTM_SeqInfoHashBucket GTM_RWLock shb_lock; } GTM_SeqInfoHashBucket; +typedef struct GTM_SeqAlteredInfo +{ + GTM_SequenceKey curr_key; + GTM_SequenceKey prev_key; +} GTM_SeqAlteredInfo; + #define SEQ_HASH_TABLE_SIZE 1024 static GTM_SeqInfoHashBucket GTMSequences[SEQ_HASH_TABLE_SIZE]; @@ -47,6 +53,9 @@ static GTM_SeqInfo *seq_find_seqinfo(GTM_SequenceKey seqkey); static int seq_release_seqinfo(GTM_SeqInfo *seqinfo); static int seq_add_seqinfo(GTM_SeqInfo *seqinfo); static int seq_remove_seqinfo(GTM_SeqInfo *seqinfo); +static int seq_rename_seqinfo(GTM_SeqInfo *seqinfo, GTM_SequenceKey newkey); +static GTM_SequenceKey seq_copy_key_context(GTM_SequenceKey key, + MemoryContext context); static GTM_SequenceKey seq_copy_key(GTM_SequenceKey key); static int seq_drop_with_dbkey(GTM_SequenceKey nsp); static bool GTM_NeedSeqRestoreUpdateInternal(GTM_SeqInfo *seqinfo); @@ -221,6 +230,107 @@ seq_remove_seqinfo(GTM_SeqInfo *seqinfo) return 0; } +/* + * Rename sequence managed by seqinfo to "newkey". + * + * The sequence is moved to the new bucket and rest of the fields remain + * unchanged. + */ +static int +seq_rename_seqinfo(GTM_SeqInfo *seqinfo, GTM_SequenceKey newkey) +{ + uint32 oldhash = seq_gethash(seqinfo->gs_key); + uint32 newhash = seq_gethash(newkey); + GTM_SeqInfoHashBucket *oldbucket; + GTM_SeqInfoHashBucket *newbucket; + gtm_ListCell *elem; + MemoryContext oldContext; + + oldbucket = >MSequences[oldhash]; + newbucket = >MSequences[newhash]; + + /* + * We must lock both old and new hash buckets. To avoid deadlock, we must + * ensure that we don't try to lock the same bucket twice (in case old and + * new keys are mapped to the same bucket) and also lock them in the same + * order. + */ + if (oldhash < newhash) + { + GTM_RWLockAcquire(&oldbucket->shb_lock, GTM_LOCKMODE_WRITE); + GTM_RWLockAcquire(&newbucket->shb_lock, GTM_LOCKMODE_WRITE); + } + else if (oldhash > newhash) + { + GTM_RWLockAcquire(&newbucket->shb_lock, GTM_LOCKMODE_WRITE); + GTM_RWLockAcquire(&oldbucket->shb_lock, GTM_LOCKMODE_WRITE); + } + else + /* old and new buckets are just the same */ + GTM_RWLockAcquire(&newbucket->shb_lock, GTM_LOCKMODE_WRITE); + + GTM_RWLockAcquire(&seqinfo->gs_lock, GTM_LOCKMODE_WRITE); + + gtm_foreach(elem, newbucket->shb_list) + { + GTM_SeqInfo *curr_seqinfo = NULL; + curr_seqinfo = (GTM_SeqInfo *) gtm_lfirst(elem); + + if (seq_keys_equal(curr_seqinfo->gs_key, newkey)) + { + GTM_RWLockRelease(&seqinfo->gs_lock); + GTM_RWLockRelease(&newbucket->shb_lock); + /* + * Release oldbucket lock but only if its not same as the new + * bucket + * */ + if (oldhash != newhash) + GTM_RWLockRelease(&oldbucket->shb_lock); + ereport(LOG, + (EEXIST, + errmsg("Sequence with the given key already exists"))); + return EEXIST; + } + } + + /* + * Must use TopMostMemoryContext since the hash bucket links can survive + * forever + */ + oldContext = MemoryContextSwitchTo(TopMostMemoryContext); + seqinfo->gs_key = seq_copy_key(newkey); + oldbucket->shb_list = gtm_list_delete(oldbucket->shb_list, seqinfo); + newbucket->shb_list = gtm_lappend(newbucket->shb_list, seqinfo); + MemoryContextSwitchTo(oldContext); + + GTM_RWLockRelease(&seqinfo->gs_lock); + GTM_RWLockRelease(&newbucket->shb_lock); + /* Release oldbucket lock but only if its not same as the new bucket */ + if (oldhash != newhash) + GTM_RWLockRelease(&oldbucket->shb_lock); + + return 0; + +} + +/* + * Same as seq_copy_key but use specified MemoryContext + */ +static GTM_SequenceKey +seq_copy_key_context(GTM_SequenceKey key, MemoryContext context) +{ + MemoryContext oldContext; + GTM_SequenceKey newkey; + + oldContext = MemoryContextSwitchTo(TopMostMemoryContext); + newkey = seq_copy_key(key); + MemoryContextSwitchTo(oldContext); + return newkey; +} + +/* + * Copy sequence key in the CurrentMemoryContext + */ static GTM_SequenceKey seq_copy_key(GTM_SequenceKey key) { @@ -254,7 +364,8 @@ GTM_SeqOpen(GTM_SequenceKey seqkey, GTM_Sequence minval, GTM_Sequence maxval, GTM_Sequence startval, - bool cycle) + bool cycle, + GlobalTransactionId gxid) { GTM_SeqInfo *seqinfo = NULL; int errcode = 0; @@ -269,6 +380,7 @@ GTM_SeqOpen(GTM_SequenceKey seqkey, seqinfo->gs_key = seq_copy_key(seqkey); seqinfo->gs_state = SEQ_STATE_ACTIVE; seqinfo->gs_called = false; + seqinfo->gs_created_gxid = gxid; /* * Set the increment. Default is 1 @@ -348,6 +460,13 @@ GTM_SeqOpen(GTM_SequenceKey seqkey, pfree(seqinfo->gs_key); pfree(seqinfo); } + else + { + seqinfo = seq_find_seqinfo(seqinfo->gs_key); + GTM_RememberCreatedSequence(gxid, seq_copy_key_context(seqinfo->gs_key, + TopMostMemoryContext)); + } + GTM_SetNeedBackup(); return errcode; @@ -355,6 +474,9 @@ GTM_SeqOpen(GTM_SequenceKey seqkey, /* * Alter a sequence + * + * We don't track altered sequences because changes to sequence values are not + * transactional and must not be rolled back if the transaction aborts. */ int GTM_SeqAlter(GTM_SequenceKey seqkey, GTM_Sequence increment_by, @@ -464,7 +586,7 @@ GTM_SeqRestore(GTM_SequenceKey seqkey, * Destroy the given sequence depending on type of given key */ int -GTM_SeqClose(GTM_SequenceKey seqkey) +GTM_SeqClose(GTM_SequenceKey seqkey, GlobalTransactionId gxid) { int res; @@ -473,14 +595,46 @@ GTM_SeqClose(GTM_SequenceKey seqkey) case GTM_SEQ_FULL_NAME: { GTM_SeqInfo *seqinfo = seq_find_seqinfo(seqkey); - if (seqinfo != NULL) + /* + * If the sequence by created by the same transaction, then just + * drop it completely + */ + res = 0; + if ((seqinfo != NULL) && (!GlobalTransactionIdIsValid(gxid) || + (seqinfo->gs_created_gxid == gxid))) + { + GTM_ForgetCreatedSequence(gxid, seqinfo); + seq_release_seqinfo(seqinfo); + if (!seq_remove_seqinfo(seqinfo)) + { + pfree(seqinfo->gs_key); + pfree(seqinfo); + } + } + /* + * Otherwise we rename it to a special value so that it can be + * restored back if the transaction fails + */ + else if (seqinfo != NULL) { - seq_remove_seqinfo(seqinfo); - pfree(seqinfo->gs_key); - pfree(seqinfo); - res = 0; + GTM_SequenceKeyData newkey; + MemoryContext oldContext; + + newkey.gsk_key = (char *) palloc0(seqinfo->gs_key->gsk_keylen + + strlen("__dropped_") + 11); + sprintf(newkey.gsk_key, "%s_dropped_%d", seqinfo->gs_key->gsk_key, + gxid); + newkey.gsk_key[strlen(newkey.gsk_key)] = '\0'; + newkey.gsk_keylen = strlen(newkey.gsk_key) + 1; + oldContext = MemoryContextSwitchTo(TopMostMemoryContext); + seqinfo->gs_oldkey = seq_copy_key(seqinfo->gs_key); + if ((res = seq_rename_seqinfo(seqinfo, &newkey)) == 0) + GTM_RememberDroppedSequence(gxid, + seq_copy_key_context(seqinfo->gs_key, TopMostMemoryContext)); + MemoryContextSwitchTo(oldContext); + seq_release_seqinfo(seqinfo); } - else + else if (seqinfo == NULL) res = EINVAL; break; @@ -600,11 +754,13 @@ seq_drop_with_dbkey(GTM_SequenceKey nsp) * Rename an existing sequence with a new name */ int -GTM_SeqRename(GTM_SequenceKey seqkey, GTM_SequenceKey newseqkey) +GTM_SeqRename(GTM_SequenceKey seqkey, GTM_SequenceKey newseqkey, + GlobalTransactionId gxid) { GTM_SeqInfo *seqinfo = seq_find_seqinfo(seqkey); - GTM_SeqInfo *newseqinfo = NULL; int errcode = 0; + MemoryContext oldContext; + GTM_SeqAlteredInfo *alterinfo; /* replace old key by new key */ if (seqinfo == NULL) @@ -615,53 +771,17 @@ GTM_SeqRename(GTM_SequenceKey seqkey, GTM_SequenceKey newseqkey) return EINVAL; } - /* Now create the new sequence info */ - newseqinfo = (GTM_SeqInfo *) palloc(sizeof (GTM_SeqInfo)); + oldContext = MemoryContextSwitchTo(TopMostMemoryContext); + alterinfo = (GTM_SeqAlteredInfo *) palloc0(sizeof (GTM_SeqAlteredInfo)); - GTM_RWLockAcquire(&seqinfo->gs_lock, GTM_LOCKMODE_WRITE); - GTM_RWLockInit(&newseqinfo->gs_lock); - - newseqinfo->gs_ref_count = 0; - newseqinfo->gs_key = seq_copy_key(newseqkey); - newseqinfo->gs_state = seqinfo->gs_state; - newseqinfo->gs_called = seqinfo->gs_called; - - newseqinfo->gs_increment_by = seqinfo->gs_increment_by; - newseqinfo->gs_min_value = seqinfo->gs_min_value; - newseqinfo->gs_max_value = seqinfo->gs_max_value; - - newseqinfo->gs_init_value = seqinfo->gs_init_value; - newseqinfo->gs_value = seqinfo->gs_value; - newseqinfo->gs_backedUpValue = seqinfo->gs_backedUpValue; - newseqinfo->gs_cycle = seqinfo->gs_cycle; - - newseqinfo->gs_state = seqinfo->gs_state; - newseqinfo->gs_max_lastvals = seqinfo->gs_max_lastvals; - newseqinfo->gs_lastval_count = seqinfo->gs_lastval_count; - newseqinfo->gs_last_values = (GTM_SeqLastVal *) - MemoryContextAlloc(TopMostMemoryContext, - newseqinfo->gs_max_lastvals * sizeof(GTM_SeqLastVal)); - memcpy(newseqinfo->gs_last_values, seqinfo->gs_last_values, - newseqinfo->gs_max_lastvals * sizeof(GTM_SeqLastVal)); - - /* Add the copy to the list */ - if ((errcode = seq_add_seqinfo(newseqinfo))) /* a lock is taken here for the new sequence */ - { - GTM_RWLockDestroy(&newseqinfo->gs_lock); - pfree(newseqinfo->gs_key); - pfree(newseqinfo); - return errcode; - } + alterinfo->curr_key = seq_copy_key(newseqkey); + alterinfo->prev_key = seq_copy_key(seqinfo->gs_key); + MemoryContextSwitchTo(oldContext); - /* Remove the old key with the old name */ - GTM_RWLockRelease(&seqinfo->gs_lock); - /* Release first the structure as it has been taken previously */ + errcode = seq_rename_seqinfo(seqinfo, newseqkey); + if (!errcode) + GTM_RememberAlteredSequence(gxid, alterinfo); seq_release_seqinfo(seqinfo); - - /* Close sequence properly, full name is here */ - seqkey->gsk_type = GTM_SEQ_FULL_NAME; - /* Then close properly the old sequence */ - GTM_SeqClose(seqkey); return errcode; } @@ -1018,6 +1138,8 @@ ProcessSequenceInitCommand(Port *myport, StringInfo message, bool is_backup) StringInfoData buf; int errcode; MemoryContext oldContext; + const char *data; + GlobalTransactionId gxid; /* * Get the sequence key @@ -1039,6 +1161,13 @@ ProcessSequenceInitCommand(Port *myport, StringInfo message, bool is_backup) cycle = pq_getmsgbyte(message); + data = pq_getmsgbytes(message, sizeof (gxid)); + if (data == NULL) + ereport(ERROR, + (EPROTO, + errmsg("Message does not contain valid GXID"))); + memcpy(&gxid, data, sizeof (gxid)); + /* * We must use the TopMostMemoryContext because the sequence information is @@ -1047,7 +1176,8 @@ ProcessSequenceInitCommand(Port *myport, StringInfo message, bool is_backup) */ oldContext = MemoryContextSwitchTo(TopMostMemoryContext); - if ((errcode = GTM_SeqOpen(&seqkey, increment, minval, maxval, startval, cycle))) + if ((errcode = GTM_SeqOpen(&seqkey, increment, minval, maxval, startval, + cycle, gxid))) ereport(ERROR, (errcode, errmsg("Failed to open a new sequence"))); @@ -1076,7 +1206,8 @@ ProcessSequenceInitCommand(Port *myport, StringInfo message, bool is_backup) minval, maxval, startval, - cycle); + cycle, + gxid); if (gtm_standby_check_communication_error(&count, oldconn)) goto retry; @@ -1163,7 +1294,8 @@ ProcessSequenceAlterCommand(Port *myport, StringInfo message, bool is_backup) elog(DEBUG1, "Altering sequence key %s", seqkey.gsk_key); - if ((errcode = GTM_SeqAlter(&seqkey, increment, minval, maxval, startval, lastval, cycle, is_restart))) + if ((errcode = GTM_SeqAlter(&seqkey, increment, minval, maxval, startval, + lastval, cycle, is_restart))) ereport(ERROR, (errcode, errmsg("Failed to open a new sequence"))); @@ -1710,15 +1842,24 @@ ProcessSequenceCloseCommand(Port *myport, StringInfo message, bool is_backup) GTM_SequenceKeyData seqkey; StringInfoData buf; int errcode; + GlobalTransactionId gxid; + const char *data; seqkey.gsk_keylen = pq_getmsgint(message, sizeof (seqkey.gsk_keylen)); seqkey.gsk_key = (char *)pq_getmsgbytes(message, seqkey.gsk_keylen); memcpy(&seqkey.gsk_type, pq_getmsgbytes(message, sizeof (GTM_SequenceKeyType)), sizeof (GTM_SequenceKeyType)); + data = pq_getmsgbytes(message, sizeof (gxid)); + if (data == NULL) + ereport(ERROR, + (EPROTO, + errmsg("Message does not contain valid GXID"))); + memcpy(&gxid, data, sizeof (gxid)); + elog(DEBUG1, "Closing sequence %s", seqkey.gsk_key); - if ((errcode = GTM_SeqClose(&seqkey))) + if ((errcode = GTM_SeqClose(&seqkey, gxid))) ereport(ERROR, (errcode, errmsg("Can not close the sequence"))); @@ -1735,7 +1876,8 @@ ProcessSequenceCloseCommand(Port *myport, StringInfo message, bool is_backup) elog(DEBUG1, "calling close_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); retry: - rc = bkup_close_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey); + rc = bkup_close_sequence(GetMyThreadInfo->thr_conn->standby, + &seqkey, gxid); if (gtm_standby_check_communication_error(&count, oldconn)) goto retry; @@ -1786,6 +1928,8 @@ ProcessSequenceRenameCommand(Port *myport, StringInfo message, bool is_backup) StringInfoData buf; int errcode; MemoryContext oldContext; + const char *data; + GlobalTransactionId gxid; /* get the message from backend */ seqkey.gsk_keylen = pq_getmsgint(message, sizeof (seqkey.gsk_keylen)); @@ -1795,6 +1939,14 @@ ProcessSequenceRenameCommand(Port *myport, StringInfo message, bool is_backup) newseqkey.gsk_keylen = pq_getmsgint(message, sizeof (newseqkey.gsk_keylen)); newseqkey.gsk_key = (char *)pq_getmsgbytes(message, newseqkey.gsk_keylen); + data = pq_getmsgbytes(message, sizeof (gxid)); + if (data == NULL) + ereport(ERROR, + (EPROTO, + errmsg("Message does not contain valid GXID"))); + memcpy(&gxid, data, sizeof (gxid)); + + /* * As when creating a sequence, we must use the TopMostMemoryContext * because the sequence information is not bound to a thread and @@ -1804,7 +1956,7 @@ ProcessSequenceRenameCommand(Port *myport, StringInfo message, bool is_backup) elog(DEBUG1, "Renaming sequence %s to %s", seqkey.gsk_key, newseqkey.gsk_key); - if ((errcode = GTM_SeqRename(&seqkey, &newseqkey))) + if ((errcode = GTM_SeqRename(&seqkey, &newseqkey, gxid))) ereport(ERROR, (errcode, errmsg("Can not rename the sequence"))); @@ -1825,7 +1977,8 @@ ProcessSequenceRenameCommand(Port *myport, StringInfo message, bool is_backup) elog(DEBUG1, "calling rename_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); retry: - rc = bkup_rename_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey, &newseqkey); + rc = bkup_rename_sequence(GetMyThreadInfo->thr_conn->standby, + &seqkey, &newseqkey, gxid); if (gtm_standby_check_communication_error(&count, oldconn)) goto retry; @@ -2208,3 +2361,93 @@ GTM_CleanupSeqSession(char *coord_name, int coord_procid) GTM_RWLockRelease(&bucket->shb_lock); } } + +/* + * Upon transaction abort, remove the sequence created in the transaction being + * aborted. + */ +void +GTM_SeqRemoveCreated(void *ptr) +{ + GTM_SeqInfo *seqinfo = seq_find_seqinfo((GTM_SequenceKey) ptr); + if (seqinfo) + { + seq_release_seqinfo(seqinfo); + if (!seq_remove_seqinfo(seqinfo)) + { + pfree(seqinfo->gs_key); + pfree(seqinfo); + } + } +} + +/* + * Upon transaction abort, restore the sequence back to its state when it was + * altered first time in the transaction. + */ +void +GTM_SeqRestoreAltered(void *ptr) +{ + GTM_SeqAlteredInfo *alterinfo = (GTM_SeqAlteredInfo *) ptr; + GTM_SeqInfo *seqinfo = seq_find_seqinfo(alterinfo->curr_key); + + if (seqinfo) + { + if (!seq_keys_equal(seqinfo->gs_key, alterinfo->prev_key)) + seq_rename_seqinfo(seqinfo, alterinfo->prev_key); + pfree(alterinfo->prev_key); + pfree(alterinfo->curr_key); + pfree(alterinfo); + seq_release_seqinfo(seqinfo); + } +} + +/* + * Upon transaction abort, rename the sequence back to its original value. + */ +void +GTM_SeqRestoreDropped(void *ptr) +{ + GTM_SeqInfo *seqinfo = seq_find_seqinfo((GTM_SequenceKey) ptr); + if (seqinfo) + { + seq_rename_seqinfo(seqinfo, seqinfo->gs_oldkey); + seq_release_seqinfo(seqinfo); + } + +} + +/* + * Upon transaction commit, forget the original sequence state. The current + * state becomes the final state of the sequence. + */ +void +GTM_SeqRemoveAltered(void *ptr) +{ + GTM_SeqAlteredInfo *alterinfo = (GTM_SeqAlteredInfo *) ptr; + if (alterinfo) + { + pfree(alterinfo->curr_key); + pfree(alterinfo->prev_key); + pfree(alterinfo); + } +} + +/* + * Upon transaction commit, remove the temporarily renamed sequence forever + * from the global structure. + */ +void +GTM_SeqRemoveDropped(void *ptr) +{ + GTM_SeqInfo *seqinfo = seq_find_seqinfo((GTM_SequenceKey) ptr); + if (seqinfo) + { + seq_release_seqinfo(seqinfo); + if (!seq_remove_seqinfo(seqinfo)) + { + pfree(seqinfo->gs_key); + pfree(seqinfo); + } + } +} diff --git a/src/gtm/main/gtm_txn.c b/src/gtm/main/gtm_txn.c index 80199073ca..706b9bc256 100644 --- a/src/gtm/main/gtm_txn.c +++ b/src/gtm/main/gtm_txn.c @@ -834,6 +834,67 @@ init_GTM_TransactionInfo(GTM_TransactionInfo *gtm_txninfo, static void clean_GTM_TransactionInfo(GTM_TransactionInfo *gtm_txninfo) { + gtm_ListCell *lc; + + if (gtm_txninfo->gti_state == GTM_TXN_ABORT_IN_PROGRESS) + { + /* + * First drop any sequences created in this transaction. We must do + * this before restoring any dropped sequences because the new sequence + * may have reused old name + */ + gtm_foreach(lc, gtm_txninfo->gti_created_seqs) + { + GTM_SeqRemoveCreated(gtm_lfirst(lc)); + } + + /* + * Restore dropped sequences to their original state + */ + gtm_foreach(lc, gtm_txninfo->gti_dropped_seqs) + { + GTM_SeqRestoreDropped(gtm_lfirst(lc)); + } + + /* + * Restore altered sequences to their original state + */ + gtm_foreach(lc, gtm_txninfo->gti_altered_seqs) + { + GTM_SeqRestoreAltered(gtm_lfirst(lc)); + } + + + } + else if (gtm_txninfo->gti_state == GTM_TXN_COMMIT_IN_PROGRESS) + { + /* + * Remove sequences dropped in this transaction permanently. No action + * needed for sequences created in this transaction + */ + gtm_foreach(lc, gtm_txninfo->gti_dropped_seqs) + { + GTM_SeqRemoveDropped(gtm_lfirst(lc)); + } + /* + * Remove original copies of sequences altered in this transaction + * permanently. The altered copies stay. + */ + gtm_foreach(lc, gtm_txninfo->gti_altered_seqs) + { + GTM_SeqRemoveAltered(gtm_lfirst(lc)); + } + + } + + gtm_list_free(gtm_txninfo->gti_created_seqs); + gtm_list_free(gtm_txninfo->gti_dropped_seqs); + gtm_list_free(gtm_txninfo->gti_altered_seqs); + + gtm_txninfo->gti_dropped_seqs = gtm_NIL; + gtm_txninfo->gti_created_seqs = gtm_NIL; + gtm_txninfo->gti_altered_seqs = gtm_NIL; + gtm_txninfo->gti_state = GTM_TXN_ABORTED; gtm_txninfo->gti_in_use = false; gtm_txninfo->gti_snapshot_set = false; @@ -2765,6 +2826,72 @@ GTM_GetLatestCompletedXID(void) return GTMTransactions.gt_latestCompletedXid; } +void +GTM_ForgetCreatedSequence(GlobalTransactionId gxid, void *seq) +{ + GTM_TransactionInfo *gtm_txninfo; + GTM_TransactionHandle txn = GTM_GXIDToHandle(gxid); + + if (txn == InvalidTransactionHandle) + return; + + gtm_txninfo = GTM_HandleToTransactionInfo(txn); + gtm_txninfo->gti_created_seqs = + gtm_list_delete(gtm_txninfo->gti_created_seqs, seq); +} + +/* + * Remember sequence created by transaction 'gxid'. + * + * This should be removed from the global data structure if the transaction + * aborts (see GTM_SeqRemoveCreated). If the sequence is later dropped in the + * same transaction, we remove it from the global structure as well as forget + * tracking (see GTM_ForgetCreatedSequence). If the transaction commits, just + * forget about this tracked sequence. + */ +void +GTM_RememberCreatedSequence(GlobalTransactionId gxid, void *seq) +{ + GTM_TransactionInfo *gtm_txninfo; + GTM_TransactionHandle txn = GTM_GXIDToHandle(gxid); + + if (txn == InvalidTransactionHandle) + return; + + gtm_txninfo = GTM_HandleToTransactionInfo(txn); + gtm_txninfo->gti_created_seqs = + gtm_lappend(gtm_txninfo->gti_created_seqs, seq); +} + +void +GTM_RememberDroppedSequence(GlobalTransactionId gxid, void *seq) +{ + GTM_TransactionInfo *gtm_txninfo; + GTM_TransactionHandle txn = GTM_GXIDToHandle(gxid); + + if (txn == InvalidTransactionHandle) + return; + + gtm_txninfo = GTM_HandleToTransactionInfo(txn); + gtm_txninfo->gti_dropped_seqs = + gtm_lappend(gtm_txninfo->gti_dropped_seqs, seq); +} + +void +GTM_RememberAlteredSequence(GlobalTransactionId gxid, void *seq) +{ + GTM_TransactionInfo *gtm_txninfo; + GTM_TransactionHandle txn = GTM_GXIDToHandle(gxid); + + if (txn == InvalidTransactionHandle) + return; + + gtm_txninfo = GTM_HandleToTransactionInfo(txn); + gtm_txninfo->gti_altered_seqs = gtm_lcons(seq, + gtm_txninfo->gti_altered_seqs); +} + + /* * TODO */ |
