refactor: minor changes

This commit is contained in:
2025-08-01 18:25:39 -05:00
parent d51f6d5946
commit 102a4e7438
2 changed files with 47 additions and 50 deletions

96
app.go
View File

@@ -67,11 +67,9 @@ func validateHomeZone(state State, entityID string) error {
// Verify it has latitude and longitude // Verify it has latitude and longitude
if entity.Attributes == nil { if entity.Attributes == nil {
return fmt.Errorf("home zone entity '%s' has no attributes", entityID) return fmt.Errorf("home zone entity '%s' has no attributes", entityID)
} } else if entity.Attributes["latitude"] == nil {
if entity.Attributes["latitude"] == nil {
return fmt.Errorf("home zone entity '%s' missing latitude attribute", entityID) return fmt.Errorf("home zone entity '%s' missing latitude attribute", entityID)
} } else if entity.Attributes["longitude"] == nil {
if entity.Attributes["longitude"] == nil {
return fmt.Errorf("home zone entity '%s' missing longitude attribute", entityID) return fmt.Errorf("home zone entity '%s' missing longitude attribute", entityID)
} }
@@ -140,27 +138,27 @@ func NewApp(request types.NewAppRequest) (*App, error) {
}, nil }, nil
} }
func (a *App) Cleanup() { func (app *App) Cleanup() {
if a.ctxCancel != nil { if app.ctxCancel != nil {
a.ctxCancel() app.ctxCancel()
} }
} }
// Close performs a clean shutdown of the application. // Close performs a clean shutdown of the application.
// It cancels the context, closes the websocket connection, // It cancels the context, closes the websocket connection,
// and ensures all background processes are properly terminated. // and ensures all background processes are properly terminated.
func (a *App) Close() error { func (app *App) Close() error {
// Close websocket connection if it exists // Close websocket connection if it exists
if a.conn != nil { if app.conn != nil {
deadline := time.Now().Add(10 * time.Second) deadline := time.Now().Add(10 * time.Second)
err := a.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), deadline) err := app.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), deadline)
if err != nil { if err != nil {
slog.Warn("Error writing close message", "error", err) slog.Warn("Error writing close message", "error", err)
return err return err
} }
// Close the websocket connection // Close the websocket connection
err = a.conn.Close() err = app.conn.Close()
if err != nil { if err != nil {
slog.Warn("Error closing websocket connection", "error", err) slog.Warn("Error closing websocket connection", "error", err)
return err return err
@@ -171,8 +169,8 @@ func (a *App) Close() error {
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
// Cancel context to signal all goroutines to stop // Cancel context to signal all goroutines to stop
if a.ctxCancel != nil { if app.ctxCancel != nil {
a.ctxCancel() app.ctxCancel()
} }
// Wait a short time for goroutines to finish // Wait a short time for goroutines to finish
@@ -182,12 +180,12 @@ func (a *App) Close() error {
return nil return nil
} }
func (a *App) RegisterSchedules(schedules ...DailySchedule) { func (app *App) RegisterSchedules(schedules ...DailySchedule) {
for _, s := range schedules { for _, s := range schedules {
// realStartTime already set for sunset/sunrise // realStartTime already set for sunset/sunrise
if s.isSunrise || s.isSunset { if s.isSunrise || s.isSunset {
s.nextRunTime = getNextSunRiseOrSet(a, s.isSunrise, s.sunOffset).Carbon2Time() s.nextRunTime = getNextSunRiseOrSet(app, s.isSunrise, s.sunOffset).Carbon2Time()
a.schedules.Put() app.schedules.Put()
continue continue
} }
@@ -200,14 +198,14 @@ func (a *App) RegisterSchedules(schedules ...DailySchedule) {
} }
s.nextRunTime = startTime.Carbon2Time() s.nextRunTime = startTime.Carbon2Time()
a.schedules.Put(Item{ app.schedules.Put(Item{
Value: s, Value: s,
Priority: float64(startTime.Carbon2Time().Unix()), Priority: float64(startTime.Carbon2Time().Unix()),
}) })
} }
} }
func (a *App) RegisterIntervals(intervals ...Interval) { func (app *App) RegisterIntervals(intervals ...Interval) {
for _, i := range intervals { for _, i := range intervals {
if i.frequency == 0 { if i.frequency == 0 {
slog.Error("A schedule must use either set frequency via Every()") slog.Error("A schedule must use either set frequency via Every()")
@@ -219,14 +217,14 @@ func (a *App) RegisterIntervals(intervals ...Interval) {
for i.nextRunTime.Before(now) { for i.nextRunTime.Before(now) {
i.nextRunTime = i.nextRunTime.Add(i.frequency) i.nextRunTime = i.nextRunTime.Add(i.frequency)
} }
a.intervals.Put(Item{ app.intervals.Put(Item{
Value: i, Value: i,
Priority: float64(i.nextRunTime.Unix()), Priority: float64(i.nextRunTime.Unix()),
}) })
} }
} }
func (a *App) RegisterEntityListeners(etls ...EntityListener) { func (app *App) RegisterEntityListeners(etls ...EntityListener) {
for _, etl := range etls { for _, etl := range etls {
etl := etl etl := etl
if etl.delay != 0 && etl.toState == "" { if etl.delay != 0 && etl.toState == "" {
@@ -235,24 +233,24 @@ func (a *App) RegisterEntityListeners(etls ...EntityListener) {
} }
for _, entity := range etl.entityIds { for _, entity := range etl.entityIds {
if elList, ok := a.entityListeners[entity]; ok { if elList, ok := app.entityListeners[entity]; ok {
a.entityListeners[entity] = append(elList, &etl) app.entityListeners[entity] = append(elList, &etl)
} else { } else {
a.entityListeners[entity] = []*EntityListener{&etl} app.entityListeners[entity] = []*EntityListener{&etl}
} }
} }
} }
} }
func (a *App) RegisterEventListeners(evls ...EventListener) { func (app *App) RegisterEventListeners(evls ...EventListener) {
for _, evl := range evls { for _, evl := range evls {
evl := evl evl := evl
for _, eventType := range evl.eventTypes { for _, eventType := range evl.eventTypes {
if elList, ok := a.eventListeners[eventType]; ok { if elList, ok := app.eventListeners[eventType]; ok {
a.eventListeners[eventType] = append(elList, &evl) app.eventListeners[eventType] = append(elList, &evl)
} else { } else {
ws.SubscribeToEventType(eventType, a.wsWriter, a.ctx) ws.SubscribeToEventType(eventType, app.wsWriter, app.ctx)
a.eventListeners[eventType] = []*EventListener{&evl} app.eventListeners[eventType] = []*EventListener{&evl}
} }
} }
} }
@@ -301,32 +299,32 @@ func getNextSunRiseOrSet(a *App, sunrise bool, offset ...types.DurationString) c
return sunriseOrSunset return sunriseOrSunset
} }
func (a *App) Start() { func (app *App) Start() {
slog.Info("Starting", "schedules", a.schedules.Len()) slog.Info("Starting", "schedules", app.schedules.Len())
slog.Info("Starting", "entity listeners", len(a.entityListeners)) slog.Info("Starting", "entity listeners", len(app.entityListeners))
slog.Info("Starting", "event listeners", len(a.eventListeners)) slog.Info("Starting", "event listeners", len(app.eventListeners))
go runSchedules(a) go runSchedules(app)
go runIntervals(a) go runIntervals(app)
// subscribe to state_changed events // subscribe to state_changed events
id := internal.NextId() id := internal.NextId()
ws.SubscribeToStateChangedEvents(id, a.wsWriter, a.ctx) ws.SubscribeToStateChangedEvents(id, app.wsWriter, app.ctx)
a.entityListenersId = id app.entityListenersId = id
// entity listeners runOnStartup // Run entity listeners startup
for eid, etls := range a.entityListeners { for eid, etls := range app.entityListeners {
for _, etl := range etls { for _, etl := range etls {
// ensure each ETL only runs once, even if // ensure each ETL only runs once, even if
// it listens to multiple entities // it listens to multiple entities
if etl.runOnStartup && !etl.runOnStartupCompleted { if etl.runOnStartup && !etl.runOnStartupCompleted {
entityState, err := a.state.Get(eid) entityState, err := app.state.Get(eid)
if err != nil { if err != nil {
slog.Warn("Failed to get entity state \"", eid, "\" during startup, skipping RunOnStartup") slog.Warn("Failed to get entity state \"", eid, "\" during startup, skipping RunOnStartup")
} }
etl.runOnStartupCompleted = true etl.runOnStartupCompleted = true
go etl.callback(a.service, a.state, EntityData{ go etl.callback(app.service, app.state, EntityData{
TriggerEntityId: eid, TriggerEntityId: eid,
FromState: entityState.State, FromState: entityState.State,
FromAttributes: entityState.Attributes, FromAttributes: entityState.Attributes,
@@ -340,7 +338,7 @@ func (a *App) Start() {
// entity listeners and event listeners // entity listeners and event listeners
elChan := make(chan ws.ChanMsg, 100) // Add buffer to prevent channel overflow elChan := make(chan ws.ChanMsg, 100) // Add buffer to prevent channel overflow
go ws.ListenWebsocket(a.conn, elChan) go ws.ListenWebsocket(app.conn, elChan)
for { for {
select { select {
@@ -349,22 +347,22 @@ func (a *App) Start() {
slog.Info("Websocket channel closed, stopping main loop") slog.Info("Websocket channel closed, stopping main loop")
return return
} }
if a.entityListenersId == msg.Id { if app.entityListenersId == msg.Id {
go callEntityListeners(a, msg.Raw) go callEntityListeners(app, msg.Raw)
} else { } else {
go callEventListeners(a, msg) go callEventListeners(app, msg)
} }
case <-a.ctx.Done(): case <-app.ctx.Done():
slog.Info("Context cancelled, stopping main loop") slog.Info("Context cancelled, stopping main loop")
return return
} }
} }
} }
func (a *App) GetService() *Service { func (app *App) GetService() *Service {
return a.service return app.service
} }
func (a *App) GetState() State { func (app *App) GetState() State {
return a.state return app.state
} }

View File

@@ -192,7 +192,6 @@ func (b elBuilder3) Build() EntityListener {
return b.entityListener return b.entityListener
} }
/* Functions */
func callEntityListeners(app *App, msgBytes []byte) { func callEntityListeners(app *App, msgBytes []byte) {
msg := stateChangedMsg{} msg := stateChangedMsg{}
_ = json.Unmarshal(msgBytes, &msg) _ = json.Unmarshal(msgBytes, &msg)