mirror of
https://github.com/Xevion/vastly.git
synced 2025-12-05 23:16:48 -06:00
198 lines
4.2 KiB
Go
198 lines
4.2 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"net"
|
|
"net/http"
|
|
"time"
|
|
|
|
probing "github.com/prometheus-community/pro-bing"
|
|
"github.com/redis/go-redis/v9"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type LatencyRequest struct {
|
|
RequestTime int64
|
|
Ip net.IPAddr
|
|
}
|
|
|
|
type LatencyQueue struct {
|
|
processChannel chan LatencyRequest
|
|
stopChannel chan bool
|
|
logger *zap.SugaredLogger
|
|
redis *redis.Client
|
|
ipTicker *time.Ticker
|
|
ipSelf net.IP
|
|
handlerChannel chan<- PingResult
|
|
}
|
|
|
|
func NewLatencyQueue(redis *redis.Client) *LatencyQueue {
|
|
logger, _ := zap.NewDevelopment()
|
|
|
|
return &LatencyQueue{
|
|
processChannel: make(chan LatencyRequest, 1024),
|
|
logger: logger.Sugar(),
|
|
redis: redis,
|
|
ipTicker: time.NewTicker(time.Minute * 5),
|
|
}
|
|
}
|
|
|
|
type PingRequest struct {
|
|
Ip net.IPAddr
|
|
}
|
|
|
|
type PingResult struct {
|
|
Ip net.IPAddr
|
|
Latency time.Duration
|
|
Successful bool
|
|
}
|
|
|
|
func (l *LatencyQueue) QueuePing(ip string) error {
|
|
// Parse the IP
|
|
parsedIp := net.ParseIP(ip)
|
|
if parsedIp == nil {
|
|
return errors.New("Invalid IP address")
|
|
}
|
|
|
|
// Create the request
|
|
request := LatencyRequest{
|
|
RequestTime: time.Now().UnixMilli(),
|
|
Ip: net.IPAddr{IP: parsedIp},
|
|
}
|
|
|
|
// Add the request to the queue
|
|
l.processChannel <- request
|
|
|
|
return nil
|
|
}
|
|
|
|
func (l *LatencyQueue) SetHandler(handler chan<- PingResult) {
|
|
l.handlerChannel = handler
|
|
}
|
|
|
|
func (l *LatencyQueue) GetSelfIP() net.IP {
|
|
return l.ipSelf
|
|
}
|
|
|
|
func (l *LatencyQueue) RefreshIP() error {
|
|
resp, err := http.Get("https://api.ipify.org?format=text")
|
|
if err != nil {
|
|
l.logger.Errorw("Failed to get IP address", "error", err)
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
ip, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
l.logger.Errorw("Failed to read response body", "error", err)
|
|
return err
|
|
}
|
|
|
|
parsedIp := net.ParseIP(string(ip))
|
|
if parsedIp == nil {
|
|
l.logger.Errorw("Invalid IP address", "ip", ip)
|
|
return errors.New("Invalid IP address")
|
|
}
|
|
|
|
l.logger.Debugw("IP Address Refreshed", "ip", parsedIp)
|
|
|
|
l.ipSelf = parsedIp
|
|
return nil
|
|
}
|
|
|
|
func (l *LatencyQueue) Start(ctx context.Context) {
|
|
l.RefreshIP()
|
|
for {
|
|
select {
|
|
case <-l.ipTicker.C:
|
|
l.RefreshIP()
|
|
case request := <-l.processChannel:
|
|
ip := request.Ip.String()
|
|
|
|
// Check if we have a result in the cache
|
|
latencyKey := fmt.Sprintf("latency:%s:%s", l.ipSelf, ip)
|
|
existingResult, err := l.redis.Get(ctx, latencyKey).Result()
|
|
if err != nil {
|
|
if err != redis.Nil {
|
|
l.logger.Errorw("Failed to get existing result", "key", latencyKey, "ip", ip, "error", err)
|
|
continue
|
|
}
|
|
} else {
|
|
// Emit the result
|
|
if (l.handlerChannel) != nil {
|
|
parsed, err := time.ParseDuration(existingResult)
|
|
if err != nil {
|
|
l.logger.Errorw("Failed to parse existing result", "key", latencyKey, "ip", ip, "error", err)
|
|
continue
|
|
}
|
|
|
|
l.handlerChannel <- PingResult{
|
|
Ip: request.Ip,
|
|
Latency: parsed,
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
pinger, err := probing.NewPinger(ip)
|
|
if err != nil {
|
|
l.logger.Errorf("Failed to create pinger for %s: %s", ip, err)
|
|
continue
|
|
}
|
|
|
|
pinger.SetPrivileged(true)
|
|
pinger.Count = 1
|
|
pinger.Interval = time.Nanosecond
|
|
pinger.Timeout = time.Millisecond * 500
|
|
|
|
// Process the request
|
|
err = pinger.Run()
|
|
if err != nil {
|
|
l.logger.Errorf("Failed to ping %s: %s", ip, err)
|
|
continue
|
|
}
|
|
|
|
// Get the results
|
|
results := pinger.Statistics()
|
|
success := results.PacketLoss == 0
|
|
|
|
// Store the result in Redis
|
|
value := "timeout"
|
|
expiration := time.Hour*8 + time.Minute*time.Duration(rand.Intn(60*8))
|
|
if success {
|
|
value = results.AvgRtt.String()
|
|
} else {
|
|
expiration = time.Hour*4 + time.Minute*time.Duration(rand.Intn(60*4))
|
|
}
|
|
|
|
l.logger.Debugw("Ping Result", "ip", ip, "latency", value)
|
|
l.redis.SetEx(ctx, latencyKey, value, expiration)
|
|
|
|
// Emit the result
|
|
if (l.handlerChannel) != nil {
|
|
l.handlerChannel <- PingResult{
|
|
Ip: request.Ip,
|
|
Latency: results.AvgRtt,
|
|
Successful: success,
|
|
}
|
|
}
|
|
|
|
case <-l.stopChannel:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *LatencyQueue) Kill() error {
|
|
l.logger.Warn("Killing LatencyQueue")
|
|
l.stopChannel <- true
|
|
err := l.redis.Close()
|
|
l.ipTicker.Stop()
|
|
l.logger.Sync()
|
|
return err
|
|
}
|