From 89255a13167e82226f7e113bbdba0b309dbd0d57 Mon Sep 17 00:00:00 2001 From: Kay Date: Sun, 12 Jan 2025 19:21:08 +0000 Subject: [PATCH] feat: supporting nip-09, accepting 1 filter per req message, using aggregation pipeline on req.: --- delivery/websocket/event_handler.go | 27 ++++++++- documents/NIPs.md | 2 +- repository/delete.go | 87 ++++++++++++++++++++++++++--- repository/req.go | 5 +- 4 files changed, 106 insertions(+), 15 deletions(-) diff --git a/delivery/websocket/event_handler.go b/delivery/websocket/event_handler.go index 471d529..e29ca8d 100644 --- a/delivery/websocket/event_handler.go +++ b/delivery/websocket/event_handler.go @@ -9,6 +9,8 @@ import ( "github.com/dezh-tech/immortal/infrastructure/redis" "github.com/dezh-tech/immortal/pkg/logger" "github.com/dezh-tech/immortal/pkg/utils" + "github.com/dezh-tech/immortal/types" + "github.com/dezh-tech/immortal/types/filter" "github.com/dezh-tech/immortal/types/message" "github.com/gorilla/websocket" gredis "github.com/redis/go-redis/v9" @@ -173,8 +175,29 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { } if !msg.Event.Kind.IsEphemeral() { - err := s.handler.HandleEvent(msg.Event) - if err != nil { + if msg.Event.Kind == types.KindEventDeletionRequest { + filterString := msg.Event.Tags.GetValue("filter") + filter, err := filter.Decode([]byte(filterString)) + if err != nil { + okm := message.MakeOK(false, + msg.Event.ID, + fmt.Sprintf("error: parse deletion event: %s", filterString), + ) + + _ = conn.WriteMessage(1, okm) + + status = invalidFail + + return + } + + // you can only delete events you own. + filter.Authors = []string{msg.Event.PublicKey} + + go s.handler.DeleteByFilter(filter) + } + + if err := s.handler.HandleEvent(msg.Event); err != nil { okm := message.MakeOK(false, msg.Event.ID, "error: can't write event to database.", diff --git a/documents/NIPs.md b/documents/NIPs.md index ab40ae6..fdf9404 100644 --- a/documents/NIPs.md +++ b/documents/NIPs.md @@ -1,7 +1,7 @@ # Immortal supported NIPs - [X] **NIP-01**: Basic Protocol Flow Description -- [ ] **NIP-09**: Event Deletion Request +- [X] **NIP-09**: Event Deletion Request - [X] **NIP-11**: Relay Information Document - [X] **NIP-13**: Proof of Work - [X] **NIP-40**: Expiration Timestamp diff --git a/repository/delete.go b/repository/delete.go index a52be37..816f0ef 100644 --- a/repository/delete.go +++ b/repository/delete.go @@ -5,24 +5,16 @@ import ( "github.com/dezh-tech/immortal/pkg/logger" "github.com/dezh-tech/immortal/types" + "github.com/dezh-tech/immortal/types/filter" "go.mongodb.org/mongo-driver/bson" ) func (h *Handler) DeleteByID(id string, kind types.Kind) error { - collName, _ := getCollectionName(kind) - coll := h.db.Client.Database(h.db.DBName).Collection(collName) - - ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout) - defer cancel() - filter := bson.D{ {Key: "id", Value: id}, } update := bson.D{ - {Key: "$set", Value: bson.D{ - {Key: "id", Value: id}, - }}, {Key: "$unset", Value: bson.D{ {Key: "pubkey"}, {Key: "created_at"}, @@ -33,6 +25,12 @@ func (h *Handler) DeleteByID(id string, kind types.Kind) error { }}, } + collName, _ := getCollectionName(kind) + coll := h.db.Client.Database(h.db.DBName).Collection(collName) + + ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout) + defer cancel() + _, err := coll.UpdateOne(ctx, filter, update) if err != nil { _, err := h.grpc.AddLog(context.Background(), @@ -46,3 +44,74 @@ func (h *Handler) DeleteByID(id string, kind types.Kind) error { return nil } + +func (h *Handler) DeleteByFilter(f *filter.Filter) error { + // question/todo::: is it possible to run the deletion on all collections with one database call? + // we have an open issue on deletion execution. + // we do the read operation using aggregation pipeline and $unionWith stage which + // helps us ti prevent multiple database calls and it would help us to do the operation faster. + // to do the same thing for deletion we need to filter the documents with $match, then update the + // fields of deleted event to null (expect the `id` since its unique index to prevent overwrites) with $unset + // then we apply them to collection using $merge. although we can't use multiple $merge's on one pipeline and we must have + // only one merge at the end of pipeline commands. also, $unionWith is restricted to be used with $merge. + + // notes::: these details may help you to think for solutions better: + // 1. we create a collection for each kind or each group of kinds. + // using this model forces us to make query to all collections corresponding to provided kinds when + // we are dealing with filters since filters contain a list of kinds (which can be empty and we are then forced to query all collections) + + // 2. when we delete an event we $unset all fields expect `id`. when we make a query to read from database, we ignore fields which + // their fields are null. and when we write new events we prevent overwriting events with duplicated `id`. so we can handle the deletion properly. + + // resources::: these links may help you: + // 1. https://www.mongodb.com/docs/manual/reference/operator/aggregation/merge/#restrictions + // 2. https://www.mongodb.com/docs/manual/reference/operator/aggregation/unionWith/#mongodb-pipeline-pipe.-unionWith + // 3. https://www.mongodb.com/docs/manual/reference/operator/aggregation + + queryKinds := make(map[types.Kind]*filter.Filter) + + if len(f.Kinds) != 0 { + uniqueKinds := removeDuplicateKinds(f.Kinds) + for _, k := range uniqueKinds { + queryKinds[k] = f + } + } else { + for k := range types.KindToName { + queryKinds[k] = f + } + } + + update := bson.D{ + {Key: "$unset", Value: bson.D{ + {Key: "pubkey"}, + {Key: "created_at"}, + {Key: "kind"}, + {Key: "tags"}, + {Key: "content"}, + {Key: "sig"}, + }}, + } + + for kind, filter := range queryKinds { + collectionName, isMultiKindColl := getCollectionName(kind) + + query := filterToMongoQuery(filter, isMultiKindColl, kind) + + ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout) + defer cancel() + + _, err := h.db.Client.Database(h.db.DBName).Collection(collectionName).UpdateMany(ctx, query, update) + if err != nil { + _, err := h.grpc.AddLog(ctx, + "database error while deleting events", err.Error()) + if err != nil { + logger.Error("can't send log to manager", "err", err) + } + + return err + } + + } + + return nil +} diff --git a/repository/req.go b/repository/req.go index 365a44b..0800257 100644 --- a/repository/req.go +++ b/repository/req.go @@ -59,7 +59,7 @@ func (h *Handler) HandleReq(f *filter.Filter) ([]event.Event, error) { finalLimit := h.config.DefaultQueryLimit if f.Limit > 0 && f.Limit < h.config.MaxQueryLimit { - finalLimit = uint32(f.Limit) + finalLimit = f.Limit } limitStage := bson.D{ @@ -71,8 +71,7 @@ func (h *Handler) HandleReq(f *filter.Filter) ([]event.Event, error) { ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout) defer cancel() - collection := h.db.Client.Database(h.db.DBName).Collection("empty") - cursor, err := collection.Aggregate(ctx, pipeline) + cursor, err := h.db.Client.Database(h.db.DBName).Collection("empty").Aggregate(ctx, pipeline) if err != nil { _, err := h.grpc.AddLog(context.Background(), "database error while adding new event", err.Error())