Skip to content

Commit

Permalink
debugging work
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Aug 13, 2024
1 parent d131ebd commit 8d2c357
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 35 deletions.
34 changes: 17 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ deltalake = { git = 'https://github.com/delta-io/delta-rs', rev = 'e75a0b49b40f3
typify = { git = 'https://github.com/ArroyoSystems/typify.git', branch = 'arroyo' }
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '52.1.0/parquet_bytes'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '52.1.0/json'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'leak_investigation'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'leak_investigation'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'leak_investigation'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'leak_investigation'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'leak_investigation'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'leak_investigation'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = 'leak_investigation'}
cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
3 changes: 3 additions & 0 deletions crates/arroyo-rpc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,9 @@ pub struct AdminConfig {

/// HTTP port the admin service will listen on
pub http_port: u16,

/// The HTTP port for the admin service in run mode; defaults to a random port
pub run_http_port: Option<u16>,
}

#[derive(Debug, Deserialize, Serialize, Clone, Default)]
Expand Down
29 changes: 20 additions & 9 deletions crates/arroyo-server-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use arroyo_types::POSTHOG_KEY;
use axum::body::Bytes;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::Router;
use hyper::Body;
Expand All @@ -18,11 +19,10 @@ use serde_json::{json, Value};
use std::error::Error;
use std::fs;
use std::future::Future;
use std::net::SocketAddr;
use std::net::{SocketAddr, TcpListener};
use std::path::PathBuf;
use std::sync::Arc;
use std::task::{Context, Poll};
use axum::response::IntoResponse;
use tonic::body::BoxBody;
use tonic::transport::Server;
use tower::layer::util::Stack;
Expand Down Expand Up @@ -233,7 +233,6 @@ async fn details<'a>(State(state): State<Arc<AdminState>>) -> String {
.unwrap()
}


pub async fn handle_get_heap() -> Result<impl IntoResponse, (StatusCode, String)> {
let mut prof_ctl = jemalloc_pprof::PROF_CTL.as_ref().unwrap().lock().await;
require_profiling_activated(&prof_ctl)?;
Expand All @@ -244,20 +243,23 @@ pub async fn handle_get_heap() -> Result<impl IntoResponse, (StatusCode, String)
}

/// Checks whether jemalloc profiling is activated an returns an error response if not.
fn require_profiling_activated(prof_ctl: &jemalloc_pprof::JemallocProfCtl) -> Result<(), (StatusCode, String)> {
fn require_profiling_activated(
prof_ctl: &jemalloc_pprof::JemallocProfCtl,
) -> Result<(), (StatusCode, String)> {
if prof_ctl.activated() {
Ok(())
} else {
Err((axum::http::StatusCode::FORBIDDEN, "heap profiling not activated".into()))
Err((
axum::http::StatusCode::FORBIDDEN,
"heap profiling not activated".into(),
))
}
}

pub async fn start_admin_server(service: &str) -> anyhow::Result<()> {
let addr = config().admin.bind_address;
let port = config().admin.http_port;

info!("Starting {} admin server on {}:{}", service, addr, port);

let state = Arc::new(AdminState {
name: format!("arroyo-{}", service),
});
Expand All @@ -271,9 +273,18 @@ pub async fn start_admin_server(service: &str) -> anyhow::Result<()> {
.route("/debug/pprof/heap", get(handle_get_heap))
.with_state(state);

let addr = SocketAddr::new(addr, port);
let listener = TcpListener::bind(SocketAddr::new(addr, port))
.map_err(|e| anyhow!("Failed to bind to admin server port {port}: {e}"))?;

let local_addr = listener.local_addr()?;
info!(
"Starting {} admin server on {}:{}",
service,
addr,
local_addr.port()
);

axum::Server::bind(&addr)
axum::Server::from_tcp(listener)?
.serve(app.into_make_service())
.await
.map_err(|e| anyhow!("Failed to start admin HTTP server: {}", e))
Expand Down
1 change: 0 additions & 1 deletion crates/arroyo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[export_name = "malloc_conf"]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";


#[derive(Parser)]
#[command(version, about)]
struct Cli {
Expand Down
11 changes: 10 additions & 1 deletion crates/arroyo/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use arroyo_openapi::types::{Pipeline, PipelinePatch, PipelinePost, StopType, Val
use arroyo_openapi::Client;
use arroyo_rpc::config::{config, DatabaseType, DefaultSink, Scheduler};
use arroyo_rpc::{config, init_db_notifier, notify_db, retry};
use arroyo_server_common::log_event;
use arroyo_server_common::shutdown::{Shutdown, ShutdownHandler, SignalBehavior};
use arroyo_server_common::{log_event, start_admin_server};
use arroyo_storage::StorageProvider;
use arroyo_types::to_millis;
use async_trait::async_trait;
Expand Down Expand Up @@ -389,6 +389,13 @@ pub async fn run(args: RunArgs) {
} else {
c.api.http_port = 0;
}

if let Some(port) = c.admin.run_http_port {
c.admin.http_port = port;
} else {
c.admin.http_port = 0;
}

c.controller.rpc_port = 0;
c.controller.scheduler = Scheduler::Process;

Expand All @@ -410,6 +417,8 @@ pub async fn run(args: RunArgs) {

config::update(|c| c.controller.rpc_port = controller_port);

shutdown.spawn_task("admin", start_admin_server("pipeline-cluster"));

let http_port = arroyo_api::start_server(db.clone(), shutdown.guard("api")).unwrap();

let client = Arc::new(Client::new_with_client(
Expand Down

0 comments on commit 8d2c357

Please sign in to comment.