diff --git a/src/controller.rs b/src/controller.rs index 1fc31e4..d246876 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -7,34 +7,39 @@ use kube::{ runtime::WatchStreamExt, Api, }; -use log::{error, info, debug}; +use log::{debug, error, info}; use tokio_stream::StreamExt; // This is the main controller function. pub async fn controller(client: kube::Client, config: TaintConfig) { let node_api: Api = Api::all(client); - let stream = watcher(node_api.clone(), watcher::Config::default()).applied_objects(); - let mut stream = std::pin::pin!(stream); - let mut next = stream.try_next().await; - while let Ok(Some(node)) = next { - - for label_taint in &config.label_taints { - if !node_matches_labels(&label_taint.selector, &node) { - continue; + loop { + info!("starting watching nodes"); + let stream = watcher(node_api.clone(), watcher::Config::default()).applied_objects(); + let mut stream = std::pin::pin!(stream); + let mut next = stream.try_next().await; + + while let Ok(Some(node)) = next { + for label_taint in &config.label_taints { + if !node_matches_labels(&label_taint.selector, &node) { + continue; + } + + let err = process_event_for_taint(&node_api, &node, &label_taint.taint).await; + if let Err(e) = err { + error!("error processing event: {}", e); + } } - let err = process_event_for_taint(&node_api, &node, &label_taint.taint).await; - if let Err(e) = err { - error!("error processing event: {}", e); - } + next = stream.try_next().await; } - next = stream.try_next().await; - } - - if let Err(e) = next { - error!("stream terminated with: {}", e); + if let Err(e) = next { + error!("stream terminated with: {}", e); + } + info!("watcher stream terminated, restarting in 10 seconds"); + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; } }