Skip to content

Commit

Permalink
feat: supporting nip-09, accepting 1 filter per req message, using ag…
Browse files Browse the repository at this point in the history
…gregation pipeline on req.:
  • Loading branch information
kehiy committed Jan 12, 2025
1 parent a0cf0ac commit 89255a1
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 15 deletions.
27 changes: 25 additions & 2 deletions delivery/websocket/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/dezh-tech/immortal/infrastructure/redis"
"github.com/dezh-tech/immortal/pkg/logger"
"github.com/dezh-tech/immortal/pkg/utils"
"github.com/dezh-tech/immortal/types"
"github.com/dezh-tech/immortal/types/filter"
"github.com/dezh-tech/immortal/types/message"
"github.com/gorilla/websocket"
gredis "github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -173,8 +175,29 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) {
}

if !msg.Event.Kind.IsEphemeral() {
err := s.handler.HandleEvent(msg.Event)
if err != nil {
if msg.Event.Kind == types.KindEventDeletionRequest {
filterString := msg.Event.Tags.GetValue("filter")
filter, err := filter.Decode([]byte(filterString))

Check failure on line 180 in delivery/websocket/event_handler.go

View workflow job for this annotation

GitHub Actions / lint

importShadow: shadow of imported from 'github.com/dezh-tech/immortal/types/filter' package 'filter' (gocritic)
if err != nil {
okm := message.MakeOK(false,
msg.Event.ID,
fmt.Sprintf("error: parse deletion event: %s", filterString),
)

_ = conn.WriteMessage(1, okm)

status = invalidFail

return
}

// you can only delete events you own.
filter.Authors = []string{msg.Event.PublicKey}

go s.handler.DeleteByFilter(filter)

Check failure on line 197 in delivery/websocket/event_handler.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `s.handler.DeleteByFilter` is not checked (errcheck)
}

if err := s.handler.HandleEvent(msg.Event); err != nil {
okm := message.MakeOK(false,
msg.Event.ID,
"error: can't write event to database.",
Expand Down
2 changes: 1 addition & 1 deletion documents/NIPs.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Immortal supported NIPs

- [X] **NIP-01**: Basic Protocol Flow Description
- [ ] **NIP-09**: Event Deletion Request
- [X] **NIP-09**: Event Deletion Request
- [X] **NIP-11**: Relay Information Document
- [X] **NIP-13**: Proof of Work
- [X] **NIP-40**: Expiration Timestamp
Expand Down
87 changes: 78 additions & 9 deletions repository/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,16 @@ import (

"github.com/dezh-tech/immortal/pkg/logger"
"github.com/dezh-tech/immortal/types"
"github.com/dezh-tech/immortal/types/filter"
"go.mongodb.org/mongo-driver/bson"
)

func (h *Handler) DeleteByID(id string, kind types.Kind) error {
collName, _ := getCollectionName(kind)
coll := h.db.Client.Database(h.db.DBName).Collection(collName)

ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout)
defer cancel()

filter := bson.D{

Check failure on line 13 in repository/delete.go

View workflow job for this annotation

GitHub Actions / lint

importShadow: shadow of imported from 'github.com/dezh-tech/immortal/types/filter' package 'filter' (gocritic)
{Key: "id", Value: id},
}

update := bson.D{
{Key: "$set", Value: bson.D{
{Key: "id", Value: id},
}},
{Key: "$unset", Value: bson.D{
{Key: "pubkey"},
{Key: "created_at"},
Expand All @@ -33,6 +25,12 @@ func (h *Handler) DeleteByID(id string, kind types.Kind) error {
}},
}

collName, _ := getCollectionName(kind)
coll := h.db.Client.Database(h.db.DBName).Collection(collName)

ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout)
defer cancel()

_, err := coll.UpdateOne(ctx, filter, update)
if err != nil {
_, err := h.grpc.AddLog(context.Background(),
Expand All @@ -46,3 +44,74 @@ func (h *Handler) DeleteByID(id string, kind types.Kind) error {

return nil
}

func (h *Handler) DeleteByFilter(f *filter.Filter) error {
// question/todo::: is it possible to run the deletion on all collections with one database call?
// we have an open issue on deletion execution.
// we do the read operation using aggregation pipeline and $unionWith stage which
// helps us ti prevent multiple database calls and it would help us to do the operation faster.
// to do the same thing for deletion we need to filter the documents with $match, then update the
// fields of deleted event to null (expect the `id` since its unique index to prevent overwrites) with $unset
// then we apply them to collection using $merge. although we can't use multiple $merge's on one pipeline and we must have
// only one merge at the end of pipeline commands. also, $unionWith is restricted to be used with $merge.

// notes::: these details may help you to think for solutions better:
// 1. we create a collection for each kind or each group of kinds.
// using this model forces us to make query to all collections corresponding to provided kinds when
// we are dealing with filters since filters contain a list of kinds (which can be empty and we are then forced to query all collections)

// 2. when we delete an event we $unset all fields expect `id`. when we make a query to read from database, we ignore fields which
// their fields are null. and when we write new events we prevent overwriting events with duplicated `id`. so we can handle the deletion properly.

// resources::: these links may help you:
// 1. https://www.mongodb.com/docs/manual/reference/operator/aggregation/merge/#restrictions
// 2. https://www.mongodb.com/docs/manual/reference/operator/aggregation/unionWith/#mongodb-pipeline-pipe.-unionWith
// 3. https://www.mongodb.com/docs/manual/reference/operator/aggregation

queryKinds := make(map[types.Kind]*filter.Filter)

if len(f.Kinds) != 0 {
uniqueKinds := removeDuplicateKinds(f.Kinds)
for _, k := range uniqueKinds {
queryKinds[k] = f
}
} else {
for k := range types.KindToName {
queryKinds[k] = f
}
}

update := bson.D{
{Key: "$unset", Value: bson.D{
{Key: "pubkey"},
{Key: "created_at"},
{Key: "kind"},
{Key: "tags"},
{Key: "content"},
{Key: "sig"},
}},
}

for kind, filter := range queryKinds {
collectionName, isMultiKindColl := getCollectionName(kind)

query := filterToMongoQuery(filter, isMultiKindColl, kind)

ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout)
defer cancel()

Check failure on line 101 in repository/delete.go

View workflow job for this annotation

GitHub Actions / lint

deferInLoop: Possible resource leak, 'defer' is called in the 'for' loop (gocritic)

_, err := h.db.Client.Database(h.db.DBName).Collection(collectionName).UpdateMany(ctx, query, update)
if err != nil {
_, err := h.grpc.AddLog(ctx,
"database error while deleting events", err.Error())
if err != nil {
logger.Error("can't send log to manager", "err", err)
}

return err
}

}

Check failure on line 114 in repository/delete.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)

return nil
}
5 changes: 2 additions & 3 deletions repository/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (h *Handler) HandleReq(f *filter.Filter) ([]event.Event, error) {

finalLimit := h.config.DefaultQueryLimit
if f.Limit > 0 && f.Limit < h.config.MaxQueryLimit {
finalLimit = uint32(f.Limit)
finalLimit = f.Limit
}

limitStage := bson.D{
Expand All @@ -71,8 +71,7 @@ func (h *Handler) HandleReq(f *filter.Filter) ([]event.Event, error) {
ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout)
defer cancel()

collection := h.db.Client.Database(h.db.DBName).Collection("empty")
cursor, err := collection.Aggregate(ctx, pipeline)
cursor, err := h.db.Client.Database(h.db.DBName).Collection("empty").Aggregate(ctx, pipeline)
if err != nil {
_, err := h.grpc.AddLog(context.Background(),
"database error while adding new event", err.Error())
Expand Down

0 comments on commit 89255a1

Please sign in to comment.