Skip to content

Commit

Permalink
added a BallistaContext to ballista to allow for Remote or standalone (
Browse files Browse the repository at this point in the history
…#1100)

* added a pycontext to ballista

* added a pycontext to ballista

* added a pycontext to ballista

* updated python to have two static methods for creating a ballista context

* updated python to have two static methods for creating a ballista context

* updated python to have two static methods for creating a ballista context

* updated python to have two static methods for creating a ballista context

* updated python to have two static methods for creating a ballista context

* updated python to have two static methods for creating a ballista context

* updated python to have two static methods for creating a ballista context

* updated python to have two static methods for creating a ballista context

* updating the pyballista package to ballista

* changing the packagaing naming convention from pyballista to ballista

* changing the packagaing naming convention from pyballista to ballista

* updated python to have two static methods for creating a ballista context

* updated python to have two static methods for creating a ballista context

* updated python to have two static methods for creating a ballista context

* updated python to have two static methods for creating a ballista context

* Updating BallistaContext and Config

* Updating BallistaContext and Config

* updated python to have two static methods for creating a ballista context

* Updating BallistaContext and Config, calling it for the night, will complete tomorrow

* Updating BallistaContext and Config, calling it for the night, will complete tomorrow

* Adding config to ballista context

* Adding config to ballista context

* Adding config to ballista context

* Adding config to ballista context

* Updated Builder and Docs

* Updated Builder and Docs

* Updated Builder and Docs

* Updated Builder and Docs

* Updated Builder and Docs

* Updated Builder and Docs

---------

Co-authored-by: Trevor Barnes <[email protected]>
  • Loading branch information
tbar4 and Trevor Barnes authored Nov 19, 2024
1 parent 5b6b50b commit d949e5f
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 389 deletions.
20 changes: 16 additions & 4 deletions docs/source/user-guide/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,20 @@ popular file formats files, run it in a distributed environment, and obtain the

The following code demonstrates how to create a Ballista context and connect to a scheduler.

If you are running a standalone cluster (runs locally), all you need to do is call the stand alone cluster method `standalone()` or your BallistaContext. If you are running a cluster in remote mode, you need to provide the URL `Ballista.remote("http://my-remote-ip:50050")`.

```text
>>> import ballista
>>> ctx = ballista.BallistaContext("localhost", 50050)
>>> from ballista import BallistaBuilder
>>> # for a standalone instance
>>> # Ballista will initiate with an empty config
>>> # set config variables with `config()`
>>> ballista = BallistaBuilder()\
>>> .config("ballista.job.name", "example ballista")
>>>
>>> ctx = ballista.standalone()
>>>
>>> # for a remote instance provide the URL
>>> ctx = ballista.remote("df://url-path-to-scheduler:50050")
```

## SQL
Expand Down Expand Up @@ -103,14 +114,15 @@ The `explain` method can be used to show the logical and physical query plans fo
The following example demonstrates creating arrays with PyArrow and then creating a Ballista DataFrame.

```python
import ballista
from ballista import BallistaBuilder
import pyarrow

# an alias
# TODO implement Functions
f = ballista.functions

# create a context
ctx = ballista.BallistaContext("localhost", 50050)
ctx = Ballista().standalone()

# create a RecordBatch and a new DataFrame from it
batch = pyarrow.RecordBatch.from_arrays(
Expand Down
10 changes: 4 additions & 6 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ readme = "README.md"
license = "Apache-2.0"
edition = "2021"
rust-version = "1.72"
include = ["/src", "/pyballista", "/LICENSE.txt", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
include = ["/src", "/ballista", "/LICENSE.txt", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
publish = false

[dependencies]
async-trait = "0.1.77"
ballista = { path = "../ballista/client", version = "0.12.0" }
ballista = { path = "../ballista/client", version = "0.12.0", features = ["standalone"] }
ballista-core = { path = "../ballista/core", version = "0.12.0" }
datafusion = { version = "42" }
datafusion = { version = "42", features = ["pyarrow", "avro"] }
datafusion-proto = { version = "42" }
datafusion-python = { version = "42" }

Expand All @@ -43,6 +43,4 @@ tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread", "sync

[lib]
crate-type = ["cdylib"]
name = "pyballista"


name = "ballista"
4 changes: 2 additions & 2 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ part of the default Cargo workspace so that it doesn't cause overhead for mainta
Creates a new context and connects to a Ballista scheduler process.

```python
from pyballista import SessionContext
>>> ctx = SessionContext("localhost", 50050)
from ballista import BallistaBuilder
>>> ctx = BallistaBuilder().standalone()
```

## Example SQL Usage
Expand Down
8 changes: 4 additions & 4 deletions python/pyballista/__init__.py → python/ballista/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@

import pyarrow as pa

from .pyballista_internal import (
SessionContext,
from .ballista_internal import (
BallistaBuilder,
)

__version__ = importlib_metadata.version(__name__)

__all__ = [
"SessionContext",
]
"BallistaBuilder",
]
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,50 @@
# specific language governing permissions and limitations
# under the License.

from pyballista import SessionContext
from ballista import BallistaBuilder
import pytest

def test_create_context():
SessionContext("localhost", 50050)
BallistaBuilder().standalone()

def test_select_one():
ctx = SessionContext("localhost", 50050)
ctx = BallistaBuilder().standalone()
df = ctx.sql("SELECT 1")
batches = df.collect()
assert len(batches) == 1

def test_read_csv():
ctx = SessionContext("localhost", 50050)
ctx = BallistaBuilder().standalone()
df = ctx.read_csv("testdata/test.csv", has_header=True)
batches = df.collect()
assert len(batches) == 1
assert len(batches[0]) == 1

def test_register_csv():
ctx = SessionContext("localhost", 50050)
ctx = BallistaBuilder().standalone()
ctx.register_csv("test", "testdata/test.csv", has_header=True)
df = ctx.sql("SELECT * FROM test")
batches = df.collect()
assert len(batches) == 1
assert len(batches[0]) == 1

def test_read_parquet():
ctx = SessionContext("localhost", 50050)
ctx = BallistaBuilder().standalone()
df = ctx.read_parquet("testdata/test.parquet")
batches = df.collect()
assert len(batches) == 1
assert len(batches[0]) == 8

def test_register_parquet():
ctx = SessionContext("localhost", 50050)
ctx = BallistaBuilder().standalone()
ctx.register_parquet("test", "testdata/test.parquet")
df = ctx.sql("SELECT * FROM test")
batches = df.collect()
assert len(batches) == 1
assert len(batches[0]) == 8

def test_read_dataframe_api():
ctx = SessionContext("localhost", 50050)
ctx = BallistaBuilder().standalone()
df = ctx.read_csv("testdata/test.csv", has_header=True) \
.select_columns('a', 'b') \
.limit(1)
Expand All @@ -67,11 +67,12 @@ def test_read_dataframe_api():
assert len(batches[0]) == 1

def test_execute_plan():
ctx = SessionContext("localhost", 50050)
ctx = BallistaBuilder().standalone()
df = ctx.read_csv("testdata/test.csv", has_header=True) \
.select_columns('a', 'b') \
.limit(1)
df = ctx.execute_logical_plan(df.logical_plan())
# TODO research SessionContext Logical Plan for DataFusionPython
#df = ctx.execute_logical_plan(df.logical_plan())
batches = df.collect()
assert len(batches) == 1
assert len(batches[0]) == 1
32 changes: 32 additions & 0 deletions python/examples/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from ballista import BallistaBuilder
from datafusion.context import SessionContext

# Ballista will initiate with an empty config
# set config variables with `config`
ctx: SessionContext = BallistaBuilder()\
.config("ballista.job.name", "example ballista")\
.config("ballista.shuffle.partitions", "16")\
.standalone()

#ctx_remote: SessionContext = ballista.remote("remote_ip", 50050)

# Select 1 to verify its working
ctx.sql("SELECT 1").show()
#ctx_remote.sql("SELECT 2").show()
6 changes: 3 additions & 3 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ requires = ["maturin>=0.15,<0.16"]
build-backend = "maturin"

[project]
name = "pyballista"
name = "ballista"
description = "Python client for Apache Arrow Ballista Distributed SQL Query Engine"
readme = "README.md"
license = {file = "LICENSE.txt"}
Expand Down Expand Up @@ -55,10 +55,10 @@ repository = "https://github.com/apache/arrow-ballista"
profile = "black"

[tool.maturin]
module-name = "pyballista.pyballista_internal"
module-name = "ballista.ballista_internal"
include = [
{ path = "Cargo.lock", format = "sdist" }
]
exclude = [".github/**", "ci/**", ".asf.yaml"]
# Require Cargo.lock is up to date
locked = true
locked = true
Loading

0 comments on commit d949e5f

Please sign in to comment.