Skip to content

Commit

Permalink
Support "availability" topics
Browse files Browse the repository at this point in the history
  • Loading branch information
haimgel committed Oct 2, 2022
1 parent 3a24b7b commit cf3b8c8
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 41 deletions.
24 changes: 22 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ This application expects a configuration file named `config.yaml`, located in:

Sample configuration (controls Slack status across multiple Slack workspaces using [slack_status](https://github.com/haimgel/slack_status))
```yaml
# Application ID is the prefix for all MQTT topics this app subscribes and publishes to. Defaults to mqtt2cmd
app-id: 'laptop'
mqtt:
broker: "tcp://your-mqtt-server-address:1883"
switches:
Expand All @@ -48,5 +50,23 @@ switches:
get_state: "slack_status --get lunch"
```
Using the configuration above, `mqtt2cmd` will subscribe to MQTT topic `mqtt2cmd/switches/lunch/set` and will
publish the current state to `mqtt2cmd/switches/lunch`
Using the configuration above, `mqtt2cmd` will:
1. Subscribe to MQTT topic `laptop/switches/lunch/set`
2. Publish the current state to `laptop/switches/lunch`
3. Publish overall application availability to `laptop/available`
4. Publish switch availability to `laptop/switches/lunch/available` (will be marked offline if the commands could not be executed successfully)

## Sample Home Assistant configuration to control the switch (as configured above)

```yaml
mqtt:
switch:
- name: "Slack 'Lunch' status"
icon: 'mdi:hamburger'
state_topic: 'laptop/switches/lunch'
command_topic: 'laptop/switches/lunch/set'
availability:
- topic: 'laptop/available'
- topic: 'laptop/switches/lunch/available'
availability_mode: 'all'
```
4 changes: 2 additions & 2 deletions cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ func Execute(version string, exit func(int), args []string) {
defer logger.Sync()
logger.Infow("Application started", "config_file", viper.ConfigFileUsed())

client, err := mqtt.Init(&appConfig.Mqtt, appConfig.Switches, logger)
client, err := mqtt.Init(appConfig.AppId, &appConfig.Mqtt, appConfig.Switches, logger)
if err != nil {
logger.Panic(err)
}
for {
client.Refresh()
time.Sleep(10 * time.Second)
client.Refresh(false)
}
}
4 changes: 3 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type LoggerConfig struct {
}

type ApplicationConfig struct {
AppId string `mapstructure:"app-id"`
Mqtt MqttConfig `mapstructure:"mqtt"`
Switches []controls.Switch `mapstructure:"switches"`
LoggerConfig LoggerConfig `mapstructure:"log"`
Expand All @@ -48,6 +49,7 @@ func Load(version string, exit func(int), args []string) (*ApplicationConfig, er
viper.SetConfigType("yaml")
viper.SetConfigName("config")
viper.AddConfigPath(configDir)
viper.SetDefault("app-id", AppName)
err = viper.ReadInConfig()
if err != nil {
return nil, err
Expand All @@ -62,7 +64,7 @@ func Load(version string, exit func(int), args []string) (*ApplicationConfig, er
}

func processCommandLineArguments(versionStr string, exit func(int), args []string) {
pflag.StringP("mqtt.broker", "b", "", "MQTT broker (example \"tcp://hostname:1883\")")
pflag.StringP("mqtt.broker", "b", "tcp://localhost:1883", "MQTT broker")
pflag.StringP("log.path", "l", defaultLogFile(), "Log file path")
helpFlag := pflag.BoolP("help", "h", false, "This help message")
versionFlag := pflag.BoolP("version", "v", false, "Show version")
Expand Down
14 changes: 11 additions & 3 deletions internal/controls/switches.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,17 @@ func (sw *Switch) SwitchOnOff(state bool) (string, error) {
}
}

func (sw *Switch) GetState() bool {
_, err := run(sw.StateCmd)
return err == nil
func (sw *Switch) GetState() (bool, string, error) {
out, err := run(sw.StateCmd)
if exitError, ok := err.(*exec.ExitError); ok {
if exitError.ExitCode() == 1 {
return false, out, nil
} else {
return false, out, err
}
} else {
return err == nil, out, err
}
}

func run(command string) (string, error) {
Expand Down
137 changes: 104 additions & 33 deletions internal/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,29 @@ import (
"time"
)

const OnPayload = "ON"
const OffPayload = "OFF"
const onPayload = "ON"
const offPayload = "OFF"
const availablePayload = "online"
const unavailablePayload = "offline"

type Switch struct {
control controls.Switch
state bool
lastRefresh time.Time
control controls.Switch
state bool
stateSet bool
available bool
availableSet bool
lastRefresh time.Time
}

type Client struct {
appName string
handle MQTT.Client
switches []*Switch
logger *zap.SugaredLogger
}

func Init(config *config.MqttConfig, controls []controls.Switch, logger *zap.SugaredLogger) (*Client, error) {
client, err := Connect(config, controls, logger)
func Init(appName string, config *config.MqttConfig, controls []controls.Switch, logger *zap.SugaredLogger) (*Client, error) {
client, err := Connect(appName, config, controls, logger)
if err != nil {
return nil, err
}
Expand All @@ -35,15 +41,16 @@ func Init(config *config.MqttConfig, controls []controls.Switch, logger *zap.Sug
client.handle.Disconnect(0)
return nil, err
}
client.Refresh(true)
client.setAppAvailable()
return client, nil
}

func Connect(config *config.MqttConfig, controls []controls.Switch, logger *zap.SugaredLogger) (*Client, error) {
func Connect(appName string, config *config.MqttConfig, controls []controls.Switch, logger *zap.SugaredLogger) (*Client, error) {
opts := MQTT.NewClientOptions()
opts.AddBroker(config.Broker)
opts.SetOrderMatters(false)
opts.SetClientID(generateClientId())
opts.SetClientID(generateClientId(appName))
opts.SetWill(appAvailabilityTopic(appName), unavailablePayload, 1, true)
if (config.User != nil) && (config.Password != nil) {
opts.SetUsername(*config.User)
opts.SetPassword(*config.Password)
Expand All @@ -57,16 +64,16 @@ func Connect(config *config.MqttConfig, controls []controls.Switch, logger *zap.
for i, control := range controls {
switches[i] = &Switch{control: control}
}
return &Client{handle: client, switches: switches, logger: logger}, nil
return &Client{appName: appName, handle: client, switches: switches, logger: logger}, nil
}

func generateClientId() string {
return fmt.Sprintf("%s-%016x", config.AppName, rand.Uint64())
func generateClientId(appName string) string {
return fmt.Sprintf("%s-%016x", appName, rand.Uint64())
}

func (client *Client) Subscribe() error {
for _, sw := range client.switches {
topic := commandTopic(sw)
topic := client.commandTopic(sw)
client.logger.Debugw("Subscribing", "topic", topic)
if token := client.handle.Subscribe(topic, 1, func(mqttClient MQTT.Client, message MQTT.Message) {
client.processSetPayload(sw, string(message.Payload()))
Expand All @@ -89,62 +96,126 @@ func (client *Client) processSetPayload(sw *Switch, payload string) {
response, err := sw.control.SwitchOnOff(command)
if err != nil {
logger.Errorw("Error running switch command", "error", err, "output", response)
client.setAvailable(sw, false)
return
}
logger.Debugw("Executed switch command successfully", "output", response)
client.setState(sw, command)
}

func (client *Client) Refresh(force bool) {
func (client *Client) Refresh() {
defer client.syncLog()
for _, sw := range client.switches {
if force || (sw.control.RefreshInterval != 0 && time.Now().After(sw.lastRefresh.Add(sw.control.RefreshInterval))) {
newState := sw.control.GetState()
sw.lastRefresh = time.Now()
if force || (newState != sw.state) {
client.setState(sw, newState)
}
client.refreshOne(sw)
}
}

func (client *Client) refreshOne(sw *Switch) {
logger := client.logger.With(zap.String("switch", sw.control.Name))
if !sw.availableSet || !sw.stateSet || (sw.control.RefreshInterval != 0 && time.Now().After(sw.lastRefresh.Add(sw.control.RefreshInterval))) {
newState, response, err := sw.control.GetState()
if err != nil {
logger.Errorw("Error running switch query command", "error", err, "output", response)
}
sw.lastRefresh = time.Now()
client.setState(sw, newState)
client.setAvailable(sw, err == nil)
}
}

func (client *Client) setState(sw *Switch, state bool) {
topic := stateTopic(sw)
logger := client.logger.With(zap.String("switch", sw.control.Name), zap.Bool("state", state), zap.String("topic", topic))
token := client.handle.Publish(topic, 1, true, generatePayload(state))
if sw.stateSet && sw.state == state {
return
}
topic := client.stateTopic(sw)
logger := client.logger.With(
zap.String("switch", sw.control.Name),
zap.String("topic", topic),
zap.Bool("state", state),
)
token := client.handle.Publish(topic, 1, true, generateStatePayload(state))
token.Wait()
if token.Error() != nil {
logger.Error("Error publishing state to MQTT", "error", token.Error())
return
}
sw.state = state
sw.stateSet = true
logger.Debugw("Published state to MQTT")
}

func (client *Client) setAvailable(sw *Switch, available bool) {
if sw.availableSet && sw.available == available {
return
}
topic := client.availabilityTopic(sw)
logger := client.logger.With(
zap.String("switch", sw.control.Name),
zap.String("topic", topic),
zap.Bool("available", available),
)
token := client.handle.Publish(topic, 1, true, generateAvailablePayload(available))
token.Wait()
if token.Error() != nil {
logger.Error("Error publishing availability to MQTT", "error", token.Error())
return
}
sw.available = available
sw.availableSet = true
logger.Debugw("Published availability to MQTT")
}

func (client *Client) setAppAvailable() {
topic := appAvailabilityTopic(client.appName)
logger := client.logger.With(zap.String("topic", topic))
token := client.handle.Publish(topic, 1, false, generateAvailablePayload(true))
token.Wait()
if token.Error() != nil {
logger.Error("Error publishing application availability to MQTT", "error", token.Error())
}
logger.Debugw("Published application availability to MQTT")
}

func parsePayload(payload string) (bool, error) {
if payload == OnPayload {
if payload == onPayload {
return true, nil
} else if payload == OffPayload {
} else if payload == offPayload {
return false, nil
} else {
return false, fmt.Errorf("invalid payload: %s", payload)
}
}

func generatePayload(state bool) string {
func generateStatePayload(state bool) string {
if state {
return OnPayload
return onPayload
} else {
return OffPayload
return offPayload
}
}

func commandTopic(sw *Switch) string {
return fmt.Sprintf("%s/switches/%s/set", config.AppName, sw.control.Name)
func generateAvailablePayload(available bool) string {
if available {
return availablePayload
} else {
return unavailablePayload
}
}

func (client *Client) commandTopic(sw *Switch) string {
return fmt.Sprintf("%s/switches/%s/set", client.appName, sw.control.Name)
}

func (client *Client) stateTopic(sw *Switch) string {
return fmt.Sprintf("%s/switches/%s", client.appName, sw.control.Name)
}

func (client *Client) availabilityTopic(sw *Switch) string {
return fmt.Sprintf("%s/switches/%s/available", client.appName, sw.control.Name)
}

func stateTopic(sw *Switch) string {
return fmt.Sprintf("%s/switches/%s", config.AppName, sw.control.Name)
func appAvailabilityTopic(appName string) string {
return fmt.Sprintf("%s/available", appName)
}

func (client *Client) syncLog() {
Expand Down

0 comments on commit cf3b8c8

Please sign in to comment.