Skip to content

Commit

Permalink
Fix Photon bugs (#86)
Browse files Browse the repository at this point in the history
* Fix signatures issue

* Intermediate commit ?

* Fix more bugs

* Bump version

* Add test and bump version

* Delete unused import
  • Loading branch information
pmantica11 authored Apr 25, 2024
1 parent 932d885 commit 0847a27
Show file tree
Hide file tree
Showing 21 changed files with 308 additions and 80 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "photon-indexer"
publish = true
readme = "README.md"
repository = "https://github.com/helius-labs/photon"
version = "0.13.0"
version = "0.14.0"

[[bin]]
name = "photon"
Expand Down Expand Up @@ -75,6 +75,7 @@ sqlx = {version = "0.6.2", features = [
]}
thiserror = "1.0.31"
# time pinned because of https://github.com/launchbadge/sqlx/issues/3189
num_enum = "0.7.2"
time = "=0.3.34"
tokio = {version = "1.23.0", features = ["full"]}
tower = {version = "0.4.13", features = ["full"]}
Expand All @@ -86,7 +87,6 @@ tracing-subscriber = {version = "0.3.16", features = [
"ansi",
]}
utoipa = {version = "4.2.0", features = ["yaml", "chrono"]}
num_enum = "0.7.2"

[dev-dependencies]
function_name = "0.3.0"
Expand Down
2 changes: 1 addition & 1 deletion src/api/method/get_signatures_for_owner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{
};
use crate::common::typedefs::serializable_pubkey::SerializablePubkey;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema, Default)]
pub struct GetSignaturesForOwnerRequest {
pub owner: SerializablePubkey,
pub limit: Option<Limit>,
Expand Down
2 changes: 1 addition & 1 deletion src/api/method/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ pub async fn search_for_signatures(

let raw_sql = format!(
"
SELECT transactions.signature, transactions.slot, blocks.block_time
SELECT DISTINCT transactions.signature, transactions.slot, blocks.block_time
FROM account_transactions
JOIN transactions ON account_transactions.signature = transactions.signature
JOIN blocks ON transactions.slot = blocks.slot
Expand Down
2 changes: 0 additions & 2 deletions src/dao/generated/account_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ pub struct Model {
pub hash: Vec<u8>,
#[sea_orm(primary_key, auto_increment = false)]
pub signature: Vec<u8>,
#[sea_orm(primary_key, auto_increment = false)]
pub closure: bool,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
5 changes: 0 additions & 5 deletions src/ingester/parser/indexer_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@
use anchor_lang::prelude::*;
use borsh::{BorshDeserialize, BorshSerialize};






#[derive(Debug, Clone, AnchorSerialize, AnchorDeserialize)]
pub struct PublicTransactionEvent {
pub input_compressed_account_hashes: Vec<[u8; 32]>,
Expand Down
42 changes: 20 additions & 22 deletions src/ingester/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,33 +190,32 @@ fn parse_public_transaction_event(
state_update.out_accounts.push(enriched_account);
}

state_update.path_nodes.extend(
path_updates
.into_iter()
.zip(transaction_event.output_leaf_indices)
.flat_map(|(p, leaf_idx)| {
let tree_height = p.path.len();
p.path
.into_iter()
.enumerate()
.map(move |(i, node)| EnrichedPathNode {
node: node.clone(),
slot,
tree: p.tree,
seq: p.seq,
level: i,
tree_depth: tree_height,
leaf_index: if i == 0 { Some(leaf_idx) } else { None },
})
}),
);
for ((path_index, path), leaf_index) in path_updates
.into_iter()
.enumerate()
.zip(transaction_event.output_leaf_indices)
{
for (i, node) in path.path.iter().enumerate() {
state_update.path_nodes.insert(
(path.tree, node.index),
EnrichedPathNode {
node: node.clone(),
slot,
tree: path.tree,
seq: path.seq + path_index as u64,
level: i,
tree_depth: path.path.len(),
leaf_index: if i == 0 { Some(leaf_index) } else { None },
},
);
}
}

state_update
.account_transactions
.extend(state_update.in_accounts.iter().map(|a| AccountTransaction {
hash: a.hash.clone(),
signature: tx,
closure: true,
slot,
}));

Expand All @@ -229,7 +228,6 @@ fn parse_public_transaction_event(
.map(|a| AccountTransaction {
hash: a.hash.clone(),
signature: tx,
closure: false,
slot,
}),
);
Expand Down
34 changes: 13 additions & 21 deletions src/ingester/parser/state_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ pub struct Transaction {
pub slot: u64,
}

#[derive(Hash, PartialEq, Eq)]
pub struct AccountTransaction {
pub hash: Hash,
pub signature: Signature,
pub closure: bool,
pub slot: u64,
}

Expand All @@ -43,8 +43,8 @@ pub struct AccountTransaction {
pub struct StateUpdate {
pub in_accounts: Vec<Account>,
pub out_accounts: Vec<Account>,
pub path_nodes: Vec<EnrichedPathNode>,
pub account_transactions: Vec<AccountTransaction>,
pub path_nodes: HashMap<([u8; 32], u32), EnrichedPathNode>,
pub account_transactions: HashSet<AccountTransaction>,
}

impl StateUpdate {
Expand All @@ -59,34 +59,26 @@ impl StateUpdate {
// removed from the tree through the nullifier crank.
self.out_accounts
.retain(|a| !in_account_set.contains(&a.hash));

// TODO: Use seq instead of slot.

// Assuming the type of `path_nodes` and fields inside `node` such as `tree`, `node.index`, and `seq`.
let mut latest_nodes: HashMap<([u8; 32], u32), EnrichedPathNode> = HashMap::new();

for node in self.path_nodes.drain(..) {
let key = (node.tree, node.node.index);
if let Some(existing) = latest_nodes.get_mut(&key) {
if (*existing).seq < node.seq {
*existing = node;
}
} else {
latest_nodes.insert(key, node);
}
}
self.path_nodes = latest_nodes.into_values().collect();
}

pub fn merge_updates(updates: Vec<StateUpdate>) -> StateUpdate {
let mut merged = StateUpdate::default();
for update in updates {
merged.in_accounts.extend(update.in_accounts);
merged.out_accounts.extend(update.out_accounts);
merged.path_nodes.extend(update.path_nodes);
merged
.account_transactions
.extend(update.account_transactions);

for (key, node) in update.path_nodes {
if let Some(existing) = merged.path_nodes.get_mut(&key) {
if (*existing).seq < node.seq {
*existing = node;
}
} else {
merged.path_nodes.insert(key, node);
}
}
}
merged
}
Expand Down
18 changes: 9 additions & 9 deletions src/ingester/persist/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ pub async fn persist_state_update(
}

debug!("Persisting path nodes...");
for chunk in path_nodes.chunks(MAX_SQL_INSERTS) {
persist_path_nodes(txn, chunk).await?;
}

debug!("Persisting path nodes...");
for chunk in path_nodes.chunks(MAX_SQL_INSERTS) {
for chunk in path_nodes
.into_values()
.collect::<Vec<_>>()
.chunks(MAX_SQL_INSERTS)
{
persist_path_nodes(txn, chunk).await?;
}

Expand All @@ -86,6 +85,7 @@ pub async fn persist_state_update(
}

debug!("Persisting account transactions...");
let account_transactions = account_transactions.into_iter().collect::<Vec<_>>();
for chunk in account_transactions.chunks(MAX_SQL_INSERTS) {
persist_account_transactions(txn, chunk).await?;
}
Expand Down Expand Up @@ -311,7 +311,9 @@ async fn persist_path_nodes(
)
.build(txn.get_database_backend());
query.sql = format!("{} WHERE excluded.seq > state_trees.seq", query.sql);
txn.execute(query).await?;
txn.execute(query).await.map_err(|e| {
IngesterError::DatabaseError(format!("Failed to persist path nodes: {}", e))
})?;

Ok(())
}
Expand Down Expand Up @@ -354,7 +356,6 @@ async fn persist_account_transactions(
.map(|transaction| account_transactions::ActiveModel {
hash: Set(transaction.hash.to_vec()),
signature: Set(Into::<[u8; 64]>::into(transaction.signature).to_vec()),
closure: Set(transaction.closure),
})
.collect::<Vec<_>>();

Expand All @@ -367,7 +368,6 @@ async fn persist_account_transactions(
OnConflict::columns([
account_transactions::Column::Hash,
account_transactions::Column::Signature,
account_transactions::Column::Closure,
])
.do_nothing()
.to_owned(),
Expand Down
8 changes: 1 addition & 7 deletions src/migration/m20220101_000001_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,17 +312,11 @@ impl MigrationTrait for Migration {
.binary()
.not_null(),
)
.col(
ColumnDef::new(AccountTransactions::Closure)
.boolean()
.not_null(),
)
.primary_key(
Index::create()
.name("pk_account_transaction_history")
.col(AccountTransactions::Hash)
.col(AccountTransactions::Signature)
.col(AccountTransactions::Closure),
.col(AccountTransactions::Signature),
)
.foreign_key(
ForeignKey::create()
Expand Down
1 change: 0 additions & 1 deletion src/migration/model/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,4 @@ pub enum AccountTransactions {
Table,
Hash,
Signature,
Closure,
}
Loading

0 comments on commit 0847a27

Please sign in to comment.