Skip to content

Commit

Permalink
merge: pull request #333 from revoltchat/feat/apns
Browse files Browse the repository at this point in the history
APNS fixes
  • Loading branch information
insertish authored Aug 5, 2024
2 parents beef06e + 39230c5 commit b36bac3
Show file tree
Hide file tree
Showing 14 changed files with 355 additions and 73 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/config/Revolt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public_key = "BGcvgR-i2z4IQ5Mw841vJvkLjt8wY-FjmWrw83jOLCY52qcGZS0OF7nfLzuYbjsQIS
api_key = ""

[api.apn]
sandbox = false
pkcs8 = ""
key_id = ""
team_id = ""
Expand Down
1 change: 1 addition & 0 deletions crates/core/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub struct ApiFcm {

#[derive(Deserialize, Debug, Clone)]
pub struct ApiApn {
pub sandbox: bool,
pub pkcs8: String,
pub key_id: String,
pub team_id: String,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ revolt_rocket_okapi = { version = "0.9.1", optional = true }
# Notifications
fcm = "0.9.2"
web-push = "0.10.0"
revolt_a2 = { version = "0.10.0", default-features = false, features = [
revolt_a2 = { version = "0.10", default-features = false, features = [
"ring",
] }

Expand Down
7 changes: 5 additions & 2 deletions crates/core/database/src/models/channel_unreads/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ mod reference;

#[async_trait]
pub trait AbstractChannelUnreads: Sync + Send {
/// Acknowledge a message.
/// Acknowledge a message, and returns updated channel unread.
async fn acknowledge_message(
&self,
channel_id: &str,
user_id: &str,
message_id: &str,
) -> Result<()>;
) -> Result<Option<ChannelUnread>>;

/// Acknowledge many channels.
async fn acknowledge_channels(&self, user_id: &str, channel_ids: &[String]) -> Result<()>;
Expand All @@ -28,4 +28,7 @@ pub trait AbstractChannelUnreads: Sync + Send {

/// Fetch all channel unreads for a user.
async fn fetch_unreads(&self, user_id: &str) -> Result<Vec<ChannelUnread>>;

/// Fetch unread for a specific user in a channel.
async fn fetch_unread(&self, user_id: &str, channel_id: &str) -> Result<Option<ChannelUnread>>;
}
36 changes: 28 additions & 8 deletions crates/core/database/src/models/channel_unreads/ops/mongodb.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use bson::Document;
use mongodb::options::FindOneAndUpdateOptions;
use mongodb::options::ReturnDocument;
use mongodb::options::UpdateOptions;
use revolt_result::Result;
use ulid::Ulid;
Expand All @@ -12,31 +14,35 @@ static COL: &str = "channel_unreads";

#[async_trait]
impl AbstractChannelUnreads for MongoDb {
/// Acknowledge a message.
/// Acknowledge a message, and returns updated channel unread.
async fn acknowledge_message(
&self,
channel_id: &str,
user_id: &str,
message_id: &str,
) -> Result<()> {
self.col::<Document>(COL)
.update_one(
) -> Result<Option<ChannelUnread>> {
self.col::<ChannelUnread>(COL)
.find_one_and_update(
doc! {
"_id.channel": channel_id,
"_id.user": user_id,
},
doc! {
"$unset": {
"mentions": 1_i32
"$pull": {
"mentions": {
"$lt": message_id
}
},
"$set": {
"last_id": message_id
}
},
UpdateOptions::builder().upsert(true).build(),
FindOneAndUpdateOptions::builder()
.upsert(true)
.return_document(ReturnDocument::After)
.build(),
)
.await
.map(|_| ())
.map_err(|_| create_database_error!("update_one", COL))
}

Expand Down Expand Up @@ -116,4 +122,18 @@ impl AbstractChannelUnreads for MongoDb {
}
)
}

/// Fetch unread for a specific user in a channel.
async fn fetch_unread(&self, user_id: &str, channel_id: &str) -> Result<Option<ChannelUnread>> {
query!(
self,
find_one,
COL,
doc! {
"_id.user": user_id,
"_id.channel": channel_id
}
)
}

}
16 changes: 13 additions & 3 deletions crates/core/database/src/models/channel_unreads/ops/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl AbstractChannelUnreads for ReferenceDb {
channel_id: &str,
user_id: &str,
message_id: &str,
) -> Result<()> {
) -> Result<Option<ChannelUnread>> {
let mut unreads = self.channel_unreads.lock().await;
let key = ChannelCompositeKey {
channel: channel_id.to_string(),
Expand All @@ -27,14 +27,14 @@ impl AbstractChannelUnreads for ReferenceDb {
unreads.insert(
key.clone(),
ChannelUnread {
id: key,
id: key.clone(),
last_id: Some(message_id.to_string()),
mentions: None,
},
);
}

Ok(())
Ok(unreads.get(&key).cloned())
}

/// Acknowledge many channels.
Expand Down Expand Up @@ -87,4 +87,14 @@ impl AbstractChannelUnreads for ReferenceDb {
.cloned()
.collect())
}

/// Fetch unread for a specific user in a channel.
async fn fetch_unread(&self, user_id: &str, channel_id: &str) -> Result<Option<ChannelUnread>> {
let unreads = self.channel_unreads.lock().await;

Ok(unreads.get(&ChannelCompositeKey {
channel: channel_id.to_string(),
user: user_id.to_string()
}).cloned())
}
}
6 changes: 3 additions & 3 deletions crates/core/database/src/models/messages/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,8 @@ impl Message {
) -> Result<()> {
self.send_without_notifications(
db,
user,
member,
user.clone(),
member.clone(),
matches!(channel, Channel::DirectMessage { .. }),
generate_embeds,
)
Expand All @@ -477,7 +477,7 @@ impl Message {
}
},
PushNotification::from(
self.clone().into_model(None, None),
self.clone().into_model(user, member),
Some(author),
channel.id(),
)
Expand Down
49 changes: 40 additions & 9 deletions crates/core/database/src/tasks/ack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use deadqueue::limited::Queue;
use once_cell::sync::Lazy;
use std::{collections::HashMap, time::Duration};

use super::DelayedTask;
use revolt_result::Result;

use super::{apple_notifications::{self, ApnJob}, DelayedTask};

/// Enumeration of possible events
#[derive(Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -52,8 +54,43 @@ pub async fn queue(channel: String, user: String, event: AckEvent) {
info!("Queue is using {} slots from {}.", Q.len(), Q.capacity());
}

pub async fn handle_ack_event(event: &AckEvent, db: &Database, authifier_db: &authifier::Database, user: &str, channel: &str) -> Result<()> {
match &event {
#[allow(clippy::disallowed_methods)] // event is sent by higher level function
AckEvent::AckMessage { id } => {
let unread = db.fetch_unread(user, channel).await?;
let updated = db.acknowledge_message(channel, user, id).await?;

if let (Some(before), Some(after)) = (unread, updated) {
let before_mentions = before.mentions.unwrap_or_default().len();
let after_mentions = after.mentions.unwrap_or_default().len();

let mentions_acked = before_mentions - after_mentions;

if mentions_acked > 0 {
if let Ok(sessions) = authifier_db.find_sessions(user).await {
for session in sessions {
if let Some(sub) = session.subscription {
if sub.endpoint == "apn" {
apple_notifications::queue(ApnJob::from_ack(session.id, user.to_string(), sub.auth)).await;
}
}
}
}
};

}
},
AckEvent::AddMention { ids } => {
db.add_mention_to_unread(channel, user, ids).await?;
}
};

Ok(())
}

/// Start a new worker
pub async fn worker(db: Database) {
pub async fn worker(db: Database, authifier_db: authifier::Database) {
let mut tasks = HashMap::<(String, String), DelayedTask<Task>>::new();
let mut keys = vec![];

Expand All @@ -71,13 +108,7 @@ pub async fn worker(db: Database) {
let Task { event } = task.data;
let (user, channel) = key;

if let Err(err) = match &event {
#[allow(clippy::disallowed_methods)] // event is sent by higher level function
AckEvent::AckMessage { id } => db.acknowledge_message(channel, user, id).await,
AckEvent::AddMention { ids } => {
db.add_mention_to_unread(channel, user, ids).await
}
} {
if let Err(err) = handle_ack_event(&event, &db, &authifier_db, user, channel).await {
error!("{err:?} for {event:?}. ({user}, {channel})");
} else {
info!("User {user} ack in {channel} with {event:?}");
Expand Down
Loading

0 comments on commit b36bac3

Please sign in to comment.