Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

for same query_text refresh just execution once #7295

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions redash/tasks/queries/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from redash.tasks.failure_report import track_failure
from redash.tasks.worker import Job, Queue
from redash.utils import gen_query_hash, utcnow
from redash.utils.locks import acquire_lock, release_lock
from redash.worker import get_job_logger

logger = get_job_logger(__name__)
Expand All @@ -34,14 +35,18 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
logger.info("Inserting job for %s with metadata=%s", query_hash, metadata)
try_count = 0
job = None
job_lock_id = _job_lock_id(query_hash, data_source.id)

while try_count < 5:
try_count += 1
identifier = acquire_lock(job_lock_id)
if identifier is None:
continue

pipe = redis_connection.pipeline()
try:
pipe.watch(_job_lock_id(query_hash, data_source.id))
job_id = pipe.get(_job_lock_id(query_hash, data_source.id))
pipe.watch(job_lock_id)
job_id = pipe.get(job_lock_id)
if job_id:
logger.info("[%s] Found existing job: %s", query_hash, job_id)
job_complete = None
Expand All @@ -66,7 +71,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query

if lock_is_irrelevant:
logger.info("[%s] %s, removing lock", query_hash, message)
redis_connection.delete(_job_lock_id(query_hash, data_source.id))
redis_connection.delete(job_lock_id)
job = None

if not job:
Expand Down Expand Up @@ -115,6 +120,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
except redis.WatchError:
continue
finally:
release_lock(job_lock_id, identifier)
pipe.reset()

if not job:
Expand Down
59 changes: 59 additions & 0 deletions redash/utils/locks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import random
import time
import uuid
import logging
from redash import redis_connection
from redis import WatchError

logger = logging.getLogger(__name__)


def acquire_lock(name, acquire_timeout=10, lock_timeout=5):
identifier = str(uuid.uuid4())
lock_name = f"lock:{name}"
end = time.time() + acquire_timeout

base_delay = 0.001
max_delay = 0.05

while time.time() < end:
if redis_connection.set(lock_name, identifier, ex=lock_timeout, nx=True):
logger.info("acquire_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier)
return identifier

delay = base_delay + random.uniform(0, base_delay)
time.sleep(min(delay, max_delay))
base_delay = min(base_delay * 2, max_delay)

return None


def release_lock(name, identifier):
lock_name = f"lock:{name}"
logger.info("release_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier)
with redis_connection.pipeline() as pipe:
while True:
try:
pipe.watch(lock_name)
if pipe.get(lock_name) == identifier:
pipe.multi()
pipe.delete(lock_name)
pipe.execute()
logger.info("Lock released successfully, lock_name=[%s], identifier=[%s]", lock_name, identifier)
return True
pipe.unwatch()
logger.warning(
"Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name, identifier
)
break
except WatchError:
logger.warning(
"WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]",
lock_name,
identifier,
)
except Exception as e:
logger.error("Error releasing lock: %s", str(e))
break

return False
Loading