Started work on significant refactor of the codebase

This commit is contained in:
Dreaded_X 2022-11-18 19:51:58 +01:00
parent c49ee841fd
commit 2df59cdb17
Signed by: Dreaded_X
GPG Key ID: 76BDEC4E165D8AD9
8 changed files with 220 additions and 179 deletions

14
connect/connect.go Normal file
View File

@ -0,0 +1,14 @@
package connect
import (
"automation/integration/hue"
"automation/integration/ntfy"
paho "github.com/eclipse/paho.mqtt.golang"
)
type Connect struct {
Client paho.Client
Hue hue.Hue
Notify ntfy.Notify
}

View File

@ -46,9 +46,7 @@ func (c *computer) Execute(execution google.Execution, updateState *google.Devic
switch execution.Name { switch execution.Name {
case google.CommandActivateScene: case google.CommandActivateScene:
if !execution.ActivateScene.Deactivate { c.SetState(!execution.ActivateScene.Deactivate)
http.Get(c.url)
}
default: default:
errCode = "actionNotAvailable" errCode = "actionNotAvailable"
log.Printf("Command (%s) not supported\n", execution.Name) log.Printf("Command (%s) not supported\n", execution.Name)
@ -61,6 +59,10 @@ func (c *computer) GetID() string {
return c.macAddress return c.macAddress
} }
func (c *computer) TurnOff() { func (c *computer) SetState(state bool) {
if state {
http.Get(c.url)
} else {
// Scene does not implement this // Scene does not implement this
} }
}

View File

@ -1,9 +1,7 @@
package device package device
import ( import (
"automation/integration/mqtt"
"automation/integration/google" "automation/integration/google"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
@ -15,9 +13,13 @@ import (
type kettle struct { type kettle struct {
Info DeviceInfo Info DeviceInfo
m *mqtt.MQTT client paho.Client
updated chan bool updated chan bool
timerLength time.Duration
timer *time.Timer
stop chan interface{}
isOn bool isOn bool
online bool online bool
} }
@ -26,22 +28,7 @@ func (k *kettle) getState() google.DeviceState {
return google.NewDeviceState(k.online).RecordOnOff(k.isOn) return google.NewDeviceState(k.online).RecordOnOff(k.isOn)
} }
func NewKettle(info DeviceInfo, m *mqtt.MQTT, s *google.Service) *kettle { func (k *kettle) stateHandler(client paho.Client, msg paho.Message) {
k := &kettle{Info: info, m: m, updated: make(chan bool, 1)}
const length = 5 * time.Minute
timer := time.NewTimer(length)
timer.Stop()
go func() {
for {
<- timer.C
log.Println("Turning kettle automatically off")
m.Publish("zigbee2mqtt/kitchen/kettle/set", 1, false, `{"state": "OFF"}`)
}
}()
k.m.AddHandler(fmt.Sprintf("zigbee2mqtt/%s", k.Info.FriendlyName), func (_ paho.Client, msg paho.Message) {
var payload struct { var payload struct {
State string `json:"state"` State string `json:"state"`
} }
@ -58,17 +45,50 @@ func NewKettle(info DeviceInfo, m *mqtt.MQTT, s *google.Service) *kettle {
k.updated <- true k.updated <- true
// Notify google of the updated state // Notify google of the updated state
id := k.GetID() // @TODO Fix this
s.ReportState(context.Background(), id, map[string]google.DeviceState{ // id := k.GetID()
id: k.getState(), // s.ReportState(context.Background(), id, map[string]google.DeviceState{
}) // id: k.getState(),
// })
if k.isOn { if k.isOn {
timer.Reset(length) k.timer.Reset(k.timerLength)
} else { } else {
timer.Stop() k.timer.Stop()
}
}
func (k *kettle) timerFunc() {
for {
select {
case <- k.timer.C:
log.Println("Turning kettle automatically off")
if token := k.client.Publish(fmt.Sprintf("zigbee2mqtt/%s/set", k.Info.FriendlyName), 1, false, `{"state": "OFF"}`); token.Wait() && token.Error() != nil {
log.Println(token.Error())
}
case <- k.stop:
return
}
}
}
func (k *kettle) Delete() {
// The the timer function that it needs to stop
k.stop <- struct{}{}
}
func NewKettle(info DeviceInfo, client paho.Client, s *google.Service) *kettle {
k := &kettle{Info: info, client: client, updated: make(chan bool, 1), timerLength: 5 * time.Minute, stop: make(chan interface{})}
k.timer = time.NewTimer(k.timerLength)
k.timer.Stop()
// Start function
go k.timerFunc()
if token := k.client.Subscribe(fmt.Sprintf("zigbee2mqtt/%s", k.Info.FriendlyName), 1, k.stateHandler); token.Wait() && token.Error() != nil {
log.Println(token.Error())
} }
})
return k return k
} }
@ -94,6 +114,8 @@ func (k *kettle) Sync() *google.Device {
Name: name, Name: name,
} }
// @TODO Fix reporting
// device.WillReportState = true
device.WillReportState = true device.WillReportState = true
if len(name) > 1 { if len(name) > 1 {
device.RoomHint = room device.RoomHint = room
@ -105,8 +127,6 @@ func (k *kettle) Sync() *google.Device {
SwVersion: k.Info.SoftwareBuildID, SwVersion: k.Info.SoftwareBuildID,
} }
k.m.Publish(fmt.Sprintf("zigbee2mqtt/%s/get", k.Info.FriendlyName), 1, false, `{ "state": "" }`)
return device return device
} }
@ -128,17 +148,13 @@ func (k *kettle) Execute(execution google.Execution, updatedState *google.Device
switch execution.Name { switch execution.Name {
case google.CommandOnOff: case google.CommandOnOff:
state := "OFF"
if execution.OnOff.On {
state = "ON"
}
// Clear the updated channel // Clear the updated channel
for len(k.updated) > 0 { for len(k.updated) > 0 {
<- k.updated <- k.updated
} }
// Update the state
k.m.Publish(fmt.Sprintf("zigbee2mqtt/%s/set", k.Info.FriendlyName), 1, false, fmt.Sprintf(`{ "state": "%s" }`, state)) k.SetState(execution.OnOff.On)
// Start timeout timer // Start timeout timer
timer := time.NewTimer(time.Second) timer := time.NewTimer(time.Second)
@ -167,6 +183,13 @@ func (k *kettle) GetID() string {
return k.Info.IEEEAdress return k.Info.IEEEAdress
} }
func (k *kettle) TurnOff() { func (k *kettle) SetState(state bool) {
k.m.Publish(fmt.Sprintf("zigbee2mqtt/%s/set", k.Info.FriendlyName), 1, false, fmt.Sprintf(`{ "state": "OFF" }`)) msg := "OFF"
if state {
msg = "ON"
}
if token := k.client.Publish(fmt.Sprintf("zigbee2mqtt/%s/set", k.Info.FriendlyName), 1, false, fmt.Sprintf(`{ "state": "%s" }`, msg)); token.Wait() && token.Error() != nil {
log.Println(token.Error())
}
} }

View File

@ -2,7 +2,6 @@ package device
import ( import (
"automation/integration/google" "automation/integration/google"
"automation/integration/mqtt"
"context" "context"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
@ -26,7 +25,7 @@ type DeviceInfo struct {
type DeviceInterface interface { type DeviceInterface interface {
google.DeviceInterface google.DeviceInterface
TurnOff() SetState(state bool)
} }
type Provider struct { type Provider struct {
@ -49,7 +48,31 @@ func (c *credentials) Decode(value string) error {
return err return err
} }
func NewProvider(config Config, m *mqtt.MQTT) *Provider { // Auto populate and update the device list
func (p *Provider) devicesHandler(client paho.Client, msg paho.Message) {
var devices []DeviceInfo
json.Unmarshal(msg.Payload(), &devices)
log.Println("zigbee2mqtt devices:")
pretty.Logln(devices)
// Remove all automatically added devices
p.devices = p.manualDevices
for _, device := range devices {
switch device.Description {
case "Kettle":
kettle := NewKettle(device, client, p.Service)
p.devices[device.IEEEAdress] = kettle
log.Printf("Added Kettle (%s) %s\n", device.IEEEAdress, device.FriendlyName)
}
}
// Send sync request
p.Service.RequestSync(context.Background(), p.userID)
}
func NewProvider(config Config, client paho.Client) *Provider {
provider := &Provider{userID: "Dreaded_X", devices: make(map[string]DeviceInterface), manualDevices: make(map[string]DeviceInterface)} provider := &Provider{userID: "Dreaded_X", devices: make(map[string]DeviceInterface), manualDevices: make(map[string]DeviceInterface)}
homegraphService, err := homegraph.NewService(context.Background(), option.WithCredentialsJSON(config.Credentials)) homegraphService, err := homegraph.NewService(context.Background(), option.WithCredentialsJSON(config.Credentials))
@ -59,29 +82,9 @@ func NewProvider(config Config, m *mqtt.MQTT) *Provider {
provider.Service = google.NewService(provider, homegraphService) provider.Service = google.NewService(provider, homegraphService)
// Auto populate and update the device list if token := client.Subscribe("zigbee2mqtt/bridge/devices", 1, provider.devicesHandler); token.Wait() && token.Error() != nil {
m.AddHandler("zigbee2mqtt/bridge/devices", func(_ paho.Client, msg paho.Message) { log.Println(token.Error())
var devices []DeviceInfo
json.Unmarshal(msg.Payload(), &devices)
log.Println("zigbee2mqtt devices:")
pretty.Logln(devices)
// Remove all automatically added devices
provider.devices = provider.manualDevices
for _, device := range devices {
switch device.Description {
case "Kettle":
kettle := NewKettle(device, m, provider.Service)
provider.devices[device.IEEEAdress] = kettle
log.Printf("Added Kettle (%s) %s\n", device.IEEEAdress, device.FriendlyName)
} }
}
// Send sync request
provider.Service.RequestSync(context.Background(), provider.userID)
})
return provider return provider
} }
@ -150,6 +153,6 @@ func (p *Provider) Execute(_ context.Context, _ string, commands []google.Comman
func (p *Provider) TurnAllOff() { func (p *Provider) TurnAllOff() {
for _, device := range p.devices { for _, device := range p.devices {
device.TurnOff() device.SetState(false)
} }
} }

View File

@ -2,59 +2,36 @@ package mqtt
import ( import (
"fmt" "fmt"
"os"
"github.com/eclipse/paho.mqtt.golang" paho "github.com/eclipse/paho.mqtt.golang"
) )
type MQTT struct {
client mqtt.Client
}
// This is the default message handler, it just prints out the topic and message // This is the default message handler, it just prints out the topic and message
var defaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { var defaultHandler paho.MessageHandler = func(client paho.Client, msg paho.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload()) fmt.Printf("MSG: %s\n", msg.Payload())
} }
func Connect(config Config) MQTT { func New(config Config) paho.Client {
opts := mqtt.NewClientOptions().AddBroker(fmt.Sprintf("%s:%s", config.Host, config.Port)) opts := paho.NewClientOptions().AddBroker(fmt.Sprintf("%s:%s", config.Host, config.Port))
opts.SetClientID(config.ClientID) opts.SetClientID(config.ClientID)
opts.SetDefaultPublishHandler(defaultHandler) opts.SetDefaultPublishHandler(defaultHandler)
opts.SetUsername(config.Username) opts.SetUsername(config.Username)
opts.SetPassword(config.Password) opts.SetPassword(config.Password)
opts.SetOrderMatters(false) opts.SetOrderMatters(false)
client := mqtt.NewClient(opts) client := paho.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil { if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error()) panic(token.Error())
} }
m := MQTT{client: client} return client
return m
} }
func (m *MQTT) Disconnect() { func Delete(m paho.Client) {
if token := m.client.Unsubscribe("automation/presence/+"); token.Wait() && token.Error() != nil { if token := m.Unsubscribe("automation/presence/+"); token.Wait() && token.Error() != nil {
fmt.Println(token.Error()) fmt.Println(token.Error())
os.Exit(1)
} }
m.client.Disconnect(250) m.Disconnect(250)
}
func (m *MQTT) AddHandler(topic string, handler func(client mqtt.Client, msg mqtt.Message)) {
if token := m.client.Subscribe(topic, 0, handler); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
}
func (m *MQTT) Publish(topic string, qos byte, retained bool, payload interface{}) {
if token := m.client.Publish(topic, qos, retained, payload); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
// Do not exit here as it might break during production, just log the error
// os.Exit(1)
}
} }

View File

@ -6,11 +6,11 @@ import (
"strings" "strings"
) )
type ntfy struct { type Notify struct {
presence string presence string
} }
func (ntfy *ntfy) Presence(home bool) { func (n *Notify) Presence(home bool) {
// @TODO Maybe add list the devices that are home currently? // @TODO Maybe add list the devices that are home currently?
var description string var description string
var actions string var actions string
@ -22,7 +22,7 @@ func (ntfy *ntfy) Presence(home bool) {
actions = "broadcast, Set as home, extras.cmd=presence, extras.state=1, clear=true" actions = "broadcast, Set as home, extras.cmd=presence, extras.state=1, clear=true"
} }
req, err := http.NewRequest("POST", fmt.Sprintf("https://ntfy.sh/%s", ntfy.presence), strings.NewReader(description)) req, err := http.NewRequest("POST", fmt.Sprintf("https://ntfy.sh/%s", n.presence), strings.NewReader(description))
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -35,8 +35,8 @@ func (ntfy *ntfy) Presence(home bool) {
http.DefaultClient.Do(req) http.DefaultClient.Do(req)
} }
func Connect(config Config) ntfy { func New(config Config) Notify {
ntfy := ntfy{presence: config.Presence} ntfy := Notify{presence: config.Presence}
// @TODO Make sure the topic is valid? // @TODO Make sure the topic is valid?

64
main.go
View File

@ -1,14 +1,13 @@
package main package main
import ( import (
"automation/connect"
"automation/device" "automation/device"
"automation/integration/hue" "automation/integration/hue"
"automation/integration/kasa" "automation/integration/kasa"
"automation/integration/mqtt" "automation/integration/mqtt"
"automation/integration/ntfy" "automation/integration/ntfy"
"automation/presence" "automation/presence"
"encoding/json"
"fmt"
"log" "log"
"net/http" "net/http"
"os" "os"
@ -17,8 +16,6 @@ import (
"github.com/joho/godotenv" "github.com/joho/godotenv"
"github.com/kelseyhightower/envconfig" "github.com/kelseyhightower/envconfig"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
paho "github.com/eclipse/paho.mqtt.golang"
) )
type config struct { type config struct {
@ -128,23 +125,25 @@ func main() {
devices = make(map[string]interface{}) devices = make(map[string]interface{})
var connect connect.Connect
// MQTT // MQTT
m := mqtt.Connect(config.MQTT) connect.Client = mqtt.New(config.MQTT)
defer m.Disconnect() defer connect.Client.Disconnect(250)
// ntfy.sh
connect.Notify = ntfy.New(config.NTFY)
// Hue // Hue
h := hue.Connect(config.Hue) connect.Hue = hue.Connect(config.Hue)
// Kasa // Kasa
for name, ip := range config.Kasa.Outlets { for name, ip := range config.Kasa.Outlets {
devices[name] = kasa.New(ip) devices[name] = kasa.New(ip)
} }
// ntfy.sh
// n := ntfy.Connect(config.NTFY)
// Devices that we control and expose to google home // Devices that we control and expose to google home
provider := device.NewProvider(config.Google, &m) provider := device.NewProvider(config.Google, connect.Client)
r := mux.NewRouter() r := mux.NewRouter()
r.HandleFunc("/assistant", provider.Service.FullfillmentHandler) r.HandleFunc("/assistant", provider.Service.FullfillmentHandler)
@ -154,47 +153,8 @@ func main() {
} }
// Presence // Presence
p := presence.New() p := presence.New(&connect)
m.AddHandler("automation/presence/+", p.PresenceHandler) defer p.Delete()
m.AddHandler("automation/presence", func(client paho.Client, msg paho.Message) {
if len(msg.Payload()) == 0 {
// In this case we clear the persistent message
return
}
var message presence.Message
err := json.Unmarshal(msg.Payload(), &message)
if err != nil {
log.Println(err)
return
}
fmt.Printf("Presence: %t\n", message.State)
// Notify users of presence update
// n.Presence(present)
// Set presence on the hue bridge
h.SetFlag(41, message.State)
if !message.State {
// Turn off all the devices that we manage ourselves
provider.TurnAllOff()
// Turn off all devices
// @TODO Maybe allow for exceptions, could be a list in the config that we check against?
for _, device := range devices {
switch d := device.(type) {
case kasa.Kasa:
d.SetState(false)
}
}
// @TODO Turn off nest thermostat
} else {
// @TODO Turn on the nest thermostat again
}
})
addr := ":8090" addr := ":8090"
srv := http.Server{ srv := http.Server{

View File

@ -1,18 +1,21 @@
package presence package presence
import ( import (
"automation/connect"
"encoding/json" "encoding/json"
"fmt"
"log" "log"
"strings" "strings"
"time" "time"
mqtt "github.com/eclipse/paho.mqtt.golang" paho "github.com/eclipse/paho.mqtt.golang"
"github.com/kr/pretty" "github.com/kr/pretty"
) )
type Presence struct { type Presence struct {
connect *connect.Connect
devices map[string]bool devices map[string]bool
current *bool presence bool
} }
type Message struct { type Message struct {
@ -20,12 +23,7 @@ type Message struct {
Updated int64 `json:"updated"` Updated int64 `json:"updated"`
} }
func New() Presence { func (p *Presence) devicePresenceHandler(client paho.Client, msg paho.Message) {
return Presence{devices: make(map[string]bool), current: nil}
}
// Handler got automation/presence/+
func (p *Presence) PresenceHandler(client mqtt.Client, msg mqtt.Message) {
name := strings.Split(msg.Topic(), "/")[2] name := strings.Split(msg.Topic(), "/")[2]
if len(msg.Payload()) == 0 { if len(msg.Payload()) == 0 {
@ -52,8 +50,8 @@ func (p *Presence) PresenceHandler(client mqtt.Client, msg mqtt.Message) {
log.Println(present) log.Println(present)
if p.current == nil || *p.current != present { if p.presence != present {
p.current = &present p.presence = present
msg, err := json.Marshal(Message{ msg, err := json.Marshal(Message{
State: present, State: present,
@ -70,3 +68,67 @@ func (p *Presence) PresenceHandler(client mqtt.Client, msg mqtt.Message) {
} }
} }
} }
func (p *Presence) overallPresenceHandler(client paho.Client, msg paho.Message) {
if len(msg.Payload()) == 0 {
// In this case we clear the persistent message
return
}
var message Message
err := json.Unmarshal(msg.Payload(), &message)
if err != nil {
log.Println(err)
return
}
fmt.Printf("Presence: %t\n", message.State)
// Notify users of presence update
p.connect.Notify.Presence(p.presence)
// Set presence on the hue bridge
p.connect.Hue.SetFlag(41, message.State)
if !message.State {
log.Println("Turn off all the devices")
// // Turn off all the devices that we manage ourselves
// provider.TurnAllOff()
// // Turn off all devices
// // @TODO Maybe allow for exceptions, could be a list in the config that we check against?
// for _, device := range devices {
// switch d := device.(type) {
// case kasa.Kasa:
// d.SetState(false)
// }
// }
// @TODO Turn off nest thermostat
} else {
// @TODO Turn on the nest thermostat again
}
}
func New(connect *connect.Connect) *Presence {
p := &Presence{connect: connect, devices: make(map[string]bool), presence: false}
if token := connect.Client.Subscribe("automation/presence", 1, p.overallPresenceHandler); token.Wait() && token.Error() != nil {
log.Println(token.Error())
}
if token := connect.Client.Subscribe("automation/presence/+", 1, p.devicePresenceHandler); token.Wait() && token.Error() != nil {
log.Println(token.Error())
}
return p
}
func (p *Presence) Delete() {
if token := p.connect.Client.Unsubscribe("automation/presence"); token.Wait() && token.Error() != nil {
log.Println(token.Error())
}
if token := p.connect.Client.Unsubscribe("automation/presence/+"); token.Wait() && token.Error() != nil {
log.Println(token.Error())
}
}