From db9bcb06dc06fa5f98f33e49b4530dc850a8709e Mon Sep 17 00:00:00 2001 From: Xevion Date: Tue, 17 Dec 2024 03:15:36 -0600 Subject: [PATCH] redis, finish latency testing logic, self ip acquisition --- api/latency.go | 128 +++++++++++++++++++++++++++++++++++++++++-------- app.go | 21 ++++++-- go.mod | 8 ++-- 3 files changed, 131 insertions(+), 26 deletions(-) diff --git a/api/latency.go b/api/latency.go index 4dbbbb6..99147c5 100644 --- a/api/latency.go +++ b/api/latency.go @@ -1,8 +1,13 @@ package api import ( + "context" "errors" + "fmt" + "io" + "math/rand" "net" + "net/http" "time" probing "github.com/prometheus-community/pro-bing" @@ -20,23 +25,19 @@ type LatencyQueue struct { stopChannel chan bool logger *zap.SugaredLogger redis *redis.Client - pinger *probing.Pinger + ipTicker *time.Ticker + ipSelf net.IP handlerChannel chan<- PingResult } -func NewLatencyQueue() *LatencyQueue { +func NewLatencyQueue(redis *redis.Client) *LatencyQueue { logger, _ := zap.NewDevelopment() - pinger, err := probing.NewPinger("127.0.0.1") - if err != nil { - logger.Fatal("Failed to create pinger") - } - pinger.Count = 1 - pinger.Interval = time.Millisecond * 850 + return &LatencyQueue{ processChannel: make(chan LatencyRequest, 1024), logger: logger.Sugar(), - redis: redis.NewClient(&redis.Options{}), - pinger: pinger, + redis: redis, + ipTicker: time.NewTicker(time.Minute * 5), } } @@ -45,8 +46,9 @@ type PingRequest struct { } type PingResult struct { - Ip net.IPAddr - Latency int64 + Ip net.IPAddr + Latency time.Duration + Successful bool } func (l *LatencyQueue) QueuePing(ip string) error { @@ -58,7 +60,7 @@ func (l *LatencyQueue) QueuePing(ip string) error { // Create the request request := LatencyRequest{ - RequestTime: time.Now().Unix(), + RequestTime: time.Now().UnixMilli(), Ip: net.IPAddr{IP: parsedIp}, } @@ -68,27 +70,110 @@ func (l *LatencyQueue) QueuePing(ip string) error { return nil } -func (l *LatencyQueue) Start() { +func (l *LatencyQueue) SetHandler(handler chan<- PingResult) { + l.handlerChannel = handler +} + +func (l *LatencyQueue) RefreshIP() error { + resp, err := http.Get("https://api.ipify.org?format=text") + if err != nil { + l.logger.Errorw("Failed to get IP address", "error", err) + return err + } + defer resp.Body.Close() + + ip, err := io.ReadAll(resp.Body) + if err != nil { + l.logger.Errorw("Failed to read response body", "error", err) + return err + } + + parsedIp := net.ParseIP(string(ip)) + if parsedIp == nil { + l.logger.Errorw("Invalid IP address", "ip", ip) + return errors.New("Invalid IP address") + } + + l.logger.Debugw("IP Address Refreshed", "ip", parsedIp) + + l.ipSelf = parsedIp + return nil +} + +func (l *LatencyQueue) Start(ctx context.Context) { + l.RefreshIP() for { select { + case <-l.ipTicker.C: + l.RefreshIP() case request := <-l.processChannel: - ip := request.Ip.String() - l.pinger.SetIPAddr(&request.Ip) + + // Check if we have a result in the cache + latencyKey := fmt.Sprintf("latency:%s:%s", l.ipSelf, ip) + existingResult, err := l.redis.Get(ctx, latencyKey).Result() + if err != nil { + if err != redis.Nil { + l.logger.Errorw("Failed to get existing result", "key", latencyKey, "ip", ip, "error", err) + continue + } + } else { + // Emit the result + if (l.handlerChannel) != nil { + parsed, err := time.ParseDuration(existingResult) + if err != nil { + l.logger.Errorw("Failed to parse existing result", "key", latencyKey, "ip", ip, "error", err) + continue + } + + l.handlerChannel <- PingResult{ + Ip: request.Ip, + Latency: parsed, + } + } + continue + } + + pinger, err := probing.NewPinger(ip) + if err != nil { + l.logger.Errorf("Failed to create pinger for %s: %s", ip, err) + continue + } + + pinger.SetPrivileged(true) + pinger.Count = 1 + pinger.Interval = time.Nanosecond + pinger.Timeout = time.Millisecond * 500 + + l.logger.Debugw("Ping Request", "ip", ip) // Process the request - err := l.pinger.Run() + err = pinger.Run() if err != nil { l.logger.Errorf("Failed to ping %s: %s", ip, err) continue } // Get the results - results := l.pinger.Statistics() + results := pinger.Statistics() + success := results.PacketLoss == 0 + + // Store the result in Redis + value := "timeout" + expiration := time.Hour*24 + time.Minute*time.Duration(rand.Intn(60*8)) + if success { + value = results.AvgRtt.String() + } + + l.logger.Debugw("Ping Result", "ip", ip, "latency", value) + l.redis.SetEx(ctx, latencyKey, value, expiration) + + // Emit the result if (l.handlerChannel) != nil { l.handlerChannel <- PingResult{ - Ip: request.Ip, - Latency: results.AvgRtt.Milliseconds(), + Ip: request.Ip, + Latency: results.AvgRtt, + Successful: success, } } @@ -99,7 +184,10 @@ func (l *LatencyQueue) Start() { } func (l *LatencyQueue) Kill() error { + l.logger.Warn("Killing LatencyQueue") l.stopChannel <- true err := l.redis.Close() + l.ipTicker.Stop() + l.logger.Sync() return err } diff --git a/app.go b/app.go index 56e20d0..fc127c4 100644 --- a/app.go +++ b/app.go @@ -5,6 +5,7 @@ import ( "os" "github.com/joho/godotenv" + "github.com/redis/go-redis/v9" "go.uber.org/zap" "xevion.dev/vastly/api" ) @@ -21,8 +22,7 @@ type App struct { func NewApp() *App { logger, _ := zap.NewDevelopment() return &App{ - logger: logger.Sugar(), - latency: api.NewLatencyQueue(), + logger: logger.Sugar(), } } @@ -41,10 +41,21 @@ func (a *App) startup(ctx context.Context) { if apiKey == "" { a.logger.Fatal("VASTAI_API_KEY not found in environment") } + + // Create Vast API client a.client = api.NewClient(apiKey) + // Connect to Redis + redisUrl := os.Getenv("REDIS_URL") + redisOptions, err := redis.ParseURL(redisUrl) + if err != nil { + a.logger.Fatal("Failed to parse Redis URL", err) + } + redis := redis.NewClient(redisOptions) + // Start latency queue - go a.latency.Start() + a.latency = api.NewLatencyQueue(redis) + go a.latency.Start(ctx) } func (a *App) beforeClose(ctx context.Context) bool { @@ -67,5 +78,9 @@ func (a *App) Search() []api.ScoredOffer { a.logger.Fatal(err) } + for _, offer := range resp.Offers { + a.latency.QueuePing(offer.PublicIPAddr) + } + return api.ScoreOffers(resp.Offers) } diff --git a/go.mod b/go.mod index b28a8ed..25ff5d9 100644 --- a/go.mod +++ b/go.mod @@ -2,13 +2,15 @@ module vastly go 1.23.3 -require github.com/wailsapp/wails/v2 v2.9.2 +require ( + github.com/prometheus-community/pro-bing v0.5.0 + github.com/redis/go-redis/v9 v9.7.0 + github.com/wailsapp/wails/v2 v2.9.2 +) require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/prometheus-community/pro-bing v0.5.0 // indirect - github.com/redis/go-redis/v9 v9.7.0 // indirect golang.org/x/sync v0.10.0 // indirect )