From 2df59cdb172164b92580ced9cdd97b1d12ddb9ab Mon Sep 17 00:00:00 2001 From: Dreaded_X Date: Fri, 18 Nov 2022 19:51:58 +0100 Subject: [PATCH] Started work on significant refactor of the codebase --- connect/connect.go | 14 +++++ device/computer.go | 12 ++-- device/kettle.go | 119 +++++++++++++++++++++++---------------- device/provider.go | 57 ++++++++++--------- integration/mqtt/mqtt.go | 41 +++----------- integration/ntfy/ntfy.go | 10 ++-- main.go | 64 ++++----------------- presence/presence.go | 82 +++++++++++++++++++++++---- 8 files changed, 220 insertions(+), 179 deletions(-) create mode 100644 connect/connect.go diff --git a/connect/connect.go b/connect/connect.go new file mode 100644 index 0000000..dfbae60 --- /dev/null +++ b/connect/connect.go @@ -0,0 +1,14 @@ +package connect + +import ( + "automation/integration/hue" + "automation/integration/ntfy" + + paho "github.com/eclipse/paho.mqtt.golang" +) + +type Connect struct { + Client paho.Client + Hue hue.Hue + Notify ntfy.Notify +} diff --git a/device/computer.go b/device/computer.go index 92a76c8..f11214e 100644 --- a/device/computer.go +++ b/device/computer.go @@ -46,9 +46,7 @@ func (c *computer) Execute(execution google.Execution, updateState *google.Devic switch execution.Name { case google.CommandActivateScene: - if !execution.ActivateScene.Deactivate { - http.Get(c.url) - } + c.SetState(!execution.ActivateScene.Deactivate) default: errCode = "actionNotAvailable" log.Printf("Command (%s) not supported\n", execution.Name) @@ -61,6 +59,10 @@ func (c *computer) GetID() string { return c.macAddress } -func (c *computer) TurnOff() { - // Scene does not implement this +func (c *computer) SetState(state bool) { + if state { + http.Get(c.url) + } else { + // Scene does not implement this + } } diff --git a/device/kettle.go b/device/kettle.go index 0570cbc..d61515c 100644 --- a/device/kettle.go +++ b/device/kettle.go @@ -1,9 +1,7 @@ package device import ( - "automation/integration/mqtt" "automation/integration/google" - "context" "encoding/json" "fmt" "log" @@ -15,9 +13,13 @@ import ( type kettle struct { Info DeviceInfo - m *mqtt.MQTT + client paho.Client updated chan bool + timerLength time.Duration + timer *time.Timer + stop chan interface{} + isOn bool online bool } @@ -26,49 +28,67 @@ func (k *kettle) getState() google.DeviceState { return google.NewDeviceState(k.online).RecordOnOff(k.isOn) } -func NewKettle(info DeviceInfo, m *mqtt.MQTT, s *google.Service) *kettle { - k := &kettle{Info: info, m: m, updated: make(chan bool, 1)} +func (k *kettle) stateHandler(client paho.Client, msg paho.Message) { + var payload struct { + State string `json:"state"` + } + json.Unmarshal(msg.Payload(), &payload) - const length = 5 * time.Minute - timer := time.NewTimer(length) - timer.Stop() + // Update the internal state + k.isOn = payload.State == "ON" + k.online = true - go func() { - for { - <- timer.C + // Notify that the state has updated + for len(k.updated) > 0 { + <- k.updated + } + k.updated <- true + + // Notify google of the updated state + // @TODO Fix this + // id := k.GetID() + // s.ReportState(context.Background(), id, map[string]google.DeviceState{ + // id: k.getState(), + // }) + + if k.isOn { + k.timer.Reset(k.timerLength) + } else { + k.timer.Stop() + } +} + +func (k *kettle) timerFunc() { + for { + select { + case <- k.timer.C: log.Println("Turning kettle automatically off") - m.Publish("zigbee2mqtt/kitchen/kettle/set", 1, false, `{"state": "OFF"}`) + if token := k.client.Publish(fmt.Sprintf("zigbee2mqtt/%s/set", k.Info.FriendlyName), 1, false, `{"state": "OFF"}`); token.Wait() && token.Error() != nil { + log.Println(token.Error()) + } + + case <- k.stop: + return } - }() + } +} - k.m.AddHandler(fmt.Sprintf("zigbee2mqtt/%s", k.Info.FriendlyName), func (_ paho.Client, msg paho.Message) { - var payload struct { - State string `json:"state"` - } - json.Unmarshal(msg.Payload(), &payload) +func (k *kettle) Delete() { + // The the timer function that it needs to stop + k.stop <- struct{}{} +} - // Update the internal state - k.isOn = payload.State == "ON" - k.online = true +func NewKettle(info DeviceInfo, client paho.Client, s *google.Service) *kettle { + k := &kettle{Info: info, client: client, updated: make(chan bool, 1), timerLength: 5 * time.Minute, stop: make(chan interface{})} + k.timer = time.NewTimer(k.timerLength) + k.timer.Stop() - // Notify that the state has updated - for len(k.updated) > 0 { - <- k.updated - } - k.updated <- true + // Start function + go k.timerFunc() - // Notify google of the updated state - id := k.GetID() - s.ReportState(context.Background(), id, map[string]google.DeviceState{ - id: k.getState(), - }) - - if k.isOn { - timer.Reset(length) - } else { - timer.Stop() - } - }) + if token := k.client.Subscribe(fmt.Sprintf("zigbee2mqtt/%s", k.Info.FriendlyName), 1, k.stateHandler); token.Wait() && token.Error() != nil { + log.Println(token.Error()) + } return k } @@ -94,6 +114,8 @@ func (k *kettle) Sync() *google.Device { Name: name, } + // @TODO Fix reporting + // device.WillReportState = true device.WillReportState = true if len(name) > 1 { device.RoomHint = room @@ -105,8 +127,6 @@ func (k *kettle) Sync() *google.Device { SwVersion: k.Info.SoftwareBuildID, } - k.m.Publish(fmt.Sprintf("zigbee2mqtt/%s/get", k.Info.FriendlyName), 1, false, `{ "state": "" }`) - return device } @@ -128,17 +148,13 @@ func (k *kettle) Execute(execution google.Execution, updatedState *google.Device switch execution.Name { case google.CommandOnOff: - state := "OFF" - if execution.OnOff.On { - state = "ON" - } // Clear the updated channel for len(k.updated) > 0 { <- k.updated } - // Update the state - k.m.Publish(fmt.Sprintf("zigbee2mqtt/%s/set", k.Info.FriendlyName), 1, false, fmt.Sprintf(`{ "state": "%s" }`, state)) + + k.SetState(execution.OnOff.On) // Start timeout timer timer := time.NewTimer(time.Second) @@ -167,6 +183,13 @@ func (k *kettle) GetID() string { return k.Info.IEEEAdress } -func (k *kettle) TurnOff() { - k.m.Publish(fmt.Sprintf("zigbee2mqtt/%s/set", k.Info.FriendlyName), 1, false, fmt.Sprintf(`{ "state": "OFF" }`)) +func (k *kettle) SetState(state bool) { + msg := "OFF" + if state { + msg = "ON" + } + + if token := k.client.Publish(fmt.Sprintf("zigbee2mqtt/%s/set", k.Info.FriendlyName), 1, false, fmt.Sprintf(`{ "state": "%s" }`, msg)); token.Wait() && token.Error() != nil { + log.Println(token.Error()) + } } diff --git a/device/provider.go b/device/provider.go index 0b55134..3009a61 100644 --- a/device/provider.go +++ b/device/provider.go @@ -2,7 +2,6 @@ package device import ( "automation/integration/google" - "automation/integration/mqtt" "context" "encoding/base64" "encoding/json" @@ -26,7 +25,7 @@ type DeviceInfo struct { type DeviceInterface interface { google.DeviceInterface - TurnOff() + SetState(state bool) } type Provider struct { @@ -49,7 +48,31 @@ func (c *credentials) Decode(value string) error { return err } -func NewProvider(config Config, m *mqtt.MQTT) *Provider { +// Auto populate and update the device list +func (p *Provider) devicesHandler(client paho.Client, msg paho.Message) { + var devices []DeviceInfo + json.Unmarshal(msg.Payload(), &devices) + + log.Println("zigbee2mqtt devices:") + pretty.Logln(devices) + + // Remove all automatically added devices + p.devices = p.manualDevices + + for _, device := range devices { + switch device.Description { + case "Kettle": + kettle := NewKettle(device, client, p.Service) + p.devices[device.IEEEAdress] = kettle + log.Printf("Added Kettle (%s) %s\n", device.IEEEAdress, device.FriendlyName) + } + } + + // Send sync request + p.Service.RequestSync(context.Background(), p.userID) +} + +func NewProvider(config Config, client paho.Client) *Provider { provider := &Provider{userID: "Dreaded_X", devices: make(map[string]DeviceInterface), manualDevices: make(map[string]DeviceInterface)} homegraphService, err := homegraph.NewService(context.Background(), option.WithCredentialsJSON(config.Credentials)) @@ -59,29 +82,9 @@ func NewProvider(config Config, m *mqtt.MQTT) *Provider { provider.Service = google.NewService(provider, homegraphService) - // Auto populate and update the device list - m.AddHandler("zigbee2mqtt/bridge/devices", func(_ paho.Client, msg paho.Message) { - var devices []DeviceInfo - json.Unmarshal(msg.Payload(), &devices) - - log.Println("zigbee2mqtt devices:") - pretty.Logln(devices) - - // Remove all automatically added devices - provider.devices = provider.manualDevices - - for _, device := range devices { - switch device.Description { - case "Kettle": - kettle := NewKettle(device, m, provider.Service) - provider.devices[device.IEEEAdress] = kettle - log.Printf("Added Kettle (%s) %s\n", device.IEEEAdress, device.FriendlyName) - } - } - - // Send sync request - provider.Service.RequestSync(context.Background(), provider.userID) - }) + if token := client.Subscribe("zigbee2mqtt/bridge/devices", 1, provider.devicesHandler); token.Wait() && token.Error() != nil { + log.Println(token.Error()) + } return provider } @@ -150,6 +153,6 @@ func (p *Provider) Execute(_ context.Context, _ string, commands []google.Comman func (p *Provider) TurnAllOff() { for _, device := range p.devices { - device.TurnOff() + device.SetState(false) } } diff --git a/integration/mqtt/mqtt.go b/integration/mqtt/mqtt.go index 83b5038..e1e9414 100644 --- a/integration/mqtt/mqtt.go +++ b/integration/mqtt/mqtt.go @@ -2,59 +2,36 @@ package mqtt import ( "fmt" - "os" - "github.com/eclipse/paho.mqtt.golang" + paho "github.com/eclipse/paho.mqtt.golang" ) -type MQTT struct { - client mqtt.Client -} - // This is the default message handler, it just prints out the topic and message -var defaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { +var defaultHandler paho.MessageHandler = func(client paho.Client, msg paho.Message) { fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: %s\n", msg.Payload()) } -func Connect(config Config) MQTT { - opts := mqtt.NewClientOptions().AddBroker(fmt.Sprintf("%s:%s", config.Host, config.Port)) +func New(config Config) paho.Client { + opts := paho.NewClientOptions().AddBroker(fmt.Sprintf("%s:%s", config.Host, config.Port)) opts.SetClientID(config.ClientID) opts.SetDefaultPublishHandler(defaultHandler) opts.SetUsername(config.Username) opts.SetPassword(config.Password) opts.SetOrderMatters(false) - client := mqtt.NewClient(opts) + client := paho.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } - m := MQTT{client: client} - - return m + return client } -func (m *MQTT) Disconnect() { - if token := m.client.Unsubscribe("automation/presence/+"); token.Wait() && token.Error() != nil { +func Delete(m paho.Client) { + if token := m.Unsubscribe("automation/presence/+"); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) - os.Exit(1) } - m.client.Disconnect(250) -} - -func (m *MQTT) AddHandler(topic string, handler func(client mqtt.Client, msg mqtt.Message)) { - if token := m.client.Subscribe(topic, 0, handler); token.Wait() && token.Error() != nil { - fmt.Println(token.Error()) - os.Exit(1) - } -} - -func (m *MQTT) Publish(topic string, qos byte, retained bool, payload interface{}) { - if token := m.client.Publish(topic, qos, retained, payload); token.Wait() && token.Error() != nil { - fmt.Println(token.Error()) - // Do not exit here as it might break during production, just log the error - // os.Exit(1) - } + m.Disconnect(250) } diff --git a/integration/ntfy/ntfy.go b/integration/ntfy/ntfy.go index 0b75a18..aaf9f5d 100644 --- a/integration/ntfy/ntfy.go +++ b/integration/ntfy/ntfy.go @@ -6,11 +6,11 @@ import ( "strings" ) -type ntfy struct { +type Notify struct { presence string } -func (ntfy *ntfy) Presence(home bool) { +func (n *Notify) Presence(home bool) { // @TODO Maybe add list the devices that are home currently? var description string var actions string @@ -22,7 +22,7 @@ func (ntfy *ntfy) Presence(home bool) { actions = "broadcast, Set as home, extras.cmd=presence, extras.state=1, clear=true" } - req, err := http.NewRequest("POST", fmt.Sprintf("https://ntfy.sh/%s", ntfy.presence), strings.NewReader(description)) + req, err := http.NewRequest("POST", fmt.Sprintf("https://ntfy.sh/%s", n.presence), strings.NewReader(description)) if err != nil { panic(err) } @@ -35,8 +35,8 @@ func (ntfy *ntfy) Presence(home bool) { http.DefaultClient.Do(req) } -func Connect(config Config) ntfy { - ntfy := ntfy{presence: config.Presence} +func New(config Config) Notify { + ntfy := Notify{presence: config.Presence} // @TODO Make sure the topic is valid? diff --git a/main.go b/main.go index 6bba956..95d8063 100644 --- a/main.go +++ b/main.go @@ -1,14 +1,13 @@ package main import ( + "automation/connect" "automation/device" "automation/integration/hue" "automation/integration/kasa" "automation/integration/mqtt" "automation/integration/ntfy" "automation/presence" - "encoding/json" - "fmt" "log" "net/http" "os" @@ -17,8 +16,6 @@ import ( "github.com/joho/godotenv" "github.com/kelseyhightower/envconfig" "gopkg.in/yaml.v3" - - paho "github.com/eclipse/paho.mqtt.golang" ) type config struct { @@ -128,23 +125,25 @@ func main() { devices = make(map[string]interface{}) + var connect connect.Connect + // MQTT - m := mqtt.Connect(config.MQTT) - defer m.Disconnect() + connect.Client = mqtt.New(config.MQTT) + defer connect.Client.Disconnect(250) + + // ntfy.sh + connect.Notify = ntfy.New(config.NTFY) // Hue - h := hue.Connect(config.Hue) + connect.Hue = hue.Connect(config.Hue) // Kasa for name, ip := range config.Kasa.Outlets { devices[name] = kasa.New(ip) } - // ntfy.sh - // n := ntfy.Connect(config.NTFY) - // Devices that we control and expose to google home - provider := device.NewProvider(config.Google, &m) + provider := device.NewProvider(config.Google, connect.Client) r := mux.NewRouter() r.HandleFunc("/assistant", provider.Service.FullfillmentHandler) @@ -154,47 +153,8 @@ func main() { } // Presence - p := presence.New() - m.AddHandler("automation/presence/+", p.PresenceHandler) - - m.AddHandler("automation/presence", func(client paho.Client, msg paho.Message) { - if len(msg.Payload()) == 0 { - // In this case we clear the persistent message - return - } - var message presence.Message - err := json.Unmarshal(msg.Payload(), &message) - if err != nil { - log.Println(err) - return - } - - fmt.Printf("Presence: %t\n", message.State) - // Notify users of presence update - // n.Presence(present) - - // Set presence on the hue bridge - h.SetFlag(41, message.State) - - if !message.State { - // Turn off all the devices that we manage ourselves - provider.TurnAllOff() - - // Turn off all devices - // @TODO Maybe allow for exceptions, could be a list in the config that we check against? - for _, device := range devices { - switch d := device.(type) { - case kasa.Kasa: - d.SetState(false) - - } - } - - // @TODO Turn off nest thermostat - } else { - // @TODO Turn on the nest thermostat again - } - }) + p := presence.New(&connect) + defer p.Delete() addr := ":8090" srv := http.Server{ diff --git a/presence/presence.go b/presence/presence.go index 8ee2c61..f244452 100644 --- a/presence/presence.go +++ b/presence/presence.go @@ -1,18 +1,21 @@ package presence import ( + "automation/connect" "encoding/json" + "fmt" "log" "strings" "time" - mqtt "github.com/eclipse/paho.mqtt.golang" + paho "github.com/eclipse/paho.mqtt.golang" "github.com/kr/pretty" ) type Presence struct { + connect *connect.Connect devices map[string]bool - current *bool + presence bool } type Message struct { @@ -20,12 +23,7 @@ type Message struct { Updated int64 `json:"updated"` } -func New() Presence { - return Presence{devices: make(map[string]bool), current: nil} -} - -// Handler got automation/presence/+ -func (p *Presence) PresenceHandler(client mqtt.Client, msg mqtt.Message) { +func (p *Presence) devicePresenceHandler(client paho.Client, msg paho.Message) { name := strings.Split(msg.Topic(), "/")[2] if len(msg.Payload()) == 0 { @@ -52,8 +50,8 @@ func (p *Presence) PresenceHandler(client mqtt.Client, msg mqtt.Message) { log.Println(present) - if p.current == nil || *p.current != present { - p.current = &present + if p.presence != present { + p.presence = present msg, err := json.Marshal(Message{ State: present, @@ -70,3 +68,67 @@ func (p *Presence) PresenceHandler(client mqtt.Client, msg mqtt.Message) { } } } + +func (p *Presence) overallPresenceHandler(client paho.Client, msg paho.Message) { + if len(msg.Payload()) == 0 { + // In this case we clear the persistent message + return + } + var message Message + err := json.Unmarshal(msg.Payload(), &message) + if err != nil { + log.Println(err) + return + } + + fmt.Printf("Presence: %t\n", message.State) + // Notify users of presence update + p.connect.Notify.Presence(p.presence) + + // Set presence on the hue bridge + p.connect.Hue.SetFlag(41, message.State) + + if !message.State { + log.Println("Turn off all the devices") + // // Turn off all the devices that we manage ourselves + // provider.TurnAllOff() + + // // Turn off all devices + // // @TODO Maybe allow for exceptions, could be a list in the config that we check against? + // for _, device := range devices { + // switch d := device.(type) { + // case kasa.Kasa: + // d.SetState(false) + + // } + // } + + // @TODO Turn off nest thermostat + } else { + // @TODO Turn on the nest thermostat again + } +} + +func New(connect *connect.Connect) *Presence { + p := &Presence{connect: connect, devices: make(map[string]bool), presence: false} + + if token := connect.Client.Subscribe("automation/presence", 1, p.overallPresenceHandler); token.Wait() && token.Error() != nil { + log.Println(token.Error()) + } + + if token := connect.Client.Subscribe("automation/presence/+", 1, p.devicePresenceHandler); token.Wait() && token.Error() != nil { + log.Println(token.Error()) + } + + return p +} + +func (p *Presence) Delete() { + if token := p.connect.Client.Unsubscribe("automation/presence"); token.Wait() && token.Error() != nil { + log.Println(token.Error()) + } + + if token := p.connect.Client.Unsubscribe("automation/presence/+"); token.Wait() && token.Error() != nil { + log.Println(token.Error()) + } +}