Skip to content

Commit

Permalink
remove scan_iter from pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Kuo (Danswer) committed Feb 6, 2025
1 parent 8a0943a commit 2be59a4
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ def validate_external_group_sync_fence(
"validate_external_group_sync_fence - "
"Resetting fence because no associated celery tasks were found: "
f"cc_pair={cc_pair_id} "
f"fence={fence_key}"
f"fence={fence_key} "
f"payload_id={payload.id}"
)

redis_connector.external_group_sync.reset()
Expand Down
27 changes: 26 additions & 1 deletion backend/onyx/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from typing import Any
from typing import cast
from uuid import uuid4

Expand Down Expand Up @@ -30,6 +31,7 @@
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.connectors.factory import instantiate_connector
Expand All @@ -51,6 +53,7 @@
from onyx.redis.redis_connector_prune import RedisConnectorPrune
from onyx.redis.redis_connector_prune import RedisConnectorPrunePayload
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.server.utils import make_short_id
from onyx.utils.logger import LoggerContextVars
Expand Down Expand Up @@ -107,6 +110,7 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool:
)
def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)
r_replica = get_redis_replica_client(tenant_id=tenant_id)
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore

lock_beat: RedisLock = r.lock(
Expand Down Expand Up @@ -155,7 +159,7 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None:
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
try:
validate_pruning_fences(tenant_id, r, r_celery, lock_beat)
validate_pruning_fences(tenant_id, r, r_replica, r_celery, lock_beat)
except Exception:
task_logger.exception("Exception while validating pruning fences")

Expand Down Expand Up @@ -517,6 +521,7 @@ def monitor_ccpair_pruning_taskset(
def validate_pruning_fences(
tenant_id: str | None,
r: Redis,
r_replica: Redis,
r_celery: Redis,
lock_beat: RedisLock,
) -> None:
Expand All @@ -538,6 +543,26 @@ def validate_pruning_fences(
OnyxCeleryQueues.CONNECTOR_DELETION, r_celery
)

# Use replica for this because the worst thing that happens
# is that we don't run the validation on this pass
keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES))
for key in keys:
key_bytes = cast(bytes, key)
key_str = key_bytes.decode("utf-8")
if not key_str.startswith(RedisConnectorPrune.FENCE_PREFIX):
continue

validate_pruning_fence(
tenant_id,
key_bytes,
reserved_generator_tasks,
queued_upsert_tasks,
r,
r_celery,
)

lock_beat.reacquire()

# validate all existing indexing jobs
for key_bytes in r.scan_iter(
RedisConnectorPrune.FENCE_PREFIX + "*",
Expand Down

0 comments on commit 2be59a4

Please sign in to comment.