Skip to content

Commit

Permalink
Implement snapshotting POC (#157)
Browse files Browse the repository at this point in the history
* Implement snapshotting intermediate commit

* Implement basic snapshotting

* Nit

* Implement basic snapshotter
  • Loading branch information
pmantica11 authored Aug 2, 2024
1 parent 1727a6a commit 7f7941b
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/ingester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod fetchers;
pub mod indexer;
pub mod parser;
pub mod persist;
pub mod snapshotter;
pub mod typedefs;

fn derive_block_state_update(block: &BlockInfo) -> Result<StateUpdate, IngesterError> {
Expand Down
2 changes: 1 addition & 1 deletion src/ingester/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod state_update;

use solana_program::pubkey;

const ACCOUNT_COMPRESSION_PROGRAM_ID: Pubkey =
pub const ACCOUNT_COMPRESSION_PROGRAM_ID: Pubkey =
pubkey!("CbjvJc1SNx1aav8tU49dJGHu8EUdzQJSMtkjDmV8miqK");
const SYSTEM_PROGRAM: Pubkey = pubkey!("11111111111111111111111111111111");
const NOOP_PROGRAM_ID: Pubkey = pubkey!("noopb9bkMVfRPU8AsbpTUg8AQkHtKwMYZiFUjNRtMmV");
Expand Down
219 changes: 219 additions & 0 deletions src/ingester/snapshotter/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
use std::{
env::temp_dir,
fs::{self, File},
io::{self, BufReader, Seek, Write},
path::{Path, PathBuf},
};

use async_std::stream::StreamExt;
use async_stream::stream;
use futures::{pin_mut, Stream};

use super::{
fetchers::BlockStreamConfig,
parser::ACCOUNT_COMPRESSION_PROGRAM_ID,
typedefs::block_info::{BlockInfo, Instruction, TransactionInfo},
};

const SNAPSHOT_VERSION: u64 = 1;

fn is_compression_instruction(instruction: &Instruction) -> bool {
instruction.program_id == ACCOUNT_COMPRESSION_PROGRAM_ID
|| instruction
.accounts
.contains(&ACCOUNT_COMPRESSION_PROGRAM_ID)
}

fn is_compression_transaction(tx: &TransactionInfo) -> bool {
for instruction_group in &tx.instruction_groups {
if is_compression_instruction(&instruction_group.outer_instruction) {
return true;
}
for instruction in &instruction_group.inner_instructions {
if is_compression_instruction(instruction) {
return true;
}
}
}
false
}

fn serialize_block_to_file(block: &BlockInfo, file: &mut File) {
let trimmed_block = BlockInfo {
metadata: block.metadata.clone(),
transactions: block
.transactions
.iter()
.filter(|tx| is_compression_transaction(tx))
.cloned()
.collect(),
};
let block_bytes = bincode::serialize(&trimmed_block).unwrap();
file.write_all(block_bytes.as_ref()).unwrap();
}

struct SnapshotFileWithSlots {
file: PathBuf,
start_slot: u64,
end_slot: u64,
}

fn get_snapshot_files_with_slots(snapshot_dir: &Path) -> Vec<SnapshotFileWithSlots> {
let snapshot_files = fs::read_dir(snapshot_dir)
.unwrap()
.map(|entry| entry.unwrap().path())
.collect::<Vec<_>>();
let mut snapshot_files_with_slots = Vec::new();

for file in snapshot_files {
let file_name = file.file_name().unwrap().to_str().unwrap();
let parts: Vec<&str> = file_name.split('-').collect();
if parts.len() == 3 {
let start_slot = parts[1].parse::<u64>().unwrap();
let end_slot = parts[2].parse::<u64>().unwrap();
snapshot_files_with_slots.push(SnapshotFileWithSlots {
file,
start_slot,
end_slot,
});
}
}
snapshot_files_with_slots.sort_by_key(|file| file.start_slot);
snapshot_files_with_slots
}

fn create_temp_snapshot_file(dir: &str) -> (File, PathBuf) {
let temp_dir = temp_dir();
// Create a subdirectory for the snapshot files
let temp_dir = temp_dir.join(dir);
if !temp_dir.exists() {
fs::create_dir(&temp_dir).unwrap();
}
let temp_file_path = temp_dir.join("temp-snapshot");
if temp_file_path.exists() {
fs::remove_file(&temp_file_path).unwrap();
}
let mut temp_file = File::create(&temp_file_path).unwrap();
temp_file
.write_all(&SNAPSHOT_VERSION.to_le_bytes())
.unwrap();
(temp_file, temp_file_path)
}

async fn merge_snapshots(snapshot_dir: &Path) {
let (mut temp_file, temp_file_path) = create_temp_snapshot_file("fullsnaphot/");

let block_stream = load_block_stream_from_snapshot_directory(snapshot_dir);
pin_mut!(block_stream);
let mut start_slot = None;
let mut end_slot = None;

while let Some(block) = block_stream.next().await {
if start_slot.is_none() {
start_slot = Some(block.metadata.slot);
}
end_slot = Some(block.metadata.slot);
serialize_block_to_file(&block, &mut temp_file);
}
let temp_file_directory = temp_file_path.parent().unwrap();
let snapshot_file_path = temp_file_directory.join(format!(
"snapshot-{}-{}",
start_slot.unwrap(),
end_slot.unwrap()
));
fs::rename(&temp_file_path, &snapshot_file_path).unwrap();
let backup_dir = temp_dir().join("backup");
fs::rename(snapshot_dir, &backup_dir).unwrap();
fs::rename(temp_file_directory, snapshot_dir).unwrap();
fs::remove_dir_all(backup_dir).unwrap();
}

pub async fn update_snapshot(
block_stream_config: BlockStreamConfig,
incremental_snapshot_interval_slots: u64,
full_snapshot_interval_slots: u64,
snapshot_dir: &Path,
) {
// Convert stream to iterator
let block_stream = block_stream_config.load_block_stream();
update_snapshot_helper(
block_stream,
block_stream_config.last_indexed_slot,
incremental_snapshot_interval_slots,
full_snapshot_interval_slots,
snapshot_dir,
)
.await;
}

pub async fn update_snapshot_helper(
blocks: impl Stream<Item = BlockInfo>,
last_indexed_slot: u64,
incremental_snapshot_interval_slots: u64,
full_snapshot_interval_slots: u64,
snapshot_dir: &Path,
) {
if !snapshot_dir.exists() {
fs::create_dir(snapshot_dir).unwrap();
}
let snapshot_files = get_snapshot_files_with_slots(snapshot_dir);

let mut last_full_snapshot_slot = snapshot_files
.first()
.map(|file| file.end_slot)
.unwrap_or(last_indexed_slot);
let mut last_snapshot_slot = snapshot_files
.last()
.map(|file| file.end_slot)
.unwrap_or(last_indexed_slot);

let (mut temp_file, temp_file_path) = create_temp_snapshot_file("incremental_snapshot/");

pin_mut!(blocks);
while let Some(block) = blocks.next().await {
let slot = block.metadata.slot;
serialize_block_to_file(&block, &mut temp_file);

let write_full_snapshot = slot - last_full_snapshot_slot + (last_indexed_slot == 0) as u64
>= full_snapshot_interval_slots;
let write_incremental_snapshot = slot - last_snapshot_slot
+ (last_snapshot_slot == 0) as u64
>= incremental_snapshot_interval_slots;

if write_full_snapshot || write_incremental_snapshot {
let snapshot_file_path =
snapshot_dir.join(format!("snapshot-{}-{}", last_snapshot_slot + 1, slot));
fs::rename(&temp_file_path, &snapshot_file_path).unwrap();
temp_file = create_temp_snapshot_file("incremental_snapshot/").0;
last_snapshot_slot = slot;

if write_full_snapshot {
merge_snapshots(snapshot_dir).await;
last_full_snapshot_slot = slot;
}
}
}
}

pub fn load_block_stream_from_snapshot_directory(
snapshot_dir: &Path,
) -> impl Stream<Item = BlockInfo> {
let snapshot_files = get_snapshot_files_with_slots(snapshot_dir);
stream! {
for snapshot_file in snapshot_files {
let file = File::open(&snapshot_file.file).unwrap();
let mut reader = BufReader::new(file);
reader.seek(io::SeekFrom::Start(8)).unwrap();
loop {
let block = bincode::deserialize_from(&mut reader);
match block {
Ok(block) => { yield block;}
Err(e) => {
println!("Error deserializing block: {:?}", e);
break
}
}
}
}
}
}
11 changes: 6 additions & 5 deletions src/ingester/typedefs/block_info.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use serde::{Deserialize, Serialize};
use solana_sdk::{
clock::{Slot, UnixTimestamp},
pubkey::Pubkey,
Expand All @@ -16,31 +17,31 @@ use crate::common::typedefs::hash::Hash;

use super::super::error::IngesterError;

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Instruction {
pub program_id: Pubkey,
pub data: Vec<u8>,
pub accounts: Vec<Pubkey>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct InstructionGroup {
pub outer_instruction: Instruction,
pub inner_instructions: Vec<Instruction>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TransactionInfo {
pub instruction_groups: Vec<InstructionGroup>,
pub signature: Signature,
pub error: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct BlockInfo {
pub metadata: BlockMetadata,
pub transactions: Vec<TransactionInfo>,
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct BlockMetadata {
pub slot: Slot,
// In Solana, slots can be skipped. So there are not necessarily sequential.
Expand Down
3 changes: 2 additions & 1 deletion tests/integration_tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
mod e2e_tests;
mod mock_tests;
mod open_api_tests;
mod utils;
mod prod_tests;
mod snapshot_tests;
mod utils;
49 changes: 49 additions & 0 deletions tests/integration_tests/snapshot_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use futures::stream;


use photon_indexer::common::typedefs::{hash::Hash};

use photon_indexer::ingester::typedefs::block_info::{BlockInfo, BlockMetadata};



use std::vec;

#[tokio::test]
async fn test_basic_snapshotting() {
use std::{env::temp_dir, fs};

use futures::StreamExt;
use photon_indexer::ingester::snapshotter::{
load_block_stream_from_snapshot_directory, update_snapshot_helper,
};

let blocks: Vec<BlockInfo> = (0..10)
.map(|i| {
let block = BlockInfo {
metadata: BlockMetadata {
slot: i,
parent_slot: if i == 0 { 0 } else { i - 1 },
block_time: 0,
blockhash: Hash::default(),
parent_blockhash: Hash::default(),
block_height: i,
},
transactions: vec![],
};
block
})
.collect();
let blocks_stream = stream::iter(blocks.clone().into_iter());
let temp_dir = temp_dir();
let snapshot_dir = temp_dir.as_path().join("test-snapshots");
if snapshot_dir.exists() {
fs::remove_dir_all(&snapshot_dir).unwrap();
} else {
fs::create_dir(&snapshot_dir).unwrap();
}
update_snapshot_helper(blocks_stream, 0, 2, 4, &snapshot_dir).await;
let snapshot_blocks = load_block_stream_from_snapshot_directory(&snapshot_dir);
let snapshot_blocks: Vec<BlockInfo> = snapshot_blocks.collect().await;
assert_eq!(snapshot_blocks, blocks);
}

0 comments on commit 7f7941b

Please sign in to comment.