Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume from exact position and truncate the rest #536

Closed
wants to merge 2 commits into from

Conversation

arajkumar
Copy link
Contributor

@arajkumar arajkumar commented Nov 9, 2023

The new logic finds the WAL segment file that contains the given message from the metadata and truncates upto the that position.

According to the implementation of streamRotateFile, the message could be anywhere between WalJsonFile(metadata->lsn) ... latest.

Consider the following scenario where we have 4 WAL segment files:

000000010000000000000000.json
000000010000000000000001.json
000000010000000000000002.json
000000010000000000000003.json <-- latest

When receive a message with LSN 0/144BEE0, the search starts from the corresponding WAL segment 000000010000000000000001.json and goes upto 000000010000000000000003.json(latest). Assume that we found the message in 000000010000000000000002.json, we need to truncate the file upto the message position and remove(renaming for debugging) all the files after that and make that file as the latest file.

If we couldn't find the message in any of those files, it means that the message is not yet streamed and we can keep writing to the latest file.

@arajkumar
Copy link
Contributor Author

I'm seeing the same issue #471 even after this PR,

2023-11-09 13:57:05 30826 NOTICE follow.c:829 Starting the catchup sub-process
2023-11-09 13:57:05 30824 INFO ld_stream.c:2279 Resuming streaming from latest file "/home/ubuntu/.local/share/pgcopydb/000000050000019E00000023.json"
2023-11-09 13:57:05 30825 SQL pgsql.c:1471 [SOURCE 171458] select startpos, endpos, apply, write_lsn, flush_lsn, replay_lsn from pgcopydb.sentinel;
2023-11-09 13:57:05 30825 SQL pgsql.c:400 Disconnecting from [source] "postgres://[email protected]:26479/defaultdb?sslmode=require&keepalives=1&keepalives_idle=10&keepalives_interval=10&keepalives_count=60"
2023-11-09 13:57:05 30825 NOTICE ld_transform.c:234 Transforming from 19E/239C1800 in "/home/ubuntu/.local/share/pgcopydb/000000050000019E00000023.sql"
2023-11-09 13:57:05 30825 NOTICE ld_transform.c:815 Transforming JSON file "/home/ubuntu/.local/share/pgcopydb/000000050000019E00000023.json" into SQL file "/home/ubuntu/.local/share/pgcopydb/000000050000019E00000023.sql"
2023-11-09 13:57:05 30826 INFO ld_apply.c:254 Replaying changes from LSN 19E/23A05A40
...
...
2023-11-09 13:57:05 30824 NOTICE ld_stream.c:1722 Searching for xid 9526907 message BEGIN lsn 19E/23964FB8 in file "/home/ubuntu/.local/share/pgcopydb/000000050000019E00000023.json" upto file "/home/ubuntu/.local/share/pgcopydb/000000050000019E00000023.json"
2023-11-09 13:57:05 30825 INFO ld_transform.c:983 Transformed 24392 JSON messages into SQL file "/home/ubuntu/.local/share/pgcopydb/000000050000019E00000023.sql"
2023-11-09 13:57:05 30824 NOTICE ld_stream.c:1751 Found xid 9526907 message BEGIN lsn 19E/23964FB8 in file "/home/ubuntu/.local/share/pgcopydb/000000050000019E00000023.json"
2023-11-09 13:57:05 30824 NOTICE ld_stream.c:1834 Truncated file "/home/ubuntu/.local/share/pgcopydb/000000050000019E00000023.json" to 14947386 bytes
2023-11-09 13:57:05 30824 NOTICE ld_stream.c:1195 Now streaming changes to "/home/ubuntu/.local/share/pgcopydb/000000050000019E00000023.json.partial"
2023-11-09 13:57:05 30825 ERROR ld_transform.c:1098 Failed to parse BEGIN: transaction already in progress
2023-11-09 13:57:05 30825 ERROR ld_transform.c:472 Failed to parse JSON message: {"action":"B","xid":"9526907","lsn":"19E/23964FB8","timestamp":"2023-11-09 13:57:05.742384+0000","message":{"action":"B","xid":9526907}}
2023-11-09 13:57:05 30825 ERROR ld_transform.c:100 Failed to transform JSON messages from input stream, see above for details
2023-11-09 13:57:06 30814 ERROR follow.c:964 Subprocess transform with pid 30825 has exited with error code 12
2023-11-09 13:57:06 30814 SQL pgsql.c:490 Connecting to [source] "postgres://[email protected]:26479/defaultdb?sslmode=require&keepalives=1&keepalives_idle=10&keepalives_interval=10&keepalives_count=60"
2023-11-09 13:57:06 30814 SQL pgsql.c:1471 [SOURCE 171470] select startpos, endpos, apply, write_lsn, flush_lsn, replay_lsn from pgcopydb.sentinel;

What happens is that while switching from prefetch to replay, stream_transform_stream calls stream_transform_resume which streams existing 000000050000019E00000023.json(which is not yet truncated), which can contain partial txn. When we receive our first streamWrite callback we truncate 000000050000019E00000023.json and the continue with the streaming. I think we should call partial txn truncation somewhere from streamCheckResumePosition.

@dimitri
Copy link
Owner

dimitri commented Nov 9, 2023

I think we should call partial txn truncation somewhere from streamCheckResumePosition.

That sounds like what we need to do yes. Is it possible that's the whole fix for the situation? I mean, if you agree to it, could we make a separate PR that does only that?

Then the plan would be to get back on this PR here, because I like the “cache invalidation” mechanism that we would obtain at re-connecting to the streaming protocol / replication slot, I think that's the best approach here.

@arajkumar
Copy link
Contributor Author

That sounds like what we need to do yes. Is it possible that's the whole fix for the situation? I mean, if you agree to it, could we make a separate PR that does only that?

@dimitri I attempted this and hit the same problem again(transform reads the latest file even before we truncate it). I'm going to add synchronisation between streamCheckResumePosition and

/*
* If the JSON file already exists on-disk, make sure to read it file again
* now. The previous round of streaming might have stopped at an endpos
* that fits in the middle of a transaction.
*
* We can think about this as "cache invalidation" of the SQL file on-disk.
*/
if (file_exists(jsonFileName))
{
if (!stream_transform_file(specs, jsonFileName, sqlFileName))
{
log_error("Failed to resume transforming from existing file \"%s\"",
sqlFileName);
return false;
}
}
.

@arajkumar
Copy link
Contributor Author

@dimitri I think this approach is not going to work unless we do drastic change to the existing flow. The current implementation assumes that wal segments files are immutable and various assumptions are made according to that. For example, When a new message has arrived, we decide the respective wal file based on maxWrittenLSN and metadata->lsn., it just breaks when we truncate the wal file. Additionally we need to add synchronisation between receive and transform to avoid reading partial files from transform process.

The new logic finds the WAL segment file that contains the given
message from the metadata and truncates upto the that position.

According to the implementation of streamRotateFile, the message could be
anywhere between pg_walfile_name(metadata->lsn) ... latest.

Consider the following scenario where we have 4 WAL segment files:

000000010000000000000000.json
000000010000000000000001.json
000000010000000000000002.json
000000010000000000000003.json  <-- latest

When receive a message with LSN 0/144BEE0, the search starts from the
corresponding WAL segment 000000010000000000000001.json and goes upto
000000010000000000000003.json(latest). Assume that we found the message
in 000000010000000000000002.json, we need to truncate the file upto the
message position and remove(renaming for debugging) all the files after
that and make that file as the latest file.

If we couldn't find the message in any of the files, it means that the
message is not yet streamed and we can keep writing to the latest file.

Signed-off-by: Arunprasad Rajkumar <[email protected]>
@dimitri
Copy link
Owner

dimitri commented Nov 15, 2023

Additionally we need to add synchronisation between receive and transform to avoid reading partial files from transform process.

I wonder now if in replay mode the transform process could wait until it receives its first message in the PIPE to do its initialization, and then it could use the LSN of that first message as input when/where needed. Also the whole idea of the transform process needing to read pre-existing files at startup is because of transactions that span multiple files. We could also force-store these in their own file (${xid}.lsn) and only issue an INCLUDE statement of sorts in the main SQL file. See also #511.

What do you think?

@arajkumar
Copy link
Contributor Author

arajkumar commented Nov 16, 2023

I wonder now if in replay mode the transform process could wait until it receives its first message in the PIPE to do its initialization, and then it could use the LSN of that first message as input when/where needed

@dimitri This won't work when we resume after reaching the ENDPOS. The receive would never write any message into the PIPE as it already reached the ENPOS and the transform will wait forever.

Currently, we handle the ENDPOS and exit early before reading from stream(PIPE).

if (!stream_transform_resume(specs))
{
log_error("Failed to resume streaming from %X/%X",
LSN_FORMAT_ARGS(privateContext->startpos));
return false;
}
LogicalMessageMetadata *metadata = &(privateContext->metadata);
if (privateContext->endpos != InvalidXLogRecPtr &&
privateContext->endpos <= metadata->lsn)
{
log_info("Transform reached end position %X/%X at %X/%X",
LSN_FORMAT_ARGS(privateContext->endpos),
LSN_FORMAT_ARGS(metadata->lsn));
return true;
}

@arajkumar
Copy link
Contributor Author

@dimitri I've also attempted a rollback approach in #544.

@arajkumar
Copy link
Contributor Author

@dimitri I'm trying out alternate approach using undo logs. It records last commit position in a undo file and resume from it incase if pgcopydb exits in the midway.

@arajkumar
Copy link
Contributor Author

This is no longer needed as #544 already merged into main.

@arajkumar arajkumar closed this Nov 21, 2023
@dimitri
Copy link
Owner

dimitri commented Nov 21, 2023

I'm wondering if your approach with undo logs is needed to handle the crash conditions?

@arajkumar
Copy link
Contributor Author

@dimitri Yes, that should help to recover from the crash. I already have a WIP PR: #549.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants