Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(*): wrong ephemeral events impl, indexing id in database, config typo, duplicated event error. #103

Merged
merged 3 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ grpc_server:
port: 50050

# manager contains information about connection with manager instance.
manger:
manager:
# IP address of manager instance.
# default is local network manager:8888.
endpoint: "manager:8888"
Expand Down
36 changes: 19 additions & 17 deletions delivery/websocket/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,27 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint
eID := msg.Event.GetRawID()
pubkey := msg.Event.PublicKey

if !msg.Event.IsValid(eID) {
okm := message.MakeOK(false,
msg.Event.ID,
"invalid: id or sig is not correct.",
)

_ = conn.WriteMessage(1, okm)

status = invalidFail

return
}

qCtx, cancel := context.WithTimeout(context.Background(), s.redis.QueryTimeout)
defer cancel()

pipe := s.redis.Client.Pipeline()

bloomCheckCmd := pipe.BFExists(qCtx, s.redis.BloomFilterName, eID[:])

// TODO::: check config to enable filter checks
// todo::: check config to enable/disable filter checks.
whiteListCheckCmd := pipe.CFExists(qCtx, s.redis.WhiteListFilterName, pubkey)
blackListCheckCmd := pipe.CFExists(qCtx, s.redis.BlackListFilterName, pubkey)

Expand All @@ -65,8 +78,9 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint

return
}

if exists {
okm := message.MakeOK(true, msg.Event.ID, "")
okm := message.MakeOK(false, msg.Event.ID, "duplicate: this event is already received.")
_ = conn.WriteMessage(1, okm)

return
Expand Down Expand Up @@ -185,7 +199,7 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint
return
}

if err := s.redis.AddDelayedTask("expiration_events",
if err := s.redis.AddDelayedTask(expirationTaskListName,
fmt.Sprintf("%s:%d", msg.Event.ID, msg.Event.Kind), time.Until(time.Unix(expiration, 0))); err != nil {
okm := message.MakeOK(false,
msg.Event.ID, "error: can't add event to expiration queue.",
Expand All @@ -199,19 +213,6 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint
}
}

if !msg.Event.IsValid(eID) {
okm := message.MakeOK(false,
msg.Event.ID,
"invalid: id or sig is not correct.",
)

_ = conn.WriteMessage(1, okm)

status = invalidFail

return
}

if len(msg.Event.Content) > int(s.config.Limitation.MaxContentLength) {
okm := message.MakeOK(false,
msg.Event.ID,
Expand Down Expand Up @@ -280,9 +281,10 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint

return
}
_ = conn.WriteMessage(1, message.MakeOK(true, msg.Event.ID, ""))
}

_ = conn.WriteMessage(1, message.MakeOK(true, msg.Event.ID, ""))

_, err = s.redis.Client.BFAdd(qCtx, s.redis.BloomFilterName, eID[:]).Result()
if err != nil {
log.Printf("error: adding event to bloom filter.")
Expand Down
11 changes: 6 additions & 5 deletions delivery/websocket/task_scheduler.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package websocket

import (
"log"
"strconv"
"strings"
"time"

"github.com/dezh-tech/immortal/types"
)

const expirationTaskListName = "expiration_events"

func (s *Server) checkExpiration() {
for range time.Tick(time.Minute) {
tasks, err := s.redis.GetReadyTasks("expiration_events")
tasks, err := s.redis.GetReadyTasks(expirationTaskListName)
if err != nil {
log.Println("error in checking expired events", err)
continue
}

failedTasks := make([]string, 0)
Expand All @@ -40,9 +41,9 @@ func (s *Server) checkExpiration() {

if len(failedTasks) != 0 {
for _, ft := range failedTasks {
if err := s.redis.AddDelayedTask("expiration_events",
if err := s.redis.AddDelayedTask(expirationTaskListName,
ft, time.Minute*10); err != nil {
continue // todo::: retry then send log to manager.
continue
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions infrastructure/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"log"
"time"

"github.com/dezh-tech/immortal/types"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
Expand Down Expand Up @@ -40,6 +42,24 @@ func Connect(cfg Config) (*Database, error) {
return nil, err
}

indexModel := mongo.IndexModel{
Keys: bson.D{
{Key: "id", Value: 1},
},
Options: options.Index().SetUnique(true).SetName("uq_id"),
}

for _, collName := range types.KindToName {
qCtx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.QueryTimeout)*time.Millisecond)
_, err := client.Database(cfg.DBName).Collection(collName).Indexes().CreateOne(qCtx, indexModel)
if err != nil {
cancel()

continue
}
cancel()
}

return &Database{
Client: client,
DBName: cfg.DBName,
Expand Down
File renamed without changes.
93 changes: 1 addition & 92 deletions repository/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,99 +5,8 @@ import (
"github.com/dezh-tech/immortal/types"
)

var KindToCollectionName = map[types.Kind]string{
types.KindUserMetadata: "user_metadatas",
types.KindShortTextNote: "short_text_notes",
types.KindRecommendRelay: "recommend_relays",
types.KindFollows: "follows",
types.KindEncryptedDirectMessages: "encrypted_direct_messages",
types.KindEventDeletionRequest: "event_deletion_requests",
types.KindRepost: "reposts",
types.KindReaction: "reactions",
types.KindBadgeAward: "badge_awards",
types.KindGroupChatMessage: "group_chat_messages",
types.KindGroupChatThreadedReply: "group_chat_threaded_replies",
types.KindGroupThread: "group_threads",
types.KindGroupThreadReply: "group_thread_replies",
types.KindSeal: "seals",
types.KindDirectMessage: "direct_messages",
types.KindGenericRepost: "generic_reposts",
types.KindReactionToWebsite: "reactions_to_websites",
types.KindChannelCreation: "channel_creations",
types.KindChannelMetadata: "channel_metadatas",
types.KindChannelMessage: "channel_messages",
types.KindChannelHideMessage: "channel_hide_messages",
types.KindChannelMuteUser: "channel_mute_users",
types.KindChessPGN: "chess_pgn",
types.KindMergeRequests: "merge_requests",
types.KindBid: "bids",
types.KindBidConfirmation: "bid_confirmations",
types.KindOpenTimestamps: "open_timestamps",
types.KindGiftWrap: "gift_wraps",
types.KindFileMetadata: "file_metadatas",
types.KindLiveChatMessage: "live_chat_messages",
types.KindPatches: "patches",
types.KindIssues: "issues",
types.KindReplies: "replies",
types.KindStatus: "status",
types.KindProblemTracker: "problem_trackers",
types.KindReporting: "reportings",
types.KindLabel: "labels",
types.KindRelayReviews: "relay_reviews",
types.KindAIEmbeddingsVectorLists: "ai_embeddings_vector_lists",
types.KindTorrent: "torrents",
types.KindTorrentComment: "torrent_comments",
types.KindCoinJoinPool: "coin_join_pools",
types.KindCommunityPostApproval: "community_post_approvals",
types.KindJobRequest: "dvm",
types.KindJobResult: "dvm",
types.KindJobFeedback: "dvm",
types.KindGroups: "groups",
types.KindZapGoal: "zap_goals",
types.KindTidalLogin: "tidal_logins",
types.KindZapRequest: "zap_requests",
types.KindZap: "zaps",
types.KindHighlights: "highlights",
types.KindMuteList: "mute_lists",
types.KindPinList: "pin_lists",
types.KindRelayListMetadata: "relay_list_metadatas",
types.KindBookmarkList: "bookmark_lists",
types.KindCommunitiesList: "communities_lists",
types.KindPublicChatsList: "public_chats_lists",
types.KindBlockedRelaysList: "blocked_relays_lists",
types.KindSearchRelaysList: "search_relays_lists",
types.KindUserGroups: "user_groups",
types.KindInterestsList: "interests_lists",
types.KindUserEmojiList: "user_emoji_lists",
types.KindRelayListToReceiveDMs: "relay_list_to_receive_dms",
types.KindUserServerList: "user_server_lists",
types.KindFileStorageServerList: "file_storage_server_lists",
types.KindWalletInfo: "wallet_infos",
types.KindLightningPubRPC: "lightning_pub_rpcs",
types.KindClientAuthentication: "client_authentications",
types.KindWalletRequest: "wallet_requests",
types.KindWalletResponse: "wallet_responses",
types.KindNostrConnect: "nostr_connects",
types.KindBlobsStoredOnMediaServers: "blobs_stored_on_media_servers",
types.KindHTTPAuth: "http_auths",
types.KindFollowSets: "follow_sets",
types.KindGenericLists: "generic_lists",
types.KindRelaySets: "relay_sets",
types.KindBookmarkSets: "bookmark_sets",
types.KindCurationSets: "curation_sets",
types.KindVideoSets: "video_sets",
types.KindKindMuteSets: "kind_mute_sets",
types.KindProfileBadges: "profile_badges",
types.KindBadgeDefinition: "badge_definitions",
types.KindLiveEvent: "live_events",
types.KindShortFormPortraitVideoEvent: "short_form_portrait_video_events",
types.KindVideoViewEvent: "video_view_events",
types.KindCommunityDefinition: "community_definitions",
types.KindGroupsMetadata: "groups_metadata",
}

func getCollectionName(k types.Kind) string {
collName, ok := KindToCollectionName[k]
collName, ok := types.KindToName[k]
if ok {
return collName
}
Expand Down
14 changes: 6 additions & 8 deletions repository/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func (h *Handler) HandleReq(fs filter.Filters) ([]event.Event, error) {
queryKinds[k] = append(queryKinds[k], qf)
}
} else {
//! we query most requested kinds if there is no kind provided.
// FIX: any better way?
// ! it makes query to the most requested kinds if there is no kind provided.
// ? fix::: any better way?
for _, k := range possibleKinds {
queryKinds[k] = append(queryKinds[k], qf)
}
Expand Down Expand Up @@ -103,17 +103,18 @@ func (h *Handler) FilterToQuery(fq *filterQuery) (bson.D, *options.FindOptions,
query := make(bson.D, 0)
opts := options.Find()

// Filter by IDs
query = append(query, bson.E{Key: "pubkey", Value: bson.M{
"$exists": true,
}})

if len(fq.IDs) > 0 {
query = append(query, bson.E{Key: "id", Value: bson.M{"$in": fq.IDs}})
}

// Filter by Authors
if len(fq.Authors) > 0 {
query = append(query, bson.E{Key: "pubkey", Value: bson.M{"$in": fq.Authors}})
}

// Filter by Tags
if len(fq.Tags) > 0 {
tagQueries := bson.A{}
for tagKey, tagValues := range fq.Tags {
Expand All @@ -130,17 +131,14 @@ func (h *Handler) FilterToQuery(fq *filterQuery) (bson.D, *options.FindOptions,
query = append(query, bson.E{Key: "$and", Value: tagQueries})
}

// Filter by Since (created_at >=)
if fq.Since > 0 {
query = append(query, bson.E{Key: "created_at", Value: bson.M{"$gte": fq.Since}})
}

// Filter by Until (created_at <=)
if fq.Until > 0 {
query = append(query, bson.E{Key: "created_at", Value: bson.M{"$lte": fq.Since}})
}

// Add Limit to options
if fq.Limit > 0 && fq.Limit < h.config.MaxQueryLimit {
opts.SetLimit(int64(fq.Limit))
} else {
Expand Down
Loading
Loading