Skip to content

Commit

Permalink
impl ProducerTableBridge
Browse files Browse the repository at this point in the history
  • Loading branch information
erer1243 committed Jan 14, 2025
1 parent 24c4632 commit d035499
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 1 deletion.
2 changes: 1 addition & 1 deletion crates/swss-common-bridge/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ enum State<T> {
}

impl<T: ConsumerTable + 'static> ConsumerTableBridge<T> {
pub fn new(table: T, destination: ServicePath) -> ConsumerTableBridge<T> {
pub fn new(table: T, destination: ServicePath) -> Self {
ConsumerTableBridge(State::WaitingForInit { table, destination })
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/swss-common-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::error::Error;
use swss_common::KeyOpFieldValues;

pub use consumer::ConsumerTableBridge;
pub use producer::ProducerTableBridge;

pub fn encode_kfvs(kfvs: &KeyOpFieldValues) -> Vec<u8> {
serde_json::to_vec(kfvs).unwrap()
Expand Down
45 changes: 45 additions & 0 deletions crates/swss-common-bridge/src/producer.rs
Original file line number Diff line number Diff line change
@@ -1 +1,46 @@
use swbus_actor::prelude::*;
use swss_common::{KeyOpFieldValues, KeyOperation, ProducerStateTable};

pub struct ProducerTableBridge {
table: ProducerStateTable,
}

impl ProducerTableBridge {
pub fn new(table: ProducerStateTable) -> Self {
ProducerTableBridge { table }
}
}

impl Actor for ProducerTableBridge {
async fn init(&mut self, _outbox: Outbox) {}

async fn handle_message(&mut self, message: IncomingMessage, outbox: Outbox) {
match &message.body {
MessageBody::Request(DataRequest { payload }) => {
let response = match handle_kfvs(&mut self.table, &payload).await {
Ok(()) => OutgoingMessage::ok_response(message),
Err((code, msg)) => OutgoingMessage::error_response(message, code, msg),
};

outbox.send(response).await;
}
MessageBody::Response(_) => { /* ignore responses, we should never get any anyway */ }
}
}
}

async fn handle_kfvs(table: &mut ProducerStateTable, payload: &[u8]) -> Result<(), (SwbusErrorCode, String)> {
let kfvs = crate::decode_kfvs(payload)
.map_err(|e| (SwbusErrorCode::InvalidPayload, format!("error decoding kfvs: {e}")))?;

match kfvs.operation {
KeyOperation::Set => {
table.set_async(&kfvs.key, kfvs.field_values).await;
}
KeyOperation::Del => {
table.del_async(&kfvs.key).await;
}
}

Ok(())
}

0 comments on commit d035499

Please sign in to comment.