Skip to content

Commit

Permalink
feat(engine): optional stream for behaviors (#899)
Browse files Browse the repository at this point in the history
* feat: optional stream and default impl
* test: no-stream
  • Loading branch information
Autoparallel authored Feb 16, 2024
1 parent 267c776 commit 0522476
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 11 deletions.
31 changes: 22 additions & 9 deletions engine/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,24 @@ pub enum State {
/// The [`Behavior`] trait is the lowest level functionality that will be used
/// by a [`StateMachine`]. This constitutes what each state transition will do.
#[async_trait::async_trait]
pub trait Behavior<E>: Serialize + DeserializeOwned + Send + Sync + Debug + 'static {
pub trait Behavior<E: Send + 'static>:
Serialize + DeserializeOwned + Send + Sync + Debug + 'static
{
/// Used to start the agent.
/// This is where the agent can engage in its specific start up activities
/// that it can do given the current state of the world.
async fn startup(
&mut self,
client: Arc<ArbiterMiddleware>,
messager: Messager,
) -> Result<EventStream<E>>;
) -> Result<Option<EventStream<E>>>;

/// Used to process events.
/// This is where the agent can engage in its specific processing
/// of events that can lead to actions being taken.
async fn process(&mut self, event: E) -> Result<ControlFlow>;
async fn process(&mut self, _event: E) -> Result<ControlFlow> {
Ok(ControlFlow::Halt)
}
}
/// A trait for creating a state machine.
///
Expand Down Expand Up @@ -140,7 +144,7 @@ pub trait StateMachine: Send + Sync + Debug + 'static {
/// This method does not return a value, but it may result in state changes
/// within the implementing type or the generation of further instructions
/// or events.
async fn execute(&mut self, instruction: MachineInstruction) -> Result<()>;
async fn execute(&mut self, _instruction: MachineInstruction) -> Result<()>;
}

/// The `Engine` struct represents the core logic unit of a state machine-based
Expand All @@ -161,6 +165,7 @@ pub trait StateMachine: Send + Sync + Debug + 'static {
pub struct Engine<B, E>
where
B: Behavior<E>,
E: Send + 'static,
{
/// The behavior the `Engine` runs.
behavior: Option<B>,
Expand Down Expand Up @@ -215,18 +220,26 @@ where
MachineInstruction::Start(client, messager) => {
self.state = State::Starting;
let mut behavior = self.behavior.take().unwrap();
let behavior_task: JoinHandle<Result<(EventStream<E>, B)>> =
let behavior_task: JoinHandle<Result<(Option<EventStream<E>>, B)>> =
tokio::spawn(async move {
let id = messager.id.clone();
let stream = behavior.startup(client, messager).await?;
debug!("startup complete for behavior {:?}", id);
Ok((stream, behavior))
});
let (stream, behavior) = behavior_task.await??;
self.event_stream = Some(stream);
self.behavior = Some(behavior);
self.execute(MachineInstruction::Process).await?;
Ok(())
match stream {
Some(stream) => {
self.event_stream = Some(stream);
self.behavior = Some(behavior);
self.execute(MachineInstruction::Process).await?;
Ok(())
}
None => {
self.behavior = Some(behavior);
Ok(())
}
}
}
MachineInstruction::Process => {
trace!("Behavior is starting up.");
Expand Down
10 changes: 8 additions & 2 deletions engine/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ use arbiter_engine::{
};
use serde::{Deserialize, Serialize};

#[allow(unused)]
fn trace() {
std::env::set_var("RUST_LOG", "trace");
tracing_subscriber::fmt::init();
}

fn default_max_count() -> Option<u64> {
Some(3)
}
Expand Down Expand Up @@ -53,12 +59,12 @@ impl Behavior<Message> for TimedMessage {
&mut self,
_client: Arc<ArbiterMiddleware>,
messager: Messager,
) -> Result<EventStream<Message>> {
) -> Result<Option<EventStream<Message>>> {
if let Some(startup_message) = &self.startup_message {
messager.send(To::All, startup_message).await?;
}
self.messager = Some(messager.clone());
Ok(messager.stream()?)
Ok(Some(messager.stream()?))
}

async fn process(&mut self, event: Message) -> Result<ControlFlow> {
Expand Down
28 changes: 28 additions & 0 deletions engine/tests/machine_integration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use arbiter_engine::{agent::Agent, world::World};

include!("common.rs");

#[derive(Debug, Deserialize, Serialize)]
struct MockBehavior;

#[async_trait::async_trait]
impl Behavior<()> for MockBehavior {
async fn startup(
&mut self,
_client: Arc<ArbiterMiddleware>,
_messager: Messager,
) -> Result<Option<EventStream<()>>> {
Ok(None)
}
}

#[tokio::test]
async fn behavior_no_stream() {
trace();
let mut world = World::new("test");
let behavior = MockBehavior;
let agent = Agent::builder("agent").with_behavior(behavior);
world.add_agent(agent);

world.run().await.unwrap();
}

0 comments on commit 0522476

Please sign in to comment.