Skip to content

Commit

Permalink
feat: finish up task manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Jan 16, 2025
1 parent 4bebac9 commit 49e91b0
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 213 deletions.
153 changes: 31 additions & 122 deletions crates/torii/indexer/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use bitflags::bitflags;
use cainome::cairo_serde::CairoSerde;
use dojo_utils::provider as provider_utils;
use dojo_world::contracts::world::WorldContractReader;
use futures_util::future::{join_all, try_join_all};
use futures_util::future::join_all;
use hashlink::LinkedHashMap;
use starknet::core::types::{
BlockHashAndNumber, BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage,
Expand All @@ -18,7 +17,7 @@ use starknet::core::types::{
};
use starknet::core::utils::get_selector_from_name;
use starknet::providers::Provider;
use starknet_crypto::{poseidon_hash_many, Felt};
use starknet_crypto::Felt;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::Sender as BoundedSender;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -47,6 +46,7 @@ use crate::processors::upgrade_model::UpgradeModelProcessor;
use crate::processors::{
BlockProcessor, EventProcessor, EventProcessorConfig, TransactionProcessor,
};
use crate::task_manager::{ParallelizedEvent, TaskManager};

type EventProcessorMap<P> = HashMap<Felt, Vec<Box<dyn EventProcessor<P>>>>;

Expand Down Expand Up @@ -233,17 +233,27 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
let contracts = Arc::new(
contracts.iter().map(|contract| (contract.address, contract.r#type)).collect(),
);
let world = Arc::new(world);
let processors = Arc::new(processors);
let max_concurrent_tasks = config.max_concurrent_tasks;
let event_processor_config = config.event_processor_config.clone();

Self {
world: Arc::new(world),
db,
world: world.clone(),
db: db.clone(),
provider: Arc::new(provider),
processors: Arc::new(processors),
processors: processors.clone(),
config,
shutdown_tx,
block_tx,
contracts,
tasks: BTreeMap::new(),
task_manager: TaskManager::new(
db,
world,
processors,
max_concurrent_tasks,
event_processor_config,
),
}
}

Expand Down Expand Up @@ -531,7 +541,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}

// Process parallelized events
self.process_tasks().await?;
self.task_manager.process_tasks().await?;

self.db.update_cursors(
data.block_number - 1,
Expand Down Expand Up @@ -578,7 +588,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}

// Process parallelized events
self.process_tasks().await?;
self.task_manager.process_tasks().await?;

let last_block_timestamp =
get_block_timestamp(&self.provider, data.latest_block_number).await?;
Expand All @@ -588,77 +598,6 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
Ok(())
}

async fn process_tasks(&mut self) -> Result<()> {
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks));

// Process each priority level sequentially
for (priority, task_group) in std::mem::take(&mut self.tasks) {
let mut handles = Vec::new();

// Process all tasks within this priority level concurrently
for (task_id, events) in task_group {
let db = self.db.clone();
let world = self.world.clone();
let semaphore = semaphore.clone();
let processors = self.processors.clone();
let event_processor_config = self.config.event_processor_config.clone();

handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
let mut local_db = db.clone();

// Process all events for this task sequentially
for (contract_type, event) in events {
let contract_processors = processors.get_event_processor(contract_type);
if let Some(processors) = contract_processors.get(&event.event.keys[0]) {
let processor = processors
.iter()
.find(|p| p.validate(&event.event))
.expect("Must find at least one processor for the event");

debug!(
target: LOG_TARGET,
event_name = processor.event_key(),
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);

if let Err(e) = processor
.process(
&world,
&mut local_db,
event.block_number,
event.block_timestamp,
&event.event_id,
&event.event,
&event_processor_config,
)
.await
{
error!(
target: LOG_TARGET,
event_name = processor.event_key(),
error = %e,
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);
}
}
}

Ok::<_, anyhow::Error>(())
}));
}

// Wait for all tasks in this priority level to complete before moving to next priority
try_join_all(handles).await?;
}

Ok(())
}

async fn process_transaction_with_events(
&mut self,
transaction_hash: Felt,
Expand Down Expand Up @@ -870,50 +809,20 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
.find(|p| p.validate(event))
.expect("Must find atleast one processor for the event");

let (task_priority, task_identifier) = match processor.event_key().as_str() {
"ModelRegistered" | "EventRegistered" => {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
let hash = hasher.finish();
(0usize, hash) // Priority 0 (highest) for model/event registration
}
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
let hash = hasher.finish();
(2usize, hash) // Priority 2 (lower) for store operations
}
"EventEmitted" => {
let mut hasher = DefaultHasher::new();

let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| {
panic!("Expected EventEmitted keys to be well formed: {:?}", e);
});

// selector
event.keys[1].hash(&mut hasher);
// entity id
let entity_id = poseidon_hash_many(&keys);
entity_id.hash(&mut hasher);

let hash = hasher.finish();
(2usize, hash) // Priority 2 for event messages
}
_ => (0, 0), // No parallelization for other events
};
let (task_priority, task_identifier) = (processor.task_priority(), processor.task_identifier(event));

// if our event can be parallelized, we add it to the task manager
if task_identifier != 0 {
self.tasks.entry(task_priority).or_default().entry(task_identifier).or_default().push(
(
self.task_manager.add_parallelized_event(
task_priority,
task_identifier,
ParallelizedEvent {
contract_type,
ParallelizedEvent {
event_id: event_id.to_string(),
event: event.clone(),
block_number,
block_timestamp,
},
),
event_id: event_id.to_string(),
event: event.clone(),
block_number,
block_timestamp,
},
);
} else {
// Process non-parallelized events immediately
Expand Down
143 changes: 109 additions & 34 deletions crates/torii/indexer/src/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};

use anyhow::Result;
use dojo_world::contracts::WorldContractReader;
use futures_util::future::try_join_all;
use starknet::{core::types::Event, providers::Provider};
use torii_sqlite::types::ContractType;
use tokio::sync::Semaphore;
use torii_sqlite::{types::ContractType, Sql};
use tracing::{debug, error};

use crate::{engine::Processors, processors::EventProcessorConfig};

use crate::engine::Processors;
const LOG_TARGET: &str = "torii::indexer::task_manager";

pub type TaskId = u64;
type TaskPriority = usize;
Expand All @@ -21,47 +27,116 @@ pub struct ParallelizedEvent {
}

pub struct TaskManager<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
db: Sql,
world: Arc<WorldContractReader<P>>,
tasks: BTreeMap<TaskPriority, HashMap<TaskId, Vec<ParallelizedEvent>>>,
processors: Arc<Processors<P>>,
max_concurrent_tasks: usize,
event_processor_config: EventProcessorConfig,
}

impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> TaskManager<P> {
pub fn new(processors: Arc<Processors<P>>) -> Self {
Self { tasks: BTreeMap::new(), processors }
}

pub fn add_parallelized_event(&mut self, parallelized_event: ParallelizedEvent) -> TaskId {
let event_key = parallelized_event.event.keys[0];
let processor = self
.processors
.get_event_processor(parallelized_event.contract_type)
.get(&event_key)
.unwrap()
.iter()
.find(|p| p.validate(&parallelized_event.event))
.unwrap();
let priority = processor.task_priority();
let task_id = processor.task_identifier(&parallelized_event.event);

if task_id != 0 {
self.tasks
.entry(priority)
.or_default()
.entry(task_id)
.or_default()
.push(parallelized_event);
pub fn new(
db: Sql,
world: Arc<WorldContractReader<P>>,
processors: Arc<Processors<P>>,
max_concurrent_tasks: usize,
event_processor_config: EventProcessorConfig,
) -> Self {
Self {
db,
world,
tasks: BTreeMap::new(),
processors,
max_concurrent_tasks,
event_processor_config,
}

task_id
}

pub fn take_tasks(
pub fn add_parallelized_event(
&mut self,
) -> BTreeMap<TaskPriority, HashMap<TaskId, Vec<(ContractType, ParallelizedEvent)>>> {
std::mem::take(&mut self.tasks)
priority: TaskPriority,
task_identifier: TaskId,
parallelized_event: ParallelizedEvent,
) {
self.tasks
.entry(priority)
.or_default()
.entry(task_identifier)
.or_default()
.push(parallelized_event);
}

pub fn is_empty(&self) -> bool {
self.tasks.is_empty()
pub async fn process_tasks(
&mut self
) -> Result<()> {
let semaphore = Arc::new(Semaphore::new(self.max_concurrent_tasks));

// Process each priority level sequentially
for (priority, task_group) in std::mem::take(&mut self.tasks) {
let mut handles = Vec::new();

// Process all tasks within this priority level concurrently
for (task_id, events) in task_group {
let db = self.db.clone();
let world = self.world.clone();
let semaphore = semaphore.clone();
let processors = self.processors.clone();
let event_processor_config = self.event_processor_config.clone();

handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
let mut local_db = db.clone();

// Process all events for this task sequentially
for ParallelizedEvent { contract_type, event, block_number, block_timestamp, event_id } in events {
let contract_processors = processors.get_event_processor(contract_type);
if let Some(processors) = contract_processors.get(&event.keys[0]) {
let processor = processors
.iter()
.find(|p| p.validate(&event))
.expect("Must find at least one processor for the event");

debug!(
target: LOG_TARGET,
event_name = processor.event_key(),
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);

if let Err(e) = processor
.process(
&world,
&mut local_db,
block_number,
block_timestamp,
&event_id,
&event,
&event_processor_config,
)
.await
{
error!(
target: LOG_TARGET,
event_name = processor.event_key(),
error = %e,
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);
}
}
}

Ok::<_, anyhow::Error>(())
}));
}

// Wait for all tasks in this priority level to complete before moving to next priority
try_join_all(handles).await?;
}

Ok(())
}
}
Loading

0 comments on commit 49e91b0

Please sign in to comment.