Skip to content

Commit

Permalink
Fixed test leaving mongodb/redis started, tag as 0.1.11
Browse files Browse the repository at this point in the history
  • Loading branch information
sylvinus committed Feb 20, 2015
1 parent 1d4b23e commit 2c7b975
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 47 deletions.
5 changes: 4 additions & 1 deletion mrq/bin/mrq_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
sys.path.insert(0, os.getcwd())

from mrq import config, utils
from mrq.context import set_current_config, set_current_job
from mrq.context import set_current_config, set_current_job, connections
from mrq.job import queue_job
from mrq.utils import load_class_by_path

Expand Down Expand Up @@ -60,5 +60,8 @@ def main():
ret = job.perform()
print json.dumps(ret) # pylint: disable=no-member

# This shouldn't be needed as the process will exit and close any remaining sockets
# connections.redis.connection_pool.disconnect()

if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion mrq/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def add_parser_args(parser, config_type):
parser.add_argument(
'--redis_timeout',
action='store',
default=20,
default=30,
help='Redis connection pool timeout to wait for an available connection')

parser.add_argument(
Expand Down
2 changes: 1 addition & 1 deletion mrq/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = "0.1.10"
VERSION = "0.1.11"
2 changes: 0 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,6 @@ def send_task(self, path, params, **kwargs):
return self.send_tasks(path, [params], **kwargs)[0]

def send_task_cli(self, path, params, queue=None, **kwargs):
if not self.started and queue:
self.start()

cli = ["python", "mrq/bin/mrq_run.py", "--quiet"]
if queue:
Expand Down
6 changes: 5 additions & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ def test_cli_run_blocking(worker):

assert result == 42

worker.stop_deps()


def test_cli_run_nonblocking(worker):

worker.start_deps()
worker.start()

job_id1 = worker.send_task_cli(
"tests.tasks.general.Add", {"a": 41, "b": 1}, queue="default")
Expand All @@ -25,3 +27,5 @@ def test_cli_run_nonblocking(worker):

assert job1.data["status"] == "success"
assert job1.data["result"] == 42

worker.stop()
41 changes: 0 additions & 41 deletions tests/test_context.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import json
import time
import os
import pytest


def test_context_get(worker):
Expand Down Expand Up @@ -98,42 +96,3 @@ def test_context_setup():

assert out.endswith("42\ntestname1\n")


@pytest.mark.parametrize(["gevent_count", "subpool_size", "iterations", "expected_clients"], [
(None, None, 1, 1), # single task opens a single connection
(None, None, 2, 1),
(None, 10, 1, 10), # single task with subpool of 10 opens 10 connections
(None, 10, 2, 10),
(None, 200, 1, 100), # single task with subpool of 200 opens 100 connections, we reach the max_connections limit
(None, 200, 2, 100),
(4, None, 1, 4), # 4 gevent workers with a single task each : 4 connections
(4, None, 2, 4),
(2, 2, 1, 4), # 2 gevent workers with 2 single tasks each : 4 connections
(2, 2, 2, 4),
])
def test_redis_disconnections(gevent_count, subpool_size, iterations, expected_clients, worker):
""" mrq.context.connections is not the actual connections pool that the worker uses.
this worker's pool is not accessible from here, since it runs in a different thread.
"""
from mrq.context import connections

gevent_count = gevent_count if gevent_count is not None else 1

get_clients = lambda: [c for c in connections.redis.client_list() if c.get("cmd") != "client"]
# 1. start the worker and asserts that there is a redis client connected
kwargs = {"flags": "--redis_max_connections 100"}
if gevent_count:
kwargs["flags"] += " --gevent %s" % gevent_count

worker.start(**kwargs)

for i in range(0, iterations):
# sending tasks has the good side effect to wait for the worker to connect to redis
worker.send_tasks("tests.tasks.redis.Disconnections", [{"subpool_size": subpool_size}] * gevent_count)

assert len(get_clients()) == expected_clients

# 2. kill the worker and make sure that the connection was closed
worker.stop(deps=False) # so that we still have access to redis
assert len(get_clients()) == 0
48 changes: 48 additions & 0 deletions tests/test_disconnects.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,51 @@ def test_disconnects_service_during_task(worker, p_service):

# Result should be there without issues
assert Job(job_id1).fetch().data["result"] == 42


@pytest.mark.parametrize(["gevent_count", "subpool_size", "iterations", "expected_clients"], [
(None, None, 1, 1), # single task opens a single connection
(None, None, 2, 1),
(None, 10, 1, 10), # single task with subpool of 10 opens 10 connections
(None, 10, 2, 10),
(None, 200, 1, 100), # single task with subpool of 200 opens 100 connections, we reach the max_connections limit
(None, 200, 2, 100),
(4, None, 1, 4), # 4 gevent workers with a single task each : 4 connections
(4, None, 2, 4),
(2, 2, 1, 4), # 2 gevent workers with 2 single tasks each : 4 connections
(2, 2, 2, 4),
])
def test_redis_disconnections(gevent_count, subpool_size, iterations, expected_clients, worker):
""" mrq.context.connections is not the actual connections pool that the worker uses.
this worker's pool is not accessible from here, since it runs in a different thread.
"""
from mrq.context import connections

worker.start_deps()

gevent_count = gevent_count if gevent_count is not None else 1

get_clients = lambda: [c for c in connections.redis.client_list() if c.get("cmd") != "client"]

assert len(get_clients()) == 0

# 1. start the worker and asserts that there is a redis client connected
kwargs = {"flags": "--redis_max_connections 100", "deps": False}
if gevent_count:
kwargs["flags"] += " --gevent %s" % gevent_count

worker.start(**kwargs)

for i in range(0, iterations):
# sending tasks has the good side effect to wait for the worker to connect to redis
worker.send_tasks("tests.tasks.redis.Disconnections", [{"subpool_size": subpool_size}] * gevent_count)

assert len(get_clients()) == expected_clients

# 2. kill the worker and make sure that the connection was closed
worker.stop(deps=False) # so that we still have access to redis

assert len(get_clients()) == 0

worker.stop_deps()

0 comments on commit 2c7b975

Please sign in to comment.