From 2c7b975135b7f0812d98008fe896666103285510 Mon Sep 17 00:00:00 2001 From: Sylvain Zimmer Date: Fri, 20 Feb 2015 16:22:04 +0100 Subject: [PATCH] Fixed test leaving mongodb/redis started, tag as 0.1.11 --- mrq/bin/mrq_run.py | 5 +++- mrq/config.py | 2 +- mrq/version.py | 2 +- tests/conftest.py | 2 -- tests/test_cli.py | 6 ++++- tests/test_context.py | 41 --------------------------------- tests/test_disconnects.py | 48 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 59 insertions(+), 47 deletions(-) diff --git a/mrq/bin/mrq_run.py b/mrq/bin/mrq_run.py index d08cbda5..3a9924f7 100755 --- a/mrq/bin/mrq_run.py +++ b/mrq/bin/mrq_run.py @@ -19,7 +19,7 @@ sys.path.insert(0, os.getcwd()) from mrq import config, utils -from mrq.context import set_current_config, set_current_job +from mrq.context import set_current_config, set_current_job, connections from mrq.job import queue_job from mrq.utils import load_class_by_path @@ -60,5 +60,8 @@ def main(): ret = job.perform() print json.dumps(ret) # pylint: disable=no-member + # This shouldn't be needed as the process will exit and close any remaining sockets + # connections.redis.connection_pool.disconnect() + if __name__ == "__main__": main() diff --git a/mrq/config.py b/mrq/config.py index 47570103..88868749 100644 --- a/mrq/config.py +++ b/mrq/config.py @@ -102,7 +102,7 @@ def add_parser_args(parser, config_type): parser.add_argument( '--redis_timeout', action='store', - default=20, + default=30, help='Redis connection pool timeout to wait for an available connection') parser.add_argument( diff --git a/mrq/version.py b/mrq/version.py index c1743d16..18c397e7 100644 --- a/mrq/version.py +++ b/mrq/version.py @@ -1 +1 @@ -VERSION = "0.1.10" +VERSION = "0.1.11" diff --git a/tests/conftest.py b/tests/conftest.py index df2b5afd..7e4de247 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -226,8 +226,6 @@ def send_task(self, path, params, **kwargs): return self.send_tasks(path, [params], **kwargs)[0] def send_task_cli(self, path, params, queue=None, **kwargs): - if not self.started and queue: - self.start() cli = ["python", "mrq/bin/mrq_run.py", "--quiet"] if queue: diff --git a/tests/test_cli.py b/tests/test_cli.py index 7c355ae6..e05d1f95 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -9,10 +9,12 @@ def test_cli_run_blocking(worker): assert result == 42 + worker.stop_deps() + def test_cli_run_nonblocking(worker): - worker.start_deps() + worker.start() job_id1 = worker.send_task_cli( "tests.tasks.general.Add", {"a": 41, "b": 1}, queue="default") @@ -25,3 +27,5 @@ def test_cli_run_nonblocking(worker): assert job1.data["status"] == "success" assert job1.data["result"] == 42 + + worker.stop() diff --git a/tests/test_context.py b/tests/test_context.py index 4b6a296a..bafc2808 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -1,7 +1,5 @@ import json -import time import os -import pytest def test_context_get(worker): @@ -98,42 +96,3 @@ def test_context_setup(): assert out.endswith("42\ntestname1\n") - -@pytest.mark.parametrize(["gevent_count", "subpool_size", "iterations", "expected_clients"], [ - (None, None, 1, 1), # single task opens a single connection - (None, None, 2, 1), - (None, 10, 1, 10), # single task with subpool of 10 opens 10 connections - (None, 10, 2, 10), - (None, 200, 1, 100), # single task with subpool of 200 opens 100 connections, we reach the max_connections limit - (None, 200, 2, 100), - (4, None, 1, 4), # 4 gevent workers with a single task each : 4 connections - (4, None, 2, 4), - (2, 2, 1, 4), # 2 gevent workers with 2 single tasks each : 4 connections - (2, 2, 2, 4), - -]) -def test_redis_disconnections(gevent_count, subpool_size, iterations, expected_clients, worker): - """ mrq.context.connections is not the actual connections pool that the worker uses. - this worker's pool is not accessible from here, since it runs in a different thread. - """ - from mrq.context import connections - - gevent_count = gevent_count if gevent_count is not None else 1 - - get_clients = lambda: [c for c in connections.redis.client_list() if c.get("cmd") != "client"] - # 1. start the worker and asserts that there is a redis client connected - kwargs = {"flags": "--redis_max_connections 100"} - if gevent_count: - kwargs["flags"] += " --gevent %s" % gevent_count - - worker.start(**kwargs) - - for i in range(0, iterations): - # sending tasks has the good side effect to wait for the worker to connect to redis - worker.send_tasks("tests.tasks.redis.Disconnections", [{"subpool_size": subpool_size}] * gevent_count) - - assert len(get_clients()) == expected_clients - - # 2. kill the worker and make sure that the connection was closed - worker.stop(deps=False) # so that we still have access to redis - assert len(get_clients()) == 0 diff --git a/tests/test_disconnects.py b/tests/test_disconnects.py index 89b70530..9ebba560 100644 --- a/tests/test_disconnects.py +++ b/tests/test_disconnects.py @@ -34,3 +34,51 @@ def test_disconnects_service_during_task(worker, p_service): # Result should be there without issues assert Job(job_id1).fetch().data["result"] == 42 + + +@pytest.mark.parametrize(["gevent_count", "subpool_size", "iterations", "expected_clients"], [ + (None, None, 1, 1), # single task opens a single connection + (None, None, 2, 1), + (None, 10, 1, 10), # single task with subpool of 10 opens 10 connections + (None, 10, 2, 10), + (None, 200, 1, 100), # single task with subpool of 200 opens 100 connections, we reach the max_connections limit + (None, 200, 2, 100), + (4, None, 1, 4), # 4 gevent workers with a single task each : 4 connections + (4, None, 2, 4), + (2, 2, 1, 4), # 2 gevent workers with 2 single tasks each : 4 connections + (2, 2, 2, 4), + +]) +def test_redis_disconnections(gevent_count, subpool_size, iterations, expected_clients, worker): + """ mrq.context.connections is not the actual connections pool that the worker uses. + this worker's pool is not accessible from here, since it runs in a different thread. + """ + from mrq.context import connections + + worker.start_deps() + + gevent_count = gevent_count if gevent_count is not None else 1 + + get_clients = lambda: [c for c in connections.redis.client_list() if c.get("cmd") != "client"] + + assert len(get_clients()) == 0 + + # 1. start the worker and asserts that there is a redis client connected + kwargs = {"flags": "--redis_max_connections 100", "deps": False} + if gevent_count: + kwargs["flags"] += " --gevent %s" % gevent_count + + worker.start(**kwargs) + + for i in range(0, iterations): + # sending tasks has the good side effect to wait for the worker to connect to redis + worker.send_tasks("tests.tasks.redis.Disconnections", [{"subpool_size": subpool_size}] * gevent_count) + + assert len(get_clients()) == expected_clients + + # 2. kill the worker and make sure that the connection was closed + worker.stop(deps=False) # so that we still have access to redis + + assert len(get_clients()) == 0 + + worker.stop_deps()