Pydantic parsing, cache read/write with tertiary no-cache-load state

This commit is contained in:
2023-10-08 18:48:10 -05:00
parent 58a3f6faca
commit 88f3aa6c8b
2 changed files with 58 additions and 23 deletions

View File

@@ -7,7 +7,7 @@ from dataclasses import dataclass
import json import json
from pathlib import Path from pathlib import Path
import time import time
from typing import List from typing import Dict, List
from logging import getLogger from logging import getLogger
from pydantic import BaseModel from pydantic import BaseModel
from requests import Session from requests import Session
@@ -36,7 +36,6 @@ class Question(BaseModel):
id: int id: int
slug: str slug: str
title: str title: str
body: str
tags: List[Tag] tags: List[Tag]
@@ -58,6 +57,7 @@ class QuestionDatabase(object):
self.client.headers["Accept"] = "application/json" self.client.headers["Accept"] = "application/json"
# Cache path exists # Cache path exists
self.questions = None # The data we're extracting from the cache
if cache_path.exists(): if cache_path.exists():
if not cache_path.is_file(): if not cache_path.is_file():
raise ValueError(f"Cache path {cache_path} is not a file.") raise ValueError(f"Cache path {cache_path} is not a file.")
@@ -71,17 +71,39 @@ class QuestionDatabase(object):
Question(**question) for question in raw_cache["questions"] Question(**question) for question in raw_cache["questions"]
] ]
logger.info(f"Loaded {len(self.questions)} questions from cache.") logger.info(f"Loaded {len(self.questions)} questions from cache.")
return
# Question data wasn't loaded from the cache, so we fetch it (and store it in the cache) now
if self.questions is None:
# Make sure the directory holding it does exist (this shouldn't ever be raised, so this is just a sanity check) # Make sure the directory holding it does exist (this shouldn't ever be raised, so this is just a sanity check)
if not cache_path.parent.exists(): if not cache_path.parent.exists():
raise RuntimeError( raise RuntimeError(
f"Directory where cache would exist ({cache_path.parent}) does not exist." f"Directory where cache would exist ({cache_path.parent}) does not exist."
) )
# Fetch cache questions
logger.info("Fetching questions from LeetCode...") logger.info("Fetching questions from LeetCode...")
fetch_time = time.time()
self.questions = self.fetch()
logger.info(f"Writing {len(self.questions)} questions to cache...")
with cache_path.open("w") as cache_file:
json.dump(
{
"fetch_time": fetch_time,
"questions": [
question.model_dump() for question in self.questions
],
},
cache_file,
)
self.by_id: Dict[int, Question] = {
question.id: question for question in self.questions
}
self.by_slug: Dict[str, Question] = {
question.slug: question for question in self.questions
}
def fetch(self) -> List[Question]:
logger.debug("Fetching total number of questions...") logger.debug("Fetching total number of questions...")
totalQuestionsQuery = """ totalQuestionsQuery = """
query getTotalQuestions($categorySlug: String, $filters: QuestionListFilterInput) { query getTotalQuestions($categorySlug: String, $filters: QuestionListFilterInput) {
@@ -112,11 +134,10 @@ class QuestionDatabase(object):
logger.debug("Fetching questions...") logger.debug("Fetching questions...")
questionsQuery = """ questionsQuery = """
query questionsList($categorySlug: String, $limit: Int, $skip: Int, $filters: QuestionListFilterInput) { query questionsList($categorySlug: String, $limit: Int, $filters: QuestionListFilterInput) {
problemsetQuestionList: questionList( questionsList: questionList(
categorySlug: $categorySlug categorySlug: $categorySlug
limit: $limit limit: $limit
skip: $skip
filters: $filters filters: $filters
) { ) {
total: totalNum total: totalNum
@@ -131,8 +152,7 @@ class QuestionDatabase(object):
} }
} }
} }
} }"""
"""
questions_response = self.client.get( questions_response = self.client.get(
"https://leetcode.com/graphql", "https://leetcode.com/graphql",
json={ json={
@@ -140,24 +160,38 @@ class QuestionDatabase(object):
"variables": { "variables": {
"categorySlug": "", "categorySlug": "",
"filters": {}, "filters": {},
"limit": 1, "limit": 50,
"skip": 0,
}, },
}, },
) )
print(questions_response.json()) if not questions_response.ok:
raise RuntimeError(
f"Failed to fetch questions: {questions_response.status_code} {questions_response.reason}"
)
questions = questions_response.json()["data"]["questionsList"]["questions"]
return [
Question(
id=question["id"],
slug=question["titleSlug"],
title=question["title"],
tags=[
Tag(name=tag["name"], slug=tag["slug"])
for tag in question["topicTags"]
],
)
for question in questions
]
def get_by_id(self, question_id: int) -> Question: def get_by_id(self, question_id: int) -> Question:
""" """
Gets a question by its ID. Gets a question by its ID.
""" """
return self.by_id[question_id]
raise NotImplementedError
def get_by_slug(self, slug: str) -> Question: def get_by_slug(self, slug: str) -> Question:
""" """
Gets a question by its slug. Gets a question by its slug.
""" """
return self.by_slug[slug]
raise NotImplementedError

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.vscode/* .vscode/*
**/target **/target
.jython_cache .jython_cache
questions.json