-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GraphQL subscription support #604
Draft
cafca
wants to merge
3
commits into
main
Choose a base branch
from
pv/subscriptions
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,9 +3,11 @@ | |
//! Dynamically create and manage GraphQL schemas. | ||
use std::sync::Arc; | ||
|
||
use async_graphql::dynamic::{Field, FieldFuture, Object, Schema, TypeRef}; | ||
use async_graphql::dynamic::{Field, FieldFuture, Object, Schema, Subscription, TypeRef}; | ||
use async_graphql::{Request, Response, Value}; | ||
use dynamic_graphql::internal::Registry; | ||
use futures::stream::BoxStream; | ||
use futures::{FutureExt, Stream, StreamExt}; | ||
use log::{debug, info, warn}; | ||
use p2panda_rs::Human; | ||
use tokio::sync::Mutex; | ||
|
@@ -23,7 +25,8 @@ use crate::graphql::objects::{ | |
build_paginated_document_object, DocumentMeta, | ||
}; | ||
use crate::graphql::queries::{ | ||
build_collection_query, build_document_query, build_next_args_query, | ||
build_collection_query, build_document_query, build_document_subscription_query, | ||
build_next_args_query, | ||
}; | ||
use crate::graphql::responses::NextArguments; | ||
use crate::graphql::scalars::{ | ||
|
@@ -75,15 +78,23 @@ pub async fn build_root_schema( | |
.register::<PublicKeyScalar>() | ||
.register::<SeqNumScalar>(); | ||
|
||
let mut schema_builder = Schema::build("Query", Some("MutationRoot"), None); | ||
// Construct the root query object | ||
let mut root_query = Object::new("Query"); | ||
|
||
// Construct the root subscription object | ||
let mut root_subscription = Subscription::new("Subscription"); | ||
|
||
// Start a schema builder with the root objects | ||
let mut schema_builder = Schema::build( | ||
root_query.type_name(), | ||
Some("MutationRoot"), | ||
Some(root_subscription.type_name()), | ||
); | ||
|
||
// Populate it with the registered types. We can now use these in any following dynamically | ||
// created query object fields. | ||
schema_builder = registry.apply_into_schema_builder(schema_builder); | ||
|
||
// Construct the root query object | ||
let mut root_query = Object::new("Query"); | ||
|
||
// Loop through all schema retrieved from the schema store, dynamically create GraphQL objects, | ||
// input values and a query for the documents they describe | ||
for schema in all_schema { | ||
|
@@ -120,15 +131,18 @@ pub async fn build_root_schema( | |
|
||
// Add a query for retrieving all documents of a certain schema | ||
root_query = build_collection_query(root_query, &schema); | ||
|
||
// Add a field for subscribing to changes on a single document | ||
root_subscription = build_document_subscription_query(root_subscription, &schema); | ||
} | ||
|
||
// Add next args to the query object | ||
let root_query = build_next_args_query(root_query); | ||
|
||
// Build the GraphQL schema. We can unwrap here since it will only fail if we forgot to | ||
// register all required types above | ||
// Build the GraphQL schema | ||
schema_builder | ||
.register(root_query) | ||
.register(root_subscription) | ||
.data(store) | ||
.data(schema_provider) | ||
.data(tx) | ||
|
@@ -211,15 +225,23 @@ impl GraphQLSchemaManager { | |
let mut on_schema_added = shared.schema_provider.on_schema_added(); | ||
|
||
// Create the new GraphQL based on the current state of known p2panda application schemas | ||
async fn rebuild(shared: GraphQLSharedData, schemas: GraphQLSchemas) { | ||
async fn rebuild( | ||
shared: GraphQLSharedData, | ||
schemas: GraphQLSchemas, | ||
) -> Result<(), async_graphql::dynamic::SchemaError> { | ||
match build_root_schema(shared.store, shared.tx, shared.schema_provider).await { | ||
Ok(schema) => schemas.lock().await.push(schema), | ||
Err(err) => warn!("Error building GraphQL schema: {}", err), | ||
Ok(schema) => { | ||
schemas.lock().await.push(schema); | ||
Ok(()) | ||
} | ||
Err(err) => Err(err), | ||
} | ||
} | ||
|
||
// Always build a schema right at the beginning as we don't have one yet | ||
rebuild(shared.clone(), schemas.clone()).await; | ||
if let Err(err) = rebuild(shared.clone(), schemas.clone()).await { | ||
panic!("Failed building initial GraphQL schema: {}", err); | ||
} | ||
debug!("Finished building initial GraphQL schema"); | ||
|
||
// Spawn a task which reacts to newly registered p2panda schemas | ||
|
@@ -231,7 +253,9 @@ impl GraphQLSchemaManager { | |
"Changed schema {}, rebuilding GraphQL API", | ||
schema_id.display() | ||
); | ||
rebuild(shared.clone(), schemas.clone()).await; | ||
if let Err(err) = rebuild(shared.clone(), schemas.clone()).await { | ||
warn!("Failed building GraphQL schema: {}", err); | ||
} | ||
} | ||
Err(err) => { | ||
panic!("Failed receiving schema updates: {}", err) | ||
|
@@ -254,6 +278,41 @@ impl GraphQLSchemaManager { | |
.execute(request) | ||
.await | ||
} | ||
|
||
/// Executes an incoming GraphQL query. | ||
/// | ||
/// This method makes sure the GraphQL query will be executed by the latest given schema the | ||
/// manager knows about. | ||
pub async fn stream( | ||
self, | ||
request: impl Into<Request>, | ||
session_data: Arc<async_graphql::Data>, | ||
) -> impl Stream<Item = Response> { | ||
self.schemas | ||
.lock() | ||
.await | ||
.last() | ||
.expect("No schema given yet") | ||
.execute_stream_with_session_data(request, session_data) | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl async_graphql::Executor for GraphQLSchemaManager { | ||
async fn execute(&self, request: Request) -> Response { | ||
self.execute(request).await | ||
} | ||
|
||
fn execute_stream( | ||
&self, | ||
request: Request, | ||
session_data: Option<Arc<async_graphql::Data>>, | ||
) -> BoxStream<'static, Response> { | ||
self.clone() | ||
.stream(request, session_data.unwrap_or_default()) | ||
.flatten_stream() | ||
.boxed() | ||
Comment on lines
+311
to
+314
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These four lines!! This drove me crazy lol |
||
} | ||
} | ||
|
||
impl std::fmt::Debug for GraphQLSchemaManager { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about push-based replication in another issue which is slightly related to this, maybe it's helpful for you:
We could introduce a "subscription service" or struct on the global
Context
which simply just keeps a map of who's interested in what ("node c is interested in schema p", "client k is interested in document y", etc.). The node interests we know throughAnnouncement
messages (similar to search queries on a network), so it'll be quite easy to implement that, client interests we will learn through new subscriptions.The materializer service informs the subscription service about any finished materialization via the bus, by document id and by schema id.
The subscription service fires the regarding callbacks of the subscribers which then again trigger the logic needed (make db request, push result to client / initiate replication with node, later we'll have "live mode" which just pushes the new data without any fancy protocol).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to look at this service as part of #589 so we could tie this together at one point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, interesting. I build a more simple integration here, but yeah, that could be upgraded to a proper subscription service. I think if you are just interested in updates to a single document it's quite simple to just use what we already have, but as soon as you get into filtering and sorting it gets more complex. I also looked into using database queries as triggers for subscriptions, so that you register a query and get updated whenever its results change, but I didn't find something in sqlx or postgres yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking already very much like that! Awesome!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That will be pretty complex indeed 🤔 I think it's enough to scope things by schema id (collection query) or document id (document query) for now, even if it's ignoring the filter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can tell me a bit more about what you imagine the subscription service to offer then I can have a go at a first version. edit: actually, maybe I have an idea of what you mean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of the top of my head (no laptop with me right now) some rough ideas, probably need improvement:
Connections
/Peers
etc. in the globalContext
, similar to schema managerSchemaIdSet
(collection queries and node announcements) orDocumentId
(document queries)