-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GraphQL subscription support #604
base: main
Are you sure you want to change the base?
Conversation
self.clone() | ||
.stream(request, session_data.unwrap_or_default()) | ||
.flatten_stream() | ||
.boxed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These four lines!! This drove me crazy lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wohooo! Love it! 🥰
loop { | ||
let store = ctx.data_unchecked::<SqlStore>(); | ||
|
||
match get_document_from_params(store, &document_id, &document_view_id).await? { | ||
Some(document) => { | ||
yield Ok(FieldValue::owned_any(Resolved::Document(document))); | ||
} | ||
None => { | ||
yield Err(Error::new("Document not found")); | ||
return | ||
} | ||
}; | ||
|
||
|
||
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about push-based replication in another issue which is slightly related to this, maybe it's helpful for you:
We could introduce a "subscription service" or struct on the global Context
which simply just keeps a map of who's interested in what ("node c is interested in schema p", "client k is interested in document y", etc.). The node interests we know through Announcement
messages (similar to search queries on a network), so it'll be quite easy to implement that, client interests we will learn through new subscriptions.
The materializer service informs the subscription service about any finished materialization via the bus, by document id and by schema id.
The subscription service fires the regarding callbacks of the subscribers which then again trigger the logic needed (make db request, push result to client / initiate replication with node, later we'll have "live mode" which just pushes the new data without any fancy protocol).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to look at this service as part of #589 so we could tie this together at one point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, interesting. I build a more simple integration here, but yeah, that could be upgraded to a proper subscription service. I think if you are just interested in updates to a single document it's quite simple to just use what we already have, but as soon as you get into filtering and sorting it gets more complex. I also looked into using database queries as triggers for subscriptions, so that you register a query and get updated whenever its results change, but I didn't find something in sqlx or postgres yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, interesting. I build a more simple integration here
Looking already very much like that! Awesome!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if you are just interested in updates to a single document it's quite simple to just use what we already have, but as soon as you get into filtering and sorting it gets more complex
That will be pretty complex indeed 🤔 I think it's enough to scope things by schema id (collection query) or document id (document query) for now, even if it's ignoring the filter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can tell me a bit more about what you imagine the subscription service to offer then I can have a go at a first version. edit: actually, maybe I have an idea of what you mean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can tell me a bit more about what you imagine the subscription service to offer then I can have a go at a first version. edit: actually, maybe I have an idea of what you mean
Out of the top of my head (no laptop with me right now) some rough ideas, probably need improvement:
- Keep a new struct
Connections
/Peers
etc. in the globalContext
, similar to schema manager - The purpose of it is to manage a list of all currently connected / subscribed clients and (later) nodes, probably in form of a map with some sort of unique subscription id -> subscription enum + callback list mapping while the enum is either a
SchemaIdSet
(collection queries and node announcements) orDocumentId
(document queries) - Since this thing lives in global context now we can reach it from anywhere: the replication service can populate it with currently known nodes (it learns about it through peer dis-/connected messages) and the graphql service (whenever a subscription starts or ends)
- Later (not for now) we can use this state to learn about how many nodes I'm currently connected to etc. (Could be an protected graphql or public crate method)
- Callbacks are added and removed to the map scoped by this unique id and sets of interest
- A small subscription service is introduced (like all other services). It hooks into the message bus and waits for materializer events. Whenever a document got touched we want to know its schema id and document id (probably that's part of the message itself), so this needs to be sent from the materializer and received here
- The service as all others also has access to the context and therefore connection table. It looks up on each incoming message if someone is subscribed to that document or that schema and accordingly calls the callbacks
- For nodes we probably then just want to send a message to the replication service, telling it to begin replication
- For clients the callback probably just kicks in the subscription callback which repeats the query over the collection or document. As mentioned already I think it's fine if filters are currently ignored at this stage
- That's probably also not part of your PRs but note for later: our replication service should know if it is in live mode with this peer already, if yes it will just send the new data to it (later just via a gossip overlay broadcast which is also just sort of subscription over a topic / schema id set), if not it will just initiate a normal 1:1 replication session as we currently already do but it would become push-based which is nice
- Surely that subscription logic could live in another service, not it's own, not sure if it needs it's own place already. On the other hand, it doesn't do anything clearly related to one service and on top it might handle other things in the future like automatically removing timed-out callbacks etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delayed response, but YEY!! this is great :-) exciting to see subscriptions land.
I understand the next step is to have the subscription service respond to document updates, but the functionality this PR brings is already very powerful. I'm happy to merge this as is 👍
I guess tests are a pain..? all the document resolution logic which is reused here is already tested, but of course, not the actual subscription.
Prototype for GraphQL subscriptions that lets you stream the value of a single document, yields an update once a second.
📋 Checklist
CHANGELOG.md