Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add retry mechanism to file access methods #2287

Merged
merged 11 commits into from
Mar 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev-requirements.in
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ python-magic; (platform_system=='Darwin' or platform_system=='Linux')

types-protobuf
types-croniter
types-decorator
types-mock
autoflake

22 changes: 22 additions & 0 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
@@ -22,10 +22,12 @@
import pathlib
import tempfile
import typing
from time import sleep
from typing import Any, Dict, Optional, Union, cast
from uuid import UUID

import fsspec
from decorator import decorator
from fsspec.utils import get_protocol
from typing_extensions import Unpack

@@ -101,6 +103,24 @@
return {}


@decorator
def retry_request(func, *args, **kwargs):
# TODO: Remove this method once s3fs has a new release. https://github.com/fsspec/s3fs/pull/865
retries = kwargs.pop("retries", 5)
for retry in range(retries):
try:
if retry > 0:
sleep(random.randint(0, min(2**retry, 32)))

Check warning on line 113 in flytekit/core/data_persistence.py

Codecov / codecov/patch

flytekit/core/data_persistence.py#L113

Added line #L113 was not covered by tests
return func(*args, **kwargs)
except Exception as e:
# Catch this specific error message from S3 since S3FS doesn't catch it and retry the request.
if "Please reduce your request rate" in str(e):
if retry == retries - 1:
raise e

Check warning on line 119 in flytekit/core/data_persistence.py

Codecov / codecov/patch

flytekit/core/data_persistence.py#L119

Added line #L119 was not covered by tests
else:
raise e


class FileAccessProvider(object):
"""
This is the class that is available through the FlyteContext and can be used for persisting data to the remote
@@ -246,6 +266,7 @@
return anon_fs.exists(path)
raise oe

@retry_request
def get(self, from_path: str, to_path: str, recursive: bool = False, **kwargs):
file_system = self.get_filesystem_for_path(from_path)
if recursive:
@@ -272,6 +293,7 @@
return file_system.get(from_path, to_path, recursive=recursive, **kwargs)
raise oe

@retry_request
def put(self, from_path: str, to_path: str, recursive: bool = False, **kwargs):
file_system = self.get_filesystem_for_path(to_path)
from_path = self.strip_file_header(from_path)
Loading