Skip to content

Commit

Permalink
Upgrade to DF 40 and Arroyo 52 (#702)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Aug 2, 2024
1 parent c076bfe commit 6ff0884
Show file tree
Hide file tree
Showing 47 changed files with 2,044 additions and 1,720 deletions.
1,930 changes: 1,025 additions & 905 deletions Cargo.lock

Large diffs are not rendered by default.

48 changes: 22 additions & 26 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ tonic = { version = "0.11", features = ["zstd"] }
tonic-build = { version = "0.11" }
tonic-web = { version = "0.11" }
tonic-reflection = { version = "0.11" }
arrow = { version = "51.0.0" }
arrow-ord = { version = "51.0.0" }
arrow-array = { version = "51.0.0" }
arrow-schema = { version = "51.0.0" }
arrow-json = { version = "51.0.0" }
object_store = { version = "0.9.1" }
parquet = { version = "51.0.0" }
arrow = { version = "=52.1.0" }
arrow-ord = { version = "=52.1.0" }
arrow-array = { version = "=52.1.0" }
arrow-schema = { version = "=52.1.0" }
arrow-json = { version = "=52.1.0" }
object_store = { version = "0.10" }
parquet = { version = "=52.1.0" }
ahash = { version = "=0.8.7" }
datafusion = { version = "37.1.0" }
datafusion-common = { version = "37.1.0" }
datafusion-proto = { version = "37.1.0" }
datafusion-functions = { version = "37.1.0" }
deltalake = { version = "0.17.3" }
datafusion = { version = "40.0.0" }
datafusion-common = { version = "40.0.0" }
datafusion-proto = { version = "40.0.0" }
datafusion-functions = { version = "40.0.0" }
deltalake = { version = "0.18.2" }
cornucopia = { version = "0.9.0" }
cornucopia_async = {version = "0.6.0"}
deadpool-postgres = "0.12"
Expand All @@ -60,20 +60,16 @@ split-debuginfo = "unpacked"


[patch.crates-io]
deltalake = { git = 'https://github.com/delta-io/delta-rs', rev = 'e75a0b49b40f35ed361444bbea0e5720f359d732' }
typify = { git = 'https://github.com/ArroyoSystems/typify.git', branch = 'arroyo' }
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '51.0.0/parquet_bytes'}
arrow = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '51.0.0/parquet_bytes'}
arrow-buffer = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '51.0.0/parquet_bytes'}
arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '51.0.0/parquet_bytes'}
arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '51.0.0/parquet_bytes'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '51.0.0/json'}
object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'object_store_0.9.1_arroyo'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '37.1.0_arroyo'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '37.1.0_arroyo'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '37.1.0_arroyo'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '37.1.0_arroyo'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '37.1.0_arroyo'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '37.1.0_arroyo'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '37.1.0_arroyo'}
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '52.1.0/parquet_bytes'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '52.1.0/json'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
8 changes: 4 additions & 4 deletions crates/arroyo-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ eventsource-client = "0.12.0"
tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] }

# Webhook
reqwest = "0.11.20"
reqwest = { version = "0.11.20", features = ["stream"] }

# Redis
redis = { version = "0.24.0", features = ["default", "tokio-rustls-comp", "cluster-async", "connection-manager"] }

# Fluvio
fluvio = {version = "0.21", features = ["openssl"]}
fluvio-future = "0.6"
fluvio = {version = "0.23", features = ["openssl"]}
fluvio-future = "0.7"

# Kinesis
aws-sdk-kinesis = { version = "0.21", default-features = false, features = ["rt-tokio", "native-tls"] }
Expand All @@ -75,7 +75,7 @@ uuid = { version = "1.7.0", features = ["v4"] }
# Filesystem
parquet = { workspace = true, features = ["async"]}
object_store = { workspace = true }
deltalake = { workspace = true, features = ["s3", "datafusion"] }
deltalake = { workspace = true, features = ["s3"] }
async-compression = { version = "0.4.3", features = ["tokio", "zstd", "gzip"] }

# MQTT
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-connectors/src/filesystem/sink/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn create_new_table(
let delta_schema: deltalake::kernel::Schema = (schema).try_into()?;
CreateBuilder::new()
.with_log_store(delta_object_store)
.with_columns(delta_schema.fields().clone())
.with_columns(delta_schema.fields().cloned())
.await
.map_err(Into::into)
}
Expand Down Expand Up @@ -199,7 +199,7 @@ async fn commit_to_delta(table: deltalake::DeltaTable, add_actions: Vec<Action>)
partition_by: None,
predicate: None,
},
)?
)
.await?
.version)
}
Expand Down
30 changes: 13 additions & 17 deletions crates/arroyo-connectors/src/filesystem/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ use arroyo_storage::StorageProvider;
use async_trait::async_trait;
use bincode::{Decode, Encode};
use chrono::{DateTime, Utc};
use datafusion::prelude::concat;
use datafusion::{
common::{Column, Result as DFResult},
execution::{
context::{SessionConfig, SessionState},
runtime_env::RuntimeEnv,
},
logical_expr::{
expr::ScalarFunction, BuiltinScalarFunction, Expr, ScalarUDF, ScalarUDFImpl, Signature,
TypeSignature, Volatility,
expr::ScalarFunction, Expr, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility,
},
physical_plan::{ColumnarValue, PhysicalExpr},
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
Expand Down Expand Up @@ -200,14 +200,11 @@ fn partition_string_for_fields_and_time(
) -> Result<Arc<dyn PhysicalExpr>> {
let field_function = field_logical_expression(schema.clone(), partition_fields)?;
let time_function = timestamp_logical_expression(time_partition_pattern)?;
let function = Expr::ScalarFunction(ScalarFunction::new(
BuiltinScalarFunction::Concat,
vec![
time_function,
Expr::Literal(ScalarValue::Utf8(Some("/".to_string()))),
field_function,
],
));
let function = concat(vec![
time_function,
Expr::Literal(ScalarValue::Utf8(Some("/".to_string()))),
field_function,
]);
compile_expression(&function, schema)
}

Expand Down Expand Up @@ -239,8 +236,7 @@ fn field_logical_expression(schema: ArroyoSchemaRef, partition_fields: &[String]
Ok((field.name(), expr))
})
.collect::<Result<Vec<_>>>()?;
let function = Expr::ScalarFunction(ScalarFunction::new(
BuiltinScalarFunction::Concat,
let function = concat(
columns_as_string
.into_iter()
.enumerate()
Expand All @@ -253,7 +249,7 @@ fn field_logical_expression(schema: ArroyoSchemaRef, partition_fields: &[String]
vec![Expr::Literal(ScalarValue::Utf8(Some(preamble))), expr]
})
.collect(),
));
);
Ok(function)
}

Expand Down Expand Up @@ -1071,7 +1067,7 @@ where
type BoxedTryFuture<T> = Pin<Box<dyn Future<Output = Result<T>> + Send>>;

struct MultipartManager {
object_store: Arc<StorageProvider>,
storage_provider: Arc<StorageProvider>,
location: Path,
partition: Option<String>,
multipart_id: Option<MultipartId>,
Expand All @@ -1085,7 +1081,7 @@ struct MultipartManager {
impl MultipartManager {
fn new(object_store: Arc<StorageProvider>, location: Path, partition: Option<String>) -> Self {
Self {
object_store,
storage_provider: object_store,
location,
partition,
multipart_id: None,
Expand Down Expand Up @@ -1141,7 +1137,7 @@ impl MultipartManager {
.multipart_id
.clone()
.ok_or_else(|| anyhow::anyhow!("missing multipart id"))?;
let object_store = self.object_store.clone();
let object_store = self.storage_provider.clone();
Ok(Box::pin(async move {
let upload_part = object_store
.add_multipart(
Expand All @@ -1164,7 +1160,7 @@ impl MultipartManager {
fn get_initialize_multipart_future(
&mut self,
) -> Result<BoxedTryFuture<MultipartCallbackWithName>> {
let object_store = self.object_store.clone();
let object_store = self.storage_provider.clone();
let location = self.location.clone();
Ok(Box::pin(async move {
let multipart_id = object_store.start_multipart(&location).await?;
Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-connectors/src/fluvio/sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arrow::array::RecordBatch;
use async_trait::async_trait;
use fluvio::{Fluvio, FluvioConfig, TopicProducer};
use fluvio::{Fluvio, FluvioConfig, TopicProducerPool};

use arroyo_formats::ser::ArrowSerializer;
use tracing::info;
Expand All @@ -12,7 +12,7 @@ use arroyo_types::CheckpointBarrier;
pub struct FluvioSinkFunc {
pub topic: String,
pub endpoint: Option<String>,
pub producer: Option<TopicProducer>,
pub producer: Option<TopicProducerPool>,
pub serializer: ArrowSerializer,
}

Expand Down Expand Up @@ -56,7 +56,7 @@ impl ArrowOperator for FluvioSinkFunc {
}

impl FluvioSinkFunc {
async fn get_producer(&mut self) -> anyhow::Result<TopicProducer> {
async fn get_producer(&mut self) -> anyhow::Result<TopicProducerPool> {
info!("Creating fluvio producer for {:?}", self.endpoint);

let config: Option<FluvioConfig> = self.endpoint.as_ref().map(FluvioConfig::new);
Expand Down
5 changes: 5 additions & 0 deletions crates/arroyo-datastream/src/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use arroyo_rpc::grpc::api;
use arroyo_rpc::grpc::api::{
ArrowDylibUdfConfig, ArrowProgram, ArrowProgramConfig, ConnectorOp, EdgeType,
};
use petgraph::dot::Dot;
use petgraph::graph::DiGraph;
use petgraph::prelude::EdgeRef;
use petgraph::Direction;
Expand Down Expand Up @@ -219,6 +220,10 @@ impl LogicalProgram {
}
}

pub fn dot(&self) -> String {
format!("{:?}", Dot::with_config(&self.graph, &[]))
}

pub fn task_count(&self) -> usize {
// TODO: this can be cached
self.graph.node_weights().map(|nw| nw.parallelism).sum()
Expand Down
18 changes: 12 additions & 6 deletions crates/arroyo-formats/src/avro/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ mod tests {
use crate::de::ArrowDeserializer;
use arrow_array::builder::{make_builder, ArrayBuilder};
use arrow_array::RecordBatch;
use arrow_json::writer::record_batch_to_vec;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{AvroFormat, BadData, Format};
Expand Down Expand Up @@ -309,13 +310,16 @@ mod tests {
deserializer.flush_buffer().unwrap().unwrap()
};

#[allow(deprecated)]
arrow_json::writer::record_batches_to_json_rows(&[&batch])
record_batch_to_vec(&batch, true, arrow_json::writer::TimestampFormat::RFC3339)
.unwrap()
.into_iter()
.map(|mut r| {
r.remove("_timestamp");
r
.iter()
.map(|b| {
serde_json::from_slice::<serde_json::Map<String, serde_json::Value>>(b.as_slice())
.unwrap()
})
.map(|mut f| {
f.remove("_timestamp");
f
})
.collect()
}
Expand Down Expand Up @@ -405,6 +409,7 @@ mod tests {
json!([{
"name": "Alyssa",
"favorite_number": 256,
"favorite_color": null,
}])
);
}
Expand Down Expand Up @@ -459,6 +464,7 @@ mod tests {
json!([{
"name": "Alyssa",
"favorite_number": 256,
"favorite_color": null,
"removed_field": "hello!"
}])
);
Expand Down
5 changes: 4 additions & 1 deletion crates/arroyo-operator/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,10 @@ impl ArrowCollector {
c.inc_by(record.get_array_memory_size() as u64)
});

let out_schema = self.out_schema.as_ref().unwrap();
let out_schema = self
.out_schema
.as_ref()
.unwrap_or_else(|| panic!("No out-schema in {}!", self.task_info.operator_name));

let record = if let Some(projection) = &self.projection {
record.project(projection).unwrap_or_else(|e| {
Expand Down
9 changes: 7 additions & 2 deletions crates/arroyo-operator/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use async_trait::async_trait;
use datafusion::common::{DataFusionError, Result as DFResult};
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::expr_rewriter::FunctionRewrite;
use datafusion::logical_expr::planner::ExprPlanner;
use datafusion::logical_expr::{
create_udaf, AggregateUDF, ScalarUDF, Signature, TypeSignature, Volatility, WindowUDF,
};
Expand All @@ -31,7 +32,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::Barrier;
use tokio_stream::StreamExt;
use tracing::{debug, error, info, warn, Instrument};
use tracing::{debug, error, info, trace, warn, Instrument};

pub trait OperatorConstructor: Send {
type ConfigT: prost::Message + Default;
Expand Down Expand Up @@ -228,7 +229,7 @@ async fn operator_run_behavior(
Some(((idx, message), s)) => {
let local_idx = idx;

debug!("[{}] Handling message {}-{}, {:?}",
trace!("[{}] Handling message {}-{}, {:?}",
ctx.task_info.operator_name, 0, local_idx, message);

match message {
Expand Down Expand Up @@ -707,4 +708,8 @@ impl FunctionRegistry for Registry {
fn register_udwf(&mut self, udwf: Arc<WindowUDF>) -> DFResult<Option<Arc<WindowUDF>>> {
Ok(self.udwfs.insert(udwf.name().to_string(), udwf))
}

fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
vec![]
}
}
8 changes: 0 additions & 8 deletions crates/arroyo-operator/src/udfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use arrow::array::cast::as_list_array;
use arrow::array::{new_empty_array, Array, ArrayRef, ListArray};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::{DataType, FieldRef, IntervalUnit, TimeUnit};
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use arroyo_udf_host::SyncUdfDylib;
use datafusion::common::{Result, ScalarValue};
use datafusion::logical_expr::Accumulator;
Expand Down Expand Up @@ -208,10 +207,3 @@ fn scalar_none(datatype: &DataType) -> ScalarValue {
| DataType::LargeListView(_) => unimplemented!("views are not supported"),
}
}

#[repr(C)]
#[derive(Debug)]
pub struct FfiArraySchemaPair(FFI_ArrowArray, FFI_ArrowSchema);

#[repr(C)]
pub struct FfiArrayResult(FFI_ArrowArray, FFI_ArrowSchema, bool);
Loading

0 comments on commit 6ff0884

Please sign in to comment.