Skip to content

Commit

Permalink
Fix snapshot bug (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmantica11 authored Aug 22, 2024
1 parent 70688c3 commit 9e19271
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 19 deletions.
4 changes: 3 additions & 1 deletion src/ingester/fetchers/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ pub fn get_poller_block_stream(
pub async fn fetch_current_slot_with_infinite_retry(client: &RpcClient) -> u64 {
loop {
match client.get_slot().await {
Ok(slot) => return slot,
Ok(slot) => {
return slot;
}
Err(e) => {
log::error!("Failed to fetch current slot: {}", e);
sleep(Duration::from_secs(5));
Expand Down
8 changes: 4 additions & 4 deletions src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use log::info;
use s3::creds::Credentials;
use s3::region::Region;
use s3::{bucket::Bucket, BucketConfiguration};
use s3_utils::multipart_upload::put_object_stream_custom;
use tokio::io::{AsyncRead, ReadBuf};
pub mod s3_utils;

pub const MEGABYTE: usize = 1024 * 1024;
pub const CHUNK_SIZE: usize = 10 * 1024 * 1024;
Expand Down Expand Up @@ -114,7 +116,7 @@ impl R2DirectoryAdapter {
let stream = result.bytes();

while let Some(byte) = stream.next().await {
let byte = byte.with_context(|| "Failed to read byte from file")?;
let byte = byte.with_context(|| "Failed to read byte from file").unwrap();
yield Ok(byte);
}
}
Expand Down Expand Up @@ -158,9 +160,7 @@ impl R2DirectoryAdapter {
byte_buffer: Vec::new(),
};
// Stream the bytes directly to S3 without collecting them in memory
self.r2_bucket
.put_object_stream(&mut stream_reader, &path)
.await?;
put_object_stream_custom(&self.r2_bucket, &mut stream_reader, &path).await?;
Ok(())
}
}
Expand Down
44 changes: 30 additions & 14 deletions src/snapshot/snapshotter/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use photon_indexer::snapshot::{
};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use std::future::pending;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
Expand Down Expand Up @@ -266,24 +267,39 @@ async fn main() {
Some(create_server(args.port, directory_adapter.clone()).await)
};

// Handle shutdown signal
match tokio::signal::ctrl_c().await {
Ok(()) => {
// Use `tokio::select!` to handle both the shutdown signal and task completions
tokio::select! {
// Handle shutdown signal (Ctrl+C)
_ = tokio::signal::ctrl_c() => {
info!("Received shutdown signal, aborting tasks...");
}

// If the snapshotter completes for some reason
res = async {
if let Some(snapshotter_handle) = snapshotter_handle {
snapshotter_handle.abort();
snapshotter_handle
.await
.expect_err("Snapshotter should have been aborted");
let res = snapshotter_handle.await;
res
} else {
pending().await
}
if let Some(server_handle) = server_handle {
server_handle.abort();
server_handle
.await
.expect_err("Server should have been aborted");
} => {
match res {
Ok(()) => info!("Snapshotter finished successfully"),
Err(e) => error!("Snapshotter task failed: {:?}", e),
}
}
Err(err) => {
error!("Unable to listen for shutdown signal: {}", err);
// If the snapshotter completes for some reason
res = async {
if let Some(server_handle) = server_handle {
server_handle.await
} else {
pending().await
}
} => {
match res {
Ok(()) => info!("Server finished successfully"),
Err(e) => error!("Server task failed: {:?}", e),
}
}
}
}

0 comments on commit 9e19271

Please sign in to comment.