Process each series episode in task group, add semaphore to limit request concurrency

This commit is contained in:
2025-05-31 23:57:08 -05:00
parent 6736548e71
commit 0486a814e1
2 changed files with 48 additions and 34 deletions

30
main.py
View File

@@ -17,6 +17,7 @@ from sonarr import (
get_episodes_for_series, get_episodes_for_series,
get_history_for_episode, get_history_for_episode,
get_series, get_series,
set_concurrency_limit,
) )
logger: structlog.stdlib.AsyncBoundLogger = get_async_logger() logger: structlog.stdlib.AsyncBoundLogger = get_async_logger()
@@ -31,15 +32,14 @@ async def process_series(client, series_id: int) -> Dict[str, int]:
episodes = await get_episodes_for_series(client, series_id) episodes = await get_episodes_for_series(client, series_id)
indexer_counts: Dict[str, int] = defaultdict(lambda: 0) indexer_counts: Dict[str, int] = defaultdict(lambda: 0)
for ep in episodes: # Process episodes in parallel for this series
async def process_episode(ep):
nonlocal indexer_counts
# Skip episodes without files # Skip episodes without files
if not ep.get("hasFile", False): if not ep.get("hasFile", False):
continue return
indexer = "unknown" indexer = "unknown"
episode_detail = f"{ellipsis(series['title'], 12)} S{ep['seasonNumber']:02}E{ep['episodeNumber']:02} - {ellipsis(ep.get('title', 'Unknown'), 18)}" episode_detail = f"{ellipsis(series['title'], 12)} S{ep['seasonNumber']:02}E{ep['episodeNumber']:02} - {ellipsis(ep.get('title', 'Unknown'), 18)}"
# Retrieves all history events for the episode
try: try:
history = await get_history_for_episode(client, ep["id"]) history = await get_history_for_episode(client, ep["id"])
except Exception as e: except Exception as e:
@@ -52,7 +52,7 @@ async def process_series(client, series_id: int) -> Dict[str, int]:
episode_id=ep["id"], episode_id=ep["id"],
episode=episode_detail, episode=episode_detail,
) )
continue return
else: else:
await logger.error( await logger.error(
"Error fetching history for episode", "Error fetching history for episode",
@@ -60,9 +60,7 @@ async def process_series(client, series_id: int) -> Dict[str, int]:
episode=episode_detail, episode=episode_detail,
error=str(e), error=str(e),
) )
continue return
# Get the episode's episodeFileId
target_file_id = ep.get("episodeFileId") target_file_id = ep.get("episodeFileId")
if not target_file_id: if not target_file_id:
await logger.error( await logger.error(
@@ -70,7 +68,7 @@ async def process_series(client, series_id: int) -> Dict[str, int]:
episode_id=ep["id"], episode_id=ep["id"],
episode=episode_detail, episode=episode_detail,
) )
continue return
# Find the 'downloadFolderImported' event with the matching data.fileId # Find the 'downloadFolderImported' event with the matching data.fileId
def is_import_event(event: Dict, file_id: int) -> bool: def is_import_event(event: Dict, file_id: int) -> bool:
@@ -93,7 +91,7 @@ async def process_series(client, series_id: int) -> Dict[str, int]:
episode=episode_detail, episode=episode_detail,
target_file_id=target_file_id, target_file_id=target_file_id,
) )
continue return
# Acquire the event's downloadId # Acquire the event's downloadId
download_id = import_event.get("downloadId") download_id = import_event.get("downloadId")
@@ -104,7 +102,7 @@ async def process_series(client, series_id: int) -> Dict[str, int]:
episode=episode_detail, episode=episode_detail,
target_file_id=target_file_id, target_file_id=target_file_id,
) )
continue return
# Find the 'grabbed' event with the matching downloadId # Find the 'grabbed' event with the matching downloadId
def is_grabbed_event(event: Dict, download_id: str) -> bool: def is_grabbed_event(event: Dict, download_id: str) -> bool:
@@ -128,7 +126,7 @@ async def process_series(client, series_id: int) -> Dict[str, int]:
download_id=ellipsis(download_id, 20), download_id=ellipsis(download_id, 20),
episode=episode_detail, episode=episode_detail,
) )
continue return
# Extract the indexer from the 'grabbed' event # Extract the indexer from the 'grabbed' event
indexer = grabbed_event.get("data", {}).get("indexer") indexer = grabbed_event.get("data", {}).get("indexer")
@@ -147,6 +145,10 @@ async def process_series(client, series_id: int) -> Dict[str, int]:
indexer_counts[indexer] += 1 indexer_counts[indexer] += 1
async with anyio.create_task_group() as tg:
for ep in episodes:
tg.start_soon(process_episode, ep)
return indexer_counts return indexer_counts
@@ -167,6 +169,8 @@ async def main():
"AUTHELIA_USERNAME and AUTHELIA_PASSWORD must be set in the .env file." "AUTHELIA_USERNAME and AUTHELIA_PASSWORD must be set in the .env file."
) )
set_concurrency_limit(25) # Set the max number of concurrent Sonarr API requests
# Request counter setup # Request counter setup
request_counter = {"count": 0, "active": 0} request_counter = {"count": 0, "active": 0}
counter_lock = anyio.Lock() counter_lock = anyio.Lock()

View File

@@ -2,43 +2,53 @@ import httpx
from config import SONARR_URL, SONARR_API_KEY, logger from config import SONARR_URL, SONARR_API_KEY, logger
# Add a global semaphore for concurrency limiting
semaphore = None
async def with_limit(coro):
global semaphore
if semaphore is None:
raise RuntimeError(
"Semaphore not initialized. Call set_concurrency_limit first."
)
async with semaphore:
return await coro
def set_concurrency_limit(limit: int):
global semaphore
import anyio
semaphore = anyio.Semaphore(limit)
async def get_all_series(client: httpx.AsyncClient): async def get_all_series(client: httpx.AsyncClient):
""" resp = await with_limit(client.get(f"{SONARR_URL}/api/v3/series"))
Fetch all series from Sonarr.
"""
url = f"{SONARR_URL}/api/v3/series"
resp = await client.get(url)
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()
async def get_series(client: httpx.AsyncClient, series_id: int): async def get_series(client: httpx.AsyncClient, series_id: int):
""" resp = await with_limit(client.get(f"{SONARR_URL}/api/v3/series/{series_id}"))
Fetch a single series by ID from Sonarr.
"""
url = f"{SONARR_URL}/api/v3/series/{series_id}"
resp = await client.get(url)
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()
async def get_episodes_for_series(client: httpx.AsyncClient, series_id: int): async def get_episodes_for_series(client: httpx.AsyncClient, series_id: int):
""" resp = await with_limit(
Fetch all episodes for a given series from Sonarr. client.get(f"{SONARR_URL}/api/v3/episode?seriesId={series_id}")
""" )
url = f"{SONARR_URL}/api/v3/episode?seriesId={series_id}"
resp = await client.get(url)
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()
async def get_history_for_episode(client: httpx.AsyncClient, episode_id: int): async def get_history_for_episode(client: httpx.AsyncClient, episode_id: int):
""" resp = await with_limit(
Fetch history for a given episode from Sonarr. client.get(
""" SONARR_URL + "/api/v3/history",
resp = await client.get( params={"episodeId": episode_id, "pageSize": 100},
SONARR_URL + "/api/v3/history", )
params={"episodeId": episode_id, "pageSize": 100},
) )
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()