Skip to content

Commit

Permalink
feat(processing_engine): log processing engine logging calls to sys e…
Browse files Browse the repository at this point in the history
…vents.
  • Loading branch information
jacksonrnewhouse committed Jan 30, 2025
1 parent bb92eb0 commit 9b23faf
Show file tree
Hide file tree
Showing 21 changed files with 324 additions and 41 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use influxdb3_processing_engine::environment::{
DisabledManager, PipManager, PythonEnvironmentManager, UVManager,
};
use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager;
use influxdb3_processing_engine::ProcessingEngineManagerImpl;
use influxdb3_server::{
auth::AllOrNothingAuthorizer,
builder::ServerBuilder,
Expand Down Expand Up @@ -584,7 +585,6 @@ pub async fn command(config: Config) -> Result<()> {
trace_exporter,
trace_header_parser,
Arc::clone(&telemetry_store),
setup_processing_engine_env_manager(&config.processing_engine_config),
)?;

let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs {
Expand All @@ -602,13 +602,24 @@ pub async fn command(config: Config) -> Result<()> {
.await
.map_err(Error::BindAddress)?;

let processing_engine = ProcessingEngineManagerImpl::new(
setup_processing_engine_env_manager(&config.processing_engine_config),
write_buffer.catalog(),
Arc::clone(&write_buffer),
Arc::clone(&query_executor) as _,
Arc::clone(&time_provider) as _,
write_buffer.wal(),
sys_events_store,
);

let builder = ServerBuilder::new(common_state)
.max_request_size(config.max_http_request_size)
.write_buffer(write_buffer)
.query_executor(query_executor)
.time_provider(time_provider)
.persister(persister)
.tcp_listener(listener);
.tcp_listener(listener)
.processing_engine(processing_engine);

let server = if let Some(token) = config.bearer_token.map(hex::decode).transpose()? {
builder
Expand Down
1 change: 1 addition & 0 deletions influxdb3/tests/server/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ async fn flight() -> Result<(), influxdb3_client::Error> {
"| public | system | distinct_caches | BASE TABLE |",
"| public | system | last_caches | BASE TABLE |",
"| public | system | parquet_files | BASE TABLE |",
"| public | system | processing_engine_logs | BASE TABLE |",
"| public | system | processing_engine_triggers | BASE TABLE |",
"| public | system | queries | BASE TABLE |",
"+--------------+--------------------+----------------------------+------------+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ source: influxdb3/tests/server/cli.rs
expression: output
snapshot_kind: text
---
Show command failed: system table 'cpu' not found: please use a valid system table name: ["distinct_caches", "last_caches", "parquet_files", "processing_engine_triggers", "queries"]
Show command failed: system table 'cpu' not found: please use a valid system table name: ["distinct_caches", "last_caches", "parquet_files", "processing_engine_logs", "processing_engine_triggers", "queries"]
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ source: influxdb3/tests/server/cli.rs
expression: output
snapshot_kind: text
---
Show command failed: system table 'meow' not found: please use a valid system table name: ["distinct_caches", "last_caches", "parquet_files", "processing_engine_triggers", "queries"]
Show command failed: system table 'meow' not found: please use a valid system table name: ["distinct_caches", "last_caches", "parquet_files", "processing_engine_logs", "processing_engine_triggers", "queries"]
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ parquet_files summary:
| table_name | path | size_bytes | row_count | min_time | max_time |
+------------+------+------------+-----------+----------+----------+
+------------+------+------------+-----------+----------+----------+
processing_engine_logs summary:
++
++
processing_engine_triggers summary:
++
++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ snapshot_kind: text
| distinct_caches | [table, name, column_ids, column_names, max_cardinality, max_age_seconds] |
| last_caches | [table, name, key_column_ids, key_column_names, value_column_ids, value_column_names, count, ttl] |
| parquet_files | [table_name, path, size_bytes, row_count, min_time, max_time] |
| processing_engine_logs | [event_time, trigger_name, log_level, log_text] |
| processing_engine_triggers | [trigger_name, plugin_filename, trigger_specification, disabled] |
| queries | [id, phase, issue_time, query_type, query_text, partitions, parquet_files, plan_duration, permit_duration, execute_duration, end2end_duration, compute_duration, max_memory, success, running, cancelled, trace_id] |
+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 change: 1 addition & 0 deletions influxdb3_processing_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_client = { path = "../influxdb3_client" }
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_py_api = { path = "../influxdb3_py_api" }
influxdb3_sys_events = { path = "../influxdb3_sys_events"}
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }
observability_deps.workspace = true
Expand Down
9 changes: 9 additions & 0 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use influxdb3_client::plugin_development::{
WalPluginTestResponse,
};
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_sys_events::SysEventStore;
#[cfg(feature = "system-py")]
use influxdb3_wal::PluginType;
use influxdb3_wal::{
Expand Down Expand Up @@ -44,6 +45,7 @@ pub struct ProcessingEngineManagerImpl {
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
time_provider: Arc<dyn TimeProvider>,
sys_event_store: Arc<SysEventStore>,
wal: Arc<dyn Wal>,
plugin_event_tx: RwLock<PluginChannels>,
}
Expand Down Expand Up @@ -212,6 +214,7 @@ impl ProcessingEngineManagerImpl {
query_executor: Arc<dyn QueryExecutor>,
time_provider: Arc<dyn TimeProvider>,
wal: Arc<dyn Wal>,
sys_event_store: Arc<SysEventStore>,
) -> Self {
// if given a plugin dir, try to initialize the virtualenv.
#[cfg(feature = "system-py")]
Expand All @@ -229,6 +232,7 @@ impl ProcessingEngineManagerImpl {
catalog,
write_buffer,
query_executor,
sys_event_store,
time_provider,
wal,
plugin_event_tx: Default::default(),
Expand Down Expand Up @@ -442,6 +446,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let plugin_context = PluginContext {
write_buffer,
query_executor,
sys_event_store: Arc::clone(&self.sys_event_store),
};
let plugin_code = self.read_plugin_code(&trigger.plugin_filename).await?;
match trigger.trigger.plugin_type() {
Expand Down Expand Up @@ -784,6 +789,7 @@ mod tests {
use influxdb3_cache::last_cache::LastCacheProvider;
use influxdb3_catalog::catalog;
use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor;
use influxdb3_sys_events::SysEventStore;
use influxdb3_wal::{Gen1Duration, TriggerSpecificationDefinition, WalConfig};
use influxdb3_write::persister::Persister;
use influxdb3_write::write_buffer::{WriteBufferImpl, WriteBufferImplArgs};
Expand Down Expand Up @@ -1042,6 +1048,8 @@ def process_writes(influxdb3_local, table_batches, args=None):
package_manager: Arc::new(DisabledManager),
};

let sys_event_store = Arc::new(SysEventStore::new(Arc::clone(&time_provider)));

(
ProcessingEngineManagerImpl::new(
environment_manager,
Expand All @@ -1050,6 +1058,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
qe,
time_provider,
wal,
sys_event_store,
),
file,
)
Expand Down
Empty file.
23 changes: 22 additions & 1 deletion influxdb3_processing_engine/src/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use influxdb3_catalog::catalog::Catalog;
use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse};
#[cfg(feature = "system-py")]
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_sys_events::SysEventStore;
use influxdb3_wal::Gen1Duration;
#[cfg(feature = "system-py")]
use influxdb3_wal::TriggerDefinition;
Expand Down Expand Up @@ -89,6 +90,7 @@ pub(crate) fn run_wal_contents_plugin(
plugin_code,
write_buffer: context.write_buffer,
query_executor: context.query_executor,
sys_event_store: context.sys_event_store,
};
tokio::task::spawn(async move {
trigger_plugin
Expand Down Expand Up @@ -129,6 +131,7 @@ pub(crate) fn run_schedule_plugin(
plugin_code,
write_buffer: context.write_buffer,
query_executor: context.query_executor,
sys_event_store: context.sys_event_store,
};

let runner = python_plugin::ScheduleTriggerRunner::try_new(
Expand Down Expand Up @@ -159,6 +162,7 @@ pub(crate) fn run_request_plugin(
plugin_code,
write_buffer: context.write_buffer,
query_executor: context.query_executor,
sys_event_store: context.sys_event_store,
};
tokio::task::spawn(async move {
trigger_plugin
Expand All @@ -174,6 +178,8 @@ pub(crate) struct PluginContext {
pub(crate) write_buffer: Arc<dyn WriteBuffer>,
// query executor to hand off to the plugin
pub(crate) query_executor: Arc<dyn QueryExecutor>,
// sys events for writing logs to ring buffers
pub(crate) sys_event_store: Arc<SysEventStore>,
}

#[cfg(feature = "system-py")]
Expand All @@ -184,6 +190,7 @@ struct TriggerPlugin {
db_name: String,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
sys_event_store: Arc<SysEventStore>,
}

#[cfg(feature = "system-py")]
Expand All @@ -199,7 +206,7 @@ mod python_plugin {
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_py_api::system_py::{
execute_python_with_batch, execute_request_trigger, execute_schedule_trigger,
PluginReturnState,
PluginReturnState, ProcessingEngineLogger,
};
use influxdb3_wal::{WalContents, WalOp};
use influxdb3_write::Precision;
Expand Down Expand Up @@ -300,6 +307,10 @@ mod python_plugin {
self.plugin_code.code().as_ref(),
Arc::clone(&schema),
Arc::clone(&self.query_executor),
Some(ProcessingEngineLogger::new(
Arc::clone(&self.sys_event_store),
self.trigger_definition.trigger_name.clone(),
)),
&self.trigger_definition.trigger_arguments,
request.query_params,
request.headers,
Expand Down Expand Up @@ -412,6 +423,10 @@ mod python_plugin {
write_batch,
Arc::clone(&schema),
Arc::clone(&self.query_executor),
Some(ProcessingEngineLogger::new(
Arc::clone(&self.sys_event_store),
self.trigger_definition.trigger_name.clone(),
)),
table_filter,
&self.trigger_definition.trigger_arguments,
)?;
Expand Down Expand Up @@ -563,6 +578,10 @@ mod python_plugin {
trigger_time,
Arc::clone(&db_schema),
Arc::clone(&plugin.query_executor),
Some(ProcessingEngineLogger::new(
Arc::clone(&plugin.sys_event_store),
plugin.trigger_definition.trigger_name.clone(),
)),
&plugin.trigger_definition.trigger_arguments,
)?;

Expand Down Expand Up @@ -629,6 +648,7 @@ pub(crate) fn run_test_wal_plugin(
db,
query_executor,
None,
None,
&request.input_arguments,
)?;

Expand Down Expand Up @@ -755,6 +775,7 @@ pub(crate) fn run_test_schedule_plugin(
schedule_time,
db,
query_executor,
None,
&request.input_arguments,
)?;

Expand Down
2 changes: 2 additions & 0 deletions influxdb3_py_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ arrow-schema.workspace = true
bytes.workspace = true
chrono.workspace = true
hashbrown.workspace = true
iox_time.workspace = true
influxdb3_id = { path = "../influxdb3_id" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_catalog = {path = "../influxdb3_catalog"}
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
iox_query_params.workspace = true
observability_deps.workspace = true
parking_lot.workspace = true
Expand Down
1 change: 1 addition & 0 deletions influxdb3_py_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ pub enum ExecutePluginError {
PluginError(#[from] anyhow::Error),
}

pub mod logging;
#[cfg(feature = "system-py")]
pub mod system_py;
89 changes: 89 additions & 0 deletions influxdb3_py_api/src/logging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use arrow_array::builder::{StringBuilder, TimestampNanosecondBuilder};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit};
use influxdb3_sys_events::{Event, RingBuffer, ToRecordBatch};
use iox_time::Time;
use std::fmt::Display;
use std::sync::Arc;

#[derive(Debug)]
pub struct ProcessingEngineLog {
event_time: Time,
log_level: LogLevel,
trigger_name: String,
log_line: String,
}

#[derive(Debug, Copy, Clone)]
pub enum LogLevel {
Info,
Warn,
Error,
}

impl Display for LogLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LogLevel::Info => write!(f, "INFO"),
LogLevel::Warn => write!(f, "WARN"),
LogLevel::Error => write!(f, "ERROR"),
}
}
}

impl ProcessingEngineLog {
pub fn new(
event_time: Time,
log_level: LogLevel,
trigger_name: String,
log_line: String,
) -> Self {
Self {
event_time,
log_level,
trigger_name,
log_line,
}
}
}

impl ToRecordBatch<ProcessingEngineLog> for ProcessingEngineLog {
fn schema() -> Schema {
let fields = vec![
Field::new(
"event_time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("trigger_name", DataType::Utf8, false),
Field::new("log_level", DataType::Utf8, false),
Field::new("log_text", DataType::Utf8, false),
];
Schema::new(fields)
}
fn to_record_batch(
items: Option<&RingBuffer<Event<ProcessingEngineLog>>>,
) -> Option<Result<RecordBatch, ArrowError>> {
let items = items?;
let capacity = items.len();
let mut event_time_builder = TimestampNanosecondBuilder::with_capacity(capacity);
let mut trigger_name_builder = StringBuilder::new();
let mut log_level_builder = StringBuilder::new();
let mut log_text_builder = StringBuilder::new();
for item in items.in_order() {
let event = &item.data;
event_time_builder.append_value(event.event_time.timestamp_nanos());
trigger_name_builder.append_value(event.trigger_name.as_str());
log_level_builder.append_value(event.log_level.to_string().as_str());
log_text_builder.append_value(event.log_line.as_str());
}
let columns: Vec<ArrayRef> = vec![
Arc::new(event_time_builder.finish()),
Arc::new(trigger_name_builder.finish()),
Arc::new(log_level_builder.finish()),
Arc::new(log_text_builder.finish()),
];

Some(RecordBatch::try_new(Arc::new(Self::schema()), columns))
}
}
Loading

0 comments on commit 9b23faf

Please sign in to comment.