diff --git a/main.py b/main.py index e429a86..7d964b4 100644 --- a/main.py +++ b/main.py @@ -4,10 +4,9 @@ import functools import logging import os -import pathlib -import shutil import subprocess from multiprocessing import Pool +from pathlib import Path from tempfile import NamedTemporaryFile, TemporaryDirectory from typing import List @@ -43,7 +42,7 @@ def split_video(infile: str, output_dir: str, prefix: str, n_parts: int) -> List split_args = FFMPEG_CMD + ["-i", infile] segments = [] for i in range(n_parts): - filename = os.path.join(output_dir, f"{prefix}_{i}{pathlib.Path(infile).suffix}") + filename = os.path.join(output_dir, f"{prefix}_{i}{Path(infile).suffix}") split_args += [ "-ss", time_format(round(duration * i) / n_parts), "-to", time_format(round(duration * (i + 1) / n_parts)), @@ -77,73 +76,69 @@ def concatenate_videos(temp_dir, chunks: List[str], outfile: str): subprocess.check_output(concat_args) -def trim_silence(temp_dir: str, infile: str, outfile: str, min_silence_len: int, silence_thresh: int, margin: int) -> bool: +def trim_silence(infile: str, outfile: str, min_silence_len: int, silence_thresh: int, margin: int) -> bool: """ Returns ------- True trimming was successful and resulted in nonempty outfile """ - with TemporaryDirectory(dir=temp_dir) as workdir: - shutil.copy(os.path.join(workdir, infile), os.path.join(workdir, BASENAME + pathlib.Path(infile).suffix)) - - infile_video = os.path.join(workdir, BASENAME + pathlib.Path(infile).suffix) - infile_audio = os.path.join(workdir, BASENAME + ".mp3") - - logging.info(f"Extracting audio from \"{infile_video}\" to \"{infile_audio}\"") - ffmpeg \ - .input(infile_video) \ - .output(infile_audio) \ - .run(quiet=True) - logging.info("Done extracting audio") - - audio = AudioSegment.from_mp3(infile_audio) - - logging.info("Detecting nonsilent parts") - parts = silence.detect_nonsilent(audio, min_silence_len=min_silence_len, - silence_thresh=audio.dBFS + silence_thresh) - parts = [(start - margin, stop + margin) for (start, stop) in parts] - logging.info( - f"Detected {len(parts)} nonsilent parts, total duration: {sum((stop - start) for start, stop in parts)} ms") - logging.info(parts) - - if not parts: - return False - - logging.info("Trimming audio") - segments = AudioSegment._sync(*[audio[start:stop] for start, stop in parts]) - trimmed_audio = segments[0]._spawn([segment._data for segment in segments]) - logging.info("Done trimming audio") - - logging.info(f"Writing trimmed audio into {infile_audio} with duration {trimmed_audio.duration_seconds} s.") - trimmed_audio.export(infile_audio) - - parts = [(start / 1000, stop / 1000) for (start, stop) in parts] - - in_file = ffmpeg.input(infile_video) - - joined = ffmpeg.concat( - ffmpeg.concat( - *[in_file.trim(start=start, end=stop).setpts('PTS-STARTPTS') - for start, stop in parts]), - ffmpeg.input(infile_audio), - v=1, - a=1).node - ffmpeg.output(joined[0], joined[1], outfile).run(quiet=True, overwrite_output=True) - return True - - -def process_chunk(workdir, args, total_segments, id, segment): - filename = os.path.join(workdir, BASENAME + f"_cropped_{id}_" + pathlib.Path(args.infile).suffix) - logging.info(f"Processing chunk {id + 1}/{total_segments}: {segment} -> {filename}") - if not trim_silence(args.temp_dir, - segment, - filename, + infile_video = infile + infile_audio = str(Path(infile_video).with_suffix('.mp3')) + + logging.info(f"Extracting audio from \"{infile_video}\" to \"{infile_audio}\"") + ffmpeg \ + .input(infile_video) \ + .output(infile_audio) \ + .run(quiet=True) + logging.info("Done extracting audio") + + audio = AudioSegment.from_mp3(infile_audio) + + logging.info("Detecting nonsilent parts") + parts = silence.detect_nonsilent(audio, min_silence_len=min_silence_len, + silence_thresh=audio.dBFS + silence_thresh) + parts = [(start - margin, stop + margin) for (start, stop) in parts] + logging.info( + f"Detected {len(parts)} nonsilent parts, total duration: {sum((stop - start) for start, stop in parts)} ms") + logging.info(parts) + + if not parts: + return False + + logging.info("Trimming audio") + segments = AudioSegment._sync(*[audio[start:stop] for start, stop in parts]) + trimmed_audio = segments[0]._spawn([segment._data for segment in segments]) + logging.info("Done trimming audio") + + logging.info(f"Writing trimmed audio into {infile_audio} with duration {trimmed_audio.duration_seconds} s.") + trimmed_audio.export(infile_audio) + + parts = [(start / 1000, stop / 1000) for (start, stop) in parts] + + in_file = ffmpeg.input(infile_video) + + joined = ffmpeg.concat( + ffmpeg.concat( + *[in_file.trim(start=start, end=stop).setpts('PTS-STARTPTS') + for start, stop in parts]), + ffmpeg.input(infile_audio), + v=1, + a=1).node + ffmpeg.output(joined[0], joined[1], outfile).run(quiet=True, overwrite_output=True) + return True + + +def process_chunk(workdir: str, args: argparse.Namespace, total_segments: int, id_: int, segment: str): + outfile = os.path.join(workdir, BASENAME + f"_cropped_{id_}" + Path(args.infile).suffix) + logging.info(f"Processing chunk {id_ + 1}/{total_segments}: {segment} -> {outfile}") + if not trim_silence(segment, + outfile, args.min_silence_len, args.silence_thresh, args.margin): - filename = None - logging.info(f"Done processing chunk {id + 1}/{total_segments}") - return filename + outfile = None + logging.info(f"Done processing chunk {id_ + 1}/{total_segments}") + return outfile def main():