Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add retry mechanism to file access methods #2287

Merged
merged 11 commits into from
Mar 25, 2024
Merged

feat: Add retry mechanism to file access methods #2287

merged 11 commits into from
Mar 25, 2024

Conversation

pingsutw
Copy link
Member

@pingsutw pingsutw commented Mar 20, 2024

Tracking issue

NA

Why are the changes needed?

Failed to upload a file to s3

      File "/usr/local/lib/python3.11/site-packages/s3fs/core.py", line 719, in _lsdir
        raise translate_boto_error(e)

Message:

    OSError: [Errno 16] Please reduce your request rate.

User error.
image

What changes were proposed in this pull request?

Catch the error Please reduce your request rate, and retry the request.

How was this patch tested?

Remote cluster

Setup process

import asyncio
import os
from typing import List, Dict, Any, Coroutine
import s3fs

from flytekit import ImageSpec, task, workflow, Resources, FlyteContextManager

# Constants
NUMBER_OF_REQUESTS: int = 100000
BUCKET_NAME: str = 's3://xxx/load-test/'
OBJECT_KEY: str = 'your_object_key'
DATA: str = 'abcsfsdfsdfsfdsdf'
s3fs = s3fs.S3FileSystem()


def write_data(source_file: str, object_key: str):
    ctx = FlyteContextManager.current_context()
    ctx.file_access.put(source_file, f"{BUCKET_NAME}{object_key}")


def make_file_if_not_exists(file_path: str, data: str) -> None:
    if not os.path.exists(file_path):
        with open(file_path, "w") as f:
            f.write(data)


async def make_s3_request(object_key: str, data: str, idx: int) -> Dict[str, Any]:
    if idx % 10 == 0:
        print(f"Making request for object_key: {idx}")
    source_file = f"/root/source_{idx % 10000}.txt"
    make_file_if_not_exists(source_file, data)
    response: Dict[str, Any] = await asyncio.to_thread(write_data, source_file, object_key)
    if idx % 10 == 0:
        print(f"Done with request {idx}")
    # response = await s3fs._put("/tmp/test/file.txt", f"s3://union-cloud-oc-staging-dogfood/load-test/{object_key}")
    return response


async def send_requests() -> List[Coroutine]:
    tasks: List[Coroutine] = [make_s3_request(f"{OBJECT_KEY}_{i}", DATA, i) for i in range(NUMBER_OF_REQUESTS)]
    print("Sending requests")
    return await asyncio.gather(*tasks)


async def main() -> None:
    responses: List[Dict[str, Any]] = await send_requests()
    # Do something with the responses


new_flytekit = "git+https://github.com/flyteorg/flytekit@0db9079b6e11767aac61c5dafc1c025ba04ef31e"
image = ImageSpec(registry="pingsutw", packages=[new_flytekit], apt_packages=["git"])


@task(
    requests=Resources(cpu="1", mem="3000Mi"),
    limits=Resources(cpu="1", mem="5000Mi"),
    interruptible=False,
    # container_image=image,
)
def s3_load_task():
    asyncio.run(main())


@workflow
def wf():
    for i in range(100):
        s3_load_task()


if __name__ == "__main__":
    asyncio.run(main())

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

NA

Docs link

NA

@pingsutw pingsutw marked this pull request as draft March 20, 2024 16:57
@dosubot dosubot bot added the size:S This PR changes 10-29 lines, ignoring generated files. label Mar 20, 2024
Copy link

codecov bot commented Mar 20, 2024

Codecov Report

Attention: Patch coverage is 64.70588% with 6 lines in your changes are missing coverage. Please review.

Project coverage is 83.60%. Comparing base (f45dc39) to head (8cf1bbd).
Report is 7 commits behind head on master.

Files Patch % Lines
flytekit/core/data_persistence.py 64.70% 3 Missing and 3 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2287      +/-   ##
==========================================
+ Coverage   83.31%   83.60%   +0.28%     
==========================================
  Files         309      311       +2     
  Lines       24064    23966      -98     
  Branches     3496     3503       +7     
==========================================
- Hits        20050    20037      -13     
+ Misses       3386     3298      -88     
- Partials      628      631       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
@pingsutw pingsutw marked this pull request as ready for review March 20, 2024 21:33
Signed-off-by: Kevin Su <[email protected]>
@dosubot dosubot bot added size:M This PR changes 30-99 lines, ignoring generated files. and removed size:S This PR changes 10-29 lines, ignoring generated files. labels Mar 21, 2024
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
@dosubot dosubot bot added size:S This PR changes 10-29 lines, ignoring generated files. and removed size:M This PR changes 30-99 lines, ignoring generated files. labels Mar 21, 2024
wild-endeavor
wild-endeavor previously approved these changes Mar 21, 2024
@dosubot dosubot bot added the lgtm This PR has been approved by maintainer label Mar 21, 2024
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
@pingsutw pingsutw marked this pull request as draft March 21, 2024 19:51
@pingsutw
Copy link
Member Author

s3fs PR. fsspec/s3fs#865

@pingsutw pingsutw self-assigned this Mar 21, 2024
@pingsutw pingsutw marked this pull request as ready for review March 21, 2024 20:14
Signed-off-by: Kevin Su <[email protected]>
for retry in range(retries):
try:
if retry > 0:
sleep(min(random.random() + 2 ** (retry - 1), 32))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exponential backoff + jitter is usually implemented like:

sleep(random.randint(0, min(2 ** retry, 32)))

https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ is the gold standard describing a few techniques for implementing exp. backoff + jitter.

Signed-off-by: Kevin Su <[email protected]>
@pingsutw pingsutw merged commit d9cea30 into master Mar 25, 2024
46 of 48 checks passed
fiedlerNr9 pushed a commit that referenced this pull request Jul 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
lgtm This PR has been approved by maintainer size:S This PR changes 10-29 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants