Skip to content

Commit

Permalink
Druid29 preview (#72)
Browse files Browse the repository at this point in the history
Co-authored-by: Katya Macedo  <[email protected]>
Co-authored-by: Katya Macedo <[email protected]>
Co-authored-by: Peter Marshall <[email protected]>
  • Loading branch information
3 people authored Feb 24, 2024
1 parent ba1c29b commit ed1f4c8
Show file tree
Hide file tree
Showing 18 changed files with 2,076 additions and 146 deletions.
11 changes: 6 additions & 5 deletions docker-compose-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ services:
- KAFKA_ENABLE_KRAFT=false

coordinator:
image: imply/druid:${DRUID_VERSION:-28.0.1}
image: imply/druid:${DRUID_VERSION:-29.0.0}
container_name: coordinator
profiles: ["druid-jupyter", "all-services"]
volumes:
Expand All @@ -89,11 +89,12 @@ services:
- environment

broker:
image: imply/druid:${DRUID_VERSION:-28.0.1}
image: imply/druid:${DRUID_VERSION:-29.0.0}
container_name: broker
profiles: ["druid-jupyter", "all-services"]
volumes:
- broker_var:/opt/druid/var
- druid_shared:/opt/shared
depends_on:
- zookeeper
- postgres
Expand All @@ -106,7 +107,7 @@ services:
- environment

historical:
image: imply/druid:${DRUID_VERSION:-28.0.1}
image: imply/druid:${DRUID_VERSION:-29.0.0}
container_name: historical
profiles: ["druid-jupyter", "all-services"]
volumes:
Expand All @@ -124,7 +125,7 @@ services:
- environment

middlemanager:
image: imply/druid:${DRUID_VERSION:-28.0.1}
image: imply/druid:${DRUID_VERSION:-29.0.0}
container_name: middlemanager
profiles: ["druid-jupyter", "all-services"]
volumes:
Expand All @@ -143,7 +144,7 @@ services:
- environment

router:
image: imply/druid:${DRUID_VERSION:-28.0.1}
image: imply/druid:${DRUID_VERSION:-29.0.0}
container_name: router
profiles: ["druid-jupyter", "all-services"]
volumes:
Expand Down
11 changes: 6 additions & 5 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ services:
- KAFKA_ENABLE_KRAFT=false

coordinator:
image: imply/druid:${DRUID_VERSION:-28.0.1}
image: imply/druid:${DRUID_VERSION:-29.0.0}
container_name: coordinator
profiles: ["druid-jupyter", "all-services"]
volumes:
Expand All @@ -89,11 +89,12 @@ services:
- environment

broker:
image: imply/druid:${DRUID_VERSION:-28.0.1}
image: imply/druid:${DRUID_VERSION:-29.0.0}
container_name: broker
profiles: ["druid-jupyter", "all-services"]
volumes:
- broker_var:/opt/druid/var
- druid_shared:/opt/shared
depends_on:
- zookeeper
- postgres
Expand All @@ -106,7 +107,7 @@ services:
- environment

historical:
image: imply/druid:${DRUID_VERSION:-28.0.1}
image: imply/druid:${DRUID_VERSION:-29.0.0}
container_name: historical
profiles: ["druid-jupyter", "all-services"]
volumes:
Expand All @@ -124,7 +125,7 @@ services:
- environment

middlemanager:
image: imply/druid:${DRUID_VERSION:-28.0.1}
image: imply/druid:${DRUID_VERSION:-29.0.0}
container_name: middlemanager
profiles: ["druid-jupyter", "all-services"]
volumes:
Expand All @@ -143,7 +144,7 @@ services:
- environment

router:
image: imply/druid:${DRUID_VERSION:-28.0.1}
image: imply/druid:${DRUID_VERSION:-29.0.0}
container_name: router
profiles: ["druid-jupyter", "all-services"]
volumes:
Expand Down
14 changes: 14 additions & 0 deletions environment
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,19 @@ druid_indexer_logs_directory=/opt/shared/indexing-logs
druid_processing_numThreads=2
druid_processing_numMergeBuffers=2


# setup up durable Storage for asynchronous query processing
druid_msq_intermediate_storage_enable=true
druid_msq_intermediate_storage_type=local
druid_msq_intermediate_storage_tempDir=/opt/shared/tmp
druid_msq_intermediate_storage_basePath=/opt/shared/intermediate
druid_msq_intermediate_storage_cleaner_enabled=true







DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration status="WARN"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="DEBUG"><AppenderRef ref="Console"/></Logger></Loggers></Configuration>

28 changes: 21 additions & 7 deletions jupyter-img/druidapi/druidapi/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,19 @@ def rows(self):
page+=1
return self._rows

def paged_rows(self, pageNum):
import json
if self.succeeded:
self._rows=[]
if pageNum < len(self._pages):
results = self._query_client().statement_results(self._id, pageNum)
try:
self._rows = json.loads(results.text)
except Exception as ex:
raise ClientError(f"Could not parse JSON from result [{results}]")
return self._rows


def _display(self, display):
return self._druid().display if not display else display

Expand Down Expand Up @@ -838,7 +851,7 @@ def __init__(self, druid, rest_client=None):
def rest_client(self):
return self._rest_client

def _prepare_query(self, request, asynch=False):
def _prepare_query(self, request, asynch=False, rowsPerPage= None ):
if not request:
raise ClientError('No query provided.')
# If the request is a dictionary, assume it is already in SqlQuery form.
Expand All @@ -854,6 +867,8 @@ def _prepare_query(self, request, asynch=False):
print(request.sql)
if asynch:
request.add_context( 'executionMode', 'ASYNC')
if rowsPerPage:
request.add_context( 'rowsPerPage', rowsPerPage)
if not query_obj:
query_obj = request.to_request()
return (request, query_obj)
Expand Down Expand Up @@ -946,7 +961,7 @@ def task(self, query) -> QueryTaskResult:
r = self.rest_client.post_only_json(REQ_SQL_TASK, query_obj, headers=request.headers)
return QueryTaskResult(request, r)

def statement(self, query) -> AsynchQueryResult:
def statement(self, query, rowsPerPage=None) -> AsynchQueryResult:
'''
Submits an MSQ asynch query. Returns a AsynchQueryResult to track the task.
Expand All @@ -955,7 +970,7 @@ def statement(self, query) -> AsynchQueryResult:
query
The query as either a string or a SqlRequest object.
'''
request, query_obj = self._prepare_query(query, asynch=True)
request, query_obj = self._prepare_query(query, asynch=True, rowsPerPage=rowsPerPage)
r = self.rest_client.post_only_json(REQ_SQL_ASYNC, query_obj, headers=request.headers)
return AsynchQueryResult(request, r)

Expand All @@ -979,7 +994,7 @@ def statement_results(self, id, page=0 ):
:param page: the page of rows to retrieve
:return: json array with rows for the page of results
'''
response = self.rest_client.get(REQ_SQL_ASYNC+f'/{id}/results', f'{{"page"={page}}}' )
response = self.rest_client.get(REQ_SQL_ASYNC+f'/{id}/results', params={"page":page} )
return response

def run_task(self, query):
Expand All @@ -996,16 +1011,15 @@ def run_task(self, query):
raise ClientError(resp.error_message)
resp.wait_until_done()

def async_sql(self, query):
def async_sql(self, query, rowsPerPage=None):
'''
Submits an MSQ asynchronous query request using the sql/statements API Returns a
:param query: The SQL query statement
:return: rows
'''
resp = self.statement(query)
resp = self.statement(query, rowsPerPage=rowsPerPage)
if not resp.ok:
raise ClientError(resp.error_message)
resp.wait_until_done()
return resp

def _tables_query(self, schema):
Expand Down
6 changes: 0 additions & 6 deletions notebooks/00-releases/README.md

This file was deleted.

15 changes: 0 additions & 15 deletions notebooks/00-releases/druid-28.md

This file was deleted.

Loading

0 comments on commit ed1f4c8

Please sign in to comment.