Skip to content

Commit

Permalink
Resume from exact position and truncate the rest
Browse files Browse the repository at this point in the history
The new logic finds the WAL segment file that contains the given
message from the metadata and truncates upto the that position.

According to the implementation of streamRotateFile, the message could be
anywhere between pg_walfile_name(metadata->lsn) ... latest.

Consider the following scenario where we have 4 WAL segment files:

000000010000000000000000.json
000000010000000000000001.json
000000010000000000000002.json
000000010000000000000003.json  <-- latest

When receive a message with LSN 0/144BEE0, the search starts from the
corresponding WAL segment 000000010000000000000001.json and goes upto
000000010000000000000003.json(latest). Assume that we found the message
in 000000010000000000000002.json, we need to truncate the file upto the
message position and remove(renaming for debugging) all the files after
that and make that file as the latest file.

If we couldn't find the message in any of the files, it means that the
message is not yet streamed and we can keep writing to the latest file.

Signed-off-by: Arunprasad Rajkumar <[email protected]>
  • Loading branch information
arajkumar committed Nov 9, 2023
1 parent 63c2d85 commit 1688ce5
Show file tree
Hide file tree
Showing 2 changed files with 283 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/bin/pgcopydb/file_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,7 @@ set_program_absolute_path(char *program, int size)
* normalize_filename returns the real path of a given filename that belongs to
* an existing file on-disk, resolving symlinks and pruning double-slashes and
* other weird constructs. filename and dst are allowed to point to the same
* adress.
* address.
*/
bool
normalize_filename(const char *filename, char *dst, int size)
Expand Down
307 changes: 282 additions & 25 deletions src/bin/pgcopydb/ld_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -692,10 +692,12 @@ streamCheckResumePosition(StreamSpecs *specs)
specs->startposActionFromJSON = latest->action;

log_info("Resuming streaming at LSN %X/%X "
"from first message with that LSN read in JSON file \"%s\", "
"from JSON file \"%s\", "
"message %s "
"line %d",
LSN_FORMAT_ARGS(specs->startpos),
latestStreamedContent.filename,
StreamActionToString(latest->action),
lineNb);

char *latestMessage = latestStreamedContent.lines[lineNb];
Expand Down Expand Up @@ -1563,6 +1565,272 @@ streamFeedback(LogicalStreamContext *context)
}


static bool
findMessageInWalFile(const char *walFile,
LogicalMessageMetadata *metadata,
bool *found,
int *startLinePosition)
{
StreamContent content = { 0 };

strlcpy(content.filename, walFile, MAXPGPATH);

if (!stream_read_file(&content))
{
log_error("Failed to read file \"%s\": %m", walFile);
return false;
}

*found = false;
for (int i = 0; i < content.count; i++)
{
LogicalMessageMetadata *message = &(content.messages[i]);
if (message->lsn == metadata->lsn &&
message->xid == metadata->xid &&
message->action == metadata->action)
{
*found = true;
return true;
}

*startLinePosition += strlen(content.lines[i]) + 1; /* +1 for \n */
}

return false;
}


static bool
resolveWalFileName(LogicalStreamContext *context,
uint64_t lsn,
char *walFileNamePtr)
{
StreamContext *privateContext = (StreamContext *) context->private;
char wal[MAXPGPATH] = { 0 };
char partialFileName[MAXPGPATH] = { 0 };
XLogSegNo segno;

XLByteToSeg(lsn, segno, context->WalSegSz);
XLogFileName(wal, context->timeline, segno, context->WalSegSz);

sformat(partialFileName, sizeof(partialFileName), "%s/%s.json.partial",
privateContext->paths.dir,
wal);

/* Partial file is the first place to look for */
if (file_exists(partialFileName))
{
strlcpy(walFileNamePtr, partialFileName, MAXPGPATH);
return true;
}

char walFileName[MAXPGPATH] = { 0 };
sformat(walFileName, sizeof(walFileName), "%s/%s.json",
privateContext->paths.dir,
wal);

if (file_exists(walFileName))
{
strlcpy(walFileNamePtr, walFileName, MAXPGPATH);
return true;
}
return false;
}


/*
* truncateJSONFileToLSN finds the WAL segment file that contains the given
* message in *metadata and truncates upto the that position.
*
* According to the implementation of streamRotateFile, the message could be
* anywhere between pg_walfile_name(metadata->lsn) ... latest.
*
* Consider the following scenario where we have 4 WAL segment files:
*
* 000000010000000000000000.json
* 000000010000000000000001.json
* 000000010000000000000002.json
* 000000010000000000000003.json <-- latest
*
* When receive a message with LSN 0/144BEE0, the search starts from the
* corresponding WAL segment 000000010000000000000001.json and goes upto
* 000000010000000000000003.json(latest). Assume that we found the message
* in 000000010000000000000002.json, we need to truncate the file upto the
* message position and remove(renaming for debugging) all the files after
* that and make that file as the latest file.
*
* If we couldn't find the message in any of the files, it means that the
* message is not yet streamed and we can keep writing to the latest file.
*/
static bool
truncateJSONFileToLSN(LogicalStreamContext *context,
LogicalMessageMetadata *metadata)
{
StreamContext *privateContext = (StreamContext *) context->private;

char walFile[MAXPGPATH] = { 0 };
char latest[MAXPGPATH] = { 0 };
bool found = false;

/*
* Find the normalized name of the latest file.
*
* latest file acts as an endpoint to stop the search.
*/
sformat(latest, sizeof(latest), "%s/latest",
privateContext->paths.dir);

if (!file_exists(latest))
{
log_error("BUG: latest file \"%s\" does not exist", latest);
return false;
}

if (!normalize_filename(latest, latest, MAXPGPATH))
{
log_error("BUG: failed to normalize latest file \"%s\"", latest);
return false;
}

/*
* Begin the search from the expected WAL segment file for the given LSN
* and go upto the latest file.
*
* The next WAL segment file is determined by adding the WAL segment size
* to the LSN. next_wal_filename = wal_filename(lsn + wal_segment_size).
*/
uint64_t lsn = metadata->lsn;
int startLinePosition = 0;

while (!found || !streq(walFile, latest))
{
if (!resolveWalFileName(context, lsn, walFile))
{
log_error("Failed to resolve WAL segment file for LSN %X/%X",
LSN_FORMAT_ARGS(lsn));
return false;
}

log_notice("Searching for xid %d message %s lsn %X/%X in "
"file \"%s\" upto file \"%s\"",
metadata->xid,
StreamActionToString(metadata->action),
LSN_FORMAT_ARGS(metadata->lsn),
walFile,
latest);

if (!findMessageInWalFile(walFile,
metadata,
&found,
&startLinePosition))
{
log_error("Failed to find xid %d message %s lsn %X/%X "
"in file \"%s\"",
metadata->xid,
StreamActionToString(metadata->action),
LSN_FORMAT_ARGS(metadata->lsn),
walFile);
return false;
}

/* Move to the next WAL segment */
lsn += context->WalSegSz;
}

char walFileToTruncate[MAXPGPATH] = { 0 };
if (found)
{
log_notice("Found xid %d message %s lsn %X/%X in file \"%s\"",
metadata->xid,
StreamActionToString(metadata->action),
LSN_FORMAT_ARGS(metadata->lsn),
walFile);

/* walFileToTruncate is the file where we found the message */
strlcpy(walFileToTruncate, walFile, MAXPGPATH);
}
else
{
/*
* If we couldn't find the message in any of the files, it means that
* the message is not yet streamed and we can keep writing to the
* latest file.
*/
return true;
}

while (!streq(walFile, latest))
{
log_notice("Renaming file \"%s\" to \"%s.old\"",
walFile,
walFile);
if (!resolveWalFileName(context, lsn, walFile))
{
/*
* It's possible to crash in the middle of renaming files. In that
* case, we won't have few WAL files which are already renamed.
* However, we want to run upto the latest file and rename whatever
* files we have.
*/
log_debug("Failed to resolve WAL segment file for LSN %X/%X",
LSN_FORMAT_ARGS(lsn));
continue;
}

char walFileOld[MAXPGPATH] = { 0 };
sformat(walFileOld, sizeof(walFileOld), "%s.old", walFile);

/* Rename existing WAL segment file for debugging */
if (!unlink_file(walFileOld))
{
log_error("Failed to unlink stale file \"%s\", "
"see above for details",
walFileOld);
return false;
}

if (!move_file(walFile, walFileOld))
{
log_error("Failed to rename file \"%s\" to \"%s\": %m",
walFile,
walFileOld);
return false;
}

/* Move to the next WAL segment */
lsn += context->WalSegSz;
}

if (found)
{
/* Now latest must point to walFileToTruncate if both are different */
if (!streq(latest, walFileToTruncate))
{
if (!stream_update_latest_symlink(privateContext,
walFileToTruncate))
{
log_error("Failed to update latest symlink to \"%s\", "
"see above for details",
walFileToTruncate);
return false;
}
}

if (truncate(walFileToTruncate, startLinePosition) != 0)
{
log_error("Failed to truncate file \"%s\" to %d bytes: %m",
walFile,
startLinePosition);
return false;
}
log_notice("Truncated file \"%s\" to %d bytes",
walFile,
startLinePosition);
}
return found;
}


/*
* prepareMessageMetadataFromContext prepares the Logical Message Metadata from
* the fields grabbbed in the logical streaming protocol.
Expand Down Expand Up @@ -1616,31 +1884,20 @@ prepareMessageMetadataFromContext(LogicalStreamContext *context)
if (!privateContext->reachedStartPos)
{
/*
* Also the same LSN might be assigned to a BEGIN message, a COMMIT
* message, and a KEEPALIVE message. Avoid skipping what looks like the
* same message as the latest flushed in our JSON file when it's
* actually a new message.
* Find the WAL segment file for the given LSN, and open it to
* determine the offset of the LSN in the file. If we can't find the
* go to the next segment file until we there is none.
*
* Once we find the offset of the LSN in the WAL segment file, we can
* trim the file to that offset and start streaming from there.
*/
privateContext->reachedStartPos =
privateContext->startpos < metadata->lsn ||
(privateContext->startpos == metadata->lsn &&
metadata->action != privateContext->startposActionFromJSON);
}

if (!privateContext->reachedStartPos)
{
metadata->filterOut = true;

log_debug("Skipping write for action %c for XID %u at LSN %X/%X: "
"startpos %X/%X not been reached",
metadata->action,
metadata->xid,
LSN_FORMAT_ARGS(metadata->lsn),
LSN_FORMAT_ARGS(privateContext->startpos));

*previous = *metadata;

return true;
if (!truncateJSONFileToLSN(context, metadata))
{
log_error("Failed to find WAL segment file for LSN %X/%X",
LSN_FORMAT_ARGS(metadata->lsn));
return false;
}
privateContext->reachedStartPos = true;
}

if (!prepareMessageJSONbuffer(context))
Expand Down

0 comments on commit 1688ce5

Please sign in to comment.