Skip to content

Commit

Permalink
feat(server): supporting nip-40. (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
kehiy authored Jan 1, 2025
1 parent 02675a7 commit 26875d7
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 38 deletions.
9 changes: 2 additions & 7 deletions documents/NIPs.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
# Immortal supported NIPs

The Immortal follows [NIPs](https://github.com/nostr-protocol/nips) and tries to implement itself as close as possible to this standards.

- [X] **NIP-01**: Basic Protocol Flow Description
- [ ] **NIP-09**: Event Deletion Request
- [X] **NIP-11**: Relay Information Document
- [X] **NIP-13**: Proof of Work
- [ ] **NIP-40**: Expiration Timestamp
- [X] **NIP-40**: Expiration Timestamp
- [X] **NIP-42**: Authentication of Clients to Relays
- [ ] **NIP-50**: Search Capability
- [ ] **NIP-56**: Reporting
- [X] **NIP-56**: Reporting
- [X] **NIP-70**: Protected Events
- [ ] **NIP-94**: File Metadata
- [ ] **NIP-96**: HTTP File Storage Integration
- [ ] **NIP-98**: HTTP Auth
1 change: 0 additions & 1 deletion documents/user/backup.md

This file was deleted.

1 change: 0 additions & 1 deletion documents/user/metrics-logs.md

This file was deleted.

1 change: 0 additions & 1 deletion documents/user/running.md

This file was deleted.

1 change: 0 additions & 1 deletion documents/user/update.md

This file was deleted.

40 changes: 40 additions & 0 deletions handler/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package handler

import (
"context"

"github.com/dezh-tech/immortal/types"
"go.mongodb.org/mongo-driver/bson"
)

func (h *Handler) DeleteByID(id string, kind types.Kind) error {
coll := h.db.Client.Database(h.db.DBName).Collection(getCollectionName(kind))

ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout)
defer cancel()

filter := bson.D{
{Key: "id", Value: id},
}

update := bson.D{
{Key: "$set", Value: bson.D{
{Key: "id", Value: id},
}},
{Key: "$unset", Value: bson.D{
{Key: "pubkey"},
{Key: "created_at"},
{Key: "kind"},
{Key: "tags"},
{Key: "content"},
{Key: "sig"},
}},
}

_, err := coll.UpdateOne(ctx, filter, update)
if err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions handler/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (h *Handler) HandleReq(fs filter.Filters) ([]event.Event, error) {
var finalResult []event.Event

for kind, filters := range queryKinds {
// todo::: querying database in goroutines.
collection := h.db.Client.Database(h.db.DBName).Collection(getCollectionName(kind))
for _, f := range filters {
query, opts, err := h.FilterToQuery(&f)
Expand Down
63 changes: 63 additions & 0 deletions relay/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,66 @@ func New(cfg Config) (*Redis, error) {
QueryTimeout: time.Duration(cfg.QueryTimeout) * time.Millisecond,
}, nil
}

// ! note: delayed tasks probably are not concurrent safe at the moment.
func (r Redis) AddDelayedTask(listName string,
data string, delay time.Duration,
) error {
taskReadyInSeconds := time.Now().Add(delay).Unix()
member := redis.Z{
Score: float64(taskReadyInSeconds),
Member: data,
}

ctx, cancel := context.WithTimeout(context.Background(), r.QueryTimeout)
defer cancel()

_, err := r.Client.ZAdd(ctx, listName, member).Result()
if err != nil {
return err
}

return nil
}

func (r Redis) GetReadyTasks(listName string) ([]string, error) {
maxTime := time.Now().Unix()

opt := &redis.ZRangeBy{
Min: fmt.Sprintf("%d", 0),
Max: fmt.Sprintf("%d", maxTime),
Count: 100,
}

ctx, cancel := context.WithTimeout(context.Background(), r.QueryTimeout)
defer cancel()

cmd := r.Client.ZRevRangeByScore(ctx, listName, opt)
resultSet, err := cmd.Result()
if err != nil {
return nil, err
}

if err := r.RemoveTasks(listName, resultSet); err != nil {
return nil, err
}

return resultSet, nil
}

func (r Redis) RemoveTasks(listName string, tasks []string) error {
if len(tasks) == 0 {
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), r.QueryTimeout)
defer cancel()

_, err := r.Client.ZRem(ctx,
listName, tasks).Result()
if err != nil {
return err
}

return nil
}
77 changes: 62 additions & 15 deletions server/websocket/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import (
"context"
"fmt"
"log"
"strconv"
"time"

"github.com/dezh-tech/immortal/types/message"
"github.com/dezh-tech/immortal/utils"
"github.com/gorilla/websocket"
)

// handleEvent handles new incoming EVENT messages from client.
func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) {
// todo::: too much complexity.
func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint
s.mu.Lock()
defer s.mu.Unlock()
defer measureLatency(s.metrics.EventLatency)()
Expand All @@ -34,6 +37,23 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) {
return
}

eID := msg.Event.GetRawID()

qCtx, cancel := context.WithTimeout(context.Background(), s.redis.QueryTimeout)
defer cancel()

exists, err := s.redis.Client.BFExists(qCtx, s.redis.BloomName, eID[:]).Result()
if err != nil {
log.Printf("error: checking bloom filter: %s", err.Error())
}

if exists {
okm := message.MakeOK(true, msg.Event.ID, "")
_ = conn.WriteMessage(1, okm)

return
}

client, ok := s.conns[conn]
if !ok {
_ = conn.WriteMessage(1, message.MakeOK(false,
Expand Down Expand Up @@ -81,21 +101,48 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) {
return
}

eID := msg.Event.GetRawID()
expirationTag := msg.Event.Tags.GetValue("expiration")

qCtx, cancel := context.WithTimeout(context.Background(), s.redis.QueryTimeout)
defer cancel()
if expirationTag != "" {
expiration, err := strconv.ParseInt(expirationTag, 10, 64)
if err != nil {
okm := message.MakeOK(false,
msg.Event.ID,
fmt.Sprintf("invalid: expiration tag %s.", expirationTag),
)

exists, err := s.redis.Client.BFExists(qCtx, s.redis.BloomName, eID[:]).Result()
if err != nil {
log.Printf("error: checking bloom filter: %s", err.Error())
}
_ = conn.WriteMessage(1, okm)

if exists {
okm := message.MakeOK(true, msg.Event.ID, "")
_ = conn.WriteMessage(1, okm)
status = invalidFail

return
return
}

if time.Now().Unix() >= expiration {
okm := message.MakeOK(false,
msg.Event.ID,
fmt.Sprintf("invalid: this event was expired in %s.", time.Unix(expiration, 0).String()),
)

_ = conn.WriteMessage(1, okm)

status = invalidFail

return
}

if err := s.redis.AddDelayedTask("expiration_events",
fmt.Sprintf("%s:%d", msg.Event.ID, msg.Event.Kind), time.Until(time.Unix(expiration, 0))); err != nil {
okm := message.MakeOK(false,
msg.Event.ID, "error: can't add event to expiration queue.",
)

_ = conn.WriteMessage(1, okm)

status = invalidFail

return
}
}

if !msg.Event.IsValid(eID) {
Expand Down Expand Up @@ -166,11 +213,11 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) {
}

if !msg.Event.Kind.IsEphemeral() {
err := s.handlers.HandleEvent(msg.Event)
err := s.handler.HandleEvent(msg.Event)
if err != nil {
okm := message.MakeOK(false,
msg.Event.ID,
err.Error(),
"error: can't write event to database.",
)

_ = conn.WriteMessage(1, okm)
Expand All @@ -184,7 +231,7 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) {

_, err = s.redis.Client.BFAdd(qCtx, s.redis.BloomName, eID[:]).Result()
if err != nil {
log.Printf("error: checking bloom filter: %s", err.Error())
log.Printf("error: adding event to bloom filter.")
}

// todo::: can we run goroutines per client?
Expand Down
2 changes: 1 addition & 1 deletion server/websocket/req_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) {
return
}

res, err := s.handlers.HandleReq(msg.Filters)
res, err := s.handler.HandleReq(msg.Filters)
if err != nil {
_ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: can't process REQ message: %s", err.Error())))
status = databaseFail
Expand Down
24 changes: 13 additions & 11 deletions server/websocket/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,31 @@ var upgrader = websocket.Upgrader{
type Server struct {
mu sync.RWMutex

config Config
conns map[*websocket.Conn]clientState
handlers *handler.Handler
metrics *metrics.Metrics
redis *redis.Redis
config Config
conns map[*websocket.Conn]clientState
handler *handler.Handler
metrics *metrics.Metrics
redis *redis.Redis
}

func New(cfg Config, h *handler.Handler, m *metrics.Metrics, r *redis.Redis,
) (*Server, error) {
return &Server{
config: cfg,
conns: make(map[*websocket.Conn]clientState),
mu: sync.RWMutex{},
handlers: h,
metrics: m,
redis: r,
config: cfg,
conns: make(map[*websocket.Conn]clientState),
mu: sync.RWMutex{},
handler: h,
metrics: m,
redis: r,
}, nil
}

// Start starts a new server instance.
func (s *Server) Start() error {
log.Println("websocket server started successfully...")

go s.checkExpiration()

http.Handle("/", s)
err := http.ListenAndServe(net.JoinHostPort(s.config.Bind, //nolint
strconv.Itoa(int(s.config.Port))), nil)
Expand Down
50 changes: 50 additions & 0 deletions server/websocket/task_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package websocket

import (
"log"
"strconv"
"strings"
"time"

"github.com/dezh-tech/immortal/types"
)

func (s *Server) checkExpiration() {
for range time.Tick(time.Minute) {
tasks, err := s.redis.GetReadyTasks("expiration_events")
if err != nil {
log.Println("error in checking expired events", err)
}

failedTasks := make([]string, 0)

if len(tasks) != 0 {
for _, task := range tasks {
data := strings.Split(task, ":")

if len(data) != 2 {
continue
}

kind, err := strconv.Atoi(data[1])
if err != nil {
continue
}

if err := s.handler.DeleteByID(data[0],
types.Kind(kind)); err != nil { //nolint
failedTasks = append(failedTasks, task)
}
}
}

if len(failedTasks) != 0 {
for _, ft := range failedTasks {
if err := s.redis.AddDelayedTask("expiration_events",
ft, time.Minute*10); err != nil {
continue // todo::: retry then send log to manager.
}
}
}
}
}
14 changes: 14 additions & 0 deletions types/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ func (tags Tags) ContainsAny(tagName string, values []string) bool {
return false
}

func (tags Tags) GetValue(tagName string) string {
for _, tag := range tags {
if len(tag) < 2 {
continue
}

if tag[0] == tagName {
return tag[1]
}
}

return ""
}

// Marshal Tag. Used for Serialization so string escaping should be as in RFC8259.
func (tag Tag) MarshalTo(dst []byte) []byte {
dst = append(dst, '[')
Expand Down

0 comments on commit 26875d7

Please sign in to comment.