Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve audio concat; cleanup temp files; add -T(temp_dir) and -P (video_pool_size) args #1

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 94 additions & 78 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#!/usr/bin/python
#!/usr/bin/env python
Copy link
Author

@vvd170501 vvd170501 Oct 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Поменял, чтобы скрипт нормально работал в virtualenv

# -*- coding: utf-8 -*-
import argparse
import functools
import logging
import os
import pathlib
import shutil
import subprocess
import tempfile
from multiprocessing import Pool
from typing import List
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import List, Tuple

import ffmpeg
from pydub import AudioSegment, silence
Expand All @@ -24,9 +24,8 @@ def time_format(ms):
return "%02d:%02d:%02d.%03d" % (hours, minutes, sec, msec)


basename = "lecture"
chunks_count = 40
ffmpeg_cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
BASENAME = "lecture"
FFMPEG_CMD = ["ffmpeg", "-hide_banner", "-loglevel", "error"]


def get_length(filename: str) -> float:
Expand All @@ -38,36 +37,53 @@ def get_length(filename: str) -> float:
return float(result.stdout)


def concatenate_videos(chunks: List[str], outfile: str):
list_file = tempfile.NamedTemporaryFile("w", suffix='.txt')
for file in chunks:
print(f"file '{file}'", file=list_file)
list_file.flush()
logging.info(f'{len(chunks)} chunks written to {list_file.name}')
def split_video(infile: str, output_dir: str, prefix: str, n_parts: int) -> List[str]:
duration = round(get_length(infile) * 1000)
split_args = FFMPEG_CMD + ["-i", infile]
segments = []
for i in range(n_parts):
filename = os.path.join(output_dir, f"{prefix}_{i}{Path(infile).suffix}")
split_args += [
"-ss", time_format(round(duration * i) / n_parts),
"-to", time_format(round(duration * (i + 1) / n_parts)),
"-c", "copy",
filename
]
segments.append(filename)
logging.info(f"Splitting {infile} into {n_parts} segments")
subprocess.check_output(split_args)
logging.info(f"Done splitting {infile}")
logging.info(segments)
return segments


logging.info(f'Concatenating chunks into {outfile}')
def concatenate_videos(temp_dir, chunks: List[str], outfile: str):
with NamedTemporaryFile("w", suffix='.txt', dir=temp_dir) as list_file:
for file in chunks:
print(f"file '{file}'", file=list_file)
list_file.flush()
logging.info(f'{len(chunks)} chunks written to {list_file.name}')

concat_args = ffmpeg_cmd + [
"-safe", "0",
"-f", "concat",
"-i", str(list_file.name),
"-c", "copy",
outfile
]
subprocess.check_output(concat_args)
logging.info(f'Concatenating chunks into {outfile}')

concat_args = FFMPEG_CMD + [
"-safe", "0",
"-f", "concat",
"-i", str(list_file.name),
"-c", "copy",
outfile
]
subprocess.check_output(concat_args)


def trim_silence(infile: str, outfile: str, min_silence_len: int, silence_thresh: int, margin: int) -> bool:
def trim_silence(infile: str, min_silence_len: int, silence_thresh: int, margin: int) -> Tuple[str, str, List[Tuple[int, int]]]:
"""
Returns
-------
True trimming was successful and resulted in nonempty outfile
Video output, audio output and a list of nonsilent intervals in form of (start_ms, stop_ms)
"""
workdir = tempfile.mkdtemp()
shutil.copy(os.path.join(workdir, infile), os.path.join(workdir, basename + pathlib.Path(infile).suffix))

infile_video = os.path.join(workdir, basename + pathlib.Path(infile).suffix)
infile_audio = os.path.join(workdir, basename + ".mp3")
infile_video = infile
infile_audio = str(Path(infile_video).with_suffix('.mp3'))

logging.info(f"Extracting audio from \"{infile_video}\" to \"{infile_audio}\"")
ffmpeg \
Expand All @@ -86,65 +102,50 @@ def trim_silence(infile: str, outfile: str, min_silence_len: int, silence_thresh
f"Detected {len(parts)} nonsilent parts, total duration: {sum((stop - start) for start, stop in parts)} ms")
logging.info(parts)

if len(parts) == 0:
return False
if not parts:
return infile_video, infile_audio, []

logging.info("Trimming audio")
trimmed_audio = AudioSegment.empty()
for start, stop in parts:
trimmed_audio += audio[start:stop]
segments = AudioSegment._sync(*[audio[start:stop] for start, stop in parts])
trimmed_audio = segments[0]._spawn([segment._data for segment in segments])
logging.info("Done trimming audio")

logging.info(f"Writing trimmed audio into {infile_audio} with duration {trimmed_audio.duration_seconds} s.")
trimmed_audio.export(infile_audio)
return infile_video, infile_audio, parts


def process_chunk(workdir: str, args: argparse.Namespace, total_segments: int, id_: int, segment: str):
logging.info(f"Processing chunk {id_ + 1}/{total_segments}: {segment} -> {Path(segment).with_suffix('.mp3')}")
res = trim_silence(segment,
args.min_silence_len,
args.silence_thresh,
args.margin)
logging.info(f"Done processing audio chunk {id_ + 1}/{total_segments}")
return res


def trim_video(infile_video: str, infile_audio: str, parts: List[Tuple[int, int]]) -> str:
in_path = Path(infile_video)
outfile = str(in_path.with_stem(in_path.stem + '_cropped'))
parts = [(start / 1000, stop / 1000) for (start, stop) in parts]

in_file = ffmpeg.input(infile_video)

joined = ffmpeg.concat(
ffmpeg.concat(
*[in_file.trim(start=start, end=stop).setpts('PTS-STARTPTS')
for start, stop in parts]),
ffmpeg.input(infile_audio),
v=1,
a=1).node
ffmpeg.output(joined[0], joined[1], outfile).run(quiet=True, overwrite_output=True)
return True

logging.info(f'Trimming {infile_video} -> {outfile}')
ffmpeg.output(joined[0], joined[1], outfile).run(quiet=True, overwrite_output=True)
logging.info(f'Done trimming {infile_video}')
return outfile

def split_video(infile: str, output_dir: str, prefix: str, n_parts: int) -> List[str]:
duration = round(get_length(infile) * 1000)
split_args = ffmpeg_cmd + ["-i", infile]
segments = []
for i in range(n_parts):
filename = os.path.join(output_dir, f"{prefix}_{i}{pathlib.Path(infile).suffix}")
split_args += [
"-ss", time_format(round(duration * i) / n_parts),
"-to", time_format(round(duration * (i + 1) / n_parts)),
"-c", "copy",
filename
]
segments.append(filename)
logging.info(f"Splitting {infile} into {n_parts} segments")
subprocess.check_output(split_args)
logging.info(f"Done splitting {infile}")
logging.info(segments)
return segments

def process_chunk(id, segment):
filename = os.path.join(workdir, basename + f"_cropped_{id}_" + pathlib.Path(args.infile).suffix)
logging.info(f"Processing chunk {id + 1}/{len(segments)}: {segment} -> {filename}")
if not trim_silence(segment,
filename,
args.min_silence_len,
args.silence_thresh,
args.margin):
filename = None
logging.info(f"Done processing chunk {id + 1}/{len(segments)}")
return filename

if __name__ == '__main__':
def main():
parser = argparse.ArgumentParser(description='Trim silence from video.')
parser.add_argument('-i', '-input', dest='infile', help='input file', required=True)
parser.add_argument('-o', '-output', dest='outfile', help='output file', default="cropped.mp4")
Expand All @@ -156,18 +157,33 @@ def process_chunk(id, segment):
help='margin (ms)', default=100)
parser.add_argument('-n', dest='n_segments', type=int,
help='number of chunks to split input file to be processed independently', default=10)
parser.add_argument('-p', dest='pool_size', type=int,
help='number of chunks to be processed concurrently', default=1)
parser.add_argument('-p', dest='audio_pool_size', type=int,
help='number of audio chunks to be processed concurrently', default=1)
parser.add_argument('-P', dest='video_pool_size', type=int,
help='number of video chunks to be processed concurrently. '
'FFmpeg (without hwaccel) uses optimal number of threads by default, '
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Проверял с ffmpeg 4.4.2-0ubuntu0.22.04.1, количество используемых потоков равно кол-ву физических ядер процессора.
Запуск нескольких одновременно выполняющихся процессов ffmpeg не улучшает производительность, а наоборот, немного ухудшает её.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*Относится только к предпоследней стадии - trim+concat для каждого чанка.
Для разделения видео и аудио потоков и обработки аудио в каждом чанке используется 1 поток, поэтому на этих этапах нет проблем с параллельным исполнением.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Спасибо
Тоже хотел протестить, но никак не доходили руки
Кажется, когда добавлял эту опцию, тестил и использование многопоточности было оправданным
Сам ещё побенчмаркаю

ffmpeg по дефолту запускается с опцией threads=-1, поэтому звучит правдоподобно, что процесс будет занимать все ядра и будет конкурировать с другими

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvd170501, а какой порядок улучшений времени? Остались результаты замеров?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Замерил на 30 минутах лекции в качестве 1080p60.
CPU - Ryzen 7 5800HS (8 физических ядер).
Разбиение на чанки и обработка аудио во всех случаях занимали ~8 секунд.

С одним процессом ffmpeg (5a04dcc):

$ time ./main.py -s 150 -m 50 -n 8 -p 8 -T /tmp/ramdisk/ -i ls4_30min.mkv -o lec4_trimmed.mkv
real	6m10,282s
user	50m56,566s
sys	0m46,125s

Во время выполнения trim+concat каждое из 16 логических ядер используется на ~50%, ffmpeg использует ~1 ГБ памяти, load average (1min) ~=9.

С 8 параллельными процессами ffmpeg (c9bc294):

$ time ./main.py -s 150 -m 50 -n 8 -p 8 -T /tmp/ramdisk/ -i lec4_30min.mkv -o lec4_trimmed.mkv
real	7m25,505s
user	110m17,434s
sys	1m48,683s

Все логические ядра загружены на 100%, используется 8 ГБ памяти, load average (1min) ~=70.

C ускорением на RTX 3060 mobile (мб попробую сделать PR, но нужно как-то добавить проверку на доступность nvenc):

$ time ./main.py -s 150 -m 50 -n 8 -p 8 -P 2 -T /tmp/ramdisk/ -i lec4_30min.mkv -o lec4_trimmed.mkv
real	2m22,735s
user	2m0,605s
sys	0m8,618s

'so values greater than 1 usually won\'t improve performance', default=1)
parser.add_argument('-T', dest='temp_dir',
help='directory for temporary files')
parser.add_argument('-d', dest='log_level', action='store_const', const=logging.DEBUG, help='print debugging info',
default=logging.WARNING)
args = parser.parse_args()

logging.basicConfig(format="%(asctime)s - %(message)s", level=args.log_level)

workdir = tempfile.mkdtemp()
segments = split_video(args.infile, workdir, basename, args.n_segments)
cropped_segments = []
with Pool(processes=args.pool_size) as pool:
cropped_segments = pool.starmap(process_chunk, enumerate(segments))
cropped_segments = [segment for segment in cropped_segments if segment is not None]
concatenate_videos(cropped_segments, args.outfile)
with TemporaryDirectory(dir=args.temp_dir) as workdir:
segments = split_video(args.infile, workdir, BASENAME, args.n_segments)

with Pool(processes=args.audio_pool_size) as pool:
cropped_segments = pool.starmap(functools.partial(process_chunk, workdir, args, len(segments)), enumerate(segments))
cropped_segments = [(video, audio, parts) for video, audio, parts in cropped_segments if parts]

logging.info('Trimming video chunks')
with Pool(processes=args.video_pool_size) as pool:
resulting_segments = pool.starmap(trim_video, cropped_segments)

concatenate_videos(args.temp_dir, resulting_segments, args.outfile)


if __name__ == '__main__':
main()