Skip to content

Commit

Permalink
Fix duplicate key errors on resuming continued txn
Browse files Browse the repository at this point in the history
This commit addresses and resolves the issue of duplicate key errors when resuming partially executed transactions (continuedTxn) in pgcopydb. We have reintroduced the transaction metadata file, which is essential for identifying the commitLSN of a partial transaction.

Unlike our previous approach, which led to a deadlock between the transform and apply phases, this update brings a more efficient process. Now, the apply phase creates metadata for any partial (continued) transactions during the commit. This metadata is then used to accurately skip the already applied partial transaction if a resume is needed.

This fix is crucial, particularly for tables with unique constraints, where executing the same continued transaction twice previously resulted in duplicate key errors. With this update, pgcopydb ensures smooth and error-free handling of transaction resumes.

Signed-off-by: Arunprasad Rajkumar <[email protected]>
  • Loading branch information
arajkumar committed Nov 28, 2023
1 parent ac9c72f commit 4927abb
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 2 deletions.
214 changes: 213 additions & 1 deletion src/bin/pgcopydb/ld_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ GUC applySettings[] = {
{ NULL, NULL },
};

static bool readTxnCommitLSN(LogicalMessageMetadata *metadata, const char *dir,
bool *txnCommitLSNFound);
static bool parseTxnMetadataFile(const char *filename, LogicalMessageMetadata *metadata);

static bool computeTxnMetadataFilename(uint32_t xid, const char *dir, char *filename);

static bool writeTxnCommitMetadata(LogicalMessageMetadata *mesg, const char *dir);

/*
* stream_apply_catchup catches up with SQL files that have been prepared by
Expand Down Expand Up @@ -671,6 +678,16 @@ stream_apply_sql(StreamApplyContext *context,
return false;
}

bool txnCommitLSNFound = false;

if (!readTxnCommitLSN(metadata,
context->paths.dir,
&txnCommitLSNFound))
{
log_error("Failed to read transaction metadata file");
return false;
}

/*
* Few a time, BEGIN won't have a txnCommitLSN for the txn which
* spread across multiple WAL segments. We call that txn as
Expand All @@ -680,7 +697,7 @@ stream_apply_sql(StreamApplyContext *context,
* The lsn of a COMMIT message determines whether to keep txn or
* abort.
*/
context->continuedTxn = metadata->txnCommitLSN == InvalidXLogRecPtr;
context->continuedTxn = !txnCommitLSNFound;

/* did we reach the starting LSN positions now? */
if (!context->reachedStartPos)
Expand Down Expand Up @@ -802,6 +819,27 @@ stream_apply_sql(StreamApplyContext *context,
{
context->reachedStartPos = context->previousLSN < metadata->lsn;

if (context->continuedTxn)
{
/*
* Write the transaction metadata file for continuedTxn.
* This file will be used for the resumed transaction
* to determine whether allow the transaction to be
* replayed or not.
* Without this, executing the same continuedTxn twice
* will result in duplicate key errors if the table has
* unique constraints.
*/
if (!writeTxnCommitMetadata(metadata, context->paths.dir))
{
log_error("Failed to write transaction metadata file");
return false;
}
log_debug("Wrote transaction metadata file for "
"continuedTxn %lld",
(long long) metadata->xid);
}

if (!context->reachedStartPos)
{
/*
Expand Down Expand Up @@ -1819,3 +1857,177 @@ stream_apply_read_lsn_tracking(StreamApplyContext *context)

return true;
}


/*
* readTxnCommitLSN ensures metadata has transaction COMMIT LSN by fetching it
* from metadata file if it is not present
*/
static bool
readTxnCommitLSN(LogicalMessageMetadata *metadata,
const char *dir,
bool *txnCommitLSNFound)
{
/* if txnCommitLSN is invalid, then fetch it from txn metadata file */
if (metadata->txnCommitLSN != InvalidXLogRecPtr)
{
*txnCommitLSNFound = true;
return true;
}

char txnfilename[MAXPGPATH] = { 0 };

if (!computeTxnMetadataFilename(metadata->xid,
dir,
txnfilename))
{
/* errors have already been logged */
return false;
}

if (!file_exists(txnfilename))
{
*txnCommitLSNFound = false;
return true;
}

log_debug("stream_apply_sql: BEGIN message without a commit LSN, "
"fetching commit LSN from transaction metadata file \"%s\"",
txnfilename);

LogicalMessageMetadata txnMetadata = { .xid = metadata->xid };

if (!parseTxnMetadataFile(txnfilename, &txnMetadata))
{
/* errors have already been logged */
return false;
}

*txnCommitLSNFound = true;
metadata->txnCommitLSN = txnMetadata.txnCommitLSN;

return true;
}


/*
* parseTxnMetadataFile returns the transaction metadata content for the given
* metadata filename.
*/
static bool
parseTxnMetadataFile(const char *filename, LogicalMessageMetadata *metadata)
{
/* store xid as it will be overwritten while parsing metadata */
uint32_t xid = metadata->xid;

if (xid == 0)
{
log_error("BUG: parseTxnMetadataFile is called with "
"transaction xid: %lld", (long long) xid);
return false;
}

char *txnMetadataContent = NULL;
long size = 0L;

if (!read_file(filename, &txnMetadataContent, &size))
{
/* errors have already been logged */
return false;
}

JSON_Value *json = json_parse_string(txnMetadataContent);

if (!parseMessageMetadata(metadata, txnMetadataContent, json, true))
{
/* errors have already been logged */
json_value_free(json);
return false;
}

json_value_free(json);

if (metadata->txnCommitLSN == InvalidXLogRecPtr ||
metadata->xid != xid ||
IS_EMPTY_STRING_BUFFER(metadata->timestamp))
{
log_error("Failed to parse metadata for transaction metadata file "
"%s: %s", filename, txnMetadataContent);
return false;
}

return true;
}


/*
* computeTxnMetadataFilename computes the file path for transaction metadata
* based on its transaction id
*/
static bool
computeTxnMetadataFilename(uint32_t xid, const char *dir, char *filename)
{
if (dir == NULL)
{
log_error("BUG: computeTxnMetadataFilename is called with "
"directory: NULL");
return false;
}

if (xid == 0)
{
log_error("BUG: computeTxnMetadataFilename is called with "
"transaction xid: %lld", (long long) xid);
return false;
}

sformat(filename, MAXPGPATH, "%s/%lld.json", dir, (long long) xid);

return true;
}


/*
* writeTxnCommitMetadata writes the transaction metadata to a file in the given
* directory
*/
static bool
writeTxnCommitMetadata(LogicalMessageMetadata *mesg, const char *dir)
{
char txnfilename[MAXPGPATH] = { 0 };

if (mesg->action != STREAM_ACTION_COMMIT)
{
log_error("BUG: writeTxnCommitMetadata is called with "
"action: %c", mesg->action);
return false;
}

if (!computeTxnMetadataFilename(mesg->xid, dir, txnfilename))
{
/* errors have already been logged */
return false;
}

log_debug("stream_write_commit_metadata_file: writing transaction "
"metadata file \"%s\" with commit lsn %X/%X",
txnfilename,
LSN_FORMAT_ARGS(mesg->lsn));

char contents[BUFSIZE] = { 0 };

sformat(contents, BUFSIZE,
"{\"xid\":%lld,\"commit_lsn\":\"%X/%X\",\"timestamp\":\"%s\"}\n",
(long long) mesg->xid,
LSN_FORMAT_ARGS(mesg->lsn),
mesg->timestamp);

/* write the metadata to txnfilename */
if (!write_file(contents, strlen(contents), txnfilename))
{
log_error("Failed to write file \"%s\"", txnfilename);
return false;
}

return true;
}
6 changes: 5 additions & 1 deletion tests/endpos-in-multi-wal-txn/copydb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ test 16 -eq `psql -AtqX -d ${PGCOPYDB_TARGET_PGURI} -c "${sql}"`
pgcopydb stream sentinel set endpos --current
# and replay the available changes, including the 3rd txn.
pgcopydb follow --resume --trace
#

# this should be a noop. We test whether we skip the txn which
# doesn't have commitLSN in the BEGIN message.
timeout 5s pgcopydb stream apply --trace --resume /var/lib/postgres/.local/share/pgcopydb/000000010000000000000004.sql

# now check that all the new rows made it
sql="select count(*) from table_a"
test 24 -eq `psql -AtqX -d ${PGCOPYDB_TARGET_PGURI} -c "${sql}"`
Expand Down

0 comments on commit 4927abb

Please sign in to comment.