Skip to content

Commit

Permalink
Merge pull request #98 from alsuren/hi-error
Browse files Browse the repository at this point in the history
Reconnect to MQTT broker if connection is dropped
  • Loading branch information
qwandor authored Dec 17, 2020
2 parents c9615ef + 9314eb0 commit 800da5c
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 45 deletions.
1 change: 1 addition & 0 deletions homie-influx/debian-scripts/homie-influx.service
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Environment=RUST_BACKTRACE=1
Environment=RUST_LIB_BACKTRACE=1
ExecStart=/usr/bin/homie-influx
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target
2 changes: 2 additions & 0 deletions homie-influx/homie-influx.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ client_prefix="homie-influx"
#password=""
# Whether to use TLS for the connection to the MQTT broker.
use_tls=false
# How long to wait between reconnection attempts if the MQTT connection is dropped.
reconnect_interval_seconds=5

[influxdb]
# The URL of the InfluxDB to which to connect.
Expand Down
16 changes: 16 additions & 0 deletions homie-influx/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ use influx_db_client::reqwest::Url;
use influx_db_client::Client;
use rumqttc::MqttOptions;
use rustls::ClientConfig;
use serde::{Deserialize as _, Deserializer};
use serde_derive::Deserialize;
use stable_eyre::eyre;
use stable_eyre::eyre::WrapErr;
use std::fs::read_to_string;
use std::sync::Arc;
use std::time::Duration;

const DEFAULT_MQTT_CLIENT_PREFIX: &str = "homie-influx";
const DEFAULT_MQTT_HOST: &str = "test.mosquitto.org";
const DEFAULT_MQTT_PORT: u16 = 1883;
const DEFAULT_MQTT_RECONNECT_INTERVAL: Duration = Duration::from_secs(5);
const DEFAULT_INFLUXDB_URL: &str = "http://localhost:8086";
const CONFIG_FILENAME: &str = "homie-influx.toml";
const DEFAULT_MAPPINGS_FILENAME: &str = "mappings.toml";
Expand Down Expand Up @@ -45,6 +48,17 @@ pub struct MqttConfig {
pub username: Option<String>,
pub password: Option<String>,
pub client_prefix: String,
#[serde(
deserialize_with = "de_duration_seconds",
rename = "reconnect_interval_seconds"
)]
pub reconnect_interval: Duration,
}

/// Deserialize an integer as a number of seconds.
fn de_duration_seconds<'de, D: Deserializer<'de>>(d: D) -> Result<Duration, D::Error> {
let seconds = u64::deserialize(d)?;
Ok(Duration::from_secs(seconds))
}

impl Default for MqttConfig {
Expand All @@ -56,6 +70,7 @@ impl Default for MqttConfig {
username: None,
password: None,
client_prefix: DEFAULT_MQTT_CLIENT_PREFIX.to_owned(),
reconnect_interval: DEFAULT_MQTT_RECONNECT_INTERVAL,
}
}
}
Expand Down Expand Up @@ -140,6 +155,7 @@ pub fn get_mqtt_options(config: &MqttConfig, client_name_suffix: &str) -> MqttOp
let client_name = format!("{}-{}", config.client_prefix, client_name_suffix);
let mut mqtt_options = MqttOptions::new(client_name, &config.host, config.port);
mqtt_options.set_keep_alive(5);
mqtt_options.set_clean_session(false);

if let (Some(username), Some(password)) = (&config.username, &config.password) {
mqtt_options.set_credentials(username, password);
Expand Down
109 changes: 64 additions & 45 deletions homie-influx/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ mod influx;
use crate::config::{get_influxdb_client, get_mqtt_options, read_mappings, Config};
use crate::influx::send_property_value;
use futures::future::try_join_all;
use futures::FutureExt;
use homie_controller::{Event, HomieController, HomieEventLoop};
use homie_controller::{Event, HomieController, HomieEventLoop, PollError};
use influx_db_client::Client;
use rumqttc::ConnectionError;
use stable_eyre::eyre;
use stable_eyre::eyre::WrapErr;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::{self, JoinHandle};
use tokio::time::delay_for;

#[tokio::main]
async fn main() -> Result<(), eyre::Report> {
Expand All @@ -31,64 +32,82 @@ async fn main() -> Result<(), eyre::Report> {

let influxdb_client = get_influxdb_client(&config.influxdb, &mapping.influxdb_database)?;

let handle = spawn_homie_poll_loop(event_loop, controller.clone(), influxdb_client);
let handle = spawn_homie_poll_loop(
event_loop,
controller.clone(),
influxdb_client,
config.mqtt.reconnect_interval,
);
controller.start().await?;
join_handles.push(handle.map(|res| Ok(res??)));
join_handles.push(handle);
}

simplify_unit_vec(try_join_all(join_handles).await)
try_join_all(join_handles).await?;
Ok(())
}

fn spawn_homie_poll_loop(
mut event_loop: HomieEventLoop,
controller: Arc<HomieController>,
influx_db_client: Client,
) -> JoinHandle<Result<(), eyre::Report>> {
reconnect_interval: Duration,
) -> JoinHandle<()> {
task::spawn(async move {
loop {
if let Some(event) = controller.poll(&mut event_loop).await.wrap_err_with(|| {
format!(
"Failed to poll HomieController for base topic '{}'.",
controller.base_topic()
)
})? {
match event {
Event::PropertyValueChanged {
device_id,
node_id,
property_id,
value,
fresh,
} => {
log::trace!(
"{}/{}/{}/{} = {} ({})",
controller.base_topic(),
device_id,
node_id,
property_id,
value,
fresh
);
if fresh {
send_property_value(
controller.as_ref(),
&influx_db_client,
device_id,
node_id,
property_id,
)
.await?;
}
}
_ => {
log::info!("{} Event: {:?}", controller.base_topic(), event);
match controller.poll(&mut event_loop).await {
Ok(Some(event)) => {
handle_event(controller.as_ref(), &influx_db_client, event).await;
}
Ok(None) => {}
Err(e) => {
log::error!(
"Failed to poll HomieController for base topic '{}': {}",
controller.base_topic(),
e
);
if let PollError::Connection(ConnectionError::Io(_)) = e {
delay_for(reconnect_interval).await;
}
}
}
}
})
}

fn simplify_unit_vec<E>(m: Result<Vec<()>, E>) -> Result<(), E> {
m.map(|_| ())
async fn handle_event(controller: &HomieController, influx_db_client: &Client, event: Event) {
match event {
Event::PropertyValueChanged {
device_id,
node_id,
property_id,
value,
fresh,
} => {
log::trace!(
"{}/{}/{}/{} = {} ({})",
controller.base_topic(),
device_id,
node_id,
property_id,
value,
fresh
);
if fresh {
if let Err(e) = send_property_value(
controller,
influx_db_client,
device_id,
node_id,
property_id,
)
.await
{
log::error!("{:?}", e);
}
}
}
_ => {
log::info!("{} Event: {:?}", controller.base_topic(), event);
}
}
}

0 comments on commit 800da5c

Please sign in to comment.