From 6c6627083484529ee0cad852f7ab5168bc9ce790 Mon Sep 17 00:00:00 2001 From: Arunprasad Rajkumar Date: Wed, 22 Nov 2023 16:27:32 +0530 Subject: [PATCH] Recover from crash using undo log This commit remembers the file write position of a last successful transaction in a undo file. If there's a crash and we need to restart, we use this undo file to get rid of any partial work from the last transaction that wasn't finished. TODO: Can we use the undo log mechanism to even for graceful exit? This would simplify the code and make it uniform for both graceful and ungraceful exits/crashes. The idea is to undo the changes made by the last incomplete transaction by reading the undo log file and write other messages like ENDPOS. Signed-off-by: Arunprasad Rajkumar --- src/bin/pgcopydb/copydb.c | 4 + src/bin/pgcopydb/copydb.h | 1 + src/bin/pgcopydb/copydb_paths.h | 1 + src/bin/pgcopydb/follow.c | 16 ++ src/bin/pgcopydb/ld_stream.c | 268 +++++++++++++++++++++++++++++--- src/bin/pgcopydb/ld_stream.h | 10 +- 6 files changed, 278 insertions(+), 22 deletions(-) diff --git a/src/bin/pgcopydb/copydb.c b/src/bin/pgcopydb/copydb.c index 4a4132160..dedeef31a 100644 --- a/src/bin/pgcopydb/copydb.c +++ b/src/bin/pgcopydb/copydb.c @@ -572,6 +572,10 @@ copydb_prepare_filepaths(CopyFilePaths *cfPaths, "%s/lsn.json", cfPaths->cdc.dir); + sformat(cfPaths->cdc.undofile, MAXPGPATH, + "%s/undo", + cfPaths->cdc.dir); + /* * Now prepare the "compare" files we need to compare schema and data * between the source and target instance. diff --git a/src/bin/pgcopydb/copydb.h b/src/bin/pgcopydb/copydb.h index 4beffb547..ad3cb9e13 100644 --- a/src/bin/pgcopydb/copydb.h +++ b/src/bin/pgcopydb/copydb.h @@ -26,6 +26,7 @@ { "extra_float_digits", "3" }, \ { "statement_timeout", "0" }, \ { "default_transaction_read_only", "off" } + /* * These parameters are added to the connection strings, unless the user has * added them, allowing user-defined values to be taken into account. diff --git a/src/bin/pgcopydb/copydb_paths.h b/src/bin/pgcopydb/copydb_paths.h index 66f83ab69..e12a98cb2 100644 --- a/src/bin/pgcopydb/copydb_paths.h +++ b/src/bin/pgcopydb/copydb_paths.h @@ -54,6 +54,7 @@ typedef struct CDCPaths char tlifile[MAXPGPATH]; /* /tmp/pgcopydb/cdc/tli */ char tlihistfile[MAXPGPATH]; /* /tmp/pgcopydb/cdc/tli.history */ char lsntrackingfile[MAXPGPATH]; /* /tmp/pgcopydb/cdc/lsn.json */ + char undofile[MAXPGPATH]; /* /tmp/pgcopydb/cdc/undo */ } CDCPaths; diff --git a/src/bin/pgcopydb/follow.c b/src/bin/pgcopydb/follow.c index e67bd2c30..5cdb9a3ff 100644 --- a/src/bin/pgcopydb/follow.c +++ b/src/bin/pgcopydb/follow.c @@ -257,6 +257,22 @@ follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel, bool verbose) bool follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs) { + /* + * Incase of a crash, execute recovery actions before starting the + * main loop. + */ + if (!recoverFromUndoLog(&streamSpecs->paths)) + { + log_error("Failed to recover from undo log, see above for details"); + return false; + } + + if (!removeUndoLog(&streamSpecs->paths)) + { + log_error("Failed to remove undo log, see above for details"); + return false; + } + /* * Remove the possibly still existing stream context files from * previous round of operations (--resume, etc). We want to make diff --git a/src/bin/pgcopydb/ld_stream.c b/src/bin/pgcopydb/ld_stream.c index d1733d949..8eb714357 100644 --- a/src/bin/pgcopydb/ld_stream.c +++ b/src/bin/pgcopydb/ld_stream.c @@ -546,6 +546,181 @@ startLogicalStreaming(StreamSpecs *specs) } +static bool +truncateWalFileToSize(CDCPaths *paths, char *walFileName, long size) +{ + if (!file_exists(walFileName)) + { + return true; + } + + char oldWalFileName[MAXPGPATH] = { 0 }; + sformat(oldWalFileName, sizeof(oldWalFileName), "%s.old", walFileName); + + log_info("Recovering WAL by truncating file \"%s\" to %ld bytes", + walFileName, + size); + if (!unlink_file(oldWalFileName)) + { + log_error("Failed to remove file \"%s\": %m", oldWalFileName); + return false; + } + + if (!duplicate_file(walFileName, oldWalFileName)) + { + log_error("Failed to duplicate file \"%s\" to \"%s\": %m", + walFileName, + oldWalFileName); + return false; + } + + if (truncate(walFileName, size) != 0) + { + log_error("Failed to truncate file \"%s\" to %ld bytes: %m", + walFileName, + size); + return false; + } + + /* Make it latest */ + if (size > 0) + { + if (!stream_update_latest_symlink(paths, walFileName)) + { + log_error("Failed to update latest symlink to \"%s\", " + "see above for details", + walFileName); + return false; + } + } + + return true; +} + + +static bool +truncateWalToSize(CDCPaths *paths, char *wal, long size) +{ + char walFileName[MAXPGPATH] = { 0 }; + + sformat(walFileName, sizeof(walFileName), "%s/%s.json", + paths->dir, + wal); + if (!truncateWalFileToSize(paths, walFileName, size)) + { + return false; + } + + sformat(walFileName, sizeof(walFileName), "%s/%s.json.partial", + paths->dir, + wal); + if (!truncateWalFileToSize(paths, walFileName, size)) + { + return false; + } + + return true; +} + + +/* + * recoverFromUndoLog recovers from undo log file by reverting the changes + * made after the last known commit message position. + */ +bool +recoverFromUndoLog(CDCPaths *paths) +{ + if (!file_exists(paths->undofile)) + { + log_notice("Undo log file \"%s\" does not exist, skipping", + paths->undofile); + return true; + } + + long size = 0L; + char *contents = NULL; + + if (!read_file(paths->undofile, &contents, &size)) + { + log_error("Failed to read undo log file \"%s\": %m", + paths->undofile); + return false; + } + + int count = countLines(contents); + char **lines = (char **) calloc(count, sizeof(char *)); + if (lines == NULL) + { + log_error(ALLOCATION_FAILED_ERROR); + return false; + } + + count = splitLines(contents, lines, count); + + for (int i = 0; i < count; i++) + { + char wal[MAXPGPATH] = { 0 }; + long offset = 0L; + char *undoLine = lines[i]; + + /* sscanf format: WAL_SEGMENT_FILENAME OFFSET */ + if (sscanf(undoLine, "%s%ld", wal, &offset) != 2) /* IGNORE-BANNED */ + { + log_error("Failed to parse undo log line \"%s\"", undoLine); + return false; + } + + if (!truncateWalToSize(paths, wal, offset)) + { + /* errors have already been logged */ + return false; + } + } + + return true; +} + + +/* + * removeUndoLog removes the undo log file. + */ +bool +removeUndoLog(CDCPaths *paths) +{ + char undofileOld[MAXPGPATH] = { 0 }; + sformat(undofileOld, sizeof(undofileOld), "%s.old", paths->undofile); + + if (!file_exists(paths->undofile)) + { + log_notice("Undo log file \"%s\" does not exist, skipping", + paths->undofile); + return true; + } + + if (!unlink_file(undofileOld)) + { + log_error("Failed to remove file \"%s\": %m", undofileOld); + return false; + } + + if (!duplicate_file(paths->undofile, undofileOld)) + { + log_error("Failed to duplicate file \"%s\" to \"%s\": %m", + paths->undofile, + undofileOld); + return false; + } + + if (!unlink_file(paths->undofile)) + { + /* errors have already been logged */ + return false; + } + + return true; +} + + /* * streamCheckResumePosition checks that the resume position on the replication * slot on the source database is in-sync with the lastest on-file LSN we have. @@ -772,6 +947,43 @@ stream_write_json(LogicalStreamContext *context, bool previous) return false; } + /* + * Create a undo log which tracks the the BEGIN file offset in + * privateContext->jsonFile. When we receive a COMMIT message, undo log + * will be truncated, otherwise this can be used to reset the file offset + * to the BEGIN message. + */ + if (metadata->action == STREAM_ACTION_BEGIN) + { + char *undofile = privateContext->paths.undofile; + privateContext->undoFile = fopen_with_umask(undofile, + "ab", + FOPEN_FLAGS_A, + 0644); + + /* file format: WAL_SEGMENT_FILENAME OFFSET */ + fformat(privateContext->undoFile, + "%s %ld\n", + privateContext->wal, + ftell(privateContext->jsonFile)); + fflush(privateContext->undoFile); + + strlcpy(privateContext->lastUndoWal, privateContext->wal, MAXPGPATH); + privateContext->transactionInProgress = true; + } + else if (privateContext->transactionInProgress && + !streq(privateContext->wal, privateContext->lastUndoWal)) + { + /* Capture new WAL file name in undo log */ + /* file format: WAL_SEGMENT_FILENAME OFFSET */ + fformat(privateContext->undoFile, + "%s 0\n", /* Truncate the whole file */ + privateContext->wal); + fflush(privateContext->undoFile); + + strlcpy(privateContext->lastUndoWal, privateContext->wal, MAXPGPATH); + } + /* prepare a in-memory buffer with the whole data formatted in JSON */ PQExpBuffer buffer = createPQExpBuffer(); @@ -802,6 +1014,7 @@ stream_write_json(LogicalStreamContext *context, bool previous) return false; } + /* then add the logical output plugin data, inside our own JSON format */ if (!write_to_stream(privateContext->jsonFile, buffer->data, buffer->len)) { @@ -852,18 +1065,22 @@ stream_write_json(LogicalStreamContext *context, bool previous) destroyPQExpBuffer(buffer); free(metadata->jsonBuffer); - /* - * Maintain the transaction progress based on the BEGIN and COMMIT messages - * received from replication slot. We don't care about the other messages. - */ - if (metadata->action == STREAM_ACTION_BEGIN) - { - privateContext->transactionInProgress = true; - } - else if (metadata->action == STREAM_ACTION_COMMIT) + if (metadata->action == STREAM_ACTION_COMMIT) { + /* Truncate the undo log */ + fclose(privateContext->undoFile); + + if (!unlink_file(privateContext->paths.undofile)) + { + log_error("Failed to remove undo log file \"%s\": %m", + privateContext->paths.undofile); + destroyPQExpBuffer(buffer); + return false; + } + privateContext->transactionInProgress = false; } + /* * We are not expecting STREAM_ACTION_ROLLBACK here. It's a custom * message we write directly to the "latest" file using @@ -973,7 +1190,7 @@ streamRotateFile(LogicalStreamContext *context) /* get the segment number from the current_record_lsn */ XLogSegNo segno; - char wal[MAXPGPATH] = { 0 }; + char *wal = privateContext->wal; char walFileName[MAXPGPATH] = { 0 }; char partialFileName[MAXPGPATH] = { 0 }; @@ -1164,7 +1381,7 @@ streamRotateFile(LogicalStreamContext *context) * Also maintain the "latest" symbolic link to the latest file where * we've been streaming changes in. */ - if (!stream_update_latest_symlink(privateContext, + if (!stream_update_latest_symlink(&privateContext->paths, privateContext->partialFileName)) { log_error("Failed to update latest symlink to \"%s\", " @@ -1217,11 +1434,6 @@ streamCloseFile(LogicalStreamContext *context, bool time_to_abort) * On graceful exit, ROLLBACK the last incomplete transaction. * As we resume from a consistent point, there's no concern about * the transaction being rolled back here. - * - * TODO: For process crashes (e.g., segmentation faults), this - * method won't work, potentially leaving incomplete transactions. - * To handle this, we should read the last message from the "latest" - * file and rollback any incomplete transaction found. */ if (time_to_abort && privateContext->jsonFile != NULL && @@ -1237,8 +1449,22 @@ streamCloseFile(LogicalStreamContext *context, bool time_to_abort) /* errors have already been logged */ return false; } - } + /* + * We no longer need undo log as we already made the last transaction + * consistent by writing a ROLLBACK message. + * + * TODO: Can we use the undo log mechanism to even for graceful exit? + * This would simplify the code and make it uniform for both graceful + * and ungraceful exits/crashes. The idea is to undo the changes made + * by the last incomplete transaction by reading the undo log file and + * write other messages like ENDPOS. + */ + if (!removeUndoLog(&privateContext->paths)) + { + return false; + } + } /* * If we have a JSON file currently opened, then close it. @@ -1276,7 +1502,7 @@ streamCloseFile(LogicalStreamContext *context, bool time_to_abort) } /* and also update the "latest" symlink, we need it for --resume */ - if (!stream_update_latest_symlink(privateContext, + if (!stream_update_latest_symlink(&privateContext->paths, privateContext->walFileName)) { log_error("Failed to update latest symlink to \"%s\", " @@ -1991,12 +2217,12 @@ stream_read_latest(StreamSpecs *specs, StreamContent *content) * filename, that must already exists on the file system. */ bool -stream_update_latest_symlink(StreamContext *privateContext, +stream_update_latest_symlink(CDCPaths *paths, const char *filename) { char latest[MAXPGPATH] = { 0 }; - sformat(latest, sizeof(latest), "%s/latest", privateContext->paths.dir); + sformat(latest, sizeof(latest), "%s/latest", paths->dir); if (!unlink_file(latest)) { @@ -2012,7 +2238,7 @@ stream_update_latest_symlink(StreamContext *privateContext, log_debug("stream_update_latest_symlink: \"%s\" -> \"%s\"", latest, - privateContext->partialFileName); + filename); return true; } diff --git a/src/bin/pgcopydb/ld_stream.h b/src/bin/pgcopydb/ld_stream.h index 2a81bb8cb..c5a56f1a4 100644 --- a/src/bin/pgcopydb/ld_stream.h +++ b/src/bin/pgcopydb/ld_stream.h @@ -317,6 +317,11 @@ typedef struct StreamContext FILE *jsonFile; FILE *sqlFile; + char wal[MAXPGPATH]; + char lastUndoWal[MAXPGPATH]; + FILE *undoFile; + bool undoInProgress; + StreamCounters counters; bool transactionInProgress; @@ -499,6 +504,9 @@ bool stream_init_context(StreamSpecs *specs); bool startLogicalStreaming(StreamSpecs *specs); bool streamCheckResumePosition(StreamSpecs *specs); +bool recoverFromUndoLog(CDCPaths *path); +bool removeUndoLog(CDCPaths *path); + bool streamWrite(LogicalStreamContext *context); bool streamFlush(LogicalStreamContext *context); bool streamKeepalive(LogicalStreamContext *context); @@ -529,7 +537,7 @@ bool stream_write_internal_message(LogicalStreamContext *context, bool stream_read_file(StreamContent *content); bool stream_read_latest(StreamSpecs *specs, StreamContent *content); -bool stream_update_latest_symlink(StreamContext *privateContext, +bool stream_update_latest_symlink(CDCPaths *paths, const char *filename); bool buildReplicationURI(const char *pguri, char **repl_pguri);