Skip to content

Commit

Permalink
Add vidformer io wrapping
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikWin committed Jan 28, 2025
1 parent 631b7a3 commit 4f9791f
Show file tree
Hide file tree
Showing 13 changed files with 183 additions and 33 deletions.
3 changes: 2 additions & 1 deletion vidformer-cli/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ impl DveBench {
&path,
stream,
&vidformer::service::Service::default(),
&None,
)
.unwrap()
})
.collect::<Vec<_>>();

let filters = default_filters();
let context = vidformer::Context::new(sources, filters);
let context = vidformer::Context::new(sources, filters, None);

(Arc::new(spec), Arc::new(context), Arc::new(self.config))
}
Expand Down
30 changes: 21 additions & 9 deletions vidformer-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub fn simple_source(
assert_eq!(split.len(), 3);
let (name, path, stream) = (split[0], split[1], split[2].parse::<usize>().unwrap());
let fs_service = vidformer::service::Service::default();
source::SourceVideoStreamMeta::profile(name, path, stream, &fs_service)
source::SourceVideoStreamMeta::profile(name, path, stream, &fs_service, &None)
}

pub fn opendal_source(
Expand All @@ -24,10 +24,10 @@ pub fn opendal_source(
service: Option<&vidformer::service::Service>,
) -> Result<vidformer::source::SourceVideoStreamMeta, vidformer::Error> {
if let Some(service) = service {
source::SourceVideoStreamMeta::profile(name, path, stream, service)
source::SourceVideoStreamMeta::profile(name, path, stream, service, &None)
} else {
let fs_service = vidformer::service::Service::default();
source::SourceVideoStreamMeta::profile(name, path, stream, &fs_service)
source::SourceVideoStreamMeta::profile(name, path, stream, &fs_service, &None)
}
}

Expand Down Expand Up @@ -107,9 +107,14 @@ enum ArgCmd {

fn cmd_profile(opt: &ProfileCmd) {
let fs_service = vidformer::service::Service::default();
let stream_meta =
source::SourceVideoStreamMeta::profile(&opt.name, &opt.vid_path, opt.stream, &fs_service)
.unwrap();
let stream_meta = source::SourceVideoStreamMeta::profile(
&opt.name,
&opt.vid_path,
opt.stream,
&fs_service,
&None,
)
.unwrap();
let profile_data = source::create_profile_file(&[stream_meta]);

if let Some(out_path) = &opt.out_path {
Expand All @@ -124,7 +129,13 @@ fn cmd_profile(opt: &ProfileCmd) {

fn cmd_validate(opt: &ValidateCmd) {
let fs_service = vidformer::service::Service::default();
source::SourceVideoStreamMeta::validate(&opt.name, &opt.vid_path, opt.stream, &fs_service);
source::SourceVideoStreamMeta::validate(
&opt.name,
&opt.vid_path,
opt.stream,
&fs_service,
&None,
);
}

fn cmd_x() {
Expand Down Expand Up @@ -152,9 +163,10 @@ fn cmd_x() {
let filters: BTreeMap<String, Box<dyn filter::Filter>> = BTreeMap::new();
let sources =
vec![
source::SourceVideoStreamMeta::profile("tos", "tos_720p.mp4", 0, &fs_service).unwrap(),
source::SourceVideoStreamMeta::profile("tos", "tos_720p.mp4", 0, &fs_service, &None)
.unwrap(),
];
let context: vidformer::Context = vidformer::Context::new(sources, filters);
let context: vidformer::Context = vidformer::Context::new(sources, filters, None);

let dve_config = vidformer::Config {
decode_pool_size: 10000,
Expand Down
4 changes: 2 additions & 2 deletions vidformer-cli/src/yrden.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ async fn yrden_http_req(
}
}

let context: vidformer::Context = vidformer::Context::new(sources, filters);
let context: vidformer::Context = vidformer::Context::new(sources, filters, None);
let context = std::sync::Arc::new(context);

let dve_config: vidformer::Config = vidformer::Config {
Expand Down Expand Up @@ -595,7 +595,7 @@ async fn yrden_http_req(
};
let format: Option<String> = request.format;

let context: vidformer::Context = vidformer::Context::new(sources, filters);
let context: vidformer::Context = vidformer::Context::new(sources, filters, None);
let context = std::sync::Arc::new(context);

let dve_config: vidformer::Config = vidformer::Config {
Expand Down
1 change: 1 addition & 0 deletions vidformer-igni/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub(crate) async fn profile_source(
&source_name,
stream_idx,
&service,
&None, // TODO: Add cache
)
})
.await
Expand Down
2 changes: 1 addition & 1 deletion vidformer-igni/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ pub(crate) async fn get_frame(
let spec = std::sync::Arc::new(std::boxed::Box::new(spec) as Box<dyn vidformer::spec::Spec>);

let filters = crate::server::vod::filters();
let context = vidformer::Context::new(sources, filters);
let context = vidformer::Context::new(sources, filters, None); // TODO: Add cache
let context: std::sync::Arc<vidformer::Context> = std::sync::Arc::new(context);

let dve_config: vidformer::Config = vidformer::Config {
Expand Down
2 changes: 1 addition & 1 deletion vidformer-igni/src/server/vod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ pub(crate) async fn get_segment(
transaction.commit().await?;

let filters = filters();
let context = vidformer::Context::new(sources, filters);
let context = vidformer::Context::new(sources, filters, None); // TODO: Add cache
let context = std::sync::Arc::new(context);

let dve_config: vidformer::Config = vidformer::Config {
Expand Down
20 changes: 13 additions & 7 deletions vidformer/src/av/demuxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{ptr, slice};
struct IoCtx {
canary: u64, // We're doing some unsafe opaque pointer passing, so let's add a canary to make sure we didn't mess up. Valid value is 0xdeadbeef
size: u64,
buf_reader: std::io::BufReader<opendal::StdReader>,
reader: Box<dyn crate::io::ReadSeek>,
err: Option<std::io::Error>,
}

Expand All @@ -22,11 +22,11 @@ unsafe extern "C" fn vidformer_avio_read_packet(
debug_assert_eq!(io_ctx.canary, 0xdeadbeef);

let buf: &mut [u8] = unsafe { slice::from_raw_parts_mut(buf, buf_size as usize) };
let read = io_ctx.buf_reader.read(buf);
let read = io_ctx.reader.read(buf);
match read {
Ok(read) => {
if read == 0 {
debug_assert!(io_ctx.buf_reader.stream_position().unwrap() == io_ctx.size);
debug_assert!(io_ctx.reader.stream_position().unwrap() == io_ctx.size);
ffi::AVERROR_EOF
} else {
read as i32
Expand Down Expand Up @@ -62,7 +62,7 @@ unsafe extern "C" fn vidformer_avio_seek(
_ => panic!("invalid seek whence ({})", whence,),
};

let seeked = io_ctx.buf_reader.seek(whence);
let seeked = io_ctx.reader.seek(whence);
match seeked {
Ok(seeked) => seeked as i64,
Err(e) => {
Expand Down Expand Up @@ -94,6 +94,7 @@ impl Demuxer {
service: &crate::service::Service,
file_size: u64,
io_runtime_handle: &tokio::runtime::Handle,
io_wrapper: &Option<Box<dyn crate::io::IoWrapper>>,
) -> Result<Self, crate::Error> {
let mut format_context = unsafe { ffi::avformat_alloc_context() };
if format_context.is_null() {
Expand Down Expand Up @@ -123,13 +124,16 @@ impl Demuxer {
)));
}
};
// let buffered_reader = std::io::BufReader::new(reader);
let buffered_reader = std::io::BufReader::with_capacity(128 * 1024, reader);

let reader = match io_wrapper {
Some(io_wrapper) => io_wrapper.wrap(Box::new(reader)),
None => Box::new(std::io::BufReader::with_capacity(128 * 1024, reader)),
};

let io_ctx = IoCtx {
canary: 0xdeadbeef,
size: file_size,
buf_reader: buffered_reader,
reader: Box::new(reader),
err: None,
};
let io_ctx = Box::pin(io_ctx);
Expand Down Expand Up @@ -328,6 +332,7 @@ mod test {
"../tos_720p.mp4",
0,
&service,
&None,
)
.unwrap();

Expand All @@ -343,6 +348,7 @@ mod test {
&service,
profile.file_size,
io_runtime.handle(),
&None,
)
.unwrap();

Expand Down
4 changes: 4 additions & 0 deletions vidformer/src/av/framesource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ impl FrameSource {
service: &crate::service::Service,
file_size: u64,
io_runtime_handle: &tokio::runtime::Handle,
io_wrapper: &Option<Box<dyn crate::io::IoWrapper>>,
) -> Result<FrameSource, crate::Error> {
let mut demuxer = crate::av::demuxer::Demuxer::new(
vid_path,
stream,
service,
file_size,
io_runtime_handle,
io_wrapper,
)?;
if !seek_ts.is_zero() {
demuxer.seek(seek_ts)?;
Expand Down Expand Up @@ -124,6 +126,7 @@ mod test {
"../tos_720p.mp4",
0,
&service,
&None,
)
.unwrap();

Expand All @@ -141,6 +144,7 @@ mod test {
&service,
profile.file_size,
io_runtime.handle(),
&None,
)
.unwrap();

Expand Down
10 changes: 9 additions & 1 deletion vidformer/src/dve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::filter::Frame;
use crate::pool::Pool;
use crate::sir;

use crate::io::IoWrapper;
use crate::spec::Spec;
use crate::util;
use log::*;
Expand Down Expand Up @@ -74,6 +75,7 @@ impl std::fmt::Display for SourceRef {
pub struct Context {
pub(crate) sources: BTreeMap<SourceRef, crate::source::SourceVideoStreamMeta>,
pub(crate) filters: BTreeMap<String, Box<dyn crate::filter::Filter>>,
pub(crate) io_wrapper: Option<Box<dyn IoWrapper>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -105,12 +107,17 @@ impl Context {
pub fn new(
source_files: Vec<crate::source::SourceVideoStreamMeta>,
filters: BTreeMap<String, Box<dyn crate::filter::Filter>>,
io_wrapper: Option<Box<dyn IoWrapper>>,
) -> Context {
let mut sources = BTreeMap::new();
for source in source_files {
sources.insert(SourceRef::new(&source.name), source);
}
Context { sources, filters }
Context {
sources,
filters,
io_wrapper,
}
}

pub fn spec_ctx(&self) -> impl crate::spec::SpecContext {
Expand Down Expand Up @@ -219,6 +226,7 @@ pub(crate) fn run_decoder(
stream_service,
stream_meta.file_size,
io_runtime_handle,
&context.io_wrapper,
)?;

while framesource.next_frame()?.is_some() {
Expand Down
10 changes: 10 additions & 0 deletions vidformer/src/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use std::boxed::Box;
use std::io::{Read, Seek};

pub trait ReadSeek: Read + Seek + Send + Sync {}
impl<T: Read + Seek + Send + Sync> ReadSeek for T {}

/// A trait for wrapping I/O Readers.
pub trait IoWrapper: Send + Sync {
fn wrap(&self, r: Box<dyn ReadSeek>) -> Box<dyn ReadSeek>;
}
1 change: 1 addition & 0 deletions vidformer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! * [🧑‍💻 Source Code](https://github.com/ixlab/vidformer/tree/main/vidformer/)
pub mod filter;
pub mod io;
pub mod service;
pub mod sir;
pub mod source;
Expand Down
9 changes: 8 additions & 1 deletion vidformer/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use rusty_ffmpeg::ffi;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
use std::ffi::CStr;
use std::io;

#[derive(Debug, Serialize, Deserialize)]
pub struct SourceFileMeta {
Expand Down Expand Up @@ -35,6 +36,7 @@ impl SourceVideoStreamMeta {
vid_path: &str,
stream: usize,
service: &crate::service::Service,
io_wrapper: &Option<Box<dyn crate::io::IoWrapper>>,
) -> Result<Self, crate::dve::Error> {
let io_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
Expand Down Expand Up @@ -72,6 +74,7 @@ impl SourceVideoStreamMeta {
service,
file_size,
io_runtime.handle(),
io_wrapper,
)?;

let codec_name = unsafe {
Expand Down Expand Up @@ -201,6 +204,7 @@ impl SourceVideoStreamMeta {
vid_path: &str,
stream: usize,
service: &crate::service::Service,
io_wrapper: &Option<Box<dyn crate::io::IoWrapper>>,
) {
let io_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
Expand All @@ -210,7 +214,8 @@ impl SourceVideoStreamMeta {

// First profile
let source_profile =
SourceVideoStreamMeta::profile(source_name, vid_path, stream, service).unwrap();
SourceVideoStreamMeta::profile(source_name, vid_path, stream, service, io_wrapper)
.unwrap();

// Do a full decode
{
Expand All @@ -222,6 +227,7 @@ impl SourceVideoStreamMeta {
service,
source_profile.file_size,
io_runtime.handle(),
io_wrapper,
)
.unwrap();
while framesource.next_frame().unwrap().is_some() {
Expand All @@ -241,6 +247,7 @@ impl SourceVideoStreamMeta {
service,
source_profile.file_size,
io_runtime.handle(),
io_wrapper,
)
.unwrap();

Expand Down
Loading

0 comments on commit 4f9791f

Please sign in to comment.