Skip to content

Commit

Permalink
feat: Refactor configuration into a module
Browse files Browse the repository at this point in the history
Also add support for setting the number of stream replicas when creating
the stream mirror for the operator.

Signed-off-by: Dan Norris <[email protected]>
  • Loading branch information
protochron committed Apr 9, 2024
1 parent b933320 commit 0121602
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 18 deletions.
26 changes: 25 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ axum-server = {workspace = true}
anyhow = {workspace = true}
ctrlc = {workspace = true}
cloudevents-sdk = {workspace = true}
config = {workspace = true}
futures = {workspace = true}
handlebars = {workspace = true}
json-patch = {workspace = true}
Expand Down Expand Up @@ -58,6 +59,7 @@ async-nats = "0.33"
axum = { version = "0.6", features = ["headers"] }
axum-server = { version = "0.4", features = ["tls-rustls"] }
anyhow = "1"
config = {version = "0.14", default-features = false, features = ["convert-case", "async"]}
cloudevents-sdk = "0.7"
ctrlc = "3"
futures = "0.3"
Expand Down
13 changes: 13 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use serde::{Deserialize, Serialize};

/// Configuration for the operator. If you are configuring the operator using environment variables
/// then all values need to be prefixed with "WASMCLOUD_OPERATOR".
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
pub struct OperatorConfig {
#[serde(default = "default_stream_replicas")]
pub stream_replicas: u16,
}

fn default_stream_replicas() -> u16 {
1
}
16 changes: 6 additions & 10 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
docker_secret::DockerConfigJson, resources::application::get_client, services::ServiceWatcher,
Error, Result,
config::OperatorConfig, docker_secret::DockerConfigJson, resources::application::get_client,
services::ServiceWatcher, Error, Result,
};
use anyhow::bail;
use futures::StreamExt;
Expand All @@ -24,7 +24,6 @@ use kube::{
Resource, ResourceExt,
};
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::{BTreeMap, HashMap};
use std::str::from_utf8;
Expand All @@ -50,7 +49,7 @@ pub const WASMCLOUD_OPERATOR_MANAGED_BY_LABEL_REQUIREMENT: &str =

pub struct Context {
pub client: Client,
pub wasmcloud_config: WasmcloudConfig,
pub wasmcloud_config: OperatorConfig,
pub nats_creds: Arc<RwLock<HashMap<NameNamespace, SecretString>>>,
service_watcher: ServiceWatcher,
}
Expand Down Expand Up @@ -90,9 +89,6 @@ impl Secrets {
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
pub struct WasmcloudConfig {}

pub async fn reconcile(cluster: Arc<WasmCloudHostConfig>, ctx: Arc<Context>) -> Result<Action> {
let cluster_configs: Api<WasmCloudHostConfig> =
Api::namespaced(ctx.client.clone(), &cluster.namespace().unwrap());
Expand Down Expand Up @@ -854,11 +850,11 @@ impl NameNamespace {
#[derive(Clone, Default)]
pub struct State {
pub nats_creds: Arc<RwLock<HashMap<NameNamespace, SecretString>>>,
pub config: WasmcloudConfig,
pub config: OperatorConfig,
}

impl State {
pub fn new(config: WasmcloudConfig) -> Self {
pub fn new(config: OperatorConfig) -> Self {
Self {
config,
..Default::default()
Expand All @@ -880,7 +876,7 @@ pub async fn run(state: State) -> anyhow::Result<()> {
let services = Api::<Service>::all(client.clone());
let pods = Api::<Pod>::all(client.clone());

let watcher = ServiceWatcher::new(client.clone());
let watcher = ServiceWatcher::new(client.clone(), state.config.stream_replicas);
let config = Config::default();
let ctx = Context {
client,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl IntoResponse for Error {
}
}

pub mod config;
pub mod controller;
pub mod discovery;
pub mod docker_secret;
Expand Down
15 changes: 11 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use anyhow::{anyhow, Result};
use axum_server::{tls_rustls::RustlsConfig, Handle};
use controller::{State, WasmcloudConfig};
use controller::{config::OperatorConfig, State};

use config::Config;
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use k8s_openapi::kube_aggregator::pkg::apis::apiregistration::v1::{
Expand Down Expand Up @@ -31,9 +32,15 @@ async fn main() -> Result<()> {
error!("Failed to configure tracing: {}", e);
e
})?;
info!("Starting controller");

let config = WasmcloudConfig {};
info!("Starting operator");

let cfg = Config::builder()
.add_source(config::Environment::with_prefix("WASMCLOUD_OPERATOR"))
.build()
.map_err(|e| anyhow!("Failed to build config: {}", e))?;
let config: OperatorConfig = cfg
.try_deserialize()
.map_err(|e| anyhow!("Failed to parse config: {}", e))?;

let client = Client::try_default().await?;
install_crd(&client).await?;
Expand Down
8 changes: 5 additions & 3 deletions src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl Watcher {
let manifest = mp.manifest;
if let Some(httpserver_service) = http_server_component(&manifest) {
if let Some(address) = find_address(&manifest, httpserver_service.name.as_str()) {
debug!(address = address, "Found address");
debug!(address, "Found address");
if let Ok(addr) = address.parse::<SocketAddr>() {
debug!("Upserting service for manifest: {}", manifest.metadata.name);
self.tx
Expand Down Expand Up @@ -224,11 +224,12 @@ impl Watcher {
pub struct ServiceWatcher {
watchers: Arc<RwLock<HashMap<String, Watcher>>>,
sender: mpsc::UnboundedSender<WatcherCommand>,
stream_replicas: u16,
}

impl ServiceWatcher {
/// Creates a new service watcher.
pub fn new(k8s_client: KubeClient) -> Self {
pub fn new(k8s_client: KubeClient, stream_replicas: u16) -> Self {
let (tx, mut rx) = mpsc::unbounded_channel::<WatcherCommand>();

let client = k8s_client.clone();
Expand Down Expand Up @@ -264,6 +265,7 @@ impl ServiceWatcher {
Self {
watchers: Arc::new(RwLock::new(HashMap::new())),
sender: tx,
stream_replicas,
}
}

Expand Down Expand Up @@ -324,7 +326,7 @@ impl ServiceWatcher {
retention: RetentionPolicy::WorkQueue,
storage: StorageType::File,
allow_rollup: false,
num_replicas: 1,
num_replicas: self.stream_replicas as usize,
mirror: Some(Source {
name: "wadm_events".to_string(),
subject_transforms: vec![SubjectTransform {
Expand Down

0 comments on commit 0121602

Please sign in to comment.