Skip to content

Commit

Permalink
swss-common-bridge initial impl
Browse files Browse the repository at this point in the history
  • Loading branch information
erer1243 committed Dec 6, 2024
1 parent dca53c3 commit 1c5d26d
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 0 deletions.
20 changes: 20 additions & 0 deletions crates/swss-common-bridge/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "swss-common-bridge"
version.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true
documentation.workspace = true
keywords.workspace = true
edition.workspace = true

[dependencies]
swss-common = { path = "../swss-common", features = ["async", "serde"] }
swbus-actor = { path = "../swbus-actor" }
tokio.workspace = true
tokio-util.workspace = true
serde.workspace = true
serde_json.workspace = true

[lints]
workspace = true
83 changes: 83 additions & 0 deletions crates/swss-common-bridge/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
pub mod payload;
mod table;
mod table_watcher;

use swbus_actor::prelude::*;
use swss_common::{ConsumerStateTable, SubscriberStateTable, ZmqConsumerStateTable};
use table::Table;
use table_watcher::{TableWatcher, TableWatcherMessage};
use tokio::sync::{
mpsc::{channel, Sender},
oneshot,
};
use tokio_util::task::AbortOnDropHandle;

/// A bridge that converts between Swbus messages and swss tables.
pub struct SwssCommonBridge {
outbox_tx: Option<oneshot::Sender<Outbox>>,
tw_msg_tx: Sender<TableWatcherMessage>,
_table_watcher_task: AbortOnDropHandle<()>,
}

impl SwssCommonBridge {
pub fn new_consumer_state_table(table: ConsumerStateTable) -> Self {
Self::new(table)
}

pub fn new_subscriber_state_table(table: SubscriberStateTable) -> Self {
Self::new(table)
}

pub fn new_zmq_consumer_state_table(table: ZmqConsumerStateTable) -> Self {
Self::new(table)
}

fn new<T: Table + 'static>(table: T) -> Self {
let (tw_msg_tx, tw_msg_rx) = channel(1024);
let (outbox_tx, outbox_rx) = oneshot::channel();
let table_watcher = TableWatcher::new(table, outbox_rx, tw_msg_rx);
let _table_watcher_task = AbortOnDropHandle::new(tokio::spawn(table_watcher.run()));
Self {
tw_msg_tx,
outbox_tx: Some(outbox_tx),
_table_watcher_task,
}
}
}

impl Actor for SwssCommonBridge {
async fn init(&mut self, outbox: Outbox) {
self.outbox_tx
.take()
.unwrap()
.send(outbox.clone())
.unwrap_or_else(|_| unreachable!("outbox_tx.send failed"));
}

async fn handle_message(&mut self, message: IncomingMessage, outbox: Outbox) {
match &message.body {
// Requests are encoded an TableWatcherMessage. Decode it and send it to the TableWatcher
MessageBody::Request(req) => match payload::decode_table_watcher_message(&req.payload) {
Ok(tw_msg) => {
self.tw_msg_tx.send(tw_msg).await.expect("TableWatcher task died");
let msg = OutgoingMessage::ok_response(message);
outbox.send(msg).await;
}

Err(e) => {
let msg = OutgoingMessage::error_response(message, SwbusErrorCode::InvalidPayload, e.to_string());
outbox.send(msg).await;
}
},
MessageBody::Response(_) => (),
}
}

async fn handle_message_failure(&mut self, _id: MessageId, subscriber: ServicePath, _outbox: Outbox) {
// If a message failed to be delivered, we unsubscribe the client so we don't waste bandwidth on them in the future
self.tw_msg_tx
.send(TableWatcherMessage::Unsubscribe { subscriber })
.await
.expect("TableWatcher task died");
}
}
24 changes: 24 additions & 0 deletions crates/swss-common-bridge/src/payload.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use crate::table_watcher::TableWatcherMessage;
use std::error::Error;
use swbus_actor::prelude::ServicePath;
use swss_common::KeyOpFieldValues;

pub fn encode_subscribe(subscriber: ServicePath) -> Vec<u8> {
serde_json::to_vec(&TableWatcherMessage::Subscribe { subscriber }).unwrap()
}

pub fn encode_unsubscribe(subscriber: ServicePath) -> Vec<u8> {
serde_json::to_vec(&TableWatcherMessage::Unsubscribe { subscriber }).unwrap()
}

pub(crate) fn decode_table_watcher_message(payload: &[u8]) -> serde_json::Result<TableWatcherMessage> {
serde_json::from_slice(payload)
}

pub(crate) fn encode_kfvs(kfvs: &KeyOpFieldValues) -> Vec<u8> {
serde_json::to_vec(kfvs).unwrap()
}

pub fn decode_kfvs(payload: &[u8]) -> Result<KeyOpFieldValues, Box<dyn Error>> {
Ok(serde_json::from_slice(payload)?)
}
37 changes: 37 additions & 0 deletions crates/swss-common-bridge/src/table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::future::Future;
use swss_common::{ConsumerStateTable, KeyOpFieldValues, SubscriberStateTable, ZmqConsumerStateTable};

pub(crate) trait Table: Send {
fn read_data_async(&mut self) -> impl Future<Output = ()> + Send;
fn pops(&mut self) -> Vec<KeyOpFieldValues>;
}

impl Table for ConsumerStateTable {
async fn read_data_async(&mut self) {
self.read_data_async().await.expect("read_data_async io error");
}

fn pops(&mut self) -> Vec<KeyOpFieldValues> {
ConsumerStateTable::pops(self)
}
}

impl Table for SubscriberStateTable {
async fn read_data_async(&mut self) {
self.read_data_async().await.expect("read_data_async io error");
}

fn pops(&mut self) -> Vec<KeyOpFieldValues> {
SubscriberStateTable::pops(self)
}
}

impl Table for ZmqConsumerStateTable {
async fn read_data_async(&mut self) {
self.read_data_async().await.expect("read_data_async io error");
}

fn pops(&mut self) -> Vec<KeyOpFieldValues> {
ZmqConsumerStateTable::pops(self)
}
}
95 changes: 95 additions & 0 deletions crates/swss-common-bridge/src/table_watcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use crate::{payload, table::Table};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use swbus_actor::prelude::*;
use tokio::sync::{mpsc::Receiver, oneshot};

#[derive(Serialize, Deserialize)]
pub(crate) enum TableWatcherMessage {
Subscribe { subscriber: ServicePath },
Unsubscribe { subscriber: ServicePath },
}

pub(crate) struct TableWatcher<T> {
outbox: LazyOutbox,
tw_msg_rx: Receiver<TableWatcherMessage>,
table: T,
subscribers: HashSet<ServicePath>,
}

impl<T: Table> TableWatcher<T> {
pub(crate) fn new(
table: T,
outbox_rx: oneshot::Receiver<Outbox>,
tw_msg_rx: Receiver<TableWatcherMessage>,
) -> Self {
Self {
outbox: LazyOutbox::Waiting(outbox_rx),
tw_msg_rx,
table,
subscribers: HashSet::new(),
}
}

pub(crate) async fn run(mut self) {
loop {
tokio::select! {
maybe_msg = self.tw_msg_rx.recv() => {
let Some(msg) = maybe_msg else { break };
self.handle_tw_msg(msg).await;
},

() = self.table.read_data_async() => {
self.handle_table_read().await;
}
}
}
}

async fn handle_tw_msg(&mut self, msg: TableWatcherMessage) {
match msg {
TableWatcherMessage::Subscribe { subscriber } => {
self.subscribers.insert(subscriber);
}

TableWatcherMessage::Unsubscribe { subscriber } => {
self.subscribers.remove(&subscriber);
}
}
}

async fn handle_table_read(&mut self) {
let kfvs = self.table.pops();
for kfv in kfvs {
let payload = payload::encode_kfvs(&kfv);
let outbox = self.outbox.get().await;

for sub in &self.subscribers {
outbox
.send(OutgoingMessage::request(sub.clone(), payload.clone()))
.await;
}
}
}
}

enum LazyOutbox {
Waiting(oneshot::Receiver<Outbox>),
Received(Outbox),
}

impl LazyOutbox {
async fn get(&mut self) -> &Outbox {
match self {
LazyOutbox::Waiting(receiver) => {
let outbox = receiver.await.unwrap();
*self = LazyOutbox::Received(outbox);
let LazyOutbox::Received(outbox) = self else {
unreachable!()
};
outbox
}
LazyOutbox::Received(outbox) => outbox,
}
}
}

0 comments on commit 1c5d26d

Please sign in to comment.