Skip to content

Commit

Permalink
feat(relay): add websocket. (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
kehiy authored Sep 11, 2024
1 parent aeb177c commit f934cc7
Show file tree
Hide file tree
Showing 13 changed files with 797 additions and 485 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ linters:
- makezero
- mirror
- misspell
- musttag
# - musttag
- nakedret
- nilerr
- nilnil
Expand Down
13 changes: 13 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package main

import "github.com/dezh-tech/immortal/relay"

// TODO::: create a full functioning CLI to manage rely.

func main() {
s := relay.NewRelay()
err := s.Start()
if err != nil {
panic(err)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/mailru/easyjson v0.7.7
github.com/stretchr/testify v1.9.0
github.com/tidwall/gjson v1.17.3
golang.org/x/net v0.29.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
192 changes: 192 additions & 0 deletions relay/relay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package relay

import (
"errors"
"fmt"
"io"
"log"
"net/http"
"sync"

"github.com/dezh-tech/immortal/types/filter"
"github.com/dezh-tech/immortal/types/message"
"golang.org/x/net/websocket"
)

// TODO::: replace with https://github.com/coder/websocket.
// TODO::: replace `log` with main logger.

// Relay represents a nostr relay which keeps track of client connections and handle them.
type Relay struct {
conns map[*websocket.Conn]map[string]filter.Filters
connsLock sync.RWMutex
}

func NewRelay() *Relay {
return &Relay{
conns: make(map[*websocket.Conn]map[string]filter.Filters),
connsLock: sync.RWMutex{},
}
}

// Start strats a new relay instance.
func (r *Relay) Start() error {
http.Handle("/ws", websocket.Handler(r.handleWS))
err := http.ListenAndServe(":3000", nil) //nolint

return err
}

// handleWS is WebSocket handler.
func (r *Relay) handleWS(ws *websocket.Conn) {
log.Printf("new connection: %s\n", ws.RemoteAddr())

r.connsLock.Lock()
r.conns[ws] = make(map[string]filter.Filters)
r.connsLock.Unlock()

r.readLoop(ws)
}

// readLoop reads incoming messages from a client and answer to them.
func (r *Relay) readLoop(ws *websocket.Conn) {
buf := make([]byte, 1024)
for {
n, err := ws.Read(buf)
if err != nil {
if errors.Is(err, io.EOF) {
break
}

log.Printf("error in connection handling: %s\n", err)

continue
}

msg := message.ParseMessage(buf[:n])
if msg == nil {
_, _ = ws.Write(message.MakeNotice("error: can't parse message."))

continue
}

log.Printf("received envelope: %s\n", msg.String())

switch msg.Type() {
case "REQ":
go r.handleReq(ws, msg)

case "EVENT":
go r.handleEvent(ws, msg)

case "CLOSE":
r.handleClose(ws, msg)
}
}
}

// handleReq handles new incoming REQ messages from client.
func (r *Relay) handleReq(ws *websocket.Conn, m message.Message) {
// TODO::: loadfrom database and sent in first query based on limit.
// TODO::: return EOSE.
// TODO::: return EVENT messages.

msg, ok := m.(*message.Req)
if !ok {
_, _ = ws.Write(message.MakeNotice("error: can't parse REQ message."))

return
}

r.connsLock.Lock()
defer r.connsLock.Unlock()

subs, ok := r.conns[ws]
if !ok {
_, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s.",
ws.RemoteAddr())))

return
}

subs[msg.SubscriptionID] = msg.Filters
}

// handleEvent handles new incoming EVENT messages from client.
func (r *Relay) handleEvent(ws *websocket.Conn, m message.Message) {
msg, ok := m.(*message.Event)
if !ok {
okm := message.MakeOK(false,
"",
"error: can't parse EVENT message.",
)

_, _ = ws.Write(okm)

return
}

if !msg.Event.IsValid() {
okm := message.MakeOK(false,
msg.SubscriptionID,
"invalid: id or sig is not correct.",
)

_, _ = ws.Write(okm)

return
}

_, _ = ws.Write(message.MakeOK(true, msg.SubscriptionID, ""))

for conn, subs := range r.conns {
for id, filters := range subs {
if !filters.Match(msg.Event) {
return
}
_, _ = conn.Write(message.MakeEvent(id, msg.Event))
}
}
}

// handleClose handles new incoming CLOSE messages from client.
func (r *Relay) handleClose(ws *websocket.Conn, m message.Message) {
msg, ok := m.(*message.Close)
if !ok {
_, _ = ws.Write(message.MakeNotice("error: can't parse CLOSE message."))

return
}

r.connsLock.Lock()
defer r.connsLock.Unlock()

conn, ok := r.conns[ws]
if !ok {
_, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s.",
ws.RemoteAddr())))

return
}

delete(conn, msg.String())
_, _ = ws.Write(message.MakeClosed(msg.String(), "ok: closed successfully."))
}

// Stop shutdowns the relay gracefully.
func (r *Relay) Stop() error {
r.connsLock.Lock()
defer r.connsLock.Unlock()

for wsConn, subs := range r.conns {
// close all subscriptions.
for id := range subs {
_, _ = wsConn.Write(message.MakeClosed(id, "error: shutdowning the relay."))
}

// close connection.
_ = wsConn.Close()
}

return nil
}
Loading

0 comments on commit f934cc7

Please sign in to comment.