Skip to content

Commit

Permalink
Name the callback function for progress monitor as `callback_for_moni…
Browse files Browse the repository at this point in the history
…tor`
  • Loading branch information
PeterDing committed Apr 9, 2024
1 parent f5d9369 commit c96336d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 29 deletions.
15 changes: 12 additions & 3 deletions alipcs_py/commands/download.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Iterable, Optional, List, Sequence, Tuple, Union
from typing import Any, Callable, Iterable, Optional, List, Sequence, Tuple, Union
from concurrent.futures import ThreadPoolExecutor, as_completed
from enum import Enum
from pathlib import Path
Expand Down Expand Up @@ -65,6 +65,7 @@ def download(
max_retries: int = 2,
out_cmd: bool = False,
encrypt_password: bytes = b"",
callback_for_monitor: Optional[Callable[[int], Any]] = None,
):
global DEFAULT_DOWNLOADER
if not self.which():
Expand All @@ -80,6 +81,7 @@ def download(
show_progress=show_progress,
max_retries=max_retries,
encrypt_password=encrypt_password,
callback_for_monitor=callback_for_monitor,
)
shutil.move(localpath_tmp, localpath)
return
Expand Down Expand Up @@ -149,6 +151,7 @@ def _me_download(
show_progress: bool = False,
max_retries: int = 2,
encrypt_password: bytes = b"",
callback_for_monitor: Optional[Callable[[int], Any]] = None,
):
headers = {
"Referer": "https://www.aliyundrive.com/",
Expand All @@ -166,19 +169,20 @@ def done_callback():

def monitor_callback(offset: int):
if task_id is not None:
_progress.update(task_id, completed=offset + 1)
_progress.update(task_id, completed=offset)

def except_callback(err):
reset_progress_task(task_id)

if isinstance(chunk_size, str):
chunk_size = human_size_to_int(chunk_size)

io = RangeRequestIO(
"GET",
url,
headers=headers,
max_chunk_size=chunk_size,
callback=monitor_callback,
callback=monitor_callback if callback_for_monitor is None else callback_for_monitor,
encrypt_password=encrypt_password,
)

Expand Down Expand Up @@ -307,6 +311,7 @@ def download_file(
max_retries: int = 2,
out_cmd: bool = False,
encrypt_password: bytes = b"",
callback_for_monitor: Optional[Callable[[int], Any]] = None,
) -> None:
"""Download a `remote_file` to the `localdir`
Expand All @@ -324,6 +329,9 @@ def download_file(
max_retries (int, optional): The max retries of download. Defaults to 2.
out_cmd (bool, optional): Whether print out the command. Defaults to False.
encrypt_password (bytes, optional): The password to decrypt the file. Defaults to b"".
callback_for_monitor (Callable[[int], Any], optional): The callback function for monitor. Defaults to None.
The callback function should accept one argument which is the count of bytes downloaded.
The callback function is only passed to the `MeDownloader` downloader.
"""

if isinstance(downloader, str):
Expand Down Expand Up @@ -396,6 +404,7 @@ def download_file(
max_retries=max_retries,
out_cmd=out_cmd,
encrypt_password=encrypt_password,
callback_for_monitor=callback_for_monitor,
)
except Exception as origin_err:
msg = f'Download "{remote_pcs_file.path}" (file_id = "{remote_pcs_file.file_id}") to "{localpath}" failed. error: {origin_err}'
Expand Down
78 changes: 52 additions & 26 deletions alipcs_py/commands/upload.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, Optional, List, Sequence, Tuple, IO, Union
from typing import Any, Callable, Optional, List, Sequence, Tuple, IO, Union
import os
import time
import math
Expand All @@ -11,17 +11,21 @@
from alipcs_py.alipcs import AliPCSApi, FromTo
from alipcs_py.alipcs.pcs import CheckNameMode
from alipcs_py.common import constant
from alipcs_py.common.path import PathType, is_file, exists, posix_path_basename, posix_path_dirname
from alipcs_py.common.path import PathType, exists, posix_path_basename, posix_path_dirname
from alipcs_py.common.event import KeyHandler, KeyboardMonitor
from alipcs_py.common.constant import CPU_NUM
from alipcs_py.common.concurrent import retry
from alipcs_py.common.progress_bar import _progress, progress_task_exists, remove_progress_task, reset_progress_task
from alipcs_py.common.progress_bar import (
_progress,
init_progress_bar,
progress_task_exists,
remove_progress_task,
reset_progress_task,
)
from alipcs_py.common.crypto import calc_sha1, calc_proof_code
from alipcs_py.common.io import total_len, EncryptType, reset_encrypt_io
from alipcs_py.commands.log import get_logger

from requests_toolbelt import MultipartEncoderMonitor

from rich.progress import TaskID
from rich import print

Expand Down Expand Up @@ -153,11 +157,7 @@ def upload(
futures = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for idx, from_to in enumerate(from_to_list):
task_id = None
if show_progress:
task_id = _progress.add_task("upload", start=False, title=from_to[0])

logger.debug("`upload_many`: Upload: index: %s, task_id: %s", idx, task_id)
logger.debug("`upload_many`: Upload: index: %s", idx)

retry_upload_file = retry(
max_retries,
Expand All @@ -178,7 +178,7 @@ def upload(
encrypt_type=encrypt_type,
slice_size=slice_size,
only_use_rapid_upload=only_use_rapid_upload,
task_id=task_id,
show_progress=show_progress,
)
futures.append(fut)

Expand Down Expand Up @@ -271,8 +271,8 @@ def upload_file(
encrypt_type: EncryptType = EncryptType.No,
slice_size: Union[str, int] = DEFAULT_SLICE_SIZE,
only_use_rapid_upload: bool = False,
task_id: Optional[TaskID] = None,
callback_for_monitor: Optional[Callable[[MultipartEncoderMonitor], None]] = None,
callback_for_monitor: Optional[Callable[[int], Any]] = None,
show_progress: bool = False,
) -> None:
"""Upload a file from `from_to[0]` to `from_to[1]`
Expand All @@ -289,7 +289,8 @@ def upload_file(
slice_size (Union[str, int], optional): Slice size. Defaults to DEFAULT_SLICE_SIZE.
only_use_rapid_upload (bool, optional): Only use rapid upload. If rapid upload fails, raise exception. Defaults to False.
task_id (Optional[TaskID], optional): Task ID. Defaults to None.
callback_for_monitor (Optional[Callable[[MultipartEncoderMonitor], None]], optional): Callback for progress monitor. Defaults to None.
callback_for_monitor (Optional[Callable[[int], Any]], optional): Callback for progress monitor. Defaults to None.
The callback should accept one argument which is the offset of the uploaded bytes.
Examples:
- Upload one file to one remote directory
Expand All @@ -302,10 +303,25 @@ def upload_file(
>>> localpath = "/local/file"
>>> from_to = (localpath, remotedir)
>>> upload_file(api, from_to)
```
- With tqdm progress bar
```python
>>> from alipcs_py.alipcs import AliPCSApi
>>> from alipcs_py.commands.upload import upload, from_tos
>>> api = AliPCSApi(...)
>>> remotedir = "/remote/dir"
>>> localpath = "/local/file"
>>> from_to = (localpath, remotedir)
>>> with tqdm.tqdm(total=Path(localpath).stat().st_size) as pbar:
>>> upload_file(api, from_to, callback_for_monitor=lambda offset: pbar.n = offset)
```
"""

_wait_start()

# Upload basic info
localpath, remotepath = from_to

remotedir = posix_path_dirname(remotepath)
Expand All @@ -318,12 +334,19 @@ def upload_file(

filename = posix_path_basename(remotepath)

# Progress bar
task_id: Optional[TaskID] = None
if show_progress:
init_progress_bar()
task_id = _progress.add_task("upload", start=False, title=from_to[0])

if not _need_to_upload(api, remotepath, check_name_mode):
if task_id is not None:
print(f"`{remotepath}` already exists.")
remove_progress_task(task_id)
return

# Upload IO info
info = _init_encrypt_io(localpath, encrypt_password=encrypt_password, encrypt_type=encrypt_type)
encrypt_io, encrypt_io_len, local_ctime, local_mtime = info
if isinstance(slice_size, str):
Expand All @@ -338,9 +361,16 @@ def upload_file(

slice_completed = 0

def callback_for_slice(monitor: MultipartEncoderMonitor):
if task_id is not None and progress_task_exists(task_id):
_progress.update(task_id, completed=slice_completed + monitor.bytes_read)
def callback_for_slice(offset: int):
if callback_for_monitor is not None:
callback_for_monitor(slice_completed + offset)
else:
if task_id is not None and progress_task_exists(task_id):
_progress.update(task_id, completed=slice_completed + offset)

def teardown():
encrypt_io.close()
remove_progress_task(task_id)

# Rapid Upload
try:
Expand Down Expand Up @@ -377,14 +407,17 @@ def callback_for_slice(monitor: MultipartEncoderMonitor):
task_id=task_id,
)
if ok:
teardown()
return
except Exception as origin_err:
teardown()
msg = f'Rapid upload "{localpath}" to "{remotepath}" failed. error: {origin_err}'
logger.debug(msg)
err = RapidUploadError(msg, localpath=localpath, remotepath=remotepath)
raise err from origin_err

if only_use_rapid_upload:
teardown()
msg = f'Only use rapid upload but rapid upload failed. localpath: "{localpath}", remotepath: "{remotepath}"'
logger.debug(msg)
err = RapidUploadError(msg, localpath=localpath, remotepath=remotepath)
Expand Down Expand Up @@ -440,11 +473,7 @@ def callback_for_slice(monitor: MultipartEncoderMonitor):
)

upload_url = upload_urls[slice_idx]
api.upload_slice(
io,
upload_url,
callback=callback_for_slice if callback_for_monitor is None else callback_for_monitor,
)
api.upload_slice(io, upload_url, callback_for_monitor=callback_for_slice)
slice_idx += 1
break
except Exception as origin_err:
Expand Down Expand Up @@ -474,8 +503,6 @@ def callback_for_slice(monitor: MultipartEncoderMonitor):
f"Hashs do not match between local file and remote file: local sha1 ({local_file_hash}) != remote sha1 ({remote_file_hash})"
)

remove_progress_task(task_id)

logger.debug(
"`upload_file`: upload_slice and combine_slices success, task_id: %s",
task_id,
Expand All @@ -486,5 +513,4 @@ def callback_for_slice(monitor: MultipartEncoderMonitor):
err = UploadError(msg, localpath=localpath, remotepath=remotepath)
raise err from origin_err
finally:
encrypt_io.close()
reset_progress_task(task_id)
teardown()

0 comments on commit c96336d

Please sign in to comment.