feat: add Close() method to app, context aware channels

This commit is contained in:
2025-07-31 14:54:40 -05:00
parent a25c550648
commit e608843b09
4 changed files with 89 additions and 11 deletions

41
app.go
View File

@@ -191,6 +191,37 @@ func (a *App) Cleanup() {
} }
} }
// Close performs a clean shutdown of the application.
// It cancels the context, closes the websocket connection,
// and ensures all background processes are properly terminated.
func (a *App) Close() error {
// Cancel context to signal all goroutines to stop
if a.ctxCancel != nil {
a.ctxCancel()
}
// Close websocket connection if it exists
if a.conn != nil {
// Send close message to Home Assistant
closeMsg := map[string]string{
"type": "close",
}
_ = a.conn.WriteJSON(closeMsg)
// Close the websocket connection
err := a.conn.Close()
if err != nil {
slog.Warn("Error closing websocket connection", "error", err)
}
}
// Wait a short time for goroutines to finish
// This allows for graceful shutdown of background processes
time.Sleep(100 * time.Millisecond)
return nil
}
func (a *App) RegisterSchedules(schedules ...DailySchedule) { func (a *App) RegisterSchedules(schedules ...DailySchedule) {
for _, s := range schedules { for _, s := range schedules {
// realStartTime already set for sunset/sunrise // realStartTime already set for sunset/sunrise
@@ -346,15 +377,21 @@ func (a *App) Start() {
go ws.ListenWebsocket(a.conn, elChan) go ws.ListenWebsocket(a.conn, elChan)
for { for {
msg, ok := <-elChan select {
case msg, ok := <-elChan:
if !ok { if !ok {
break slog.Info("Websocket channel closed, stopping main loop")
return
} }
if a.entityListenersId == msg.Id { if a.entityListenersId == msg.Id {
go callEntityListeners(a, msg.Raw) go callEntityListeners(a, msg.Raw)
} else { } else {
go callEventListeners(a, msg) go callEventListeners(a, msg)
} }
case <-a.ctx.Done():
slog.Info("Context cancelled, stopping main loop")
return
}
} }
} }

View File

@@ -44,6 +44,15 @@ func ListenWebsocket(conn *websocket.Conn, c chan ChanMsg) {
Raw: bytes, Raw: bytes,
} }
c <- chanMsg // Use non-blocking send to avoid hanging on closed channel
select {
case c <- chanMsg:
// Message sent successfully
default:
// Channel is full or closed, break out of loop
slog.Warn("Websocket message channel is full or closed, stopping listener")
close(c)
return
}
} }
} }

View File

@@ -2,6 +2,7 @@ package gomeassistant
import ( import (
"fmt" "fmt"
"log/slog"
"time" "time"
"saml.dev/gome-assistant/internal" "saml.dev/gome-assistant/internal"
@@ -151,6 +152,13 @@ func runIntervals(a *App) {
} }
for { for {
select {
case <-a.ctx.Done():
slog.Info("Intervals goroutine shutting down")
return
default:
}
i := popInterval(a) i := popInterval(a)
// run callback for all intervals before now in case they overlap // run callback for all intervals before now in case they overlap
@@ -161,7 +169,15 @@ func runIntervals(a *App) {
i = popInterval(a) i = popInterval(a)
} }
time.Sleep(time.Until(i.nextRunTime)) // Use context-aware sleep
select {
case <-time.After(time.Until(i.nextRunTime)):
// Time elapsed, continue
case <-a.ctx.Done():
slog.Info("Intervals goroutine shutting down")
return
}
i.maybeRunCallback(a) i.maybeRunCallback(a)
requeueInterval(a, i) requeueInterval(a, i)
} }

View File

@@ -158,6 +158,13 @@ func runSchedules(a *App) {
} }
for { for {
select {
case <-a.ctx.Done():
slog.Info("Schedules goroutine shutting down")
return
default:
}
sched := popSchedule(a) sched := popSchedule(a)
// run callback for all schedules before now in case they overlap // run callback for all schedules before now in case they overlap
@@ -169,7 +176,16 @@ func runSchedules(a *App) {
} }
slog.Info("Next schedule", "start_time", sched.nextRunTime) slog.Info("Next schedule", "start_time", sched.nextRunTime)
time.Sleep(time.Until(sched.nextRunTime))
// Use context-aware sleep
select {
case <-time.After(time.Until(sched.nextRunTime)):
// Time elapsed, continue
case <-a.ctx.Done():
slog.Info("Schedules goroutine shutting down")
return
}
sched.maybeRunCallback(a) sched.maybeRunCallback(a)
requeueSchedule(a, sched) requeueSchedule(a, sched)
} }