diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index dfa61eef..05ca954b 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -155,7 +155,7 @@ pub struct QuerySubscription { pub struct ChangeSubscription { entries: HashMap>, - sender: mpsc::Sender, + sender: Option>, permissions: Permissions, } @@ -607,7 +607,7 @@ impl Subscriptions { } pub async fn notify( - &self, + &mut self, changed: Option<&HashMap>>, db: &Database, ) -> Result>, NotificationError> { @@ -627,7 +627,7 @@ impl Subscriptions { } } - for sub in &self.change_subscriptions { + for sub in &mut self.change_subscriptions { match sub.notify(changed, db).await { Ok(_) => {} Err(err) => error = Some(err), @@ -660,12 +660,24 @@ impl Subscriptions { true } }); - self.change_subscriptions.retain(|sub| { - if sub.sender.is_closed() { + self.change_subscriptions.retain_mut(|sub| { + if let Some(sender) = &sub.sender { + if sender.is_closed() { + info!("Subscriber gone: removing subscription"); + false + } else { + match &sub.permissions.expired() { + Ok(()) => true, + Err(PermissionError::Expired) => { + sub.sender = None; + false + } + Err(err) => panic!("Error: {:?}", err), + } + } + } else { info!("Subscriber gone: removing subscription"); false - } else { - true } }); } @@ -693,7 +705,7 @@ impl ChangeSubscription { // notify let notifications = { let mut notifications = EntryUpdates::default(); - + let mut error = None; for (id, changed_fields) in changed { if let Some(fields) = self.entries.get(id) { if !fields.is_disjoint(changed_fields) { @@ -723,22 +735,32 @@ impl ChangeSubscription { fields: notify_fields, }); } + Err(ReadError::PermissionExpired) => { + debug!("notify: token expired, closing subscription channel"); + error = Some(NotificationError {}); + break; + } Err(_) => { - debug!("notify: could not find entry with id {}", id) + debug!("notify: could not find entry with id {}", id); } } } } } + if let Some(err) = error { + return Err(err); + } notifications }; if notifications.updates.is_empty() { Ok(()) - } else { - match self.sender.send(notifications).await { + } else if let Some(sender) = &self.sender { + match sender.send(notifications).await { Ok(()) => Ok(()), Err(_) => Err(NotificationError {}), } + } else { + Err(NotificationError {}) } } else { Ok(()) @@ -775,9 +797,13 @@ impl ChangeSubscription { } notifications }; - match self.sender.send(notifications).await { - Ok(()) => Ok(()), - Err(_) => Err(NotificationError {}), + if let Some(sender) = &self.sender { + match sender.send(notifications).await { + Ok(()) => Ok(()), + Err(_) => Err(NotificationError {}), + } + } else { + Err(NotificationError {}) } } } @@ -1411,7 +1437,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { match self .broker .subscriptions - .read() + .write() .await .notify(Some(&changed), &db) .await @@ -1457,7 +1483,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let (sender, receiver) = mpsc::channel(10); let subscription = ChangeSubscription { entries: valid_entries, - sender, + sender: Some(sender), permissions: self.permissions.clone(), };