From 1a5aace520ccab8bcd2be72fc69e6a113ee4ed02 Mon Sep 17 00:00:00 2001 From: ienkovich Date: Mon, 26 Jun 2023 16:37:08 -0500 Subject: [PATCH] Introduce ResultSetTableToken::toArrow and use it in PyHDK. Signed-off-by: ienkovich --- omniscidb/ResultSet/ResultSet.cpp | 4 ++++ omniscidb/ResultSet/ResultSet.h | 1 + .../ResultSetRegistry/ResultSetRegistry.cpp | 8 +++++++ .../ResultSetRegistry/ResultSetTableToken.cpp | 24 +++++++++++++++++++ .../ResultSetRegistry/ResultSetTableToken.h | 4 ++++ omniscidb/Tests/ResultSetArrowConversion.cpp | 20 ++++++++++++++++ python/pyhdk/_sql.pxd | 10 ++++++++ python/pyhdk/_sql.pyx | 16 ++++--------- 8 files changed, 75 insertions(+), 12 deletions(-) diff --git a/omniscidb/ResultSet/ResultSet.cpp b/omniscidb/ResultSet/ResultSet.cpp index 33fdc9440..7b04f68db 100644 --- a/omniscidb/ResultSet/ResultSet.cpp +++ b/omniscidb/ResultSet/ResultSet.cpp @@ -468,6 +468,10 @@ bool ResultSet::hasColNames() const { return targets_.size() == fields_.size(); } +const std::vector& ResultSet::getColNames() const { + return fields_; +} + std::string ResultSet::colName(size_t col_idx) const { if (fields_.empty()) { return "col" + std::to_string(col_idx); diff --git a/omniscidb/ResultSet/ResultSet.h b/omniscidb/ResultSet/ResultSet.h index b7dbf376e..68437a448 100644 --- a/omniscidb/ResultSet/ResultSet.h +++ b/omniscidb/ResultSet/ResultSet.h @@ -289,6 +289,7 @@ class ResultSet { void setColNames(std::vector fields); bool hasColNames() const; + const std::vector& getColNames() const; std::string colName(size_t col_idx) const; /** diff --git a/omniscidb/ResultSetRegistry/ResultSetRegistry.cpp b/omniscidb/ResultSetRegistry/ResultSetRegistry.cpp index 98e3d03b7..6baae52e3 100644 --- a/omniscidb/ResultSetRegistry/ResultSetRegistry.cpp +++ b/omniscidb/ResultSetRegistry/ResultSetRegistry.cpp @@ -212,6 +212,10 @@ ResultSetTableTokenPtr ResultSetRegistry::head(const ResultSetTableToken& token, } } + // Copy column names to the resulting table. + auto* first_rs = table->fragments.front().rs.get(); + new_results.front()->setColNames(first_rs->getColNames()); + data_lock.unlock(); table_lock.unlock(); @@ -256,6 +260,10 @@ ResultSetTableTokenPtr ResultSetRegistry::tail(const ResultSetTableToken& token, } } + // Copy column names to the resulting table. + auto* first_rs = table->fragments.front().rs.get(); + new_results.front()->setColNames(first_rs->getColNames()); + data_lock.unlock(); table_lock.unlock(); diff --git a/omniscidb/ResultSetRegistry/ResultSetTableToken.cpp b/omniscidb/ResultSetRegistry/ResultSetTableToken.cpp index 795587fd1..d2400bb66 100644 --- a/omniscidb/ResultSetRegistry/ResultSetTableToken.cpp +++ b/omniscidb/ResultSetRegistry/ResultSetTableToken.cpp @@ -7,6 +7,9 @@ #include "ResultSetTableToken.h" #include "ResultSetRegistry.h" +#include "ResultSet/ArrowResultSet.h" +#include "Shared/ArrowUtil.h" + namespace hdk { ResultSetTableToken::ResultSetTableToken(TableInfoPtr tinfo, @@ -43,4 +46,25 @@ ResultSetTableTokenPtr ResultSetTableToken::tail(size_t n) const { return registry_->tail(*this, n); } +std::shared_ptr ResultSetTableToken::toArrow() const { + auto first_rs = resultSet(0); + std::vector col_names; + for (size_t col_idx = 0; col_idx < first_rs->colCount(); ++col_idx) { + col_names.push_back(first_rs->colName(col_idx)); + } + + std::vector> converted_tables; + for (size_t rs_idx = 0; rs_idx < resultSetCount(); ++rs_idx) { + ArrowResultSetConverter converter(resultSet(rs_idx), col_names, -1); + converted_tables.push_back(converter.convertToArrowTable()); + } + + if (converted_tables.size() == (size_t)1) { + return converted_tables.front(); + } + + ARROW_ASSIGN_OR_THROW(auto res, arrow::ConcatenateTables(converted_tables)); + return res; +} + } // namespace hdk diff --git a/omniscidb/ResultSetRegistry/ResultSetTableToken.h b/omniscidb/ResultSetRegistry/ResultSetTableToken.h index ed9abfb01..ef7a0d36c 100644 --- a/omniscidb/ResultSetRegistry/ResultSetTableToken.h +++ b/omniscidb/ResultSetRegistry/ResultSetTableToken.h @@ -11,6 +11,8 @@ #include "DataMgr/ChunkMetadata.h" #include "SchemaMgr/TableInfo.h" +#include "arrow/api.h" + namespace hdk { class ResultSetRegistry; @@ -55,6 +57,8 @@ class ResultSetTableToken : public std::enable_shared_from_this toArrow() const; + std::string toString() const { return "ResultSetTableToken(" + std::to_string(dbId()) + ":" + std::to_string(tableId()) + ")"; diff --git a/omniscidb/Tests/ResultSetArrowConversion.cpp b/omniscidb/Tests/ResultSetArrowConversion.cpp index f5ad6c724..74ffd50a9 100644 --- a/omniscidb/Tests/ResultSetArrowConversion.cpp +++ b/omniscidb/Tests/ResultSetArrowConversion.cpp @@ -950,6 +950,26 @@ TEST(ArrowTable, FixedLenArrays) { } } +TEST(ArrowTable, MultifragResult) { + bool prev_enable_multifrag_execution_result = + config().exec.enable_multifrag_execution_result; + ScopeGuard reset = [prev_enable_multifrag_execution_result] { + config().exec.enable_multifrag_execution_result = + prev_enable_multifrag_execution_result; + }; + + config().exec.enable_multifrag_execution_result = true; + + auto res = runSqlQuery("select * from test_chunked;", ExecutorDeviceType::CPU, false); + // Expect two fragments in the result and two batches in converted table. + CHECK_EQ(res.getToken()->resultSetCount(), (size_t)2); + auto table = res.getToken()->toArrow(); + CHECK_EQ(table->column(1)->num_chunks(), 2); + compare_columns(table6x4_col_i64, table->column(1)); + compare_columns(table6x4_col_bi, table->column(2)); + compare_columns(table6x4_col_d, table->column(3)); +} + int main(int argc, char* argv[]) { testing::InitGoogleTest(&argc, argv); TestHelpers::init_logger_stderr_only(argc, argv); diff --git a/python/pyhdk/_sql.pxd b/python/pyhdk/_sql.pxd index c380b71a6..61ce37bd3 100644 --- a/python/pyhdk/_sql.pxd +++ b/python/pyhdk/_sql.pxd @@ -8,6 +8,8 @@ from libcpp.memory cimport shared_ptr, unique_ptr from libcpp.string cimport string from libcpp.vector cimport vector +from pyarrow.lib cimport CTable as CArrowTable + from pyhdk._common cimport CConfig, CType from pyhdk._storage cimport CSchemaProvider, CSchemaProviderPtr, CDataProvider, CDataMgr, CBufferProvider from pyhdk._execute cimport CExecutor, CResultSetPtr, CCompilationOptions, CExecutionOptions, CTargetMetaInfo @@ -50,6 +52,13 @@ cdef extern from "omniscidb/QueryEngine/RelAlgDagBuilder.h": cdef cppclass CRelAlgDagBuilder "RelAlgDagBuilder"(CQueryDag): CRelAlgDagBuilder(const string&, int, CSchemaProviderPtr, shared_ptr[CConfig]) except + +cdef extern from "omniscidb/ResultSetRegistry/ResultSetTableToken.h": + cdef cppclass CResultSetTableToken "hdk::ResultSetTableToken": + size_t rowCount() + shared_ptr[CArrowTable] toArrow() except + + +ctypedef shared_ptr[const CResultSetTableToken] CResultSetTableTokenPtr + cdef extern from "omniscidb/QueryEngine/Descriptors/RelAlgExecutionDescriptor.h": cdef cppclass CExecutionResult "ExecutionResult": CExecutionResult() @@ -60,6 +69,7 @@ cdef extern from "omniscidb/QueryEngine/Descriptors/RelAlgExecutionDescriptor.h" const vector[CTargetMetaInfo]& getTargetsMeta() string getExplanation() const string& tableName() + CResultSetTableTokenPtr getToken() CExecutionResult head(size_t) except + CExecutionResult tail(size_t) except + diff --git a/python/pyhdk/_sql.pyx b/python/pyhdk/_sql.pyx index a7d6200f0..02909adfd 100644 --- a/python/pyhdk/_sql.pyx +++ b/python/pyhdk/_sql.pyx @@ -74,20 +74,12 @@ cdef extract_array_value(const CArrayTargetValue *array, const CType *c_type): cdef class ExecutionResult: def row_count(self): - cdef shared_ptr[CResultSet] c_res - c_res = self.c_result.getRows() - return int(c_res.get().rowCount()) + cdef CResultSetTableTokenPtr c_token = self.c_result.getToken() + return int(c_token.get().rowCount()) def to_arrow(self): - cdef vector[string] col_names - cdef vector[CTargetMetaInfo].const_iterator it = self.c_result.getTargetsMeta().const_begin() - - while it != self.c_result.getTargetsMeta().const_end(): - col_names.push_back(dereference(it).get_resname()) - preincrement(it) - - cdef unique_ptr[CArrowResultSetConverter] converter = make_unique[CArrowResultSetConverter](self.c_result.getRows(), col_names, -1) - cdef shared_ptr[CArrowTable] at = converter.get().convertToArrowTable() + cdef CResultSetTableTokenPtr c_token = self.c_result.getToken() + cdef shared_ptr[CArrowTable] at = c_token.get().toArrow() return pyarrow_wrap_table(at) def to_explain_str(self):