diff --git a/main.py b/main.py index 43468da..d4b1711 100644 --- a/main.py +++ b/main.py @@ -17,6 +17,7 @@ from sonarr import ( get_episodes_for_series, get_history_for_episode, get_series, + set_concurrency_limit, ) logger: structlog.stdlib.AsyncBoundLogger = get_async_logger() @@ -31,15 +32,14 @@ async def process_series(client, series_id: int) -> Dict[str, int]: episodes = await get_episodes_for_series(client, series_id) indexer_counts: Dict[str, int] = defaultdict(lambda: 0) - for ep in episodes: + # Process episodes in parallel for this series + async def process_episode(ep): + nonlocal indexer_counts # Skip episodes without files if not ep.get("hasFile", False): - continue - + return indexer = "unknown" episode_detail = f"{ellipsis(series['title'], 12)} S{ep['seasonNumber']:02}E{ep['episodeNumber']:02} - {ellipsis(ep.get('title', 'Unknown'), 18)}" - - # Retrieves all history events for the episode try: history = await get_history_for_episode(client, ep["id"]) except Exception as e: @@ -52,7 +52,7 @@ async def process_series(client, series_id: int) -> Dict[str, int]: episode_id=ep["id"], episode=episode_detail, ) - continue + return else: await logger.error( "Error fetching history for episode", @@ -60,9 +60,7 @@ async def process_series(client, series_id: int) -> Dict[str, int]: episode=episode_detail, error=str(e), ) - continue - - # Get the episode's episodeFileId + return target_file_id = ep.get("episodeFileId") if not target_file_id: await logger.error( @@ -70,7 +68,7 @@ async def process_series(client, series_id: int) -> Dict[str, int]: episode_id=ep["id"], episode=episode_detail, ) - continue + return # Find the 'downloadFolderImported' event with the matching data.fileId def is_import_event(event: Dict, file_id: int) -> bool: @@ -93,7 +91,7 @@ async def process_series(client, series_id: int) -> Dict[str, int]: episode=episode_detail, target_file_id=target_file_id, ) - continue + return # Acquire the event's downloadId download_id = import_event.get("downloadId") @@ -104,7 +102,7 @@ async def process_series(client, series_id: int) -> Dict[str, int]: episode=episode_detail, target_file_id=target_file_id, ) - continue + return # Find the 'grabbed' event with the matching downloadId def is_grabbed_event(event: Dict, download_id: str) -> bool: @@ -128,7 +126,7 @@ async def process_series(client, series_id: int) -> Dict[str, int]: download_id=ellipsis(download_id, 20), episode=episode_detail, ) - continue + return # Extract the indexer from the 'grabbed' event indexer = grabbed_event.get("data", {}).get("indexer") @@ -147,6 +145,10 @@ async def process_series(client, series_id: int) -> Dict[str, int]: indexer_counts[indexer] += 1 + async with anyio.create_task_group() as tg: + for ep in episodes: + tg.start_soon(process_episode, ep) + return indexer_counts @@ -167,6 +169,8 @@ async def main(): "AUTHELIA_USERNAME and AUTHELIA_PASSWORD must be set in the .env file." ) + set_concurrency_limit(25) # Set the max number of concurrent Sonarr API requests + # Request counter setup request_counter = {"count": 0, "active": 0} counter_lock = anyio.Lock() diff --git a/sonarr.py b/sonarr.py index 3f050d7..4cb93b0 100644 --- a/sonarr.py +++ b/sonarr.py @@ -2,43 +2,53 @@ import httpx from config import SONARR_URL, SONARR_API_KEY, logger +# Add a global semaphore for concurrency limiting +semaphore = None + + +async def with_limit(coro): + global semaphore + if semaphore is None: + raise RuntimeError( + "Semaphore not initialized. Call set_concurrency_limit first." + ) + async with semaphore: + return await coro + + +def set_concurrency_limit(limit: int): + global semaphore + import anyio + + semaphore = anyio.Semaphore(limit) + + async def get_all_series(client: httpx.AsyncClient): - """ - Fetch all series from Sonarr. - """ - url = f"{SONARR_URL}/api/v3/series" - resp = await client.get(url) + resp = await with_limit(client.get(f"{SONARR_URL}/api/v3/series")) resp.raise_for_status() return resp.json() async def get_series(client: httpx.AsyncClient, series_id: int): - """ - Fetch a single series by ID from Sonarr. - """ - url = f"{SONARR_URL}/api/v3/series/{series_id}" - resp = await client.get(url) + resp = await with_limit(client.get(f"{SONARR_URL}/api/v3/series/{series_id}")) resp.raise_for_status() return resp.json() async def get_episodes_for_series(client: httpx.AsyncClient, series_id: int): - """ - Fetch all episodes for a given series from Sonarr. - """ - url = f"{SONARR_URL}/api/v3/episode?seriesId={series_id}" - resp = await client.get(url) + resp = await with_limit( + client.get(f"{SONARR_URL}/api/v3/episode?seriesId={series_id}") + ) resp.raise_for_status() return resp.json() async def get_history_for_episode(client: httpx.AsyncClient, episode_id: int): - """ - Fetch history for a given episode from Sonarr. - """ - resp = await client.get( - SONARR_URL + "/api/v3/history", - params={"episodeId": episode_id, "pageSize": 100}, + resp = await with_limit( + client.get( + SONARR_URL + "/api/v3/history", + params={"episodeId": episode_id, "pageSize": 100}, + ) ) resp.raise_for_status() return resp.json()