Skip to content

Commit

Permalink
fix IsValid issue, off load filter check to go
Browse files Browse the repository at this point in the history
  • Loading branch information
kehiy committed Sep 11, 2024
1 parent 8776a89 commit fd94c4c
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 32 deletions.
59 changes: 34 additions & 25 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"log"
"net/http"
"sync"

"github.com/dezh-tech/immortal/types/filter"
"github.com/dezh-tech/immortal/types/message"
Expand All @@ -15,12 +16,14 @@ import (
// TODO::: replace with https://github.com/coder/websocket.

type Relay struct {
conns map[*websocket.Conn]map[string]filter.Filters
conns map[*websocket.Conn]map[string]filter.Filters
connsLock sync.RWMutex
}

func NewRelay() *Relay {
return &Relay{
conns: make(map[*websocket.Conn]map[string]filter.Filters),
conns: make(map[*websocket.Conn]map[string]filter.Filters),
connsLock: sync.RWMutex{},
}
}

Expand All @@ -35,8 +38,9 @@ func (r *Relay) handleWS(ws *websocket.Conn) {
// TODO::: replace with logger.
log.Printf("new connection: %s\n", ws.RemoteAddr())

// TODO::: make it concurrent safe.
r.connsLock.Lock()
r.conns[ws] = make(map[string]filter.Filters)
r.connsLock.Unlock()

r.readLoop(ws)
}
Expand All @@ -53,13 +57,12 @@ func (r *Relay) readLoop(ws *websocket.Conn) {
// TODO::: replace with logger.
log.Printf("error in connection handling: %s\n", err)

// TODO::: drop connection?
continue
}

msg := message.ParseMessage(buf[:n])
if msg == nil {
_, _ = ws.Write(message.MakeNotice("error: can't parse message.")) // TODO::: should we check error?
_, _ = ws.Write(message.MakeNotice("error: can't parse message."))

continue
}
Expand All @@ -76,29 +79,28 @@ func (r *Relay) readLoop(ws *websocket.Conn) {

case "CLOSE":
go r.HandleClose(ws, msg)

default:
break
}
}
}

func (r *Relay) HandleReq(ws *websocket.Conn, m message.Message) {
// TODO::: loadfrom database and sent in first query based on limit.
// TODO::: return EOSE.
// TODO::: use a concurrent safe map.

msg, ok := m.(*message.Req)
if !ok {
_, _ = ws.Write(message.MakeNotice("error: can't parse REQ message")) // TODO::: should we check error?
_, _ = ws.Write(message.MakeNotice("error: can't parse REQ message"))

return
}

r.connsLock.Lock()
defer r.connsLock.Unlock()

subs, ok := r.conns[ws]
if !ok {
_, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s",
ws.RemoteAddr()))) // TODO::: should we check error?
ws.RemoteAddr())))

return
}
Expand All @@ -111,15 +113,14 @@ func (r *Relay) HandleReq(ws *websocket.Conn, m message.Message) {
func (r *Relay) HandleEvent(ws *websocket.Conn, m message.Message) {
// TODO::: send events to be stored and proccessed.

// can we ignore assertion check?
msg, ok := m.(*message.Event)
if !ok {
okm := message.MakeOK(false,
"",
"error: can't parse the message.", // TODO::: make an error builder.
)

_, _ = ws.Write(okm) // TODO::: should we check error?
_, _ = ws.Write(okm)

return
}
Expand All @@ -130,51 +131,59 @@ func (r *Relay) HandleEvent(ws *websocket.Conn, m message.Message) {
"invalid: invalid id or sig.", // TODO::: make an error builder.
)

_, _ = ws.Write(okm) // TODO::: should we check error?
_, _ = ws.Write(okm)

return
}

_, _ = ws.Write(message.MakeOK(true, msg.SubscriptionID, "")) // TODO::: should we check error?
_, _ = ws.Write(message.MakeOK(true, msg.SubscriptionID, ""))

// TODO::: any better way?
for conn, subs := range r.conns {
for id, filters := range subs {
if !filters.Match(msg.Event) {
continue
}
_, _ = conn.Write(message.MakeEvent(id, msg.Event)) // TODO::: should we check error?
// is this concurrent safe?
go func(conn *websocket.Conn, id string, filters filter.Filters) {
if !filters.Match(msg.Event) {
return
}
_, _ = conn.Write(message.MakeEvent(id, msg.Event))
}(conn, id, filters)
}
}
}

func (r *Relay) HandleClose(ws *websocket.Conn, m message.Message) {
msg, ok := m.(*message.Close)
if !ok {
_, _ = ws.Write(message.MakeNotice("error: can't parse CLOSE message")) // TODO::: should we check error?
_, _ = ws.Write(message.MakeNotice("error: can't parse CLOSE message"))

return
}

r.connsLock.Lock()
defer r.connsLock.Unlock()

conn, ok := r.conns[ws]
if !ok {
_, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s",
ws.RemoteAddr()))) // TODO::: should we check error?
ws.RemoteAddr())))

return
}

delete(conn, msg.String())
_, _ = ws.Write(message.MakeClosed(msg.String(), "ok: closed successfully")) // TODO::: should we check error?
_, _ = ws.Write(message.MakeClosed(msg.String(), "ok: closed successfully"))
}

// Stop shutdowns the relay gracefully.
func (r *Relay) Stop() error {
r.connsLock.Lock()
defer r.connsLock.Unlock()

for wsConn, subs := range r.conns {
for id := range subs {
_, _ = wsConn.Write(message.MakeClosed(id, "relay is stopping.")) // TODO::: should we check error?
_, _ = wsConn.Write(message.MakeClosed(id, "relay is stopping."))
}
_ = wsConn.Close() // TODO::: should we check error?
_ = wsConn.Close()
}

return nil
Expand Down
4 changes: 3 additions & 1 deletion tmp/client_recvr.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ ws.onmessage = (e) => {
console.log(e.data);
};

ws.send(req);
ws.send(
`["REQ","nak",{"ids":["cbba15aff4ed4db6370834c9370436ba20615ffa2d170515058f11e522c8dc02"]}]`,
);

// ws.send(close);
6 changes: 3 additions & 3 deletions tmp/client_sendr.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
const eventMsg = `["EVENT",{"kind":1,"id":"cbba15aff4ed4db6370834c9370436ba20615ffa2d170515058f11e522c8dc02","pubkey":"79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798","created_at":1726056813,"tags":[],"content":"test","sig":"282db08a865b5fe97d2351a3e7321ee279b9e6bd16e4e8d9f746c4ea148337e01bae7bc10f7465bdffe740bf6682d7aaadd4777919891ba845e66bf52ee7b6f8"}]`;

let ws = new WebSocket("ws://localhost:3000/ws");

ws.onmessage = (e) => {
console.log(e.data);
};

ws.send(eventMsg);
ws.send(
`["EVENT",{"kind":1,"id":"cbba15aff4ed4db6370834c9370436ba20615ffa2d170515058f11e522c8dc02","pubkey":"79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798","created_at":1726056813,"tags":[],"content":"test","sig":"282db08a865b5fe97d2351a3e7321ee279b9e6bd16e4e8d9f746c4ea148337e01bae7bc10f7465bdffe740bf6682d7aaadd4777919891ba845e66bf52ee7b6f8"}]`,
);
9 changes: 6 additions & 3 deletions types/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (e *Event) Serialize() []byte {

// IsValid function validats an event Signature and ID.
func (e *Event) IsValid() bool {
id := sha256.Sum256(e.Serialize())
if hex.EncodeToString(id[:]) != e.ID {
return false
}

pk, err := hex.DecodeString(e.PublicKey)
if err != nil {
return false
Expand All @@ -94,10 +99,8 @@ func (e *Event) IsValid() bool {
return false
}

hash := sha256.Sum256(e.Serialize())

// TODO::: replace with libsecp256k1 (C++ version).
return sig.Verify(hash[:], pubkey)
return sig.Verify(id[:], pubkey)
}

// String returns and encoded string representation of event e.
Expand Down

0 comments on commit fd94c4c

Please sign in to comment.