Skip to content

Commit

Permalink
Merge pull request #4 from bagaluten/fix/restart-loop-on-error
Browse files Browse the repository at this point in the history
fix: restart controller when watch dies
  • Loading branch information
kstiehl authored Jun 20, 2024
2 parents fdcbb9c + 09de069 commit 350dcd9
Showing 1 changed file with 23 additions and 18 deletions.
41 changes: 23 additions & 18 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,39 @@ use kube::{
runtime::WatchStreamExt,
Api,
};
use log::{error, info, debug};
use log::{debug, error, info};
use tokio_stream::StreamExt;

// This is the main controller function.
pub async fn controller(client: kube::Client, config: TaintConfig) {
let node_api: Api<Node> = Api::all(client);
let stream = watcher(node_api.clone(), watcher::Config::default()).applied_objects();
let mut stream = std::pin::pin!(stream);
let mut next = stream.try_next().await;

while let Ok(Some(node)) = next {

for label_taint in &config.label_taints {
if !node_matches_labels(&label_taint.selector, &node) {
continue;
loop {
info!("starting watching nodes");
let stream = watcher(node_api.clone(), watcher::Config::default()).applied_objects();
let mut stream = std::pin::pin!(stream);
let mut next = stream.try_next().await;

while let Ok(Some(node)) = next {
for label_taint in &config.label_taints {
if !node_matches_labels(&label_taint.selector, &node) {
continue;
}

let err = process_event_for_taint(&node_api, &node, &label_taint.taint).await;
if let Err(e) = err {
error!("error processing event: {}", e);
}
}

let err = process_event_for_taint(&node_api, &node, &label_taint.taint).await;
if let Err(e) = err {
error!("error processing event: {}", e);
}
next = stream.try_next().await;
}

next = stream.try_next().await;
}

if let Err(e) = next {
error!("stream terminated with: {}", e);
if let Err(e) = next {
error!("stream terminated with: {}", e);
}
info!("watcher stream terminated, restarting in 10 seconds");
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
}

Expand Down

0 comments on commit 350dcd9

Please sign in to comment.