added Throttle to entityListener

This commit is contained in:
Sam Lewis
2022-10-16 23:41:21 -04:00
parent 7ed5cbcf94
commit 2c5e68903a
4 changed files with 41 additions and 64 deletions

36
app.go
View File

@@ -25,10 +25,12 @@ type app struct {
state *State state *State
schedules pq.PriorityQueue schedules pq.PriorityQueue
entityListeners map[string][]entityListener entityListeners map[string][]*entityListener
entityListenersId int64 entityListenersId int64
} }
type TimeString string
/* /*
NewApp establishes the websocket connection and returns an object NewApp establishes the websocket connection and returns an object
you can use to register schedules and listeners. you can use to register schedules and listeners.
@@ -50,7 +52,7 @@ func NewApp(connString string) app {
service: service, service: service,
state: state, state: state,
schedules: pq.New(), schedules: pq.New(),
entityListeners: map[string][]entityListener{}, entityListeners: map[string][]*entityListener{},
} }
} }
@@ -85,9 +87,9 @@ func (a *app) RegisterSchedule(s schedule) {
func (a *app) RegisterEntityListener(el entityListener) { func (a *app) RegisterEntityListener(el entityListener) {
for _, entity := range el.entityIds { for _, entity := range el.entityIds {
if elList, ok := a.entityListeners[entity]; ok { if elList, ok := a.entityListeners[entity]; ok {
a.entityListeners[entity] = append(elList, el) a.entityListeners[entity] = append(elList, &el)
} else { } else {
a.entityListeners[entity] = []entityListener{el} a.entityListeners[entity] = []*entityListener{&el}
} }
} }
} }
@@ -95,32 +97,34 @@ func (a *app) RegisterEntityListener(el entityListener) {
// Sunrise take an optional string that is passed to time.ParseDuration. // Sunrise take an optional string that is passed to time.ParseDuration.
// Examples include "-1.5h", "30m", etc. See https://pkg.go.dev/time#ParseDuration // Examples include "-1.5h", "30m", etc. See https://pkg.go.dev/time#ParseDuration
// for full list. // for full list.
func (a *app) Sunrise(offset ...string) string { func (a *app) Sunrise(offset ...TimeString) string {
return getSunriseSunset(a, true, offset) return getSunriseSunset(a, true, offset)
} }
// Sunset take an optional string that is passed to time.ParseDuration. // Sunset take an optional string that is passed to time.ParseDuration.
// Examples include "-1.5h", "30m", etc. See https://pkg.go.dev/time#ParseDuration // Examples include "-1.5h", "30m", etc. See https://pkg.go.dev/time#ParseDuration
// for full list. // for full list.
func (a *app) Sunset(offset ...string) string { func (a *app) Sunset(offset ...TimeString) string {
return getSunriseSunset(a, false, offset) return getSunriseSunset(a, false, offset)
} }
func getSunriseSunset(a *app, sunrise bool, offset []string) string { func getSunriseSunset(a *app, sunrise bool, offset []TimeString) string {
printString := "Sunset" printString := "Sunset"
attrKey := "next_setting" attrKey := "next_setting"
if sunrise { if sunrise {
printString = "Sunrise" printString = "Sunrise"
attrKey = "next_rising" attrKey = "next_rising"
} }
var t time.Duration var t time.Duration
var err error var err error
if len(offset) == 1 { if len(offset) == 1 {
t, err = time.ParseDuration(offset[0]) t, err = time.ParseDuration(string(offset[0]))
if err != nil { if err != nil {
log.Fatalf("Could not parse offset passed to %s: \"%s\"", printString, offset[0]) log.Fatalf("Could not parse offset passed to %s: \"%s\"", printString, offset[0])
} }
} }
// get next sunrise/sunset time from HA // get next sunrise/sunset time from HA
state, err := a.state.Get("sun.sun") state, err := a.state.Get("sun.sun")
if err != nil { if err != nil {
@@ -172,20 +176,4 @@ func (a *app) Start() {
go callEntityListeners(a, msg.Raw) go callEntityListeners(a, msg.Raw)
} }
} }
// NOTE:should the prio queue and websocket listener both write to a channel or something?
// then select from that and spawn new goroutine to call callback?
// TODO: loop through schedules and create heap priority queue
// TODO: figure out looping listening to messages for
// listeners
} }
const (
FrequencyMissing time.Duration = 0
Daily time.Duration = time.Hour * 24
Hourly time.Duration = time.Hour
Minutely time.Duration = time.Minute
)

View File

@@ -3,6 +3,7 @@ package gomeassistant
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"log"
"time" "time"
"github.com/golang-module/carbon" "github.com/golang-module/carbon"
@@ -16,12 +17,13 @@ type entityListener struct {
toState string toState string
betweenStart string betweenStart string
betweenEnd string betweenEnd string
throttle time.Duration
lastRan carbon.Carbon
err error err error
} }
type entityListenerCallback func(*Service, EntityData) type entityListenerCallback func(*Service, EntityData)
// TODO: use this to flatten json sent from HA for trigger event
type EntityData struct { type EntityData struct {
TriggerEntityId string TriggerEntityId string
FromState string FromState string
@@ -52,43 +54,12 @@ type msgState struct {
Attributes map[string]any `json:"attributes"` Attributes map[string]any `json:"attributes"`
} }
type triggerMsg struct {
Id int64 `json:"id"`
Type string `json:"type"`
Event struct {
Variables struct {
Trigger struct {
EntityId string `json:"entity_id"`
FromState triggerMsgState `json:"from_state"`
ToState triggerMsgState `json:"to_state"`
}
} `json:"variables"`
} `json:"event"`
}
type triggerMsgState struct {
State string `json:"state"`
Attributes map[string]any `json:"attributes"`
LastChanged string `json:"last_changed"`
}
type subscribeMsg struct {
Id int64 `json:"id"`
Type string `json:"type"`
Trigger subscribeMsgTrigger `json:"trigger"`
}
type subscribeMsgTrigger struct {
Platform string `json:"platform"`
EntityId string `json:"entity_id"`
From string `json:"from"`
To string `json:"to"`
}
/* Methods */ /* Methods */
func EntityListenerBuilder() elBuilder1 { func EntityListenerBuilder() elBuilder1 {
return elBuilder1{entityListener{}} return elBuilder1{entityListener{
lastRan: carbon.Now().StartOfCentury(),
}}
} }
type elBuilder1 struct { type elBuilder1 struct {
@@ -145,6 +116,15 @@ func (b elBuilder3) ToState(s string) elBuilder3 {
return b return b
} }
func (b elBuilder3) Throttle(s TimeString) elBuilder3 {
d, err := time.ParseDuration(string(s))
if err != nil {
log.Fatalf("Couldn't parse string duration passed to Throttle(): \"%s\" see https://pkg.go.dev/time#ParseDuration for valid time units", s)
}
b.entityListener.throttle = d
return b
}
func (b elBuilder3) Build() entityListener { func (b elBuilder3) Build() entityListener {
return b.entityListener return b.entityListener
} }
@@ -197,6 +177,13 @@ func callEntityListeners(app *app, msgBytes []byte) {
return return
} }
// don't run callback if Throttle is set and that duration hasn't passed since lastRan
if l.throttle.Seconds() > 0 && // throttle is set
!l.lastRan.Eq(carbon.Now().StartOfCentury()) && // lastRan is set aka this callback has been called since starting gomeassistant
l.lastRan.DiffAbsInSeconds(carbon.Now()) < int64(l.throttle.Seconds()) { // it's been less than <throttle> seconds since it last ran
return
}
entityData := EntityData{ entityData := EntityData{
TriggerEntityId: eid, TriggerEntityId: eid,
FromState: data.OldState.State, FromState: data.OldState.State,
@@ -205,6 +192,7 @@ func callEntityListeners(app *app, msgBytes []byte) {
ToAttributes: data.NewState.Attributes, ToAttributes: data.NewState.Attributes,
LastChanged: data.OldState.LastChanged, LastChanged: data.OldState.LastChanged,
} }
l.callback(app.service, entityData) go l.callback(app.service, entityData)
l.lastRan = carbon.Now()
} }
} }

View File

@@ -19,6 +19,7 @@ func main() {
EntityIds("group.office_ceiling_lights"). EntityIds("group.office_ceiling_lights").
Call(listenerCB). Call(listenerCB).
OnlyAfter("23:03"). OnlyAfter("23:03").
// Throttle("5s").
Build() Build()
app.RegisterEntityListener(simpleListener) app.RegisterEntityListener(simpleListener)

View File

@@ -107,8 +107,8 @@ func (sb scheduleBuilderDaily) At(s string) scheduleBuilderEnd {
return scheduleBuilderEnd(sb) return scheduleBuilderEnd(sb)
} }
func (sb scheduleBuilderCall) Every(s string) scheduleBuilderCustom { func (sb scheduleBuilderCall) Every(s TimeString) scheduleBuilderCustom {
d, err := time.ParseDuration(s) d, err := time.ParseDuration(string(s))
if err != nil { if err != nil {
log.Fatalf("couldn't parse string duration passed to Every(): \"%s\" see https://pkg.go.dev/time#ParseDuration for valid time units", s) log.Fatalf("couldn't parse string duration passed to Every(): \"%s\" see https://pkg.go.dev/time#ParseDuration for valid time units", s)
} }
@@ -116,8 +116,8 @@ func (sb scheduleBuilderCall) Every(s string) scheduleBuilderCustom {
return scheduleBuilderCustom(sb) return scheduleBuilderCustom(sb)
} }
func (sb scheduleBuilderCustom) Offset(s string) scheduleBuilderEnd { func (sb scheduleBuilderCustom) Offset(s TimeString) scheduleBuilderEnd {
t, err := time.ParseDuration(s) t, err := time.ParseDuration(string(s))
if err != nil { if err != nil {
log.Fatalf("Couldn't parse string duration passed to Offset(): \"%s\" see https://pkg.go.dev/time#ParseDuration for valid time units", s) log.Fatalf("Couldn't parse string duration passed to Offset(): \"%s\" see https://pkg.go.dev/time#ParseDuration for valid time units", s)
} }