reformat: migrate.py

This commit is contained in:
2024-10-24 04:02:18 -05:00
parent 98ef2d2aa6
commit 2ae2b6af52

View File

@@ -2,16 +2,16 @@ import os
import pkgutil
import re
import sys
import questionary
from typing import Any, List, Optional, Tuple
from dotenv import load_dotenv
from peewee_migrate import Router, router
from peewee import PostgresqlDatabase
from typing import List, Optional, Tuple
from linkpulse.formatting import pluralize
import questionary
from dotenv import load_dotenv
from peewee import PostgresqlDatabase
from peewee_migrate import Router, router
load_dotenv(dotenv_path=".env")
class ExtendedRouter(Router):
def show(self, module: str) -> Optional[Tuple[str, str]]:
"""
@@ -29,7 +29,9 @@ class ExtendedRouter(Router):
modules = models
if isinstance(module, bool):
modules = [
m for _, m, ispkg in pkgutil.iter_modules([f"{router.CURDIR}"]) if ispkg
m
for _, m, ispkg in pkgutil.iter_modules([f"{router.CURDIR}"])
if ispkg
]
models = [m for module in modules for m in router.load_models(module)]
@@ -58,24 +60,32 @@ class ExtendedRouter(Router):
"""
return [mm.name for mm in self.model.select().order_by(self.model.id)]
def main(*args: str) -> None:
"""
Main function for running migrations.
Args are fed directly from sys.argv.
"""
from linkpulse import models
db: PostgresqlDatabase = models.BaseModel._meta.database
router = ExtendedRouter(database=db, migrate_dir='linkpulse/migrations', ignore=[models.BaseModel._meta.table_name])
auto = 'linkpulse.models'
router = ExtendedRouter(
database=db,
migrate_dir="linkpulse/migrations",
ignore=[models.BaseModel._meta.table_name],
)
auto = "linkpulse.models"
current = router.all_migrations()
if len(current) == 0:
diff = router.diff
if len(diff) == 0:
print("No migrations found, no pending migrations to apply. Creating initial migration.")
print(
"No migrations found, no pending migrations to apply. Creating initial migration."
)
migration = router.create('initial', auto=auto)
migration = router.create("initial", auto=auto)
if not migration:
print("No changes detected. Something went wrong.")
else:
@@ -84,14 +94,24 @@ def main(*args: str) -> None:
diff = router.diff
if len(diff) > 0:
print('Note: Selecting a migration will apply all migrations up to and including the selected migration.')
print('e.g. Applying 004 while only 001 is applied would apply 002, 003, and 004.')
print(
"Note: Selecting a migration will apply all migrations up to and including the selected migration."
)
print(
"e.g. Applying 004 while only 001 is applied would apply 002, 003, and 004."
)
choice = questionary.select("Select highest migration to apply:", choices=diff).ask()
choice = questionary.select(
"Select highest migration to apply:", choices=diff
).ask()
if choice is None:
print("For safety reasons, you won't be able to create migrations without applying the pending ones.")
print(
"For safety reasons, you won't be able to create migrations without applying the pending ones."
)
if len(current) == 0:
print("Warn: No migrations have been applied globally, which is dangerous. Something may be wrong.")
print(
"Warn: No migrations have been applied globally, which is dangerous. Something may be wrong."
)
return
result = router.run(choice)
@@ -106,19 +126,24 @@ def main(*args: str) -> None:
migrate_text, rollback_text = migration_available
print("MIGRATION:")
for line in migrate_text.split('\n'):
if line.strip() == '':
for line in migrate_text.split("\n"):
if line.strip() == "":
continue
print('\t' + line)
print("\t" + line)
print("ROLLBACK:")
for line in rollback_text.split('\n'):
if line.strip() == '':
for line in rollback_text.split("\n"):
if line.strip() == "":
continue
print('\t' + line)
print("\t" + line)
if questionary.confirm("Do you want to create this migration?").ask():
print('Lowercase letters and underscores only (e.g. "create_table", "remove_ipaddress_count").')
migration_name: Optional[str] = questionary.text("Enter migration name", validate=lambda text: re.match("^[a-z_]+$", text) is not None).ask()
print(
'Lowercase letters and underscores only (e.g. "create_table", "remove_ipaddress_count").'
)
migration_name: Optional[str] = questionary.text(
"Enter migration name",
validate=lambda text: re.match("^[a-z_]+$", text) is not None,
).ask()
if migration_name is None:
return
@@ -127,9 +152,11 @@ def main(*args: str) -> None:
if migration:
print(f"Migration created: {migration}")
if len(router.diff) == 1:
if questionary.confirm("Do you want to apply this migration immediately?").ask():
if questionary.confirm(
"Do you want to apply this migration immediately?"
).ask():
router.run(migration)
print('Done.')
print("Done.")
print("!!! Commit and push this migration file immediately!")
else:
print("No changes detected. Something went wrong.")
@@ -138,7 +165,10 @@ def main(*args: str) -> None:
print("No database changes detected.")
if len(current) > 5:
if questionary.confirm("There are more than 5 migrations applied. Do you want to merge them?", default=False).ask():
if questionary.confirm(
"There are more than 5 migrations applied. Do you want to merge them?",
default=False,
).ask():
print("Merging migrations...")
router.merge(name="initial")
print("Done.")
@@ -146,6 +176,8 @@ def main(*args: str) -> None:
print("!!! Commit and push this merged migration file immediately!")
# Testing Code:
"""
print(router.print('linkpulse.models'))