Skip to content

Commit

Permalink
Close subscription after expired token
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmittag committed Apr 10, 2024
1 parent 8795dd8 commit 942bbf0
Showing 1 changed file with 42 additions and 16 deletions.
58 changes: 42 additions & 16 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub struct QuerySubscription {

pub struct ChangeSubscription {
entries: HashMap<i32, HashSet<Field>>,
sender: mpsc::Sender<EntryUpdates>,
sender: Option<mpsc::Sender<EntryUpdates>>,
permissions: Permissions,
}

Expand Down Expand Up @@ -607,7 +607,7 @@ impl Subscriptions {
}

pub async fn notify(
&self,
&mut self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
db: &Database,
) -> Result<Option<HashMap<String, ()>>, NotificationError> {
Expand All @@ -627,7 +627,7 @@ impl Subscriptions {
}
}

for sub in &self.change_subscriptions {
for sub in &mut self.change_subscriptions {
match sub.notify(changed, db).await {
Ok(_) => {}
Err(err) => error = Some(err),
Expand Down Expand Up @@ -660,12 +660,24 @@ impl Subscriptions {
true
}
});
self.change_subscriptions.retain(|sub| {
if sub.sender.is_closed() {
self.change_subscriptions.retain_mut(|sub| {
if let Some(sender) = &sub.sender {
if sender.is_closed() {
info!("Subscriber gone: removing subscription");
false
} else {
match &sub.permissions.expired() {
Ok(()) => true,
Err(PermissionError::Expired) => {
sub.sender = None;
false
}
Err(err) => panic!("Error: {:?}", err),
}
}
} else {
info!("Subscriber gone: removing subscription");
false
} else {
true
}
});
}
Expand Down Expand Up @@ -693,7 +705,7 @@ impl ChangeSubscription {
// notify
let notifications = {
let mut notifications = EntryUpdates::default();

let mut error = None;
for (id, changed_fields) in changed {
if let Some(fields) = self.entries.get(id) {
if !fields.is_disjoint(changed_fields) {
Expand Down Expand Up @@ -723,22 +735,32 @@ impl ChangeSubscription {
fields: notify_fields,
});
}
Err(ReadError::PermissionExpired) => {
debug!("notify: token expired, closing subscription channel");
error = Some(NotificationError {});
break;
}
Err(_) => {
debug!("notify: could not find entry with id {}", id)
debug!("notify: could not find entry with id {}", id);
}
}
}
}
}
if let Some(err) = error {
return Err(err);
}
notifications
};
if notifications.updates.is_empty() {
Ok(())
} else {
match self.sender.send(notifications).await {
} else if let Some(sender) = &self.sender {
match sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
}
} else {
Err(NotificationError {})
}
} else {
Ok(())
Expand Down Expand Up @@ -775,9 +797,13 @@ impl ChangeSubscription {
}
notifications
};
match self.sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
if let Some(sender) = &self.sender {
match sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
}
} else {
Err(NotificationError {})
}
}
}
Expand Down Expand Up @@ -1411,7 +1437,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
match self
.broker
.subscriptions
.read()
.write()
.await
.notify(Some(&changed), &db)
.await
Expand Down Expand Up @@ -1457,7 +1483,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
let (sender, receiver) = mpsc::channel(10);
let subscription = ChangeSubscription {
entries: valid_entries,
sender,
sender: Some(sender),
permissions: self.permissions.clone(),
};

Expand Down

0 comments on commit 942bbf0

Please sign in to comment.