diff --git a/docs/source/user-guide/python.md b/docs/source/user-guide/python.md index 674850c70..f17ac68d4 100644 --- a/docs/source/user-guide/python.md +++ b/docs/source/user-guide/python.md @@ -28,9 +28,20 @@ popular file formats files, run it in a distributed environment, and obtain the The following code demonstrates how to create a Ballista context and connect to a scheduler. +If you are running a standalone cluster (runs locally), all you need to do is call the stand alone cluster method `standalone()` or your BallistaContext. If you are running a cluster in remote mode, you need to provide the URL `Ballista.remote("http://my-remote-ip:50050")`. + ```text ->>> import ballista ->>> ctx = ballista.BallistaContext("localhost", 50050) +>>> from ballista import BallistaBuilder +>>> # for a standalone instance +>>> # Ballista will initiate with an empty config +>>> # set config variables with `config()` +>>> ballista = BallistaBuilder()\ +>>> .config("ballista.job.name", "example ballista") +>>> +>>> ctx = ballista.standalone() +>>> +>>> # for a remote instance provide the URL +>>> ctx = ballista.remote("df://url-path-to-scheduler:50050") ``` ## SQL @@ -103,14 +114,15 @@ The `explain` method can be used to show the logical and physical query plans fo The following example demonstrates creating arrays with PyArrow and then creating a Ballista DataFrame. ```python -import ballista +from ballista import BallistaBuilder import pyarrow # an alias +# TODO implement Functions f = ballista.functions # create a context -ctx = ballista.BallistaContext("localhost", 50050) +ctx = Ballista().standalone() # create a RecordBatch and a new DataFrame from it batch = pyarrow.RecordBatch.from_arrays( diff --git a/python/Cargo.toml b/python/Cargo.toml index dbe419d28..b03f1e997 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -26,14 +26,14 @@ readme = "README.md" license = "Apache-2.0" edition = "2021" rust-version = "1.72" -include = ["/src", "/pyballista", "/LICENSE.txt", "pyproject.toml", "Cargo.toml", "Cargo.lock"] +include = ["/src", "/ballista", "/LICENSE.txt", "pyproject.toml", "Cargo.toml", "Cargo.lock"] publish = false [dependencies] async-trait = "0.1.77" -ballista = { path = "../ballista/client", version = "0.12.0" } +ballista = { path = "../ballista/client", version = "0.12.0", features = ["standalone"] } ballista-core = { path = "../ballista/core", version = "0.12.0" } -datafusion = { version = "42" } +datafusion = { version = "42", features = ["pyarrow", "avro"] } datafusion-proto = { version = "42" } datafusion-python = { version = "42" } @@ -43,6 +43,4 @@ tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread", "sync [lib] crate-type = ["cdylib"] -name = "pyballista" - - +name = "ballista" diff --git a/python/README.md b/python/README.md index 2898cb165..01b0a7f90 100644 --- a/python/README.md +++ b/python/README.md @@ -29,8 +29,8 @@ part of the default Cargo workspace so that it doesn't cause overhead for mainta Creates a new context and connects to a Ballista scheduler process. ```python -from pyballista import SessionContext ->>> ctx = SessionContext("localhost", 50050) +from ballista import BallistaBuilder +>>> ctx = BallistaBuilder().standalone() ``` ## Example SQL Usage diff --git a/python/pyballista/__init__.py b/python/ballista/__init__.py similarity index 92% rename from python/pyballista/__init__.py rename to python/ballista/__init__.py index 62a6bc790..a143f17e9 100644 --- a/python/pyballista/__init__.py +++ b/python/ballista/__init__.py @@ -25,12 +25,12 @@ import pyarrow as pa -from .pyballista_internal import ( - SessionContext, +from .ballista_internal import ( + BallistaBuilder, ) __version__ = importlib_metadata.version(__name__) __all__ = [ - "SessionContext", -] + "BallistaBuilder", +] \ No newline at end of file diff --git a/python/pyballista/tests/__init__.py b/python/ballista/tests/__init__.py similarity index 100% rename from python/pyballista/tests/__init__.py rename to python/ballista/tests/__init__.py diff --git a/python/pyballista/tests/test_context.py b/python/ballista/tests/test_context.py similarity index 81% rename from python/pyballista/tests/test_context.py rename to python/ballista/tests/test_context.py index b440bb270..a0af9592b 100644 --- a/python/pyballista/tests/test_context.py +++ b/python/ballista/tests/test_context.py @@ -15,27 +15,27 @@ # specific language governing permissions and limitations # under the License. -from pyballista import SessionContext +from ballista import BallistaBuilder import pytest def test_create_context(): - SessionContext("localhost", 50050) + BallistaBuilder().standalone() def test_select_one(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() df = ctx.sql("SELECT 1") batches = df.collect() assert len(batches) == 1 def test_read_csv(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() df = ctx.read_csv("testdata/test.csv", has_header=True) batches = df.collect() assert len(batches) == 1 assert len(batches[0]) == 1 def test_register_csv(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() ctx.register_csv("test", "testdata/test.csv", has_header=True) df = ctx.sql("SELECT * FROM test") batches = df.collect() @@ -43,14 +43,14 @@ def test_register_csv(): assert len(batches[0]) == 1 def test_read_parquet(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() df = ctx.read_parquet("testdata/test.parquet") batches = df.collect() assert len(batches) == 1 assert len(batches[0]) == 8 def test_register_parquet(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() ctx.register_parquet("test", "testdata/test.parquet") df = ctx.sql("SELECT * FROM test") batches = df.collect() @@ -58,7 +58,7 @@ def test_register_parquet(): assert len(batches[0]) == 8 def test_read_dataframe_api(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() df = ctx.read_csv("testdata/test.csv", has_header=True) \ .select_columns('a', 'b') \ .limit(1) @@ -67,11 +67,12 @@ def test_read_dataframe_api(): assert len(batches[0]) == 1 def test_execute_plan(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() df = ctx.read_csv("testdata/test.csv", has_header=True) \ .select_columns('a', 'b') \ .limit(1) - df = ctx.execute_logical_plan(df.logical_plan()) + # TODO research SessionContext Logical Plan for DataFusionPython + #df = ctx.execute_logical_plan(df.logical_plan()) batches = df.collect() assert len(batches) == 1 assert len(batches[0]) == 1 diff --git a/python/examples/example.py b/python/examples/example.py new file mode 100644 index 000000000..61a9abbd2 --- /dev/null +++ b/python/examples/example.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from ballista import BallistaBuilder +from datafusion.context import SessionContext + +# Ballista will initiate with an empty config +# set config variables with `config` +ctx: SessionContext = BallistaBuilder()\ + .config("ballista.job.name", "example ballista")\ + .config("ballista.shuffle.partitions", "16")\ + .standalone() + +#ctx_remote: SessionContext = ballista.remote("remote_ip", 50050) + +# Select 1 to verify its working +ctx.sql("SELECT 1").show() +#ctx_remote.sql("SELECT 2").show() \ No newline at end of file diff --git a/python/pyproject.toml b/python/pyproject.toml index dbb76e59d..2d06b225d 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -20,7 +20,7 @@ requires = ["maturin>=0.15,<0.16"] build-backend = "maturin" [project] -name = "pyballista" +name = "ballista" description = "Python client for Apache Arrow Ballista Distributed SQL Query Engine" readme = "README.md" license = {file = "LICENSE.txt"} @@ -55,10 +55,10 @@ repository = "https://github.com/apache/arrow-ballista" profile = "black" [tool.maturin] -module-name = "pyballista.pyballista_internal" +module-name = "ballista.ballista_internal" include = [ { path = "Cargo.lock", format = "sdist" } ] exclude = [".github/**", "ci/**", ".asf.yaml"] # Require Cargo.lock is up to date -locked = true +locked = true \ No newline at end of file diff --git a/python/src/context.rs b/python/src/context.rs deleted file mode 100644 index d27d5314b..000000000 --- a/python/src/context.rs +++ /dev/null @@ -1,353 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::utils::to_pyerr; -use datafusion::logical_expr::SortExpr; -use pyo3::exceptions::PyValueError; -use pyo3::prelude::*; -use std::path::PathBuf; - -use ballista::prelude::*; -use datafusion::arrow::datatypes::Schema; -use datafusion::arrow::pyarrow::PyArrowType; -use datafusion::prelude::*; -use datafusion_python::catalog::PyTable; -use datafusion_python::context::{ - convert_table_partition_cols, parse_file_compression_type, -}; -use datafusion_python::dataframe::PyDataFrame; -use datafusion_python::errors::DataFusionError; -use datafusion_python::expr::sort_expr::PySortExpr; -use datafusion_python::sql::logical::PyLogicalPlan; -use datafusion_python::utils::wait_for_future; - -/// PyBallista session context. This is largely a duplicate of -/// DataFusion's PySessionContext, with the main difference being -/// that this operates on a BallistaContext instead of DataFusion's -/// SessionContext. We could probably add extra extension points to -/// DataFusion to allow for a pluggable context and remove much of -/// this code. -#[pyclass(name = "SessionContext", module = "pyballista", subclass)] -pub struct PySessionContext { - ctx: BallistaContext, -} - -#[pymethods] -impl PySessionContext { - /// Create a new SessionContext by connecting to a Ballista scheduler process. - #[new] - pub fn new(host: &str, port: u16, py: Python) -> PyResult { - let config = BallistaConfig::default(); - let ballista_context = BallistaContext::remote(host, port, &config); - let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; - Ok(Self { ctx }) - } - - pub fn sql(&mut self, query: &str, py: Python) -> PyResult { - let result = self.ctx.sql(query); - let df = wait_for_future(py, result)?; - Ok(PyDataFrame::new(df)) - } - - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (path, schema=None, table_partition_cols=vec![], file_extension=".avro"))] - pub fn read_avro( - &self, - path: &str, - schema: Option>, - table_partition_cols: Vec<(String, String)>, - file_extension: &str, - py: Python, - ) -> PyResult { - let mut options = AvroReadOptions::default() - .table_partition_cols(convert_table_partition_cols(table_partition_cols)?); - options.file_extension = file_extension; - let df = if let Some(schema) = schema { - options.schema = Some(&schema.0); - let read_future = self.ctx.read_avro(path, options); - wait_for_future(py, read_future).map_err(DataFusionError::from)? - } else { - let read_future = self.ctx.read_avro(path, options); - wait_for_future(py, read_future).map_err(DataFusionError::from)? - }; - Ok(PyDataFrame::new(df)) - } - - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = ( - path, - schema=None, - has_header=true, - delimiter=",", - schema_infer_max_records=1000, - file_extension=".csv", - table_partition_cols=vec![], - file_compression_type=None))] - pub fn read_csv( - &self, - path: PathBuf, - schema: Option>, - has_header: bool, - delimiter: &str, - schema_infer_max_records: usize, - file_extension: &str, - table_partition_cols: Vec<(String, String)>, - file_compression_type: Option, - py: Python, - ) -> PyResult { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - - let delimiter = delimiter.as_bytes(); - if delimiter.len() != 1 { - return Err(PyValueError::new_err( - "Delimiter must be a single character", - )); - }; - - let mut options = CsvReadOptions::new() - .has_header(has_header) - .delimiter(delimiter[0]) - .schema_infer_max_records(schema_infer_max_records) - .file_extension(file_extension) - .table_partition_cols(convert_table_partition_cols(table_partition_cols)?) - .file_compression_type(parse_file_compression_type(file_compression_type)?); - - if let Some(py_schema) = schema { - options.schema = Some(&py_schema.0); - let result = self.ctx.read_csv(path, options); - let df = PyDataFrame::new(wait_for_future(py, result)?); - Ok(df) - } else { - let result = self.ctx.read_csv(path, options); - let df = PyDataFrame::new(wait_for_future(py, result)?); - Ok(df) - } - } - - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (path, schema=None, schema_infer_max_records=1000, file_extension=".json", table_partition_cols=vec![], file_compression_type=None))] - pub fn read_json( - &mut self, - path: PathBuf, - schema: Option>, - schema_infer_max_records: usize, - file_extension: &str, - table_partition_cols: Vec<(String, String)>, - file_compression_type: Option, - py: Python, - ) -> PyResult { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - let mut options = NdJsonReadOptions::default() - .table_partition_cols(convert_table_partition_cols(table_partition_cols)?) - .file_compression_type(parse_file_compression_type(file_compression_type)?); - options.schema_infer_max_records = schema_infer_max_records; - options.file_extension = file_extension; - let df = if let Some(schema) = schema { - options.schema = Some(&schema.0); - let result = self.ctx.read_json(path, options); - wait_for_future(py, result).map_err(DataFusionError::from)? - } else { - let result = self.ctx.read_json(path, options); - wait_for_future(py, result).map_err(DataFusionError::from)? - }; - Ok(PyDataFrame::new(df)) - } - - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = ( - path, - table_partition_cols=vec![], - parquet_pruning=true, - file_extension=".parquet", - skip_metadata=true, - schema=None, - file_sort_order=None))] - pub fn read_parquet( - &self, - path: &str, - table_partition_cols: Vec<(String, String)>, - parquet_pruning: bool, - file_extension: &str, - skip_metadata: bool, - schema: Option>, - file_sort_order: Option>>, - py: Python, - ) -> PyResult { - let mut options = ParquetReadOptions::default() - .table_partition_cols(convert_table_partition_cols(table_partition_cols)?) - .parquet_pruning(parquet_pruning) - .skip_metadata(skip_metadata); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - options.file_sort_order = file_sort_order - .unwrap_or_default() - .into_iter() - .map(|e| { - e.into_iter() - .map(|f| { - let sort_expr: SortExpr = f.into(); - sort_expr - }) - .collect() - }) - .collect(); - - let result = self.ctx.read_parquet(path, options); - let df = - PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?); - Ok(df) - } - - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (name, - path, - schema=None, - file_extension=".avro", - table_partition_cols=vec![]))] - pub fn register_avro( - &mut self, - name: &str, - path: PathBuf, - schema: Option>, - file_extension: &str, - table_partition_cols: Vec<(String, String)>, - py: Python, - ) -> PyResult<()> { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - - let mut options = AvroReadOptions::default() - .table_partition_cols(convert_table_partition_cols(table_partition_cols)?); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - - let result = self.ctx.register_avro(name, path, options); - wait_for_future(py, result).map_err(DataFusionError::from)?; - - Ok(()) - } - - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (name, - path, - schema=None, - has_header=true, - delimiter=",", - schema_infer_max_records=1000, - file_extension=".csv", - file_compression_type=None))] - pub fn register_csv( - &mut self, - name: &str, - path: PathBuf, - schema: Option>, - has_header: bool, - delimiter: &str, - schema_infer_max_records: usize, - file_extension: &str, - file_compression_type: Option, - py: Python, - ) -> PyResult<()> { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - let delimiter = delimiter.as_bytes(); - if delimiter.len() != 1 { - return Err(PyValueError::new_err( - "Delimiter must be a single character", - )); - } - - let mut options = CsvReadOptions::new() - .has_header(has_header) - .delimiter(delimiter[0]) - .schema_infer_max_records(schema_infer_max_records) - .file_extension(file_extension) - .file_compression_type(parse_file_compression_type(file_compression_type)?); - options.schema = schema.as_ref().map(|x| &x.0); - - let result = self.ctx.register_csv(name, path, options); - wait_for_future(py, result).map_err(DataFusionError::from)?; - - Ok(()) - } - - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (name, path, table_partition_cols=vec![], - parquet_pruning=true, - file_extension=".parquet", - skip_metadata=true, - schema=None, - file_sort_order=None))] - pub fn register_parquet( - &mut self, - name: &str, - path: &str, - table_partition_cols: Vec<(String, String)>, - parquet_pruning: bool, - file_extension: &str, - skip_metadata: bool, - schema: Option>, - file_sort_order: Option>>, - py: Python, - ) -> PyResult<()> { - let mut options = ParquetReadOptions::default() - .table_partition_cols(convert_table_partition_cols(table_partition_cols)?) - .parquet_pruning(parquet_pruning) - .skip_metadata(skip_metadata); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - options.file_sort_order = file_sort_order - .unwrap_or_default() - .into_iter() - .map(|e| { - e.into_iter() - .map(|f| { - let sort_expr: SortExpr = f.into(); - sort_expr - }) - .collect() - }) - .collect(); - - let result = self.ctx.register_parquet(name, path, options); - wait_for_future(py, result).map_err(DataFusionError::from)?; - Ok(()) - } - - pub fn register_table(&mut self, name: &str, table: &PyTable) -> PyResult<()> { - self.ctx - .register_table(name, table.table()) - .map_err(DataFusionError::from)?; - Ok(()) - } - - pub fn execute_logical_plan( - &mut self, - logical_plan: PyLogicalPlan, - py: Python, - ) -> PyResult { - let result = self.ctx.execute_logical_plan(logical_plan.into()); - let df = wait_for_future(py, result).unwrap(); - Ok(PyDataFrame::new(df)) - } -} diff --git a/python/src/lib.rs b/python/src/lib.rs index 5fbd2491b..41b4b6d31 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -15,18 +15,89 @@ // specific language governing permissions and limitations // under the License. +use ballista::prelude::*; +use datafusion::execution::SessionStateBuilder; +use datafusion::prelude::*; +use datafusion_python::context::PySessionContext; +use datafusion_python::utils::wait_for_future; + +use std::collections::HashMap; + use pyo3::prelude::*; -pub mod context; mod utils; - -pub use crate::context::PySessionContext; +use utils::to_pyerr; #[pymodule] -fn pyballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { +fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { pyo3_log::init(); - // Ballista structs - m.add_class::()?; - // DataFusion structs + // BallistaBuilder struct + m.add_class::()?; + // DataFusion struct m.add_class::()?; Ok(()) } + +// Ballista Builder will take a HasMap/Dict Cionfg +#[pyclass(name = "BallistaBuilder", module = "ballista", subclass)] +pub struct PyBallistaBuilder { + conf: HashMap, +} + +#[pymethods] +impl PyBallistaBuilder { + #[new] + pub fn new() -> Self { + Self { + conf: HashMap::new(), + } + } + + pub fn config( + mut slf: PyRefMut<'_, Self>, + k: &str, + v: &str, + py: Python, + ) -> PyResult { + slf.conf.insert(k.into(), v.into()); + + Ok(slf.into_py(py)) + } + + /// Construct the standalone instance from the SessionContext + pub fn standalone(&self, py: Python) -> PyResult { + // Build the config + let config: SessionConfig = SessionConfig::from_string_hash_map(&self.conf)?; + // Build the state + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + // Build the context + let standalone_session = SessionContext::standalone_with_state(state); + + // SessionContext is an async function + let ctx = wait_for_future(py, standalone_session)?; + + // Convert the SessionContext into a Python SessionContext + Ok(ctx.into()) + } + + /// Construct the remote instance from the SessionContext + pub fn remote(&self, url: &str, py: Python) -> PyResult { + // Build the config + let config: SessionConfig = SessionConfig::from_string_hash_map(&self.conf)?; + // Build the state + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + // Build the context + let remote_session = SessionContext::remote_with_state(url, state); + + // SessionContext is an async function + let ctx = wait_for_future(py, remote_session)?; + + // Convert the SessionContext into a Python SessionContext + Ok(ctx.into()) + } +} diff --git a/python/testdata/test.csv b/python/testdata/test.csv old mode 100644 new mode 100755