diff --git a/app.go b/app.go index ec0c5ed..b63373c 100644 --- a/app.go +++ b/app.go @@ -7,6 +7,7 @@ import ( "time" "github.com/saml-dev/gome-assistant/internal/http" + pq "github.com/saml-dev/gome-assistant/internal/priorityqueue" ws "github.com/saml-dev/gome-assistant/internal/websocket" "nhooyr.io/websocket" ) @@ -20,15 +21,15 @@ type app struct { service *Service state *State - schedules []schedule + schedules pq.PriorityQueue entityListeners []entityListener } /* -App establishes the websocket connection and returns an object +NewApp establishes the websocket connection and returns an object you can use to register schedules and listeners. */ -func App(connString string) app { +func NewApp(connString string) app { token := os.Getenv("AUTH_TOKEN") conn, ctx, ctxCancel := ws.SetupConnection(connString) @@ -44,7 +45,7 @@ func App(connString string) app { httpClient: httpClient, service: service, state: state, - schedules: []schedule{}, + schedules: pq.New(), entityListeners: []entityListener{}, } } @@ -56,7 +57,6 @@ func (a *app) Cleanup() { } func (a *app) RegisterSchedule(s schedule) { - s.callback(a.service, a.state) if s.err != nil { log.Fatalln(s.err) // something wasn't configured properly when the schedule was built } @@ -80,10 +80,16 @@ func (a *app) RegisterSchedule(s schedule) { } s.realStartTime = startTime - a.schedules = append(a.schedules, s) + a.schedules.Insert(s, float64(startTime.Unix())) // TODO: this blows up because schedule can't be used as key for map in prio queue lib. Just copy/paste and tweak as needed } func (a *app) Start() { + + // schedules + if a.schedules.Len() != 0 { + go RunSchedules(a) + } + // NOTE:should the prio queue and websocket listener both write to a channel or something? // then select from that and spawn new goroutine to call callback? diff --git a/cmd/main/testing.go b/cmd/main/testing.go index 94cb97d..be9a762 100644 --- a/cmd/main/testing.go +++ b/cmd/main/testing.go @@ -8,16 +8,19 @@ import ( ) func main() { - app := ga.App("192.168.86.67:8123") + app := ga.NewApp("192.168.86.67:8123") defer app.Cleanup() - s := ga.ScheduleBuilder().Call(lightsOut).Daily().At(ga.TimeOfDay(23, 00)).Build() - s2 := ga.ScheduleBuilder().Call(lightsOut).Every(ga.Duration(04, 30)).Offset(ga.TimeOfDay(1, 0)).Build() + s := ga.ScheduleBuilder().Call(lightsOut).Every(time.Second * 5).Build() + s2 := ga.ScheduleBuilder().Call(cool).Every(time.Millisecond * 500).Build() + s3 := ga.ScheduleBuilder().Call(c).Every(time.Minute * 1).Build() + app.RegisterSchedule(s) app.RegisterSchedule(s2) + app.RegisterSchedule(s3) app.Start() simpleListener := ga.EntityListenerBuilder(). - EntityId("light.lights"). - Call(cool). + EntityIds("light.lights"). + Call(listenerCB). OnlyBetween(ga.TimeOfDay(22, 00), ga.TimeOfDay(07, 00)) log.Println(simpleListener) @@ -26,12 +29,20 @@ func main() { } func lightsOut(service *ga.Service, state *ga.State) { - service.InputDatetime.Set("input_datetime.garage_last_triggered_ts", time.Now()) + // service.InputDatetime.Set("input_datetime.garage_last_triggered_ts", time.Now()) // service.HomeAssistant.Toggle("group.living_room_lamps", map[string]any{"brightness_pct": 100}) // service.Light.Toggle("light.entryway_lamp", map[string]any{"brightness_pct": 100}) // service.Switch.Toggle("switch.espurna_sunroom_lamp") + log.Default().Println("A") } -func cool(service ga.Service, data ga.Data) { - service.Light.TurnOn("light.entryway_lamp") +func cool(service *ga.Service, state *ga.State) { + // service.Light.TurnOn("light.entryway_lamp") + log.Default().Println("B") } + +func c(service *ga.Service, state *ga.State) { + log.Default().Println("C") +} + +func listenerCB(service *ga.Service, data *ga.Data) {} diff --git a/entitylistener.go b/entitylistener.go index 603e24a..fab9ea3 100644 --- a/entitylistener.go +++ b/entitylistener.go @@ -1,17 +1,21 @@ package gomeassistant -import "time" +import ( + "errors" + "time" +) type entityListener struct { - entityId string + entityIds []string callback entityListenerCallback fromState string toState string betweenStart time.Duration betweenEnd time.Duration + err error } -type entityListenerCallback func(Service, Data) +type entityListenerCallback func(*Service, *Data) type Data struct{} @@ -41,8 +45,12 @@ type elBuilder1 struct { entityListener } -func (b elBuilder1) EntityId(eid string) elBuilder2 { - b.entityListener.entityId = eid +func (b elBuilder1) EntityIds(entityIds ...string) elBuilder2 { + if len(entityIds) == 0 { + b.err = errors.New("must pass at least one entityId to EntityIds()") + } else { + b.entityListener.entityIds = entityIds + } return elBuilder2(b) } @@ -51,7 +59,9 @@ type elBuilder2 struct { } func (b elBuilder2) Call(callback entityListenerCallback) elBuilder3 { - b.entityListener.callback = callback + if b.err == nil { + b.entityListener.callback = callback + } return elBuilder3(b) } diff --git a/go.mod b/go.mod index 587e0e6..b875865 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,4 @@ go 1.19 require nhooyr.io/websocket v1.8.7 -require ( - github.com/golang-module/carbon/v2 v2.1.9 // indirect - github.com/klauspost/compress v1.10.3 // indirect -) +require github.com/klauspost/compress v1.10.3 // indirect diff --git a/go.sum b/go.sum index b3517c5..f36a934 100644 --- a/go.sum +++ b/go.sum @@ -17,19 +17,12 @@ github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= -github.com/golang-module/carbon/v2 v2.1.9 h1:OWkhYzTTPe+jPOUEL2JkvGwf6bKNQJoh4LVT1LUay80= -github.com/golang-module/carbon/v2 v2.1.9/go.mod h1:NF5unWf838+pyRY0o+qZdIwBMkFf7w0hmLIguLiEpzU= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= -github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/subcommands v1.0.1 h1:/eqq+otEXm5vhfBrbREPCSVQbvofip6kIz+mX5TUH7k= -github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= -github.com/google/wire v0.5.0 h1:I7ELFeVBr3yfPIcc8+MWvrjk+3VjbcSzoXm3JVa+jD8= -github.com/google/wire v0.5.0/go.mod h1:ngWDr9Qvq3yZA10YrxfyGELY/AFWGVpy9c1LTRi1EoU= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= @@ -44,33 +37,24 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20200116001909-b77594299b42 h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b h1:NVD8gBK33xpdqCaZVVtd6OFJp+3dxkXuz7+U7KaVN6s= -golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= diff --git a/internal/priorityqueue/priorityqueue.go b/internal/priorityqueue/priorityqueue.go new file mode 100644 index 0000000..b7d7c0b --- /dev/null +++ b/internal/priorityqueue/priorityqueue.go @@ -0,0 +1,87 @@ +package priorityqueue + +import ( + "container/heap" + "errors" +) + +// PriorityQueue represents the queue +type PriorityQueue struct { + itemHeap *itemHeap + lookup map[interface{}]*item +} + +// New initializes an empty priority queue. +func New() PriorityQueue { + return PriorityQueue{ + itemHeap: &itemHeap{}, + lookup: make(map[interface{}]*item), + } +} + +// Len returns the number of elements in the queue. +func (p *PriorityQueue) Len() int { + return p.itemHeap.Len() +} + +// Insert inserts a new element into the queue. No action is performed on duplicate elements. +func (p *PriorityQueue) Insert(v interface{ Hash() string }, priority float64) { + _, ok := p.lookup[v.Hash()] + if ok { + return + } + + newItem := &item{ + value: v, + priority: priority, + } + heap.Push(p.itemHeap, newItem) + p.lookup[v.Hash()] = newItem +} + +// Pop removes the element with the highest priority from the queue and returns it. +// In case of an empty queue, an error is returned. +func (p *PriorityQueue) Pop() (interface{}, error) { + if len(*p.itemHeap) == 0 { + return nil, errors.New("empty queue") + } + + item := heap.Pop(p.itemHeap).(*item) + delete(p.lookup, item.value.(interface{ Hash() string }).Hash()) + return item.value, nil +} + +type itemHeap []*item + +type item struct { + value interface{} + priority float64 + index int +} + +func (ih *itemHeap) Len() int { + return len(*ih) +} + +func (ih *itemHeap) Less(i, j int) bool { + return (*ih)[i].priority < (*ih)[j].priority +} + +func (ih *itemHeap) Swap(i, j int) { + (*ih)[i], (*ih)[j] = (*ih)[j], (*ih)[i] + (*ih)[i].index = i + (*ih)[j].index = j +} + +func (ih *itemHeap) Push(x interface{}) { + it := x.(*item) + it.index = len(*ih) + *ih = append(*ih, it) +} + +func (ih *itemHeap) Pop() interface{} { + old := *ih + item := old[len(old)-1] + *ih = old[0 : len(old)-1] + return item +} diff --git a/internal/websocket/reader.go b/internal/websocket/reader.go new file mode 100644 index 0000000..06e0173 --- /dev/null +++ b/internal/websocket/reader.go @@ -0,0 +1,16 @@ +package websocket + +import ( + "context" + + "nhooyr.io/websocket" +) + +type BaseMessage struct { + MsgType string `json:"type"` + Other any +} + +func ReadWebsocket(ws *websocket.Conn, ctx context.Context) { + +} diff --git a/schedule.go b/schedule.go index 4a6aff7..2a620ad 100644 --- a/schedule.go +++ b/schedule.go @@ -90,13 +90,17 @@ type schedule struct { */ offset time.Duration /* - This will be set rather than returning an error to avoid checking err for nil on every schedule :) + err will be set rather than returning an error to avoid checking err for nil on every schedule :) RegisterSchedule will exit if the error is set. */ err error realStartTime time.Time } +func (s schedule) Hash() string { + return fmt.Sprint(s.offset, s.frequency, s.callback) +} + type scheduleBuilder struct { schedule schedule } @@ -206,3 +210,33 @@ func convertTimeOfDayToActualOffset(t timeOfDay) time.Duration { } return TimeOfDay(0, int(mins)) } + +// app.Start() functions +func RunSchedules(a *app) { + for { + sched := popSchedule(a) + // log.Default().Println(sched.realStartTime) + + // run callback for all schedules before now in case they overlap + for sched.realStartTime.Before(time.Now()) { + go sched.callback(a.service, a.state) + requeueSchedule(a, sched) + + sched = popSchedule(a) + } + + time.Sleep(time.Until(sched.realStartTime)) + go sched.callback(a.service, a.state) + requeueSchedule(a, sched) + } +} + +func popSchedule(a *app) schedule { + _sched, _ := a.schedules.Pop() + return _sched.(schedule) +} + +func requeueSchedule(a *app, s schedule) { + s.realStartTime = s.realStartTime.Add(s.frequency) + a.schedules.Insert(s, float64(s.realStartTime.Unix())) +}