mirror of
https://github.com/Xevion/vastly.git
synced 2025-12-06 01:16:50 -06:00
sort in internal scoring, extract latency keys in batch for top 100
This commit is contained in:
@@ -74,6 +74,10 @@ func (l *LatencyQueue) SetHandler(handler chan<- PingResult) {
|
|||||||
l.handlerChannel = handler
|
l.handlerChannel = handler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *LatencyQueue) GetSelfIP() net.IP {
|
||||||
|
return l.ipSelf
|
||||||
|
}
|
||||||
|
|
||||||
func (l *LatencyQueue) RefreshIP() error {
|
func (l *LatencyQueue) RefreshIP() error {
|
||||||
resp, err := http.Get("https://api.ipify.org?format=text")
|
resp, err := http.Get("https://api.ipify.org?format=text")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -145,8 +149,6 @@ func (l *LatencyQueue) Start(ctx context.Context) {
|
|||||||
pinger.Interval = time.Nanosecond
|
pinger.Interval = time.Nanosecond
|
||||||
pinger.Timeout = time.Millisecond * 500
|
pinger.Timeout = time.Millisecond * 500
|
||||||
|
|
||||||
l.logger.Debugw("Ping Request", "ip", ip)
|
|
||||||
|
|
||||||
// Process the request
|
// Process the request
|
||||||
err = pinger.Run()
|
err = pinger.Run()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
14
api/score.go
14
api/score.go
@@ -3,6 +3,7 @@ package api
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"slices"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
@@ -190,10 +191,21 @@ func ScoreOffers(offers []Offer) []ScoredOffer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
newScore := score * multiplier
|
newScore := score * multiplier
|
||||||
// sugar.Infow("Multiplier Applied", "offer", offer.ID, "baseScore", score, "score", newScore, "multiplier", multiplier)
|
|
||||||
score = newScore
|
score = newScore
|
||||||
|
|
||||||
scoredOffers = append(scoredOffers, ScoredOffer{Offer: offer, Score: score, Reasons: reasons})
|
scoredOffers = append(scoredOffers, ScoredOffer{Offer: offer, Score: score, Reasons: reasons})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sort by score
|
||||||
|
slices.SortStableFunc(scoredOffers, func(a, b ScoredOffer) int {
|
||||||
|
if a.Score < b.Score {
|
||||||
|
return 1
|
||||||
|
} else if a.Score > b.Score {
|
||||||
|
return -1
|
||||||
|
} else {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
return scoredOffers
|
return scoredOffers
|
||||||
}
|
}
|
||||||
|
|||||||
44
app.go
44
app.go
@@ -2,7 +2,9 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/joho/godotenv"
|
"github.com/joho/godotenv"
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
@@ -16,6 +18,7 @@ type App struct {
|
|||||||
client *api.Client
|
client *api.Client
|
||||||
logger *zap.SugaredLogger
|
logger *zap.SugaredLogger
|
||||||
latency *api.LatencyQueue
|
latency *api.LatencyQueue
|
||||||
|
redis *redis.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewApp creates a new App application struct
|
// NewApp creates a new App application struct
|
||||||
@@ -51,10 +54,14 @@ func (a *App) startup(ctx context.Context) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
a.logger.Fatal("Failed to parse Redis URL", err)
|
a.logger.Fatal("Failed to parse Redis URL", err)
|
||||||
}
|
}
|
||||||
redis := redis.NewClient(redisOptions)
|
|
||||||
|
a.redis = redis.NewClient(redisOptions)
|
||||||
|
if _, err := a.redis.Ping(ctx).Result(); err != nil {
|
||||||
|
a.logger.Fatal("Failed to connect to Redis", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Start latency queue
|
// Start latency queue
|
||||||
a.latency = api.NewLatencyQueue(redis)
|
a.latency = api.NewLatencyQueue(a.redis)
|
||||||
go a.latency.Start(ctx)
|
go a.latency.Start(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -82,5 +89,36 @@ func (a *App) Search() []api.ScoredOffer {
|
|||||||
a.latency.QueuePing(offer.PublicIPAddr)
|
a.latency.QueuePing(offer.PublicIPAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return api.ScoreOffers(resp.Offers)
|
scored := api.ScoreOffers(resp.Offers)
|
||||||
|
|
||||||
|
// collect IP latency keys
|
||||||
|
currentIP := a.latency.GetSelfIP()
|
||||||
|
batchCount := min(100, len(scored))
|
||||||
|
latencyKeys := make([]string, batchCount, batchCount)
|
||||||
|
for i := range batchCount {
|
||||||
|
latencyKeys[i] = fmt.Sprintf("latency:%s:%s", currentIP, scored[i].Offer.PublicIPAddr)
|
||||||
|
}
|
||||||
|
latencyValues, err := a.redis.MGet(a.ctx, latencyKeys...).Result()
|
||||||
|
if err != nil {
|
||||||
|
a.logger.Errorw("Failed to get latency values", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assign latency values to scored offers
|
||||||
|
for i, value := range latencyValues {
|
||||||
|
if value != nil {
|
||||||
|
if value.(string) == "timeout" {
|
||||||
|
scored[i].Latency = api.Pointer(int32(-1))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
parsed, err := time.ParseDuration(value.(string))
|
||||||
|
if err != nil {
|
||||||
|
a.logger.Errorw("Failed to parse latency value", "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
scored[i].Latency = api.Pointer(int32(parsed.Milliseconds()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return scored
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,8 +8,6 @@ function App() {
|
|||||||
|
|
||||||
async function invoke() {
|
async function invoke() {
|
||||||
const offers = await Search();
|
const offers = await Search();
|
||||||
console.log({ offer: offers[0] });
|
|
||||||
offers.sort((a, b) => b.Score - a.Score);
|
|
||||||
setState(offers);
|
setState(offers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user