From e608843b091e9d00611b96a97407eacdbcbbfc48 Mon Sep 17 00:00:00 2001 From: Xevion Date: Thu, 31 Jul 2025 14:54:40 -0500 Subject: [PATCH] feat: add Close() method to app, context aware channels --- app.go | 53 ++++++++++++++++++++++++++++++------ internal/websocket/reader.go | 11 +++++++- interval.go | 18 +++++++++++- schedule.go | 18 +++++++++++- 4 files changed, 89 insertions(+), 11 deletions(-) diff --git a/app.go b/app.go index 7ac2132..cb28d01 100644 --- a/app.go +++ b/app.go @@ -191,6 +191,37 @@ func (a *App) Cleanup() { } } +// Close performs a clean shutdown of the application. +// It cancels the context, closes the websocket connection, +// and ensures all background processes are properly terminated. +func (a *App) Close() error { + // Cancel context to signal all goroutines to stop + if a.ctxCancel != nil { + a.ctxCancel() + } + + // Close websocket connection if it exists + if a.conn != nil { + // Send close message to Home Assistant + closeMsg := map[string]string{ + "type": "close", + } + _ = a.conn.WriteJSON(closeMsg) + + // Close the websocket connection + err := a.conn.Close() + if err != nil { + slog.Warn("Error closing websocket connection", "error", err) + } + } + + // Wait a short time for goroutines to finish + // This allows for graceful shutdown of background processes + time.Sleep(100 * time.Millisecond) + + return nil +} + func (a *App) RegisterSchedules(schedules ...DailySchedule) { for _, s := range schedules { // realStartTime already set for sunset/sunrise @@ -346,14 +377,20 @@ func (a *App) Start() { go ws.ListenWebsocket(a.conn, elChan) for { - msg, ok := <-elChan - if !ok { - break - } - if a.entityListenersId == msg.Id { - go callEntityListeners(a, msg.Raw) - } else { - go callEventListeners(a, msg) + select { + case msg, ok := <-elChan: + if !ok { + slog.Info("Websocket channel closed, stopping main loop") + return + } + if a.entityListenersId == msg.Id { + go callEntityListeners(a, msg.Raw) + } else { + go callEventListeners(a, msg) + } + case <-a.ctx.Done(): + slog.Info("Context cancelled, stopping main loop") + return } } } diff --git a/internal/websocket/reader.go b/internal/websocket/reader.go index 9685b32..c15e138 100644 --- a/internal/websocket/reader.go +++ b/internal/websocket/reader.go @@ -44,6 +44,15 @@ func ListenWebsocket(conn *websocket.Conn, c chan ChanMsg) { Raw: bytes, } - c <- chanMsg + // Use non-blocking send to avoid hanging on closed channel + select { + case c <- chanMsg: + // Message sent successfully + default: + // Channel is full or closed, break out of loop + slog.Warn("Websocket message channel is full or closed, stopping listener") + close(c) + return + } } } diff --git a/interval.go b/interval.go index 3d525d9..de0d80e 100644 --- a/interval.go +++ b/interval.go @@ -2,6 +2,7 @@ package gomeassistant import ( "fmt" + "log/slog" "time" "saml.dev/gome-assistant/internal" @@ -151,6 +152,13 @@ func runIntervals(a *App) { } for { + select { + case <-a.ctx.Done(): + slog.Info("Intervals goroutine shutting down") + return + default: + } + i := popInterval(a) // run callback for all intervals before now in case they overlap @@ -161,7 +169,15 @@ func runIntervals(a *App) { i = popInterval(a) } - time.Sleep(time.Until(i.nextRunTime)) + // Use context-aware sleep + select { + case <-time.After(time.Until(i.nextRunTime)): + // Time elapsed, continue + case <-a.ctx.Done(): + slog.Info("Intervals goroutine shutting down") + return + } + i.maybeRunCallback(a) requeueInterval(a, i) } diff --git a/schedule.go b/schedule.go index 77a26d5..f5f5bf7 100644 --- a/schedule.go +++ b/schedule.go @@ -158,6 +158,13 @@ func runSchedules(a *App) { } for { + select { + case <-a.ctx.Done(): + slog.Info("Schedules goroutine shutting down") + return + default: + } + sched := popSchedule(a) // run callback for all schedules before now in case they overlap @@ -169,7 +176,16 @@ func runSchedules(a *App) { } slog.Info("Next schedule", "start_time", sched.nextRunTime) - time.Sleep(time.Until(sched.nextRunTime)) + + // Use context-aware sleep + select { + case <-time.After(time.Until(sched.nextRunTime)): + // Time elapsed, continue + case <-a.ctx.Done(): + slog.Info("Schedules goroutine shutting down") + return + } + sched.maybeRunCallback(a) requeueSchedule(a, sched) }