Skip to content

Commit

Permalink
Add frame block compression
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikWin committed Jan 25, 2025
1 parent 2c944c9 commit 38f1aef
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 56 deletions.
3 changes: 2 additions & 1 deletion vidformer-igni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ regex = "1"
"toml" = "0.8.19"
"rand" = "0.8.5"
zstd = "0.13"
base64 = "0.22.1"
base64 = "0.22.1"
flate2 = "1.0"
2 changes: 1 addition & 1 deletion vidformer-igni/init/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ CREATE TABLE spec (
CREATE TABLE spec_t (
spec_id UUID REFERENCES spec(id) ON DELETE CASCADE,
pos INT NOT NULL,
frame TEXT,
frame BYTEA,
PRIMARY KEY (spec_id, pos)
);

Expand Down
47 changes: 40 additions & 7 deletions vidformer-igni/src/frame_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl InlineLiteral {
match self {
InlineLiteral::Int(i) => {
// Upper byte should be 0x00, lowest 32 bits should be the integer
0x00000000_00000000 | (*i as i64)
0x00000000_00000000 | (*i as i64 & 0xFFFFFFFF)
}
InlineLiteral::Bool(b) => {
// Upper byte should be 0x01, lowest bit should be the boolean value
Expand All @@ -35,15 +35,18 @@ impl InlineLiteral {
}
InlineLiteral::ListIntSingle(i) => {
// Upper byte should be 0x04, lowest 16 bits should be the integer
0x04000000_00000000 | (*i as i64)
0x04000000_00000000 | (*i as i64 & 0xFFFF)
}
InlineLiteral::ListIntPair(i1, i2) => {
// Upper byte should be 0x05, lowest 32 bits should be the two integers
0x05000000_00000000 | (*i1 as i64) << 16 | *i2 as i64
0x05000000_00000000 | (*i1 as i64 & 0xFFFF) << 16 | (*i2 as i64 & 0xFFFF)
}
InlineLiteral::ListIntTriple(i1, i2, i3) => {
// Upper byte should be 0x06, lowest 48 bits should be the three integers
0x06000000_00000000 | (*i1 as i64) << 32 | (*i2 as i64) << 16 | *i3 as i64
0x06000000_00000000
| (*i1 as i64 & 0xFFFF) << 32
| (*i2 as i64 & 0xFFFF) << 16
| (*i3 as i64 & 0xFFFF)
}
}
}
Expand Down Expand Up @@ -229,7 +232,7 @@ fn expr_coded_as_scalar(expr: &vidformer::sir::Expr) -> bool {
vidformer::sir::Expr::Frame(vidformer::sir::FrameExpr::Source(_)) => true,
vidformer::sir::Expr::Frame(vidformer::sir::FrameExpr::Filter(_)) => false,
vidformer::sir::Expr::Data(vidformer::sir::DataExpr::Bool(_)) => true,
vidformer::sir::Expr::Data(vidformer::sir::DataExpr::Int(i)) => true,
vidformer::sir::Expr::Data(vidformer::sir::DataExpr::Int(_)) => true,
vidformer::sir::Expr::Data(vidformer::sir::DataExpr::Float(_)) => true,
vidformer::sir::Expr::Data(vidformer::sir::DataExpr::List(list)) => {
if list.len() > 3 {
Expand Down Expand Up @@ -267,7 +270,7 @@ pub struct FrameBlock {
}

impl FrameBlock {
fn new() -> Self {
pub fn new() -> Self {
FrameBlock {
functions: Vec::new(),
literals: Vec::new(),
Expand All @@ -279,7 +282,7 @@ impl FrameBlock {
}
}

fn insert_frame(&mut self, frame: &vidformer::sir::FrameExpr) -> Result<(), String> {
pub fn insert_frame(&mut self, frame: &vidformer::sir::FrameExpr) -> Result<(), String> {
let expr_idx = self.insert_frame_expr(frame)?;
self.frame_exprs.push(expr_idx as i64);
Ok(())
Expand Down Expand Up @@ -844,6 +847,36 @@ mod test {
assert_eq!(frame_exprs, frame_block.frames().unwrap());
}

#[test]
fn test_negative_ints() {
let mut frame_block = FrameBlock::new();
let frame_expr = vidformer::sir::FrameExpr::Filter(vidformer::sir::FilterExpr {
name: "filter".to_string(),
args: vec![
vidformer::sir::Expr::Data(vidformer::sir::DataExpr::Int(-3)),
vidformer::sir::Expr::Data(vidformer::sir::DataExpr::Int(-100_000_000_000)),
vidformer::sir::Expr::Data(vidformer::sir::DataExpr::List(vec![
vidformer::sir::DataExpr::Int(-8),
vidformer::sir::DataExpr::Int(-1024),
])),
vidformer::sir::Expr::Data(vidformer::sir::DataExpr::List(vec![
vidformer::sir::DataExpr::Int(-1),
vidformer::sir::DataExpr::Int(-2),
vidformer::sir::DataExpr::Int(-3),
])),
vidformer::sir::Expr::Data(vidformer::sir::DataExpr::List(vec![
vidformer::sir::DataExpr::Int(-1),
vidformer::sir::DataExpr::Int(-2000),
vidformer::sir::DataExpr::Int(-3),
vidformer::sir::DataExpr::Int(-8192),
])),
],
kwargs: std::collections::BTreeMap::new(),
});
frame_block.insert_frame(&frame_expr).unwrap();
assert_eq!(vec![frame_expr], frame_block.frames().unwrap());
}

#[test]
fn test_lists() {
// Insert 5 frames, each with n number of elements in a list argument to a filter
Expand Down
7 changes: 6 additions & 1 deletion vidformer-igni/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,18 @@ async fn igni_http_req_error_handler(
req: hyper::Request<impl hyper::body::Body>,
global: std::sync::Arc<IgniServerGlobal>,
) -> Result<hyper::Response<http_body_util::Full<hyper::body::Bytes>>, std::convert::Infallible> {
let method_copy_for_err = req.method().clone();
let uri_copy_for_err = req.uri().clone();
match igni_http_req(req, global).await {
Ok(ok) => Ok(ok),
Err(err) => {
// An error occured which is not an explicitly handled error
// Log the error and return a 500 response
// Do not leak the error to the client
error!("Error handling request: {:?}", err);
error!(
"Error handling request {} {}: {:?}",
method_copy_for_err, &uri_copy_for_err, err
);
Ok(hyper::Response::builder()
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
.body(http_body_util::Full::new(hyper::body::Bytes::from(
Expand Down
41 changes: 30 additions & 11 deletions vidformer-igni/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use std::collections::BTreeSet;

use super::super::IgniError;
use crate::schema;
use base64::prelude::*;
use http_body_util::BodyExt;
use log::*;
use num_rational::Rational64;
use std::io::Read;
use uuid::Uuid;
use base64::prelude::*;

use super::IgniServerGlobal;

Expand Down Expand Up @@ -674,7 +675,7 @@ pub(crate) async fn push_part_block(
#[derive(serde::Deserialize, serde::Serialize)]
struct RequestBlock {
frames: i32,
compression: String,
compression: Option<String>,
body: String,
}

Expand Down Expand Up @@ -723,10 +724,10 @@ pub(crate) async fn push_part_block(
Ok(body_bytes) => body_bytes,
};

let body_uncompresed = match block.compression.as_str() {
"none" => body_bytes,
"zstd" => {
let reader = std::io::BufReader::new(body_bytes.as_slice());
let body_uncompresed = match block.compression.as_deref() {
None => body_bytes,
Some("zstd") => {
let reader = std::io::Cursor::new(body_bytes.as_slice());
let body_uncompresed = zstd::stream::decode_all(reader);
match body_uncompresed {
Ok(body_uncompresed) => body_uncompresed,
Expand All @@ -739,11 +740,20 @@ pub(crate) async fn push_part_block(
}
}
}
_ => {
Some("gzip") => {
let reader = std::io::Cursor::new(body_bytes.as_slice());
let mut decoder = flate2::read::GzDecoder::new(reader);
let mut body_uncompresed = Vec::new();
decoder.read_to_end(&mut body_uncompresed).map_err(|err| {
IgniError::General(format!("Error decompressing block: {}", err))
})?;
body_uncompresed
}
Some(_) => {
return Ok(hyper::Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(http_body_util::Full::new(hyper::body::Bytes::from(
"Invalid block compression algorithm (only 'none' and 'zstd' are supported)",
"Invalid block compression algorithm (only null, 'gzip' 'zstd' are supported)",
)))?);
}
};
Expand Down Expand Up @@ -819,15 +829,24 @@ async fn push_frame_req(
// stage inserted rows before beginning transaction
let insert_spec_ids = vec![spec_id; req.2.len()];
let mut insert_pos: Vec<i32> = Vec::with_capacity(req.2.len());
let mut insert_frames: Vec<Option<String>> = Vec::with_capacity(req.2.len());
let mut insert_frames: Vec<Option<Vec<u8>>> = Vec::with_capacity(req.2.len());

let mut referenced_source_frames = BTreeSet::new();
for (frame_idx, ((_numer, _denom), frame)) in req.2.iter().enumerate() {
// TODO: Check numer and denom are correct?

insert_pos.push(pos + frame_idx as i32);
if let Some(expr) = frame {
insert_frames.push(Some(serde_json::to_string(expr).unwrap()));
let mut feb = crate::frame_block::FrameBlock::new();
feb.insert_frame(expr).map_err(|err| {
IgniError::General(format!("Error inserting value to FEB: {:?}", err))
})?;
let feb_json: Vec<u8> = serde_json::to_vec(&feb)
.map_err(|err| IgniError::General(format!("Error serializing FEB: {:?}", err)))?;
let feb_json_reader = std::io::BufReader::new(feb_json.as_slice());
let feb_compressed = zstd::stream::encode_all(feb_json_reader, 0)
.map_err(|err| IgniError::General(format!("Error compressing FEB: {:?}", err)))?;
insert_frames.push(Some(feb_compressed));
} else {
if let Some(err) = user.permissions.flag_err("spec:deferred_frames") {
// Block deferred frames unless explicitly allowed
Expand Down Expand Up @@ -1044,7 +1063,7 @@ async fn push_frame_req(

if !insert_spec_ids.is_empty() {
// Only insert if there are frames to insert
sqlx::query("INSERT INTO spec_t (spec_id, pos, frame) VALUES (UNNEST($1::UUID[]), UNNEST($2::INT[]), UNNEST($3::TEXT[]))")
sqlx::query("INSERT INTO spec_t (spec_id, pos, frame) VALUES (UNNEST($1::UUID[]), UNNEST($2::INT[]), UNNEST($3::BYTEA[]))")
.bind(&insert_spec_ids)
.bind(&insert_pos)
.bind(&insert_frames)
Expand Down
29 changes: 17 additions & 12 deletions vidformer-igni/src/server/vod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,7 @@ pub(crate) async fn get_segment(
) -> Result<hyper::Response<http_body_util::Full<hyper::body::Bytes>>, IgniError> {
let spec_id = Uuid::parse_str(spec_id).unwrap();

// Check that the spec exists, is not closed, and grab the width, height, and pix_fmt
let mut transaction = global.pool.begin().await?;
// let row: Option<(i32, i32, String, bool)> =
// sqlx::query_as("SELECT width, height, pix_fmt, closed FROM spec WHERE id = $1")
// .bind(spec_id)
// .fetch_optional(&mut *transaction)
// .await?;

let row: Option<schema::SpecRow> = sqlx::query_as("SELECT * FROM spec WHERE id = $1")
.bind(spec_id)
Expand Down Expand Up @@ -259,7 +253,7 @@ pub(crate) async fn get_segment(
let last_t = first_t + segment.n_frames - 1;

// Get the frames from spec_t that are in the segment (pos between first_t and last_t)
let rows: Vec<(i32, String)> = sqlx::query_as(
let rows: Vec<(i32, Vec<u8>)> = sqlx::query_as(
"SELECT pos, frame FROM spec_t WHERE spec_id = $1 AND pos BETWEEN $2 AND $3 ORDER BY pos",
)
.bind(spec_id)
Expand All @@ -278,11 +272,22 @@ pub(crate) async fn get_segment(
let start = *times.first().unwrap();
let end = *times.last().unwrap();

// Map json values to FrameExpr
let frames: Vec<vidformer::sir::FrameExpr> = rows
.iter()
.map(|(_, frame)| serde_json::from_str(frame).unwrap())
.collect();
let frames = {
let mut out = vec![];
for (_, frame) in rows {
let frame_reader = std::io::Cursor::new(frame);
let frame_uncompressed = zstd::stream::decode_all(frame_reader).unwrap();
let feb: crate::frame_block::FrameBlock =
serde_json::from_slice(&frame_uncompressed).unwrap();
let mut f_collection = feb.frames().map_err(|err| {
IgniError::General(format!("Error decoding frame block: {:?}", err))
})?;
assert_eq!(f_collection.len(), 1);
let f = f_collection.remove(0);
out.push(f);
}
out
};

struct IgniSpec {
times: Vec<num_rational::Ratio<i64>>,
Expand Down
46 changes: 27 additions & 19 deletions vidformer-py/vidformer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ def insert_data_expr(self, data):
return len(self._exprs) - 1
elif type(data) is int:
if data >= -(2**31) and data < 2**31:
self._exprs.append(data)
self._exprs.append(data & 0xFFFFFFFF)
else:
self._literals.append(data)
self._literals.append(_json_arg(data, skip_data_anot=True))
self._exprs.append(0x40000000_00000000 | len(self._literals) - 1)
return len(self._exprs) - 1
elif type(data) is float:
self._exprs.append(
0x02000000_00000000 | int.from_bytes(struct.pack("f", data))
0x02000000_00000000 | int.from_bytes(struct.pack("f", data)[::-1])
)
elif type(data) is str:
self._literals.append(data)
self._literals.append(_json_arg(data, skip_data_anot=True))
self._exprs.append(0x40000000_00000000 | len(self._literals) - 1)
elif type(data) is bytes:
self._literals.append(data)
self._literals.append(_json_arg(data, skip_data_anot=True))
self._exprs.append(0x40000000_00000000 | len(self._literals) - 1)
elif type(data) is list:
if len(data) == 0:
Expand All @@ -250,7 +250,7 @@ def insert_data_expr(self, data):
and data[0] >= -(2**15)
and data[0] < 2**15
):
self._exprs.append(0x04000000_00000000 | data[0])
self._exprs.append(0x04000000_00000000 | (data[0] & 0xFFFF))
return len(self._exprs) - 1
if (
len(data) == 2
Expand All @@ -261,7 +261,11 @@ def insert_data_expr(self, data):
and data[1] >= -(2**15)
and data[1] < 2**15
):
self._exprs.append(0x05000000_00000000 | (data[0] << 16) | data[1])
self._exprs.append(
0x05000000_00000000
| ((data[0] & 0xFFFF) << 16)
| (data[1] & 0xFFFF)
)
return len(self._exprs) - 1
if (
len(data) == 3
Expand All @@ -276,18 +280,21 @@ def insert_data_expr(self, data):
and data[2] < 2**15
):
self._exprs.append(
0x06000000_00000000 | (data[0] << 32) | (data[1] << 16) | data[2]
0x06000000_00000000
| ((data[0] & 0xFFFF) << 32)
| ((data[1] & 0xFFFF) << 16)
| (data[2] & 0xFFFF)
)
return len(self._exprs) - 1
out = len(self._exprs)
member_idxs = []
for member in data:
if _feb_expr_coded_as_scalar(member):
member_idxs.append(self.insert_data_expr(member))
else:
member_idxs.append(None)
else:
member_idxs.append(self.insert_data_expr(member))

self._exprs.append(0x42000000_00000000 | len(data) - 1)
self._exprs.append(0x42000000_00000000 | len(data))

for i in range(len(data)):
if member_idxs[i] is None:
Expand Down Expand Up @@ -665,29 +672,30 @@ def push_spec_part(self, spec_id, pos, frames, terminal):
response = response.json()
assert response["status"] == "ok"

def push_spec_part_block(self, spec_id: str, pos, blocks, terminal):
def push_spec_part_block(
self, spec_id: str, pos, blocks, terminal, compression="gzip"
):
if type(spec_id) is IgniSpec:
spec_id = spec_id._id
assert type(spec_id) is str
assert type(pos) is int
assert type(blocks) is list
assert type(terminal) is bool
assert compression is None or compression == "gzip"

req_blocks = []
for block in blocks:
assert type(block) is _FrameExpressionBlock
block_body = block.as_dict()
block_body["literals"] = [
_json_arg(lit, skip_data_anot=True) for lit in block_body["literals"]
]
block_frames = len(block_body["frame_exprs"])
block_body = base64.b64encode(
json.dumps(block_body).encode("utf-8")
).decode("utf-8")
block_body = json.dumps(block_body).encode("utf-8")
if compression == "gzip":
block_body = gzip.compress(block_body, 1)
block_body = base64.b64encode(block_body).decode("utf-8")
req_blocks.append(
{
"frames": block_frames,
"compression": "none",
"compression": compression,
"body": block_body,
}
)
Expand Down
Loading

0 comments on commit 38f1aef

Please sign in to comment.