Skip to content

Commit

Permalink
feat: update cli megfile sync
Browse files Browse the repository at this point in the history
  • Loading branch information
liyang committed Jan 25, 2025
1 parent 7be020a commit 574a659
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 37 deletions.
102 changes: 66 additions & 36 deletions megfile/cli.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import configparser
import os
import signal
import shutil
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from queue import Queue

import click
from tqdm import tqdm
Expand Down Expand Up @@ -69,10 +71,13 @@ def cli(debug, log_level):


def safe_cli(): # pragma: no cover
debug = options.get("debug", False)
if not debug:
signal.signal(signal.SIGINT, signal.SIG_DFL)
try:
cli()
except Exception as e:
if options.get("debug", False):
if debug:
raise
else:
click.echo(f"\n[{type(e).__name__}] {e}", err=True)
Expand Down Expand Up @@ -364,23 +369,25 @@ def rm(path: str, recursive: bool):
@cli.command(short_help="Make source and dest identical, modifying destination only.")
@click.argument("src_path")
@click.argument("dst_path")
@click.option("-g", "--progress-bar", is_flag=True, help="Show progress bar.")
@click.option(
"-w", "--worker", type=click.INT, default=-1, help="Number of concurrent workers."
"-f", "--force", is_flag=True, help="Copy files forcible, ignore same files."
)
@click.option("--skip", is_flag=True, help="Skip existed files.")
@click.option(
"-f", "--force", is_flag=True, help="Copy files forcible, ignore same files."
"-w", "--worker", type=click.INT, default=-1, help="Number of concurrent workers."
)
@click.option("-G", "--no-progress-bar", is_flag=True, help="Do not show progress bar.")
@click.option("-v", "--verbose", is_flag=True, help="Show more progress log.")
@click.option("-q", "--quiet", is_flag=True, help="Not show any progress log.")
@click.option("--skip", is_flag=True, help="Skip existed files.")
def sync(
src_path: str,
dst_path: str,
progress_bar: bool,
worker: int,
force: bool,
quiet: bool,
skip: bool,
worker: int,
no_progress_bar: bool,
verbose: bool,
quiet: bool,
):
_sftp_prompt_host_key(src_path)
_sftp_prompt_host_key(dst_path)
Expand All @@ -389,7 +396,7 @@ def sync(
force = True

max_workers = worker if worker > 0 else (os.cpu_count() or 1) * 2
with ThreadPoolExecutor(max_workers=max_workers) as executor:
with ThreadPoolExecutor(max_workers=max_workers + 1) as executor: # +1 for scan
if has_magic(src_path):
src_root_path = get_non_glob_dir(src_path)
if not smart_exists(src_root_path):
Expand All @@ -411,42 +418,65 @@ def scan_func(path):
src_root_path = src_path
scan_func = partial(smart_scan_stat, followlinks=True)

if progress_bar and not quiet:
print("building progress bar", end="\r")
file_entries = []
total_count = total_size = 0
for total_count, file_entry in enumerate(scan_func(src_path), start=1):
if total_count > max_file_object_catch_count:
file_entries = []
else:
file_entries.append(file_entry)
total_size += file_entry.stat.size
print(f"building progress bar, find {total_count} files", end="\r")

if not file_entries:
file_entries = scan_func(src_path)
else:
total_count = total_size = None
file_entries = scan_func(src_path)

if quiet:
no_progress_bar = True
verbose = False

if no_progress_bar:
callback = callback_after_copy_file = None

if verbose:
def callback_after_copy_file(src_file_path, dst_file_path):
print(f"copy {src_file_path} to {dst_file_path} done")

file_entries = scan_func(src_path)
else:
tbar = tqdm(total=total_count, ascii=True)
tbar = tqdm(
total=0,
ascii=True,
desc="Files (scaning)",
)
sbar = tqdm(
unit="B",
total=0,
ascii=True,
unit="B",
unit_scale=True,
unit_divisor=1024,
total=total_size,
desc="File size (scaning)",
)

def callback(_filename: str, length: int):
sbar.update(length)

def callback_after_copy_file(src_file_path, dst_file_path):
if verbose:
tqdm.write(f"copy {src_file_path} to {dst_file_path} done")
tbar.update(1)

def callback(src_file_path: str, length: int):
sbar.update(length)

file_entry_queue = Queue(maxsize=max_file_object_catch_count)

def scan_and_put_file_entry_to_queue():
for file_entry in scan_func(src_path):
tbar.total += 1
sbar.total += file_entry.stat.size
tbar.refresh()
sbar.refresh()
file_entry_queue.put(file_entry)
file_entry_queue.put(None)
tbar.set_description_str("Files")
sbar.set_description_str("File size")

executor.submit(scan_and_put_file_entry_to_queue)

def get_file_entry_from_queue():
while True:
file_entry = file_entry_queue.get()
if file_entry is None:
break
yield file_entry

file_entries = get_file_entry_from_queue()

params_iter = (
dict(
src_root_path=src_root_path,
Expand All @@ -461,10 +491,10 @@ def callback_after_copy_file(src_file_path, dst_file_path):
for file_entry in file_entries
)
list(executor.map(_smart_sync_single_file, params_iter))
if not quiet:

if not no_progress_bar:
sbar.update(sbar.total - sbar.n)
tbar.close()
if progress_bar:
sbar.update(sbar.total - sbar.n)
sbar.close()


Expand Down
4 changes: 3 additions & 1 deletion megfile/smart.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def _smart_sync_single_file(items: dict):
force = items["force"]
overwrite = items["overwrite"]

content_path = os.path.relpath(src_file_path, start=src_root_path)
content_path = smart_relpath(src_file_path, start=src_root_path)
if len(content_path) and content_path != ".":
content_path = content_path.lstrip("/")
dst_abs_file_path = smart_path_join(dst_root_path, content_path)
Expand Down Expand Up @@ -438,6 +438,8 @@ def _smart_sync_single_file(items: dict):
callback=copy_callback,
followlinks=followlinks,
)
elif callback:
callback(src_file_path, src_file_stat.size)
if callback_after_copy_file:
callback_after_copy_file(src_file_path, dst_abs_file_path)
return should_sync
Expand Down

0 comments on commit 574a659

Please sign in to comment.