Skip to content

Commit

Permalink
[SYNPY-1298] flakey integration tests (#999)
Browse files Browse the repository at this point in the history
Fixes aimed at integration test stability
  • Loading branch information
BryanFauble authored Oct 30, 2023
1 parent c6223b5 commit 831c2a5
Show file tree
Hide file tree
Showing 22 changed files with 513 additions and 328 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ jobs:
export EXTERNAL_S3_BUCKET_AWS_SECRET_ACCESS_KEY="${{secrets.EXTERNAL_S3_BUCKET_AWS_SECRET_ACCESS_KEY}}"
# use loadscope to avoid issues running tests concurrently that share scoped fixtures
pytest -sv tests/integration -n auto --dist loadscope
pytest -sv tests/integration -n auto --ignore=tests/integration/synapseclient/test_command_line_client.py --dist loadscope
# Execute the CLI tests in a non-dist way because they were causing some test instability when being run concurrently
pytest -sv tests/integration/synapseclient/test_command_line_client.py
fi
# enforce the code matches the Black code style
lint:
runs-on: ubuntu-latest
Expand Down
459 changes: 244 additions & 215 deletions Pipfile.lock

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -64,25 +64,31 @@ install_requires =
cryptography<3.4; sys_platform == "linux"
deprecated>=1.2.4,<2.0
tests_require =
pytest>=5.0.0,<7.0
pytest>=6.0.0,<7.0
pytest-mock>=3.0,<4.0
flake8>=3.7.0,<4.0
pytest-xdist[psutil]>=2.2,<3.0.0
pytest-rerunfailures~=12.0
func-timeout~=4.3

[options.extras_require]
dev =
pytest>=5.0.0,<7.0
pytest>=6.0.0,<7.0
pytest-mock>=3.0,<4.0
flake8>=3.7.0,<4.0
pytest-xdist[psutil]>=2.2,<3.0.0
pytest-rerunfailures~=12.0
func-timeout~=4.3
black
pre-commit

tests =
pytest>=5.0.0,<7.0
pytest>=6.0.0,<7.0
pytest-mock>=3.0,<4.0
flake8>=3.7.0,<4.0
pytest-xdist[psutil]>=2.2,<3.0.0
pytest-rerunfailures~=12.0
func-timeout~=4.3

pandas =
pandas>=1.5,<2.1
Expand Down
16 changes: 2 additions & 14 deletions synapseclient/core/upload/multipart_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import concurrent.futures
from contextlib import contextmanager
import hashlib
import json
import math
import mimetypes
Expand All @@ -30,7 +29,7 @@
SynapseUploadAbortedException,
SynapseUploadFailedException,
)
from synapseclient.core.utils import md5_for_file, MB, Spinner
from synapseclient.core.utils import md5_fn, md5_for_file, MB, Spinner

# AWS limits
MAX_NUMBER_OF_PARTS = 10000
Expand Down Expand Up @@ -345,7 +344,6 @@ def _upload_parts(self, part_count, remaining_part_numbers):

if isinstance(cause, KeyboardInterrupt):
raise SynapseUploadAbortedException("User interrupted upload")

raise SynapseUploadFailedException("Part upload failed") from cause

def _complete_upload(self):
Expand Down Expand Up @@ -479,11 +477,6 @@ def multipart_upload_file(
def part_fn(part_number):
return _get_file_chunk(file_path, part_number, part_size)

def md5_fn(part, _):
md5 = hashlib.md5()
md5.update(part)
return md5.hexdigest()

return _multipart_upload(
syn,
dest_file_name,
Expand Down Expand Up @@ -533,7 +526,7 @@ def multipart_upload_string(

data = text.encode("utf-8")
file_size = len(data)
md5_hex = hashlib.md5(data).hexdigest()
md5_hex = md5_fn(data, None)

if not dest_file_name:
dest_file_name = "message.txt"
Expand All @@ -557,11 +550,6 @@ def multipart_upload_string(
def part_fn(part_number):
return _get_data_chunk(data, part_number, part_size)

def md5_fn(part, _):
md5 = hashlib.md5()
md5.update(part)
return md5.hexdigest()

part_size = _get_part_size(part_size, file_size)
return _multipart_upload(
syn,
Expand Down
12 changes: 12 additions & 0 deletions synapseclient/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ def md5_for_file(filename, block_size=2 * MB, callback=None):
return md5


def md5_fn(part, _):
"""Calculate the MD5 of a file-like object.
:part -- A file-like object to read from.
:returns: The MD5
"""
md5 = hashlib.new("md5", usedforsecurity=False)
md5.update(part)
return md5.hexdigest()


def download_file(url, localFilepath=None):
"""
Downloads a remote file.
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
def syn():
"""
Create a logged in Synapse instance that can be shared by all tests in the session.
If xdist is being used a syn is created for each worker node.
"""
print("Python version:", sys.version)

Expand All @@ -38,7 +39,8 @@ def syn():
@pytest.fixture(scope="session")
def project(request, syn):
"""
Create a project to be shared by all tests in the session.
Create a project to be shared by all tests in the session. If xdist is being used
a project is created for each worker node.
"""

# Make one project for all the tests to use
Expand Down
76 changes: 54 additions & 22 deletions tests/integration/synapseclient/core/test_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

import synapseclient.core.utils as utils
from synapseclient.core.exceptions import SynapseError, SynapseHTTPError
from synapseclient import File, Project
from synapseclient import File, Project, Synapse, Entity
from func_timeout import FunctionTimedOut, func_set_timeout


@pytest.fixture(scope="module")
Expand All @@ -40,7 +41,8 @@ def syn_state(syn):
del syn.test_threadsRunning


def test_threaded_access(syn, project, schedule_for_cleanup):
@pytest.mark.flaky(reruns=6)
def test_threaded_access(syn: Synapse, project: Project, schedule_for_cleanup):
"""Starts multiple threads to perform store and get calls randomly."""
# Doesn't this test look like a DOS attack on Synapse?
# Maybe it should be called explicity...
Expand All @@ -60,16 +62,16 @@ def test_threaded_access(syn, project, schedule_for_cleanup):
update_thread = wrap_function_as_child_thread(
syn, thread_get_and_update_file_from_Project, syn, project, schedule_for_cleanup
)
# thread.start_new_thread(store_thread, ())
# thread.start_new_thread(store_thread, ())
thread.start_new_thread(store_thread, ())
thread.start_new_thread(store_thread, ())
thread.start_new_thread(store_thread, ())
thread.start_new_thread(store_thread, ())
thread.start_new_thread(get_thread, ())
thread.start_new_thread(get_thread, ())
thread.start_new_thread(get_thread, ())
# thread.start_new_thread(get_thread, ())
thread.start_new_thread(update_thread, ())
thread.start_new_thread(update_thread, ())
thread.start_new_thread(update_thread, ())
# thread.start_new_thread(update_thread, ())

# Give the threads some time to wreak havoc on the cache
time.sleep(20)
Expand All @@ -89,7 +91,7 @@ def test_threaded_access(syn, project, schedule_for_cleanup):
#############


def wrap_function_as_child_thread(syn, function, *args, **kwargs):
def wrap_function_as_child_thread(syn: Synapse, function, *args, **kwargs):
"""Wraps the given function so that it ties into the main thread."""

def child_thread():
Expand All @@ -109,7 +111,7 @@ def child_thread():
return child_thread


def collect_errors_and_fail(syn):
def collect_errors_and_fail(syn: Synapse):
"""Pulls error traces from the error queue and fails if the queue is not empty."""
failures = []
for i in range(syn.test_errors.qsize()):
Expand All @@ -123,7 +125,7 @@ def collect_errors_and_fail(syn):
######################


def thread_keep_storing_one_File(syn, project, schedule_for_cleanup):
def thread_keep_storing_one_File(syn: Synapse, project: Project, schedule_for_cleanup):
"""Makes one file and stores it over and over again."""

# Make a local file to continuously store
Expand All @@ -134,7 +136,14 @@ def thread_keep_storing_one_File(syn, project, schedule_for_cleanup):
)

while syn.test_keepRunning:
stored = store_catch_412_HTTPError(syn, myPrecious)
stored = None
try:
stored = store_catch_412_HTTPError(syn, myPrecious)
except FunctionTimedOut:
syn.logger.warning(
f"thread_keep_storing_one_File()::store_catch_412_HTTPError timed out, [Path: {myPrecious.path}]"
)

if stored is not None:
myPrecious = stored
elif "id" in myPrecious:
Expand All @@ -145,22 +154,35 @@ def thread_keep_storing_one_File(syn, project, schedule_for_cleanup):
sleep_for_a_bit()


def thread_get_files_from_Project(syn, project):
def thread_get_files_from_Project(syn: Synapse, project: Project):
"""Continually polls and fetches items from the Project."""

while syn.test_keepRunning:
for id in get_all_ids_from_Project(syn, project):
pass
try:
get_all_ids_from_Project(syn, project)
except FunctionTimedOut:
syn.logger.warning(
f"thread_get_files_from_Project()::get_all_ids_from_Project timed out, [Project: {project.id}]"
)

sleep_for_a_bit()


def thread_get_and_update_file_from_Project(syn, project, schedule_for_cleanup):
def thread_get_and_update_file_from_Project(
syn: Synapse, project: Project, schedule_for_cleanup
):
"""Fetches one item from the Project and updates it with a new file."""

while syn.test_keepRunning:
id = get_all_ids_from_Project(syn, project)
if len(id) <= 0:
id = []
try:
id = get_all_ids_from_Project(syn, project)
except FunctionTimedOut:
syn.logger.warning(
f"thread_get_and_update_file_from_Project()::get_all_ids_from_Project timed out, [project: {project.id}]"
)
if len(id) == 0:
sleep_for_a_bit()
continue

id = id[random.randrange(len(id))]
Expand All @@ -170,7 +192,12 @@ def thread_get_and_update_file_from_Project(syn, project, schedule_for_cleanup):
path = utils.make_bogus_data_file()
schedule_for_cleanup(path)
entity.path = path
entity = store_catch_412_HTTPError(syn, entity)
try:
entity = store_catch_412_HTTPError(syn, entity)
except FunctionTimedOut:
syn.logger.warning(
f"thread_get_and_update_file_from_Project()::store_catch_412_HTTPError timed out, [project: {project.id}, path: {entity.path}]"
)
if entity is not None:
assert os.stat(entity.path) == os.stat(path)

Expand All @@ -182,18 +209,23 @@ def thread_get_and_update_file_from_Project(syn, project, schedule_for_cleanup):
####################


def sleep_for_a_bit():
def sleep_for_a_bit() -> int:
"""Sleeps for a random amount of seconds between 1 and 5 inclusive."""

time.sleep(random.randint(1, 5))
time_to_sleep = random.randint(1, 5)
time.sleep(time_to_sleep)
return time_to_sleep


def get_all_ids_from_Project(syn, project):
# When running with multiple threads it can lock up and do nothing until pipeline is killed at 6hrs
@func_set_timeout(20)
def get_all_ids_from_Project(syn: Synapse, project: Project):
"""Fetches all currently available Synapse IDs from the parent Project."""
return [result["id"] for result in syn.getChildren(project.id)]


def store_catch_412_HTTPError(syn, entity):
# When running with multiple threads it can lock up and do nothing until pipeline is killed at 6hrs
@func_set_timeout(20)
def store_catch_412_HTTPError(syn: Synapse, entity: Entity):
"""Returns the stored Entity if the function succeeds or None if the 412 is caught."""
try:
return syn.store(entity)
Expand Down
1 change: 1 addition & 0 deletions tests/integration/synapseclient/core/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def test_download_check_md5(syn, project, schedule_for_cleanup):
)


@pytest.mark.flaky(reruns=3)
def test_resume_partial_download(syn, project, schedule_for_cleanup):
original_file = utils.make_bogus_data_file(40000)

Expand Down
6 changes: 3 additions & 3 deletions tests/integration/synapseclient/core/test_external_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import tempfile
import uuid

from synapseclient import File
from synapseclient import File, Synapse, Project
from synapseclient.core.retry import with_retry

import pytest
Expand Down Expand Up @@ -48,11 +48,11 @@ def get_aws_env():
@unittest.skipIf(*check_test_preconditions())
class ExernalStorageTest(unittest.TestCase):
@pytest.fixture(autouse=True)
def _syn(self, syn):
def _syn(self, syn: Synapse):
self.syn = syn

@pytest.fixture(autouse=True)
def _project(self, project):
def _project(self, project: Project):
self.project = project

@classmethod
Expand Down
Loading

0 comments on commit 831c2a5

Please sign in to comment.