From 9b23faf18d634f16c89de070c2ce0dcbd31ebe16 Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Thu, 30 Jan 2025 11:33:04 -0800 Subject: [PATCH] feat(processing_engine): log processing engine logging calls to sys events. --- Cargo.lock | 4 + influxdb3/src/commands/serve.rs | 15 +++- influxdb3/tests/server/flight.rs | 1 + ...'re_concerned_here_with_system_tables.snap | 2 +- ...able_name_doesn't_exist,_should_error.snap | 2 +- ...how_up_to_ten_entries_from_each_table.snap | 3 + ...should_list_system_schema_tables_only.snap | 1 + influxdb3_processing_engine/Cargo.toml | 1 + influxdb3_processing_engine/src/lib.rs | 9 ++ influxdb3_processing_engine/src/logging.rs | 0 influxdb3_processing_engine/src/plugins.rs | 23 ++++- influxdb3_py_api/Cargo.toml | 2 + influxdb3_py_api/src/lib.rs | 1 + influxdb3_py_api/src/logging.rs | 89 +++++++++++++++++++ influxdb3_py_api/src/system_py.rs | 43 +++++++++ influxdb3_server/Cargo.toml | 1 + influxdb3_server/src/builder.rs | 87 +++++++++++++----- influxdb3_server/src/lib.rs | 31 ++++--- influxdb3_server/src/system_tables/mod.rs | 10 ++- .../src/system_tables/python_call.rs | 36 ++++++++ influxdb3_sys_events/src/lib.rs | 4 + 21 files changed, 324 insertions(+), 41 deletions(-) create mode 100644 influxdb3_processing_engine/src/logging.rs create mode 100644 influxdb3_py_api/src/logging.rs diff --git a/Cargo.lock b/Cargo.lock index 64d629d959c..7ce61ec7e9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3012,6 +3012,7 @@ dependencies = [ "influxdb3_client", "influxdb3_internal_api", "influxdb3_py_api", + "influxdb3_sys_events", "influxdb3_wal", "influxdb3_write", "iox_query", @@ -3042,8 +3043,10 @@ dependencies = [ "influxdb3_catalog", "influxdb3_id", "influxdb3_internal_api", + "influxdb3_sys_events", "influxdb3_wal", "iox_query_params", + "iox_time", "observability_deps", "parking_lot", "pyo3", @@ -3086,6 +3089,7 @@ dependencies = [ "influxdb3_internal_api", "influxdb3_process", "influxdb3_processing_engine", + "influxdb3_py_api", "influxdb3_sys_events", "influxdb3_telemetry", "influxdb3_wal", diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 4ede3354e73..933da674736 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -22,6 +22,7 @@ use influxdb3_processing_engine::environment::{ DisabledManager, PipManager, PythonEnvironmentManager, UVManager, }; use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager; +use influxdb3_processing_engine::ProcessingEngineManagerImpl; use influxdb3_server::{ auth::AllOrNothingAuthorizer, builder::ServerBuilder, @@ -584,7 +585,6 @@ pub async fn command(config: Config) -> Result<()> { trace_exporter, trace_header_parser, Arc::clone(&telemetry_store), - setup_processing_engine_env_manager(&config.processing_engine_config), )?; let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs { @@ -602,13 +602,24 @@ pub async fn command(config: Config) -> Result<()> { .await .map_err(Error::BindAddress)?; + let processing_engine = ProcessingEngineManagerImpl::new( + setup_processing_engine_env_manager(&config.processing_engine_config), + write_buffer.catalog(), + Arc::clone(&write_buffer), + Arc::clone(&query_executor) as _, + Arc::clone(&time_provider) as _, + write_buffer.wal(), + sys_events_store, + ); + let builder = ServerBuilder::new(common_state) .max_request_size(config.max_http_request_size) .write_buffer(write_buffer) .query_executor(query_executor) .time_provider(time_provider) .persister(persister) - .tcp_listener(listener); + .tcp_listener(listener) + .processing_engine(processing_engine); let server = if let Some(token) = config.bearer_token.map(hex::decode).transpose()? { builder diff --git a/influxdb3/tests/server/flight.rs b/influxdb3/tests/server/flight.rs index 98ab4e0bb6a..6fa5c75b627 100644 --- a/influxdb3/tests/server/flight.rs +++ b/influxdb3/tests/server/flight.rs @@ -120,6 +120,7 @@ async fn flight() -> Result<(), influxdb3_client::Error> { "| public | system | distinct_caches | BASE TABLE |", "| public | system | last_caches | BASE TABLE |", "| public | system | parquet_files | BASE TABLE |", + "| public | system | processing_engine_logs | BASE TABLE |", "| public | system | processing_engine_triggers | BASE TABLE |", "| public | system | queries | BASE TABLE |", "+--------------+--------------------+----------------------------+------------+", diff --git a/influxdb3/tests/server/snapshots/server__cli__iox_schema_table_name_exists,_but_should_error_because_we're_concerned_here_with_system_tables.snap b/influxdb3/tests/server/snapshots/server__cli__iox_schema_table_name_exists,_but_should_error_because_we're_concerned_here_with_system_tables.snap index 4428ee07fa4..1a14b06c662 100644 --- a/influxdb3/tests/server/snapshots/server__cli__iox_schema_table_name_exists,_but_should_error_because_we're_concerned_here_with_system_tables.snap +++ b/influxdb3/tests/server/snapshots/server__cli__iox_schema_table_name_exists,_but_should_error_because_we're_concerned_here_with_system_tables.snap @@ -3,4 +3,4 @@ source: influxdb3/tests/server/cli.rs expression: output snapshot_kind: text --- -Show command failed: system table 'cpu' not found: please use a valid system table name: ["distinct_caches", "last_caches", "parquet_files", "processing_engine_triggers", "queries"] +Show command failed: system table 'cpu' not found: please use a valid system table name: ["distinct_caches", "last_caches", "parquet_files", "processing_engine_logs", "processing_engine_triggers", "queries"] diff --git a/influxdb3/tests/server/snapshots/server__cli__random_table_name_doesn't_exist,_should_error.snap b/influxdb3/tests/server/snapshots/server__cli__random_table_name_doesn't_exist,_should_error.snap index e8306e38a7f..cb3bb6a1c41 100644 --- a/influxdb3/tests/server/snapshots/server__cli__random_table_name_doesn't_exist,_should_error.snap +++ b/influxdb3/tests/server/snapshots/server__cli__random_table_name_doesn't_exist,_should_error.snap @@ -3,4 +3,4 @@ source: influxdb3/tests/server/cli.rs expression: output snapshot_kind: text --- -Show command failed: system table 'meow' not found: please use a valid system table name: ["distinct_caches", "last_caches", "parquet_files", "processing_engine_triggers", "queries"] +Show command failed: system table 'meow' not found: please use a valid system table name: ["distinct_caches", "last_caches", "parquet_files", "processing_engine_logs", "processing_engine_triggers", "queries"] diff --git a/influxdb3/tests/server/snapshots/server__cli__summary_should_show_up_to_ten_entries_from_each_table.snap b/influxdb3/tests/server/snapshots/server__cli__summary_should_show_up_to_ten_entries_from_each_table.snap index 352512ae3be..d2792f7790f 100644 --- a/influxdb3/tests/server/snapshots/server__cli__summary_should_show_up_to_ten_entries_from_each_table.snap +++ b/influxdb3/tests/server/snapshots/server__cli__summary_should_show_up_to_ten_entries_from_each_table.snap @@ -18,6 +18,9 @@ parquet_files summary: | table_name | path | size_bytes | row_count | min_time | max_time | +------------+------+------------+-----------+----------+----------+ +------------+------+------------+-----------+----------+----------+ +processing_engine_logs summary: +++ +++ processing_engine_triggers summary: ++ ++ diff --git a/influxdb3/tests/server/snapshots/server__cli__table-list_should_list_system_schema_tables_only.snap b/influxdb3/tests/server/snapshots/server__cli__table-list_should_list_system_schema_tables_only.snap index a36db939ba1..3a11de37c2b 100644 --- a/influxdb3/tests/server/snapshots/server__cli__table-list_should_list_system_schema_tables_only.snap +++ b/influxdb3/tests/server/snapshots/server__cli__table-list_should_list_system_schema_tables_only.snap @@ -9,6 +9,7 @@ snapshot_kind: text | distinct_caches | [table, name, column_ids, column_names, max_cardinality, max_age_seconds] | | last_caches | [table, name, key_column_ids, key_column_names, value_column_ids, value_column_names, count, ttl] | | parquet_files | [table_name, path, size_bytes, row_count, min_time, max_time] | +| processing_engine_logs | [event_time, trigger_name, log_level, log_text] | | processing_engine_triggers | [trigger_name, plugin_filename, trigger_specification, disabled] | | queries | [id, phase, issue_time, query_type, query_text, partitions, parquet_files, plan_duration, permit_duration, execute_duration, end2end_duration, compute_duration, max_memory, success, running, cancelled, trace_id] | +----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/influxdb3_processing_engine/Cargo.toml b/influxdb3_processing_engine/Cargo.toml index b371167df9c..0f177751949 100644 --- a/influxdb3_processing_engine/Cargo.toml +++ b/influxdb3_processing_engine/Cargo.toml @@ -23,6 +23,7 @@ influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_client = { path = "../influxdb3_client" } influxdb3_internal_api = { path = "../influxdb3_internal_api" } influxdb3_py_api = { path = "../influxdb3_py_api" } +influxdb3_sys_events = { path = "../influxdb3_sys_events"} influxdb3_wal = { path = "../influxdb3_wal" } influxdb3_write = { path = "../influxdb3_write" } observability_deps.workspace = true diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index ec9c1f61977..9eb99e9dc6f 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -14,6 +14,7 @@ use influxdb3_client::plugin_development::{ WalPluginTestResponse, }; use influxdb3_internal_api::query_executor::QueryExecutor; +use influxdb3_sys_events::SysEventStore; #[cfg(feature = "system-py")] use influxdb3_wal::PluginType; use influxdb3_wal::{ @@ -44,6 +45,7 @@ pub struct ProcessingEngineManagerImpl { write_buffer: Arc, query_executor: Arc, time_provider: Arc, + sys_event_store: Arc, wal: Arc, plugin_event_tx: RwLock, } @@ -212,6 +214,7 @@ impl ProcessingEngineManagerImpl { query_executor: Arc, time_provider: Arc, wal: Arc, + sys_event_store: Arc, ) -> Self { // if given a plugin dir, try to initialize the virtualenv. #[cfg(feature = "system-py")] @@ -229,6 +232,7 @@ impl ProcessingEngineManagerImpl { catalog, write_buffer, query_executor, + sys_event_store, time_provider, wal, plugin_event_tx: Default::default(), @@ -442,6 +446,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { let plugin_context = PluginContext { write_buffer, query_executor, + sys_event_store: Arc::clone(&self.sys_event_store), }; let plugin_code = self.read_plugin_code(&trigger.plugin_filename).await?; match trigger.trigger.plugin_type() { @@ -784,6 +789,7 @@ mod tests { use influxdb3_cache::last_cache::LastCacheProvider; use influxdb3_catalog::catalog; use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor; + use influxdb3_sys_events::SysEventStore; use influxdb3_wal::{Gen1Duration, TriggerSpecificationDefinition, WalConfig}; use influxdb3_write::persister::Persister; use influxdb3_write::write_buffer::{WriteBufferImpl, WriteBufferImplArgs}; @@ -1042,6 +1048,8 @@ def process_writes(influxdb3_local, table_batches, args=None): package_manager: Arc::new(DisabledManager), }; + let sys_event_store = Arc::new(SysEventStore::new(Arc::clone(&time_provider))); + ( ProcessingEngineManagerImpl::new( environment_manager, @@ -1050,6 +1058,7 @@ def process_writes(influxdb3_local, table_batches, args=None): qe, time_provider, wal, + sys_event_store, ), file, ) diff --git a/influxdb3_processing_engine/src/logging.rs b/influxdb3_processing_engine/src/logging.rs new file mode 100644 index 00000000000..e69de29bb2d diff --git a/influxdb3_processing_engine/src/plugins.rs b/influxdb3_processing_engine/src/plugins.rs index 4b78f659be3..985156519c0 100644 --- a/influxdb3_processing_engine/src/plugins.rs +++ b/influxdb3_processing_engine/src/plugins.rs @@ -10,6 +10,7 @@ use influxdb3_catalog::catalog::Catalog; use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; #[cfg(feature = "system-py")] use influxdb3_internal_api::query_executor::QueryExecutor; +use influxdb3_sys_events::SysEventStore; use influxdb3_wal::Gen1Duration; #[cfg(feature = "system-py")] use influxdb3_wal::TriggerDefinition; @@ -89,6 +90,7 @@ pub(crate) fn run_wal_contents_plugin( plugin_code, write_buffer: context.write_buffer, query_executor: context.query_executor, + sys_event_store: context.sys_event_store, }; tokio::task::spawn(async move { trigger_plugin @@ -129,6 +131,7 @@ pub(crate) fn run_schedule_plugin( plugin_code, write_buffer: context.write_buffer, query_executor: context.query_executor, + sys_event_store: context.sys_event_store, }; let runner = python_plugin::ScheduleTriggerRunner::try_new( @@ -159,6 +162,7 @@ pub(crate) fn run_request_plugin( plugin_code, write_buffer: context.write_buffer, query_executor: context.query_executor, + sys_event_store: context.sys_event_store, }; tokio::task::spawn(async move { trigger_plugin @@ -174,6 +178,8 @@ pub(crate) struct PluginContext { pub(crate) write_buffer: Arc, // query executor to hand off to the plugin pub(crate) query_executor: Arc, + // sys events for writing logs to ring buffers + pub(crate) sys_event_store: Arc, } #[cfg(feature = "system-py")] @@ -184,6 +190,7 @@ struct TriggerPlugin { db_name: String, write_buffer: Arc, query_executor: Arc, + sys_event_store: Arc, } #[cfg(feature = "system-py")] @@ -199,7 +206,7 @@ mod python_plugin { use influxdb3_catalog::catalog::DatabaseSchema; use influxdb3_py_api::system_py::{ execute_python_with_batch, execute_request_trigger, execute_schedule_trigger, - PluginReturnState, + PluginReturnState, ProcessingEngineLogger, }; use influxdb3_wal::{WalContents, WalOp}; use influxdb3_write::Precision; @@ -300,6 +307,10 @@ mod python_plugin { self.plugin_code.code().as_ref(), Arc::clone(&schema), Arc::clone(&self.query_executor), + Some(ProcessingEngineLogger::new( + Arc::clone(&self.sys_event_store), + self.trigger_definition.trigger_name.clone(), + )), &self.trigger_definition.trigger_arguments, request.query_params, request.headers, @@ -412,6 +423,10 @@ mod python_plugin { write_batch, Arc::clone(&schema), Arc::clone(&self.query_executor), + Some(ProcessingEngineLogger::new( + Arc::clone(&self.sys_event_store), + self.trigger_definition.trigger_name.clone(), + )), table_filter, &self.trigger_definition.trigger_arguments, )?; @@ -563,6 +578,10 @@ mod python_plugin { trigger_time, Arc::clone(&db_schema), Arc::clone(&plugin.query_executor), + Some(ProcessingEngineLogger::new( + Arc::clone(&plugin.sys_event_store), + plugin.trigger_definition.trigger_name.clone(), + )), &plugin.trigger_definition.trigger_arguments, )?; @@ -629,6 +648,7 @@ pub(crate) fn run_test_wal_plugin( db, query_executor, None, + None, &request.input_arguments, )?; @@ -755,6 +775,7 @@ pub(crate) fn run_test_schedule_plugin( schedule_time, db, query_executor, + None, &request.input_arguments, )?; diff --git a/influxdb3_py_api/Cargo.toml b/influxdb3_py_api/Cargo.toml index c4a3c6e5273..18691612061 100644 --- a/influxdb3_py_api/Cargo.toml +++ b/influxdb3_py_api/Cargo.toml @@ -15,10 +15,12 @@ arrow-schema.workspace = true bytes.workspace = true chrono.workspace = true hashbrown.workspace = true +iox_time.workspace = true influxdb3_id = { path = "../influxdb3_id" } influxdb3_wal = { path = "../influxdb3_wal" } influxdb3_catalog = {path = "../influxdb3_catalog"} influxdb3_internal_api = { path = "../influxdb3_internal_api" } +influxdb3_sys_events = { path = "../influxdb3_sys_events" } iox_query_params.workspace = true observability_deps.workspace = true parking_lot.workspace = true diff --git a/influxdb3_py_api/src/lib.rs b/influxdb3_py_api/src/lib.rs index 5e9342fe8e2..ecfa8009bb9 100644 --- a/influxdb3_py_api/src/lib.rs +++ b/influxdb3_py_api/src/lib.rs @@ -13,5 +13,6 @@ pub enum ExecutePluginError { PluginError(#[from] anyhow::Error), } +pub mod logging; #[cfg(feature = "system-py")] pub mod system_py; diff --git a/influxdb3_py_api/src/logging.rs b/influxdb3_py_api/src/logging.rs new file mode 100644 index 00000000000..60740f74fca --- /dev/null +++ b/influxdb3_py_api/src/logging.rs @@ -0,0 +1,89 @@ +use arrow_array::builder::{StringBuilder, TimestampNanosecondBuilder}; +use arrow_array::{ArrayRef, RecordBatch}; +use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit}; +use influxdb3_sys_events::{Event, RingBuffer, ToRecordBatch}; +use iox_time::Time; +use std::fmt::Display; +use std::sync::Arc; + +#[derive(Debug)] +pub struct ProcessingEngineLog { + event_time: Time, + log_level: LogLevel, + trigger_name: String, + log_line: String, +} + +#[derive(Debug, Copy, Clone)] +pub enum LogLevel { + Info, + Warn, + Error, +} + +impl Display for LogLevel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + LogLevel::Info => write!(f, "INFO"), + LogLevel::Warn => write!(f, "WARN"), + LogLevel::Error => write!(f, "ERROR"), + } + } +} + +impl ProcessingEngineLog { + pub fn new( + event_time: Time, + log_level: LogLevel, + trigger_name: String, + log_line: String, + ) -> Self { + Self { + event_time, + log_level, + trigger_name, + log_line, + } + } +} + +impl ToRecordBatch for ProcessingEngineLog { + fn schema() -> Schema { + let fields = vec![ + Field::new( + "event_time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("trigger_name", DataType::Utf8, false), + Field::new("log_level", DataType::Utf8, false), + Field::new("log_text", DataType::Utf8, false), + ]; + Schema::new(fields) + } + fn to_record_batch( + items: Option<&RingBuffer>>, + ) -> Option> { + let items = items?; + let capacity = items.len(); + let mut event_time_builder = TimestampNanosecondBuilder::with_capacity(capacity); + let mut trigger_name_builder = StringBuilder::new(); + let mut log_level_builder = StringBuilder::new(); + let mut log_text_builder = StringBuilder::new(); + for item in items.in_order() { + let event = &item.data; + event_time_builder.append_value(event.event_time.timestamp_nanos()); + trigger_name_builder.append_value(event.trigger_name.as_str()); + log_level_builder.append_value(event.log_level.to_string().as_str()); + log_text_builder.append_value(event.log_line.as_str()); + } + let columns: Vec = vec![ + Arc::new(event_time_builder.finish()), + Arc::new(trigger_name_builder.finish()), + Arc::new(log_level_builder.finish()), + Arc::new(log_text_builder.finish()), + ]; + + Some(RecordBatch::try_new(Arc::new(Self::schema()), columns)) + } +} diff --git a/influxdb3_py_api/src/system_py.rs b/influxdb3_py_api/src/system_py.rs index fb6c2ca9762..3ad1f0a2ffe 100644 --- a/influxdb3_py_api/src/system_py.rs +++ b/influxdb3_py_api/src/system_py.rs @@ -1,3 +1,4 @@ +use crate::logging::{LogLevel, ProcessingEngineLog}; use crate::ExecutePluginError; use anyhow::Context; use arrow_array::types::Int32Type; @@ -13,8 +14,10 @@ use hashbrown::HashMap; use influxdb3_catalog::catalog::DatabaseSchema; use influxdb3_id::TableId; use influxdb3_internal_api::query_executor::QueryExecutor; +use influxdb3_sys_events::SysEventStore; use influxdb3_wal::{FieldData, WriteBatch}; use iox_query_params::StatementParams; +use iox_time::TimeProvider; use observability_deps::tracing::{error, info, warn}; use parking_lot::Mutex; use pyo3::exceptions::{PyException, PyValueError}; @@ -35,6 +38,22 @@ struct PyPluginCallApi { db_schema: Arc, query_executor: Arc, return_state: Arc>, + logger: Option, +} + +#[derive(Debug)] +pub struct ProcessingEngineLogger { + sys_event_store: Arc, + trigger_name: String, +} + +impl ProcessingEngineLogger { + pub fn new(sys_event_store: Arc, trigger_name: String) -> Self { + Self { + sys_event_store, + trigger_name, + } + } } #[derive(Debug, Default)] @@ -79,6 +98,7 @@ impl PyPluginCallApi { let line = self.log_args_to_string(args)?; info!("processing engine: {}", line); + self.write_to_logger(LogLevel::Info, line.clone()); self.return_state.lock().log_lines.push(LogLine::Info(line)); Ok(()) } @@ -88,6 +108,7 @@ impl PyPluginCallApi { let line = self.log_args_to_string(args)?; warn!("processing engine: {}", line); + self.write_to_logger(LogLevel::Warn, line.clone()); self.return_state .lock() .log_lines @@ -100,6 +121,7 @@ impl PyPluginCallApi { let line = self.log_args_to_string(args)?; error!("processing engine: {}", line); + self.write_to_logger(LogLevel::Error, line.clone()); self.return_state .lock() .log_lines @@ -256,6 +278,20 @@ impl PyPluginCallApi { } } +impl PyPluginCallApi { + fn write_to_logger(&self, level: LogLevel, log_line: String) { + if let Some(logger) = &self.logger { + let processing_engine_log = ProcessingEngineLog::new( + logger.sys_event_store.time_provider().now(), + level, + logger.trigger_name.clone(), + log_line, + ); + logger.sys_event_store.record(processing_engine_log); + } + } +} + // constant for the process writes call site string const PROCESS_WRITES_CALL_SITE: &str = "process_writes"; @@ -396,6 +432,7 @@ pub fn execute_python_with_batch( write_batch: &WriteBatch, schema: Arc, query_executor: Arc, + logger: Option, table_filter: Option, args: &Option>, ) -> Result { @@ -495,6 +532,7 @@ pub fn execute_python_with_batch( let api = PyPluginCallApi { db_schema: schema, query_executor, + logger, return_state: Default::default(), }; let return_state = Arc::clone(&api.return_state); @@ -531,6 +569,7 @@ pub fn execute_schedule_trigger( schedule_time: DateTime, schema: Arc, query_executor: Arc, + logger: Option, args: &Option>, ) -> Result { Python::with_gil(|py| { @@ -552,6 +591,7 @@ pub fn execute_schedule_trigger( let api = PyPluginCallApi { db_schema: schema, query_executor, + logger, return_state: Default::default(), }; let return_state = Arc::clone(&api.return_state); @@ -584,10 +624,12 @@ pub fn execute_schedule_trigger( }) } +#[allow(clippy::too_many_arguments)] pub fn execute_request_trigger( code: &str, db_schema: Arc, query_executor: Arc, + logger: Option, args: &Option>, query_params: HashMap, request_headers: HashMap, @@ -607,6 +649,7 @@ pub fn execute_request_trigger( let api = PyPluginCallApi { db_schema, query_executor, + logger, return_state: Default::default(), }; let return_state = Arc::clone(&api.return_state); diff --git a/influxdb3_server/Cargo.toml b/influxdb3_server/Cargo.toml index f420853a1cd..67da82c7e9e 100644 --- a/influxdb3_server/Cargo.toml +++ b/influxdb3_server/Cargo.toml @@ -38,6 +38,7 @@ influxdb3_id = { path = "../influxdb3_id" } influxdb3_internal_api = { path = "../influxdb3_internal_api" } influxdb3_process = { path = "../influxdb3_process", default-features = false } influxdb3_processing_engine = { path = "../influxdb3_processing_engine" } +influxdb3_py_api = {path = "../influxdb3_py_api"} influxdb3_wal = { path = "../influxdb3_wal"} influxdb3_write = { path = "../influxdb3_write" } iox_query_influxql_rewrite = { path = "../iox_query_influxql_rewrite" } diff --git a/influxdb3_server/src/builder.rs b/influxdb3_server/src/builder.rs index 94847323c39..0e0eabdce9a 100644 --- a/influxdb3_server/src/builder.rs +++ b/influxdb3_server/src/builder.rs @@ -10,7 +10,7 @@ use iox_time::TimeProvider; use tokio::net::TcpListener; #[derive(Debug)] -pub struct ServerBuilder { +pub struct ServerBuilder { common_state: CommonServerState, time_provider: T, max_request_size: usize, @@ -18,10 +18,20 @@ pub struct ServerBuilder { query_executor: Q, persister: P, listener: L, + processing_engine: E, authorizer: Arc, } -impl ServerBuilder { +impl + ServerBuilder< + NoWriteBuf, + NoQueryExec, + NoPersister, + NoTimeProvider, + NoListener, + NoProcessingEngine, + > +{ pub fn new(common_state: CommonServerState) -> Self { Self { common_state, @@ -32,11 +42,12 @@ impl ServerBuilder ServerBuilder { +impl ServerBuilder { pub fn max_request_size(mut self, max_request_size: usize) -> Self { self.max_request_size = max_request_size; self @@ -69,8 +80,16 @@ pub struct NoListener; #[derive(Debug)] pub struct WithListener(TcpListener); -impl ServerBuilder { - pub fn write_buffer(self, wb: Arc) -> ServerBuilder { +#[derive(Debug)] +pub struct NoProcessingEngine; +#[derive(Debug)] +pub struct WithProcessingEngine(ProcessingEngineManagerImpl); + +impl ServerBuilder { + pub fn write_buffer( + self, + wb: Arc, + ) -> ServerBuilder { ServerBuilder { common_state: self.common_state, time_provider: self.time_provider, @@ -80,15 +99,16 @@ impl ServerBuilder { persister: self.persister, listener: self.listener, authorizer: self.authorizer, + processing_engine: self.processing_engine, } } } -impl ServerBuilder { +impl ServerBuilder { pub fn query_executor( self, qe: Arc, - ) -> ServerBuilder { + ) -> ServerBuilder { ServerBuilder { common_state: self.common_state, time_provider: self.time_provider, @@ -98,12 +118,13 @@ impl ServerBuilder { persister: self.persister, listener: self.listener, authorizer: self.authorizer, + processing_engine: self.processing_engine, } } } -impl ServerBuilder { - pub fn persister(self, p: Arc) -> ServerBuilder { +impl ServerBuilder { + pub fn persister(self, p: Arc) -> ServerBuilder { ServerBuilder { common_state: self.common_state, time_provider: self.time_provider, @@ -113,12 +134,13 @@ impl ServerBuilder { persister: WithPersister(p), listener: self.listener, authorizer: self.authorizer, + processing_engine: self.processing_engine, } } } -impl ServerBuilder { - pub fn time_provider(self, tp: Arc) -> ServerBuilder, L> { +impl ServerBuilder { + pub fn time_provider(self, tp: Arc) -> ServerBuilder, L, E> { ServerBuilder { common_state: self.common_state, time_provider: WithTimeProvider(tp), @@ -128,12 +150,13 @@ impl ServerBuilder { persister: self.persister, listener: self.listener, authorizer: self.authorizer, + processing_engine: self.processing_engine, } } } -impl ServerBuilder { - pub fn tcp_listener(self, listener: TcpListener) -> ServerBuilder { +impl ServerBuilder { + pub fn tcp_listener(self, listener: TcpListener) -> ServerBuilder { ServerBuilder { common_state: self.common_state, time_provider: self.time_provider, @@ -143,24 +166,44 @@ impl ServerBuilder { persister: self.persister, listener: WithListener(listener), authorizer: self.authorizer, + processing_engine: self.processing_engine, + } + } +} + +impl ServerBuilder { + pub fn processing_engine( + self, + processing_engine: ProcessingEngineManagerImpl, + ) -> ServerBuilder { + ServerBuilder { + common_state: self.common_state, + time_provider: self.time_provider, + max_request_size: self.max_request_size, + write_buffer: self.write_buffer, + query_executor: self.query_executor, + persister: self.persister, + listener: self.listener, + authorizer: self.authorizer, + processing_engine: WithProcessingEngine(processing_engine), } } } impl - ServerBuilder, WithListener> + ServerBuilder< + WithWriteBuf, + WithQueryExec, + WithPersister, + WithTimeProvider, + WithListener, + WithProcessingEngine, + > { pub async fn build(self) -> Server { let persister = Arc::clone(&self.persister.0); let authorizer = Arc::clone(&self.authorizer); - let processing_engine = Arc::new(ProcessingEngineManagerImpl::new( - self.common_state.processing_engine_environment.clone(), - self.write_buffer.0.catalog(), - Arc::clone(&self.write_buffer.0), - Arc::clone(&self.query_executor.0), - Arc::clone(&self.time_provider.0) as _, - self.write_buffer.0.wal(), - )); + let processing_engine = Arc::new(self.processing_engine.0); processing_engine .start_triggers() diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index ee26f17b1d8..e02cee5e428 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -27,7 +27,6 @@ use authz::Authorizer; use hyper::server::conn::AddrIncoming; use hyper::server::conn::Http; use hyper::service::service_fn; -use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_write::persister::Persister; use iox_time::TimeProvider; @@ -79,7 +78,6 @@ pub struct CommonServerState { trace_exporter: Option>, trace_header_parser: TraceHeaderParser, telemetry_store: Arc, - processing_engine_environment: ProcessingEngineEnvironmentManager, } impl CommonServerState { @@ -88,14 +86,12 @@ impl CommonServerState { trace_exporter: Option>, trace_header_parser: TraceHeaderParser, telemetry_store: Arc, - processing_engine_environment: ProcessingEngineEnvironmentManager, ) -> Result { Ok(Self { metrics, trace_exporter, trace_header_parser, telemetry_store, - processing_engine_environment, }) } @@ -223,6 +219,7 @@ mod tests { use influxdb3_id::{DbId, TableId}; use influxdb3_processing_engine::environment::DisabledManager; use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager; + use influxdb3_processing_engine::ProcessingEngineManagerImpl; use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::WalConfig; @@ -802,14 +799,9 @@ mod tests { None, trace_header_parser, Arc::clone(&sample_telem_store), - ProcessingEngineEnvironmentManager { - plugin_dir: None, - virtual_env_location: None, - package_manager: Arc::new(DisabledManager), - }, ) .unwrap(); - let query_executor = QueryExecutorImpl::new(CreateQueryExecutorArgs { + let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs { catalog: write_buffer.catalog(), write_buffer: Arc::clone(&write_buffer), exec: Arc::clone(&exec), @@ -818,7 +810,7 @@ mod tests { query_log_size: 10, telemetry_store: Arc::clone(&sample_telem_store), sys_events_store: Arc::clone(&sys_events_store), - }); + })); // bind to port 0 will assign a random available port: let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); @@ -827,13 +819,28 @@ mod tests { .expect("bind tcp address"); let addr = listener.local_addr().unwrap(); + let processing_engine = ProcessingEngineManagerImpl::new( + ProcessingEngineEnvironmentManager { + plugin_dir: None, + virtual_env_location: None, + package_manager: Arc::new(DisabledManager), + }, + write_buffer.catalog(), + Arc::clone(&write_buffer), + Arc::clone(&query_executor) as _, + Arc::clone(&time_provider) as _, + write_buffer.wal(), + sys_events_store, + ); + let server = ServerBuilder::new(common_state) .write_buffer(Arc::clone(&write_buffer)) - .query_executor(Arc::new(query_executor)) + .query_executor(query_executor) .persister(persister) .authorizer(Arc::new(DefaultAuthorizer)) .time_provider(Arc::clone(&time_provider)) .tcp_listener(listener) + .processing_engine(processing_engine) .build() .await; let frontend_shutdown = CancellationToken::new(); diff --git a/influxdb3_server/src/system_tables/mod.rs b/influxdb3_server/src/system_tables/mod.rs index 8fb5e042fb0..33658451a28 100644 --- a/influxdb3_server/src/system_tables/mod.rs +++ b/influxdb3_server/src/system_tables/mod.rs @@ -21,7 +21,7 @@ use self::{last_caches::LastCachesTable, queries::QueriesTable}; mod distinct_caches; mod last_caches; mod parquet_files; -use crate::system_tables::python_call::ProcessingEngineTriggerTable; +use crate::system_tables::python_call::{ProcessingEngineLogsTable, ProcessingEngineTriggerTable}; mod python_call; mod queries; @@ -36,6 +36,8 @@ pub(crate) const PARQUET_FILES_TABLE_NAME: &str = "parquet_files"; const PROCESSING_ENGINE_TRIGGERS_TABLE_NAME: &str = "processing_engine_triggers"; +const PROCESSING_ENGINE_LOGS_TABLE_NAME: &str = "processing_engine_logs"; + #[derive(Debug)] pub(crate) enum SystemSchemaProvider { AllSystemSchemaTables(AllSystemSchemaTablesProvider), @@ -90,7 +92,7 @@ impl AllSystemSchemaTablesProvider { db_schema: Arc, query_log: Arc, buffer: Arc, - _sys_events_store: Arc, + sys_events_store: Arc, ) -> Self { let mut tables = HashMap::<&'static str, Arc>::new(); let queries = Arc::new(SystemTableProvider::new(Arc::new(QueriesTable::new( @@ -123,6 +125,10 @@ impl AllSystemSchemaTablesProvider { ))), ); tables.insert(PARQUET_FILES_TABLE_NAME, parquet_files); + let logs_table = Arc::new(SystemTableProvider::new(Arc::new( + ProcessingEngineLogsTable::new(sys_events_store), + ))); + tables.insert(PROCESSING_ENGINE_LOGS_TABLE_NAME, logs_table); Self { buffer, db_schema, diff --git a/influxdb3_server/src/system_tables/python_call.rs b/influxdb3_server/src/system_tables/python_call.rs index afa0f3ab2c1..988a3cb57da 100644 --- a/influxdb3_server/src/system_tables/python_call.rs +++ b/influxdb3_server/src/system_tables/python_call.rs @@ -3,6 +3,8 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef}; use async_trait::async_trait; use datafusion::common::Result; use datafusion::logical_expr::Expr; +use influxdb3_py_api::logging::ProcessingEngineLog; +use influxdb3_sys_events::{SysEventStore, ToRecordBatch}; use influxdb3_wal::TriggerDefinition; use iox_system_tables::IoxSystemTable; use std::sync::Arc; @@ -72,3 +74,37 @@ impl IoxSystemTable for ProcessingEngineTriggerTable { Ok(RecordBatch::try_new(Arc::clone(&self.schema), columns)?) } } + +#[derive(Debug)] +pub(super) struct ProcessingEngineLogsTable { + sys_event_store: Arc, +} + +impl ProcessingEngineLogsTable { + pub fn new(sys_event_store: Arc) -> Self { + Self { sys_event_store } + } +} + +#[async_trait] +impl IoxSystemTable for ProcessingEngineLogsTable { + fn schema(&self) -> SchemaRef { + Arc::new(ProcessingEngineLog::schema()) + } + + async fn scan( + &self, + _filters: Option>, + _limit: Option, + ) -> Result { + let Some(result) = self + .sys_event_store + .as_record_batch::() + else { + return Ok(RecordBatch::new_empty(Arc::new( + ProcessingEngineLog::schema(), + ))); + }; + Ok(result?) + } +} diff --git a/influxdb3_sys_events/src/lib.rs b/influxdb3_sys_events/src/lib.rs index d4e807142a1..bd713dcd8d9 100644 --- a/influxdb3_sys_events/src/lib.rs +++ b/influxdb3_sys_events/src/lib.rs @@ -48,6 +48,10 @@ impl SysEventStore { } } + pub fn time_provider(&self) -> &Arc { + &self.time_provider + } + /// records an event by adding it to this event store pub fn record(&self, val: E) where