diff --git a/config/config.yml b/config/config.yml index 2242e53..242f5e2 100644 --- a/config/config.yml +++ b/config/config.yml @@ -29,7 +29,7 @@ grpc_server: port: 50050 # manager contains information about connection with manager instance. -manger: +manager: # IP address of manager instance. # default is local network manager:8888. endpoint: "manager:8888" diff --git a/delivery/websocket/event_handler.go b/delivery/websocket/event_handler.go index b5efc71..d376cae 100644 --- a/delivery/websocket/event_handler.go +++ b/delivery/websocket/event_handler.go @@ -40,6 +40,19 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint eID := msg.Event.GetRawID() pubkey := msg.Event.PublicKey + if !msg.Event.IsValid(eID) { + okm := message.MakeOK(false, + msg.Event.ID, + "invalid: id or sig is not correct.", + ) + + _ = conn.WriteMessage(1, okm) + + status = invalidFail + + return + } + qCtx, cancel := context.WithTimeout(context.Background(), s.redis.QueryTimeout) defer cancel() @@ -47,7 +60,7 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint bloomCheckCmd := pipe.BFExists(qCtx, s.redis.BloomFilterName, eID[:]) - // TODO::: check config to enable filter checks + // todo::: check config to enable/disable filter checks. whiteListCheckCmd := pipe.CFExists(qCtx, s.redis.WhiteListFilterName, pubkey) blackListCheckCmd := pipe.CFExists(qCtx, s.redis.BlackListFilterName, pubkey) @@ -65,8 +78,9 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint return } + if exists { - okm := message.MakeOK(true, msg.Event.ID, "") + okm := message.MakeOK(false, msg.Event.ID, "duplicate: this event is already received.") _ = conn.WriteMessage(1, okm) return @@ -185,7 +199,7 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint return } - if err := s.redis.AddDelayedTask("expiration_events", + if err := s.redis.AddDelayedTask(expirationTaskListName, fmt.Sprintf("%s:%d", msg.Event.ID, msg.Event.Kind), time.Until(time.Unix(expiration, 0))); err != nil { okm := message.MakeOK(false, msg.Event.ID, "error: can't add event to expiration queue.", @@ -199,19 +213,6 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint } } - if !msg.Event.IsValid(eID) { - okm := message.MakeOK(false, - msg.Event.ID, - "invalid: id or sig is not correct.", - ) - - _ = conn.WriteMessage(1, okm) - - status = invalidFail - - return - } - if len(msg.Event.Content) > int(s.config.Limitation.MaxContentLength) { okm := message.MakeOK(false, msg.Event.ID, @@ -280,9 +281,10 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint return } - _ = conn.WriteMessage(1, message.MakeOK(true, msg.Event.ID, "")) } + _ = conn.WriteMessage(1, message.MakeOK(true, msg.Event.ID, "")) + _, err = s.redis.Client.BFAdd(qCtx, s.redis.BloomFilterName, eID[:]).Result() if err != nil { log.Printf("error: adding event to bloom filter.") diff --git a/delivery/websocket/task_scheduler.go b/delivery/websocket/task_scheduler.go index 8ee53c9..8d2e082 100644 --- a/delivery/websocket/task_scheduler.go +++ b/delivery/websocket/task_scheduler.go @@ -1,7 +1,6 @@ package websocket import ( - "log" "strconv" "strings" "time" @@ -9,11 +8,13 @@ import ( "github.com/dezh-tech/immortal/types" ) +const expirationTaskListName = "expiration_events" + func (s *Server) checkExpiration() { for range time.Tick(time.Minute) { - tasks, err := s.redis.GetReadyTasks("expiration_events") + tasks, err := s.redis.GetReadyTasks(expirationTaskListName) if err != nil { - log.Println("error in checking expired events", err) + continue } failedTasks := make([]string, 0) @@ -40,9 +41,9 @@ func (s *Server) checkExpiration() { if len(failedTasks) != 0 { for _, ft := range failedTasks { - if err := s.redis.AddDelayedTask("expiration_events", + if err := s.redis.AddDelayedTask(expirationTaskListName, ft, time.Minute*10); err != nil { - continue // todo::: retry then send log to manager. + continue } } } diff --git a/infrastructure/database/database.go b/infrastructure/database/database.go index 8f51611..5d2a8e0 100644 --- a/infrastructure/database/database.go +++ b/infrastructure/database/database.go @@ -5,6 +5,8 @@ import ( "log" "time" + "github.com/dezh-tech/immortal/types" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -40,6 +42,24 @@ func Connect(cfg Config) (*Database, error) { return nil, err } + indexModel := mongo.IndexModel{ + Keys: bson.D{ + {Key: "id", Value: 1}, + }, + Options: options.Index().SetUnique(true).SetName("uq_id"), + } + + for _, collName := range types.KindToName { + qCtx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.QueryTimeout)*time.Millisecond) + _, err := client.Database(cfg.DBName).Collection(collName).Indexes().CreateOne(qCtx, indexModel) + if err != nil { + cancel() + + continue + } + cancel() + } + return &Database{ Client: client, DBName: cfg.DBName, diff --git a/infrastructure/redis/redis_config.go b/infrastructure/redis/config.go similarity index 100% rename from infrastructure/redis/redis_config.go rename to infrastructure/redis/config.go diff --git a/repository/handler.go b/repository/handler.go index c83a4bf..372c5e6 100644 --- a/repository/handler.go +++ b/repository/handler.go @@ -5,99 +5,8 @@ import ( "github.com/dezh-tech/immortal/types" ) -var KindToCollectionName = map[types.Kind]string{ - types.KindUserMetadata: "user_metadatas", - types.KindShortTextNote: "short_text_notes", - types.KindRecommendRelay: "recommend_relays", - types.KindFollows: "follows", - types.KindEncryptedDirectMessages: "encrypted_direct_messages", - types.KindEventDeletionRequest: "event_deletion_requests", - types.KindRepost: "reposts", - types.KindReaction: "reactions", - types.KindBadgeAward: "badge_awards", - types.KindGroupChatMessage: "group_chat_messages", - types.KindGroupChatThreadedReply: "group_chat_threaded_replies", - types.KindGroupThread: "group_threads", - types.KindGroupThreadReply: "group_thread_replies", - types.KindSeal: "seals", - types.KindDirectMessage: "direct_messages", - types.KindGenericRepost: "generic_reposts", - types.KindReactionToWebsite: "reactions_to_websites", - types.KindChannelCreation: "channel_creations", - types.KindChannelMetadata: "channel_metadatas", - types.KindChannelMessage: "channel_messages", - types.KindChannelHideMessage: "channel_hide_messages", - types.KindChannelMuteUser: "channel_mute_users", - types.KindChessPGN: "chess_pgn", - types.KindMergeRequests: "merge_requests", - types.KindBid: "bids", - types.KindBidConfirmation: "bid_confirmations", - types.KindOpenTimestamps: "open_timestamps", - types.KindGiftWrap: "gift_wraps", - types.KindFileMetadata: "file_metadatas", - types.KindLiveChatMessage: "live_chat_messages", - types.KindPatches: "patches", - types.KindIssues: "issues", - types.KindReplies: "replies", - types.KindStatus: "status", - types.KindProblemTracker: "problem_trackers", - types.KindReporting: "reportings", - types.KindLabel: "labels", - types.KindRelayReviews: "relay_reviews", - types.KindAIEmbeddingsVectorLists: "ai_embeddings_vector_lists", - types.KindTorrent: "torrents", - types.KindTorrentComment: "torrent_comments", - types.KindCoinJoinPool: "coin_join_pools", - types.KindCommunityPostApproval: "community_post_approvals", - types.KindJobRequest: "dvm", - types.KindJobResult: "dvm", - types.KindJobFeedback: "dvm", - types.KindGroups: "groups", - types.KindZapGoal: "zap_goals", - types.KindTidalLogin: "tidal_logins", - types.KindZapRequest: "zap_requests", - types.KindZap: "zaps", - types.KindHighlights: "highlights", - types.KindMuteList: "mute_lists", - types.KindPinList: "pin_lists", - types.KindRelayListMetadata: "relay_list_metadatas", - types.KindBookmarkList: "bookmark_lists", - types.KindCommunitiesList: "communities_lists", - types.KindPublicChatsList: "public_chats_lists", - types.KindBlockedRelaysList: "blocked_relays_lists", - types.KindSearchRelaysList: "search_relays_lists", - types.KindUserGroups: "user_groups", - types.KindInterestsList: "interests_lists", - types.KindUserEmojiList: "user_emoji_lists", - types.KindRelayListToReceiveDMs: "relay_list_to_receive_dms", - types.KindUserServerList: "user_server_lists", - types.KindFileStorageServerList: "file_storage_server_lists", - types.KindWalletInfo: "wallet_infos", - types.KindLightningPubRPC: "lightning_pub_rpcs", - types.KindClientAuthentication: "client_authentications", - types.KindWalletRequest: "wallet_requests", - types.KindWalletResponse: "wallet_responses", - types.KindNostrConnect: "nostr_connects", - types.KindBlobsStoredOnMediaServers: "blobs_stored_on_media_servers", - types.KindHTTPAuth: "http_auths", - types.KindFollowSets: "follow_sets", - types.KindGenericLists: "generic_lists", - types.KindRelaySets: "relay_sets", - types.KindBookmarkSets: "bookmark_sets", - types.KindCurationSets: "curation_sets", - types.KindVideoSets: "video_sets", - types.KindKindMuteSets: "kind_mute_sets", - types.KindProfileBadges: "profile_badges", - types.KindBadgeDefinition: "badge_definitions", - types.KindLiveEvent: "live_events", - types.KindShortFormPortraitVideoEvent: "short_form_portrait_video_events", - types.KindVideoViewEvent: "video_view_events", - types.KindCommunityDefinition: "community_definitions", - types.KindGroupsMetadata: "groups_metadata", -} - func getCollectionName(k types.Kind) string { - collName, ok := KindToCollectionName[k] + collName, ok := types.KindToName[k] if ok { return collName } diff --git a/repository/req.go b/repository/req.go index 2f55fb1..c0722bc 100644 --- a/repository/req.go +++ b/repository/req.go @@ -50,8 +50,8 @@ func (h *Handler) HandleReq(fs filter.Filters) ([]event.Event, error) { queryKinds[k] = append(queryKinds[k], qf) } } else { - //! we query most requested kinds if there is no kind provided. - // FIX: any better way? + // ! it makes query to the most requested kinds if there is no kind provided. + // ? fix::: any better way? for _, k := range possibleKinds { queryKinds[k] = append(queryKinds[k], qf) } @@ -103,17 +103,18 @@ func (h *Handler) FilterToQuery(fq *filterQuery) (bson.D, *options.FindOptions, query := make(bson.D, 0) opts := options.Find() - // Filter by IDs + query = append(query, bson.E{Key: "pubkey", Value: bson.M{ + "$exists": true, + }}) + if len(fq.IDs) > 0 { query = append(query, bson.E{Key: "id", Value: bson.M{"$in": fq.IDs}}) } - // Filter by Authors if len(fq.Authors) > 0 { query = append(query, bson.E{Key: "pubkey", Value: bson.M{"$in": fq.Authors}}) } - // Filter by Tags if len(fq.Tags) > 0 { tagQueries := bson.A{} for tagKey, tagValues := range fq.Tags { @@ -130,17 +131,14 @@ func (h *Handler) FilterToQuery(fq *filterQuery) (bson.D, *options.FindOptions, query = append(query, bson.E{Key: "$and", Value: tagQueries}) } - // Filter by Since (created_at >=) if fq.Since > 0 { query = append(query, bson.E{Key: "created_at", Value: bson.M{"$gte": fq.Since}}) } - // Filter by Until (created_at <=) if fq.Until > 0 { query = append(query, bson.E{Key: "created_at", Value: bson.M{"$lte": fq.Since}}) } - // Add Limit to options if fq.Limit > 0 && fq.Limit < h.config.MaxQueryLimit { opts.SetLimit(int64(fq.Limit)) } else { diff --git a/types/kind.go b/types/kind.go index 4ada776..6ee1fbc 100644 --- a/types/kind.go +++ b/types/kind.go @@ -163,3 +163,94 @@ func (k Kind) Range() Range { return Ephemeral } + +var KindToName = map[Kind]string{ + KindUserMetadata: "user_metadatas", + KindShortTextNote: "short_text_notes", + KindRecommendRelay: "recommend_relays", + KindFollows: "follows", + KindEncryptedDirectMessages: "encrypted_direct_messages", + KindEventDeletionRequest: "event_deletion_requests", + KindRepost: "reposts", + KindReaction: "reactions", + KindBadgeAward: "badge_awards", + KindGroupChatMessage: "group_chat_messages", + KindGroupChatThreadedReply: "group_chat_threaded_replies", + KindGroupThread: "group_threads", + KindGroupThreadReply: "group_thread_replies", + KindSeal: "seals", + KindDirectMessage: "direct_messages", + KindGenericRepost: "generic_reposts", + KindReactionToWebsite: "reactions_to_websites", + KindChannelCreation: "channel_creations", + KindChannelMetadata: "channel_metadatas", + KindChannelMessage: "channel_messages", + KindChannelHideMessage: "channel_hide_messages", + KindChannelMuteUser: "channel_mute_users", + KindChessPGN: "chess_pgn", + KindMergeRequests: "merge_requests", + KindBid: "bids", + KindBidConfirmation: "bid_confirmations", + KindOpenTimestamps: "open_timestamps", + KindGiftWrap: "gift_wraps", + KindFileMetadata: "file_metadatas", + KindLiveChatMessage: "live_chat_messages", + KindPatches: "patches", + KindIssues: "issues", + KindReplies: "replies", + KindStatus: "status", + KindProblemTracker: "problem_trackers", + KindReporting: "reportings", + KindLabel: "labels", + KindRelayReviews: "relay_reviews", + KindAIEmbeddingsVectorLists: "ai_embeddings_vector_lists", + KindTorrent: "torrents", + KindTorrentComment: "torrent_comments", + KindCoinJoinPool: "coin_join_pools", + KindCommunityPostApproval: "community_post_approvals", + KindJobRequest: "dvm", + KindJobResult: "dvm", + KindJobFeedback: "dvm", + KindGroups: "groups", + KindZapGoal: "zap_goals", + KindTidalLogin: "tidal_logins", + KindZapRequest: "zap_requests", + KindZap: "zaps", + KindHighlights: "highlights", + KindMuteList: "mute_lists", + KindPinList: "pin_lists", + KindRelayListMetadata: "relay_list_metadatas", + KindBookmarkList: "bookmark_lists", + KindCommunitiesList: "communities_lists", + KindPublicChatsList: "public_chats_lists", + KindBlockedRelaysList: "blocked_relays_lists", + KindSearchRelaysList: "search_relays_lists", + KindUserGroups: "user_groups", + KindInterestsList: "interests_lists", + KindUserEmojiList: "user_emoji_lists", + KindRelayListToReceiveDMs: "relay_list_to_receive_dms", + KindUserServerList: "user_server_lists", + KindFileStorageServerList: "file_storage_server_lists", + KindWalletInfo: "wallet_infos", + KindLightningPubRPC: "lightning_pub_rpcs", + KindClientAuthentication: "client_authentications", + KindWalletRequest: "wallet_requests", + KindWalletResponse: "wallet_responses", + KindNostrConnect: "nostr_connects", + KindBlobsStoredOnMediaServers: "blobs_stored_on_media_servers", + KindHTTPAuth: "http_auths", + KindFollowSets: "follow_sets", + KindGenericLists: "generic_lists", + KindRelaySets: "relay_sets", + KindBookmarkSets: "bookmark_sets", + KindCurationSets: "curation_sets", + KindVideoSets: "video_sets", + KindKindMuteSets: "kind_mute_sets", + KindProfileBadges: "profile_badges", + KindBadgeDefinition: "badge_definitions", + KindLiveEvent: "live_events", + KindShortFormPortraitVideoEvent: "short_form_portrait_video_events", + KindVideoViewEvent: "video_view_events", + KindCommunityDefinition: "community_definitions", + KindGroupsMetadata: "groups_metadata", +}