Skip to content

Commit

Permalink
pubsub (#6)
Browse files Browse the repository at this point in the history
* inmem pubsub

* refactor to use pubsub and channels

* non-blocking view.stream

* send event on websocket with fallback on http

* update plugin version

* subscription instead of sub

* pubsub redis adapter
  • Loading branch information
adnaan authored Nov 13, 2022
1 parent e216a0c commit 77bd3e3
Show file tree
Hide file tree
Showing 17 changed files with 836 additions and 505 deletions.
4 changes: 2 additions & 2 deletions alpinejs-plugin/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion alpinejs-plugin/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@adnaanx/fir",
"version": "0.0.7",
"version": "0.0.8",
"repository": {
"type": "git",
"url": "https://github.com/adnaan/fir"
Expand Down
69 changes: 39 additions & 30 deletions alpinejs-plugin/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const Plugin = (Alpine) => {
if (window.location.protocol === "https:") {
connectURL = `wss://${window.location.host}${window.location.pathname}`
}
websocket(connectURL, [], (patchOperation) => operations[patchOperation.op](patchOperation), updateStore);
const socket = websocket(connectURL, [], (patchOperation) => operations[patchOperation.op](patchOperation), updateStore);

Alpine.directive('fir-store', (el, { expression }, { evaluate }) => {
const val = evaluate(expression)
Expand Down Expand Up @@ -64,7 +64,6 @@ const Plugin = (Alpine) => {
}
});


let formMethod = el.getAttribute("method")
if (!formMethod) {
formMethod = "get"
Expand Down Expand Up @@ -173,45 +172,55 @@ const Plugin = (Alpine) => {
cancelable: true,
}


el.dispatchEvent(new CustomEvent(startEventName, options))
if (detail.id) {
el.dispatchEvent(new CustomEvent(`${startEventName}:${detail.id}`, options))
el.dispatchEvent(new CustomEvent(`${eventName}:${detail.id}`, options))
}

fetch(window.location.pathname, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-FIR-MODE': 'event'
},
body: JSON.stringify({
id: id,
params: params,
}),
})
.then(response => response.json())
.then(patchOperations => {
patchOperations.forEach(patchOperation => {
operations[patchOperation.op](patchOperation)
});
const event = {
id: id,
params: params,
}
if (socket.emit(event)) {
el.dispatchEvent(new CustomEvent(endEventName, options))
if (detail.id) {
el.dispatchEvent(new CustomEvent(`${endEventName}:${detail.id}`, options))
el.dispatchEvent(new CustomEvent(`${eventName}:${detail.id}`, options))
}
} else {
fetch(window.location.pathname, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-FIR-MODE': 'event'
},
body: JSON.stringify({
id: id,
params: params,
}),
})
.catch((error) => {
console.error(`${endEventName} error: ${error}, detail: ${detail}`, error,);
}).finally(() => {
el.dispatchEvent(new CustomEvent(endEventName, options))
if (detail.id) {
el.dispatchEvent(new CustomEvent(`${endEventName}:${detail.id}`, options))
el.dispatchEvent(new CustomEvent(`${eventName}:${detail.id}`, options))
}
});
.then(response => response.json())
.then(patchOperations => {
patchOperations.forEach(patchOperation => {
operations[patchOperation.op](patchOperation)
});
})
.catch((error) => {
console.error(`${endEventName} error: ${error}, detail: ${detail}`, error,);
}).finally(() => {
el.dispatchEvent(new CustomEvent(endEventName, options))
if (detail.id) {
el.dispatchEvent(new CustomEvent(`${endEventName}:${detail.id}`, options))
el.dispatchEvent(new CustomEvent(`${eventName}:${detail.id}`, options))
}
});
}

}

Alpine.plugin(morph)
}


const isObject = (obj) => {
return Object.prototype.toString.call(obj) === '[object Object]';
};
Expand Down
16 changes: 14 additions & 2 deletions alpinejs-plugin/src/websocket.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const reopenTimeouts = [2000, 5000, 10000, 30000, 60000];
const reopenTimeouts = [500, 1000, 1500, 2000, 5000, 10000, 30000, 60000];

export default websocket = (
url,
Expand Down Expand Up @@ -59,7 +59,6 @@ export default websocket = (
// console.log("socket disconnected")
}


socket.onclose = event => reOpenSocket();
socket.onmessage = event => {
try {
Expand Down Expand Up @@ -87,4 +86,17 @@ export default websocket = (
}

openSocket().then(() => { }).catch(e => console.error(e));

return {
emit(value) {
const send = () => socket.send(JSON.stringify(value));
if (socket.readyState !== WebSocket.OPEN) {
openSocket().then(() => { }).catch(e => console.error(e));
return false
} else {
send();
return true
}
}
}
}
25 changes: 25 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package fir

import (
"fmt"
"log"
"net/http"
"strings"
)

func DefaultChannelFunc(r *http.Request, viewID string) *string {
if viewID == "" {
viewID = "root"
if r.URL.Path != "/" {
viewID = strings.Replace(r.URL.Path, "/", "_", -1)
}
}

userID, ok := r.Context().Value(UserIDKey).(string)
if !ok || userID == "" {
log.Printf("warning: no user id in request context. user is anonymous\n")
userID = "anonymous"
}
channel := fmt.Sprintf("%s:%s", userID, viewID)
return &channel
}
4 changes: 3 additions & 1 deletion cli/cmd/new.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/*
Copyright © 2022 NAME HERE <EMAIL ADDRESS>
*/
package cmd

Expand Down Expand Up @@ -37,4 +36,7 @@ func init() {
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// newCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")

// @todo: litestack project -> sqlite, litestream, nats.io, bulmacss stack deployed to fly.io
// @todo: hello project -> a simple project with a custom helloview for local development.
}
Loading

0 comments on commit 77bd3e3

Please sign in to comment.