PyCharm grand repo wide reformat with black formatter

This commit is contained in:
Xevion
2020-03-08 20:49:06 -05:00
parent 66c2ff228c
commit bc3cd82a95
16 changed files with 487 additions and 341 deletions

View File

@@ -4,6 +4,7 @@ from flask_limiter import Limiter
from flask_limiter.util import get_remote_address from flask_limiter.util import get_remote_address
from flask_login import LoginManager from flask_login import LoginManager
from flask_migrate import Migrate from flask_migrate import Migrate
# Flask Extensions # Flask Extensions
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
@@ -15,7 +16,7 @@ app.config.from_object(Config)
app.url_map.strict_slashes = False app.url_map.strict_slashes = False
# App extension setup # App extension setup
login = LoginManager(app) login = LoginManager(app)
login.login_view = 'login' login.login_view = "login"
db = SQLAlchemy(app) db = SQLAlchemy(app)
migrate = Migrate(app, db) migrate = Migrate(app, db)
limiter = Limiter(app, key_func=get_remote_address, default_limits=["10 per second"]) limiter = Limiter(app, key_func=get_remote_address, default_limits=["10 per second"])

View File

@@ -6,41 +6,43 @@ from app.custom import require_role
from app.forms import ProfileSettingsForm, ProfilePictureForm from app.forms import ProfileSettingsForm, ProfilePictureForm
@app.route('/dashboard') @app.route("/dashboard")
@login_required @login_required
def dashboard(): def dashboard():
return render_template('/dashboard/dashboard.html') return render_template("/dashboard/dashboard.html")
@app.route('/dashboard/profile_settings', methods=['GET']) @app.route("/dashboard/profile_settings", methods=["GET"])
@login_required @login_required
def profile_settings(): def profile_settings():
psform = ProfileSettingsForm() psform = ProfileSettingsForm()
ppform = ProfilePictureForm() ppform = ProfilePictureForm()
return render_template('/dashboard/profile_settings.html', psform=psform, ppform=ppform) return render_template(
"/dashboard/profile_settings.html", psform=psform, ppform=ppform
)
@app.route('/dashboard/profile_settings/submit', methods=['POST']) @app.route("/dashboard/profile_settings/submit", methods=["POST"])
@login_required @login_required
def profile_settings_submit(): def profile_settings_submit():
form = ProfileSettingsForm() form = ProfileSettingsForm()
if form.validate_on_submit(): if form.validate_on_submit():
data = { data = {
'show_email': form.show_email.data or None, "show_email": form.show_email.data or None,
'profile_picture_file': request.files "profile_picture_file": request.files,
} }
return jsonify(data=data) return jsonify(data=data)
return '{}' return "{}"
@app.route('/dashboard/constants') @app.route("/dashboard/constants")
@login_required @login_required
@require_role(roles=['Admin']) @require_role(roles=["Admin"])
def constants(): def constants():
return render_template('/dashboard/constants.html') return render_template("/dashboard/constants.html")
@app.route('/dashboard/rbac') @app.route("/dashboard/rbac")
@login_required @login_required
def rbac(): def rbac():
return render_template('/dashboard/rbac.html') return render_template("/dashboard/rbac.html")

View File

@@ -1,43 +1,58 @@
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField, RadioField, FileField from wtforms import (
StringField,
PasswordField,
BooleanField,
SubmitField,
RadioField,
FileField,
)
from wtforms.validators import ValidationError, DataRequired, EqualTo, Email, URL from wtforms.validators import ValidationError, DataRequired, EqualTo, Email, URL
from app.models import User from app.models import User
class LoginForm(FlaskForm): class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()]) username = StringField("Username", validators=[DataRequired()])
password = PasswordField('Password', validators=[DataRequired()]) password = PasswordField("Password", validators=[DataRequired()])
remember_me = BooleanField('Remember Me') remember_me = BooleanField("Remember Me")
submit = SubmitField('Sign in') submit = SubmitField("Sign in")
class RegistrationForm(FlaskForm): class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()]) username = StringField("Username", validators=[DataRequired()])
email = StringField('Email', validators=[DataRequired(), Email()]) email = StringField("Email", validators=[DataRequired(), Email()])
password = PasswordField('Password', validators=[DataRequired()]) password = PasswordField("Password", validators=[DataRequired()])
password2 = PasswordField('Repeat Password', validators=[DataRequired(), EqualTo('password')]) password2 = PasswordField(
submit = SubmitField('Register') "Repeat Password", validators=[DataRequired(), EqualTo("password")]
)
submit = SubmitField("Register")
def validate_username(self, username): def validate_username(self, username):
user = User.query.filter_by(username=username.data).first() user = User.query.filter_by(username=username.data).first()
if user is not None: if user is not None:
raise ValidationError('That username is not available.') raise ValidationError("That username is not available.")
def validate_email(self, email): def validate_email(self, email):
user = User.query.filter_by(email=email.data).first() user = User.query.filter_by(email=email.data).first()
if user is not None: if user is not None:
raise ValidationError('That email address is not available.') raise ValidationError("That email address is not available.")
class ProfileSettingsForm(FlaskForm): class ProfileSettingsForm(FlaskForm):
show_email = RadioField('Show Email', default='registered', show_email = RadioField(
choices=[('public', 'Public'), ('registered', 'Registered Users Only'), "Show Email",
('hidden', 'Hidden')]) default="registered",
submit = SubmitField('Save Profile Settings') choices=[
("public", "Public"),
("registered", "Registered Users Only"),
("hidden", "Hidden"),
],
)
submit = SubmitField("Save Profile Settings")
class ProfilePictureForm(FlaskForm): class ProfilePictureForm(FlaskForm):
profile_picture_file = FileField('Upload Profile Picture') profile_picture_file = FileField("Upload Profile Picture")
profile_picture_url = StringField('Use URL for Profile Picture', validators=[URL()]) profile_picture_url = StringField("Use URL for Profile Picture", validators=[URL()])
submit = SubmitField('Submit Profile Picture') submit = SubmitField("Submit Profile Picture")

View File

@@ -3,25 +3,25 @@ import flask
from app import app from app import app
@app.route('/ftbhot/about') @app.route("/ftbhot/about")
@app.route('/ftbhot/about/') @app.route("/ftbhot/about/")
def ftbhot_about(): def ftbhot_about():
return flask.render_template('/ftbhot/about.html') return flask.render_template("/ftbhot/about.html")
@app.route('/ftbhot/auth') @app.route("/ftbhot/auth")
@app.route('/ftbhot/auth/') @app.route("/ftbhot/auth/")
def ftbhot_auth(): def ftbhot_auth():
return 'WIP' return "WIP"
@app.route('/ftbhot') @app.route("/ftbhot")
@app.route('/ftbhot/') @app.route("/ftbhot/")
def ftbhot(): def ftbhot():
return flask.render_template('/ftbhot/embed.html') return flask.render_template("/ftbhot/embed.html")
@app.route('/ftbhot/json') @app.route("/ftbhot/json")
@app.route('/ftbhot/json/') @app.route("/ftbhot/json/")
def ftbhot_embed(): def ftbhot_embed():
return flask.render_template('/ftbhot/current.json') return flask.render_template("/ftbhot/current.json")

View File

@@ -11,23 +11,23 @@ from app.custom import require_role
from app.models import Search from app.models import Search
@app.route('/hidden/history') @app.route("/hidden/history")
@login_required @login_required
@require_role(roles=['Hidden', 'Admin']) @require_role(roles=["Hidden", "Admin"])
def hidden_history(): def hidden_history():
return render_template('hidden_history.html') return render_template("hidden_history.html")
@app.route('/hidden/help') @app.route("/hidden/help")
@login_required @login_required
@require_role(roles=['Hidden']) @require_role(roles=["Hidden"])
def hidden_help(): def hidden_help():
return render_template('hidden_help.html') return render_template("hidden_help.html")
# Parses strings to test for "boolean-ness" # Parses strings to test for "boolean-ness"
def boolparse(string, default=False): def boolparse(string, default=False):
trues = ['true', '1'] trues = ["true", "1"]
if string is None: if string is None:
return default return default
elif string.lower() in trues: elif string.lower() in trues:
@@ -35,26 +35,29 @@ def boolparse(string, default=False):
return False return False
@app.route('/hidden/') @app.route("/hidden/")
@login_required @login_required
@require_role(roles=['Hidden']) @require_role(roles=["Hidden"])
def hidden(): def hidden():
# Handled within request # Handled within request
tags = request.args.get('tags') or 'trap' tags = request.args.get("tags") or "trap"
try: try:
page = int(request.args.get('page') or 1) page = int(request.args.get("page") or 1)
except (TypeError, ValueError): except (TypeError, ValueError):
return '\"page\" parameter must be Integer.<br>Invalid \"page\" parameter: \"{}\"'.format( return '"page" parameter must be Integer.<br>Invalid "page" parameter: "{}"'.format(
request.args.get('page')) request.args.get("page")
)
# Handled within building # Handled within building
try: try:
count = int(request.args.get('count') or 50) count = int(request.args.get("count") or 50)
except (TypeError, ValueError): except (TypeError, ValueError):
return '\"count\" parameter must be Integer.<br>Invalid \"count\": \"{}\"'.format(request.args.get('count')) return '"count" parameter must be Integer.<br>Invalid "count": "{}"'.format(
base64 = boolparse(request.args.get('base64')) request.args.get("count")
)
base64 = boolparse(request.args.get("base64"))
# Handled within Jinja template # Handled within Jinja template
showfull = boolparse(request.args.get('showfull')) showfull = boolparse(request.args.get("showfull"))
showtags = boolparse(request.args.get('showtags')) showtags = boolparse(request.args.get("showtags"))
# Request, Parse & Build Data # Request, Parse & Build Data
data = build_data(tags, page - 1, count, base64, showfull) data = build_data(tags, page - 1, count, base64, showfull)
# Handling for limiters # Handling for limiters
@@ -63,18 +66,33 @@ def hidden():
count = min(25, count) count = min(25, count)
else: else:
count = min(50, count) count = min(50, count)
search = Search(user_id=current_user.id, exact_url=str(request.url), query_args=json.dumps(request.args.to_dict())) search = Search(
user_id=current_user.id,
exact_url=str(request.url),
query_args=json.dumps(request.args.to_dict()),
)
db.session.add(search) db.session.add(search)
db.session.commit() db.session.commit()
return render_template('hidden.html', title='Gelbooru Browser', data=data, tags=tags, page=page, count=count, return render_template(
base64=base64, showfull=showfull, showtags=showtags) "hidden.html",
title="Gelbooru Browser",
data=data,
tags=tags,
page=page,
count=count,
base64=base64,
showfull=showfull,
showtags=showtags,
)
def base64ify(url): def base64ify(url):
return base64.b64encode(requests.get(url).content).decode() return base64.b64encode(requests.get(url).content).decode()
gelbooru_api_url = "https://gelbooru.com/index.php?page=dapi&s=post&q=index&tags={}&pid={}&limit={}" gelbooru_api_url = (
"https://gelbooru.com/index.php?page=dapi&s=post&q=index&tags={}&pid={}&limit={}"
)
gelbooru_view_url = "https://gelbooru.com/index.php?page=post&s=view&id={}" gelbooru_view_url = "https://gelbooru.com/index.php?page=post&s=view&id={}"
@@ -87,24 +105,29 @@ def build_data(tags, page, count, base64, showfull):
build = [] build = []
try: try:
parse['posts']['post'] parse["posts"]["post"]
except KeyError: except KeyError:
return build return build
for index, element in enumerate(parse['posts']['post'][:count]): for index, element in enumerate(parse["posts"]["post"][:count]):
temp = { temp = {
'index': str(index + 1), "index": str(index + 1),
'real_url': element['@file_url'], "real_url": element["@file_url"],
'sample_url': element['@preview_url'], "sample_url": element["@preview_url"],
# strips tags, ensures no empty tags (may be unnecessary) # strips tags, ensures no empty tags (may be unnecessary)
'tags': list(filter(lambda tag: tag != '', [tag.strip() for tag in element['@tags'].split(' ')])), "tags": list(
'view': gelbooru_view_url.format(element['@id']) filter(
lambda tag: tag != "",
[tag.strip() for tag in element["@tags"].split(" ")],
)
),
"view": gelbooru_view_url.format(element["@id"]),
} }
if base64: if base64:
if not showfull: if not showfull:
temp['base64'] = base64ify(temp['sample_url']) temp["base64"] = base64ify(temp["sample_url"])
else: else:
temp['base64'] = base64ify(temp['real_url']) temp["base64"] = base64ify(temp["real_url"])
build.append(temp) build.append(temp)
return build return build

View File

@@ -16,9 +16,9 @@ class User(UserMixin, db.Model):
email = db.Column(db.String(120), index=True, unique=True) email = db.Column(db.String(120), index=True, unique=True)
register_timestamp = db.Column(db.DateTime, default=datetime.utcnow) register_timestamp = db.Column(db.DateTime, default=datetime.utcnow)
password_hash = db.Column(db.String(64)) password_hash = db.Column(db.String(64))
posts = db.relationship('Post', backref='author', lazy='dynamic') posts = db.relationship("Post", backref="author", lazy="dynamic")
search_history = db.relationship('Search', backref='user', lazy='dynamic') search_history = db.relationship("Search", backref="user", lazy="dynamic")
uroles = db.Column(db.String(80), default='') uroles = db.Column(db.String(80), default="")
about_me = db.Column(db.String(320)) about_me = db.Column(db.String(320))
last_seen = db.Column(db.DateTime, default=datetime.utcnow) last_seen = db.Column(db.DateTime, default=datetime.utcnow)
show_email = db.Column(db.Boolean, default=False) show_email = db.Column(db.Boolean, default=False)
@@ -31,11 +31,11 @@ class User(UserMixin, db.Model):
raise "{} has no password_hash set!".format(self.__repr__()) raise "{} has no password_hash set!".format(self.__repr__())
return check_password_hash(self.password_hash, password) return check_password_hash(self.password_hash, password)
# Retains order while making sure that there are no duplicate role values and they are capitalized # Retains order while making sure that there are no duplicate role values and they are capitalized
def post_role_processing(self): def post_role_processing(self):
user_roles = self.get_roles() user_roles = self.get_roles()
user_roles = list(dict.fromkeys(user_roles)) user_roles = list(dict.fromkeys(user_roles))
self.uroles = ' '.join([role.title() for role in user_roles]) self.uroles = " ".join([role.title() for role in user_roles])
self.uroles = self.uroles.strip() self.uroles = self.uroles.strip()
def delete_role(self, role): def delete_role(self, role):
@@ -55,7 +55,7 @@ class User(UserMixin, db.Model):
return success return success
def get_roles(self): def get_roles(self):
return self.uroles.split(' ') return self.uroles.split(" ")
def add_role(self, role): def add_role(self, role):
self.add_roles([role]) self.add_roles([role])
@@ -63,12 +63,12 @@ class User(UserMixin, db.Model):
def add_roles(self, roles, postprocess=True): def add_roles(self, roles, postprocess=True):
user_roles = self.get_roles() user_roles = self.get_roles()
# Ensure whitespace is replaced with a underscore # Ensure whitespace is replaced with a underscore
roles = ['_'.join(role.split()) for role in roles] roles = ["_".join(role.split()) for role in roles]
if type(roles) == str: if type(roles) == str:
user_roles.append(roles) user_roles.append(roles)
elif type(roles) == list: elif type(roles) == list:
user_roles.extend(roles) user_roles.extend(roles)
user_roles = ' '.join(user_roles) user_roles = " ".join(user_roles)
self.uroles = user_roles self.uroles = user_roles
if postprocess: if postprocess:
self.post_role_processing() self.post_role_processing()
@@ -92,7 +92,7 @@ class User(UserMixin, db.Model):
return True return True
def __repr__(self): def __repr__(self):
return '<User {}>'.format(self.username) return "<User {}>".format(self.username)
class Search(db.Model): class Search(db.Model):
@@ -100,20 +100,22 @@ class Search(db.Model):
exact_url = db.Column(db.String(160)) exact_url = db.Column(db.String(160))
query_args = db.Column(db.String(120)) query_args = db.Column(db.String(120))
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow) timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id')) user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
def __repr__(self): def __repr__(self):
return '<Search by {} @ {}>'.format(User.query.filter_by(id=self.user_id).first().username, self.timestamp) return "<Search by {} @ {}>".format(
User.query.filter_by(id=self.user_id).first().username, self.timestamp
)
class Post(db.Model): class Post(db.Model):
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
body = db.Column(db.String(140)) body = db.Column(db.String(140))
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow) timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id')) user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
def __repr__(self): def __repr__(self):
return '<Post {}>'.format(self.body) return "<Post {}>".format(self.body)
@login.user_loader @login.user_loader

View File

@@ -7,13 +7,13 @@ from PIL import Image, ImageDraw, ImageFont
from app import app from app import app
@app.route('/panzer/') @app.route("/panzer/")
@app.route('/panzer') @app.route("/panzer")
@app.route('/panzer/<string>') @app.route("/panzer/<string>")
@app.route('/panzer/<string>/') @app.route("/panzer/<string>/")
def panzer(string='bionicles are cooler than sex'): def panzer(string="bionicles are cooler than sex"):
string = string.replace('+', ' ') string = string.replace("+", " ")
string = string.replace('\n', '%0A') string = string.replace("\n", "%0A")
image = create_panzer(string) image = create_panzer(string)
return serve_pil_image(image) return serve_pil_image(image)
@@ -21,12 +21,12 @@ def panzer(string='bionicles are cooler than sex'):
def create_panzer(string): def create_panzer(string):
img = Image.open("./app/static/panzer.jpeg") img = Image.open("./app/static/panzer.jpeg")
draw = ImageDraw.Draw(img) draw = ImageDraw.Draw(img)
font1 = ImageFont.truetype('./app/static/arial.ttf', size=30) font1 = ImageFont.truetype("./app/static/arial.ttf", size=30)
draw.text((10, 20), 'Oh panzer of the lake, what is your wisdom?', font=font1) draw.text((10, 20), "Oh panzer of the lake, what is your wisdom?", font=font1)
font2 = ImageFont.truetype('./app/static/arial.ttf', size=30) font2 = ImageFont.truetype("./app/static/arial.ttf", size=30)
topleft = (250, 500) topleft = (250, 500)
wrapped = wrap(string, width=25) wrapped = wrap(string, width=25)
wrapped = [text.replace('%0A', '\n') for text in wrapped] wrapped = [text.replace("%0A", "\n") for text in wrapped]
for y, text in enumerate(wrapped): for y, text in enumerate(wrapped):
draw.text((topleft[0], topleft[1] + (y * 33)), text, font=font2) draw.text((topleft[0], topleft[1] + (y * 33)), text, font=font2)
return img return img
@@ -34,6 +34,6 @@ def create_panzer(string):
def serve_pil_image(pil_img): def serve_pil_image(pil_img):
img_io = BytesIO() img_io = BytesIO()
pil_img.save(img_io, 'JPEG', quality=50) pil_img.save(img_io, "JPEG", quality=50)
img_io.seek(0) img_io.seek(0)
return flask.send_file(img_io, mimetype='image/jpeg') return flask.send_file(img_io, mimetype="image/jpeg")

View File

@@ -16,39 +16,62 @@ from app.models import User
print = pprint.PrettyPrinter().pprint print = pprint.PrettyPrinter().pprint
fake = faker.Faker() fake = faker.Faker()
strgen = lambda length, charset=string.ascii_letters, weights=None: ''.join( strgen = lambda length, charset=string.ascii_letters, weights=None: "".join(
random.choices(list(charset), k=length, weights=weights)) random.choices(list(charset), k=length, weights=weights)
)
@app.route('/', subdomain='api') @app.route("/", subdomain="api")
def api_index(): def api_index():
return "api" return "api"
@app.route('/time/') @app.route("/time/")
def time(): def time():
value = request.args.get('value') value = request.args.get("value")
if not value: if not value:
return '<br>'.join( return "<br>".join(
['[int] value', '[int list] lengths', '[string list] strings', '[boolean] reverse', '[string] pluralappend', [
'[boolean] synonym']) "[int] value",
"[int list] lengths",
"[string list] strings",
"[boolean] reverse",
"[string] pluralappend",
"[boolean] synonym",
]
)
value = int(value) value = int(value)
lengths = request.args.get('lengths') lengths = request.args.get("lengths")
if lengths: lengths = lengths.split(',') if lengths:
strings = request.args.get('strings') lengths = lengths.split(",")
if strings: strings = strings.split(',') strings = request.args.get("strings")
if (len(lengths or []) + len(strings or []) > 0) and (len(lengths or []) + 1 != len(strings or [])): if strings:
return f'error: lengths ({len(lengths or [])}) and strings ({len(strings or [])}) arrays must be same length to process properly' strings = strings.split(",")
if lengths: lengths = list(map(int, lengths)) if (len(lengths or []) + len(strings or []) > 0) and (
reverse = request.args.get('reverse') len(lengths or []) + 1 != len(strings or [])
if reverse: reverse = bool(reverse) ):
return timeformat(value=value, lengths=lengths or [60, 60, 24, 365], return f"error: lengths ({len(lengths or [])}) and strings ({len(strings or [])}) arrays must be same length to process properly"
strings=strings or ['second', 'minute', 'hour', 'day', 'year'], if lengths:
reverse=True if reverse is None else reverse) lengths = list(map(int, lengths))
reverse = request.args.get("reverse")
if reverse:
reverse = bool(reverse)
return timeformat(
value=value,
lengths=lengths or [60, 60, 24, 365],
strings=strings or ["second", "minute", "hour", "day", "year"],
reverse=True if reverse is None else reverse,
)
def timeformat(value, lengths=[60, 60, 24, 365], strings=['second', 'minute', 'hour', 'day', 'year'], reverse=True, def timeformat(
pluralappend='s', synonym=False): value,
lengths=[60, 60, 24, 365],
strings=["second", "minute", "hour", "day", "year"],
reverse=True,
pluralappend="s",
synonym=False,
):
converted = [value] converted = [value]
for index, length in enumerate(lengths): for index, length in enumerate(lengths):
temp = converted[-1] // length temp = converted[-1] // length
@@ -58,19 +81,23 @@ def timeformat(value, lengths=[60, 60, 24, 365], strings=['second', 'minute', 'h
converted.append(temp) converted.append(temp)
else: else:
break break
strings = strings[:len(converted)] strings = strings[: len(converted)]
build = ['{} {}'.format(value, strings[i] + pluralappend if value > 1 or value == 0 else strings[i]) for i, value in build = [
enumerate(converted)][::-1] "{} {}".format(
build = ', '.join(build) value, strings[i] + pluralappend if value > 1 or value == 0 else strings[i]
)
for i, value in enumerate(converted)
][::-1]
build = ", ".join(build)
return build return build
@app.route('/avatar/') @app.route("/avatar/")
@app.route('/avatar/<id>/') @app.route("/avatar/<id>/")
@app.route('/avatar/<id>') @app.route("/avatar/<id>")
def getAvatar(id=''): def getAvatar(id=""):
# Constants # Constants
headers = {'Authorization': f'Bot {app.config["DISCORD_TOKEN"]}'} headers = {"Authorization": f'Bot {app.config["DISCORD_TOKEN"]}'}
api = "https://discordapp.com/api/v6/users/{}" api = "https://discordapp.com/api/v6/users/{}"
cdn = "https://cdn.discordapp.com/avatars/{}/{}.png" cdn = "https://cdn.discordapp.com/avatars/{}/{}.png"
# Get User Data which contains Avatar Hash # Get User Data which contains Avatar Hash
@@ -78,74 +105,76 @@ def getAvatar(id=''):
if response.status_code != 200: if response.status_code != 200:
return response.text return response.text
user = json.loads(response.text) user = json.loads(response.text)
url = cdn.format(id, user['avatar']) url = cdn.format(id, user["avatar"])
return "<img src=\"{}\">".format(url) return '<img src="{}">'.format(url)
@app.route('/userinfo/') @app.route("/userinfo/")
@login_required @login_required
@require_role(roles=['Admin']) @require_role(roles=["Admin"])
def user_info(): def user_info():
prepare = { prepare = {
'id': current_user.get_id(), "id": current_user.get_id(),
'email': current_user.email, "email": current_user.email,
'username': current_user.username, "username": current_user.username,
'password_hash': current_user.password_hash, "password_hash": current_user.password_hash,
'is_active': current_user.is_active, "is_active": current_user.is_active,
'is_anonymous': current_user.is_anonymous, "is_anonymous": current_user.is_anonymous,
'is_authenticated': current_user.is_authenticated, "is_authenticated": current_user.is_authenticated,
'metadata': current_user.metadata.info, "metadata": current_user.metadata.info,
'uroles': current_user.get_roles() "uroles": current_user.get_roles(),
} }
return jsonify(prepare) return jsonify(prepare)
@app.route('/') @app.route("/")
def index(): def index():
jobs = [ jobs = [
'Student Photographer', "Student Photographer",
'Highschool Student', "Highschool Student",
'Web Developer', "Web Developer",
'Python Developer', "Python Developer",
'Software Engineer', "Software Engineer",
] ]
return render_template('index.html', job=random.choice(jobs)) return render_template("index.html", job=random.choice(jobs))
@app.route('/register/', methods=['GET', 'POST']) @app.route("/register/", methods=["GET", "POST"])
def register(): def register():
if current_user.is_authenticated: if current_user.is_authenticated:
return redirect(url_for('dashboard')) return redirect(url_for("dashboard"))
form = RegistrationForm() form = RegistrationForm()
if form.validate_on_submit(): if form.validate_on_submit():
user = User(username=form.username.data, email=form.email.data) user = User(username=form.username.data, email=form.email.data)
user.set_password(form.password.data) user.set_password(form.password.data)
db.session.add(user) db.session.add(user)
db.session.commit() db.session.commit()
flash('Registered Successfully!', 'info') flash("Registered Successfully!", "info")
return redirect(url_for('login')) return redirect(url_for("login"))
return render_template('register.html', title='Register', form=form, hideRegister=True) return render_template(
"register.html", title="Register", form=form, hideRegister=True
)
@app.route('/login/', methods=['GET', 'POST']) @app.route("/login/", methods=["GET", "POST"])
def login(): def login():
if current_user.is_authenticated: if current_user.is_authenticated:
return redirect(url_for('dashboard')) return redirect(url_for("dashboard"))
form = LoginForm() form = LoginForm()
if form.validate_on_submit(): if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first() user = User.query.filter_by(username=form.username.data).first()
if user is None or not user.check_password(form.password.data): if user is None or not user.check_password(form.password.data):
flash('Invalid username or password', 'error') flash("Invalid username or password", "error")
return redirect(url_for('login')) return redirect(url_for("login"))
login_user(user, remember=form.remember_me.data) login_user(user, remember=form.remember_me.data)
next_page = request.args.get('next') next_page = request.args.get("next")
if not next_page or url_parse(next_page).netloc != '': if not next_page or url_parse(next_page).netloc != "":
next_page = url_for('index') next_page = url_for("index")
return redirect(next_page) return redirect(next_page)
return render_template('login.html', title='Login', form=form, hideLogin=True) return render_template("login.html", title="Login", form=form, hideLogin=True)
@app.route('/logout/') @app.route("/logout/")
def logout(): def logout():
logout_user() logout_user()
return redirect(url_for('index')) return redirect(url_for("index"))

View File

@@ -8,28 +8,33 @@ from app import app
markdown = mistune.Markdown() markdown = mistune.Markdown()
@app.route('/keybase.txt') @app.route("/keybase.txt")
def keybase(): def keybase():
return app.send_static_file('keybase.txt') return app.send_static_file("keybase.txt")
@app.route('/modpacks') @app.route("/modpacks")
def modpacks(): def modpacks():
return markdown(open(os.path.join(app.root_path, 'static', 'MODPACKS.MD'), 'r').read()) return markdown(
open(os.path.join(app.root_path, "static", "MODPACKS.MD"), "r").read()
)
@app.route('/favicon.ico') @app.route("/favicon.ico")
def favicon(): def favicon():
return send_from_directory(os.path.join(app.root_path, 'static'), 'favicon.ico', return send_from_directory(
mimetype='image/vnd.microsoft.icon') os.path.join(app.root_path, "static"),
"favicon.ico",
mimetype="image/vnd.microsoft.icon",
)
@app.errorhandler(401) @app.errorhandler(401)
def unauthorized(e): def unauthorized(e):
return redirect(url_for('login')) return redirect(url_for("login"))
@app.errorhandler(404) @app.errorhandler(404)
def page_not_found(e): def page_not_found(e):
# note that we set the 404 status explicitly # note that we set the 404 status explicitly
return render_template('error.html', code=404, message='Content not found...'), 404 return render_template("error.html", code=404, message="Content not found..."), 404

View File

@@ -2,18 +2,27 @@ from flask import Response, send_file, request, jsonify
from flask_login import current_user from flask_login import current_user
from app import app, db, limiter from app import app, db, limiter
from app.sound_models import YouTubeAudio, CouldNotDecode, CouldNotDownload, CouldNotProcess from app.sound_models import (
YouTubeAudio,
CouldNotDecode,
CouldNotDownload,
CouldNotProcess,
)
# Selection of Lambdas for creating new responses # Selection of Lambdas for creating new responses
# Not sure if Responses change based on Request Context, but it doesn't hurt. # Not sure if Responses change based on Request Context, but it doesn't hurt.
getBadRequest = lambda: Response('Bad request', status=400, mimetype='text/plain') getBadRequest = lambda: Response("Bad request", status=400, mimetype="text/plain")
getNotImplemented = lambda: Response('Not implemented', status=501, mimetype='text/plain') getNotImplemented = lambda: Response(
getInvalidID = lambda: Response('Invalid ID', status=400, mimetype='text/plain') "Not implemented", status=501, mimetype="text/plain"
getNotDownloaded = lambda: Response('Media not yet downloaded', status=400, mimetype='text/plain') )
getInvalidID = lambda: Response("Invalid ID", status=400, mimetype="text/plain")
getNotDownloaded = lambda: Response(
"Media not yet downloaded", status=400, mimetype="text/plain"
)
# Retrieves the YouTubeAudio object relevant to the mediaid if available. If not, it facilitates the creation and writing of one. # Retrieves the YouTubeAudio object relevant to the mediaid if available. If not, it facilitates the creation and
# Also helps with access times. # writing of one. Also helps with access times.
def get_youtube(mediaid): def get_youtube(mediaid):
audio = YouTubeAudio.query.get(mediaid) audio = YouTubeAudio.query.get(mediaid)
if audio is not None: if audio is not None:
@@ -30,9 +39,9 @@ def get_youtube(mediaid):
basic_responses = { basic_responses = {
CouldNotDecode: 'Could not decode process response.', CouldNotDecode: "Could not decode process response.",
CouldNotDownload: 'Could not download video.', CouldNotDownload: "Could not download video.",
CouldNotProcess: 'Could not process.' CouldNotProcess: "Could not process.",
} }
@@ -41,55 +50,58 @@ basic_responses = {
# Shows error in full context IF authenticated + admin, otherwise basic error description, OTHERWISE a basic error message. # Shows error in full context IF authenticated + admin, otherwise basic error description, OTHERWISE a basic error message.
def errorCheck(e): def errorCheck(e):
if type(e) in basic_responses.keys(): if type(e) in basic_responses.keys():
response = f'{basic_responses[type(e)]}' response = f"{basic_responses[type(e)]}"
else: else:
raise e raise e
if current_user.is_authenticated and current_user.has_role('Admin'): response = str(e) + '\n' + response if current_user.is_authenticated and current_user.has_role("Admin"):
return Response(response, status=200, mimetype='text/plain') response = str(e) + "\n" + response
return Response(response, status=200, mimetype="text/plain")
# Under the request context, it grabs the same args needed to decide whether the stream has been downloaded previously # Under the request context, it grabs the same args needed to decide whether the stream has been downloaded previously
# It applies rate limiting differently based on service, and whether the stream has been accessed previously # It applies rate limiting differently based on service, and whether the stream has been accessed previously
def downloadLimiter(): def downloadLimiter():
if request.view_args['service'] == 'youtube': if request.view_args["service"] == "youtube":
if YouTubeAudio.query.get(request.view_args['mediaid']) is not None: if YouTubeAudio.query.get(request.view_args["mediaid"]) is not None:
return '5/minute' return "5/minute"
else: else:
return '1/30seconds' return "1/30seconds"
else: else:
return '10/minute' return "10/minute"
# Streams back the specified media back to the client # Streams back the specified media back to the client
@app.route('/stream/<service>/<mediaid>') @app.route("/stream/<service>/<mediaid>")
@limiter.limit(downloadLimiter, lambda: 'global', error_message='429 Too Many Requests') @limiter.limit(downloadLimiter, lambda: "global", error_message="429 Too Many Requests")
def stream(service, mediaid): def stream(service, mediaid):
if service == 'youtube': if service == "youtube":
if YouTubeAudio.isValid(mediaid): if YouTubeAudio.isValid(mediaid):
try: try:
audio = get_youtube(mediaid) audio = get_youtube(mediaid)
except Exception as e: except Exception as e:
return errorCheck(e) return errorCheck(e)
return send_file(audio.getPath(alt=True), attachment_filename=audio.filename) return send_file(
audio.getPath(alt=True), attachment_filename=audio.filename
)
else: else:
return getInvalidID() return getInvalidID()
elif service == 'soundcloud': elif service == "soundcloud":
return getNotImplemented() return getNotImplemented()
elif service == 'spotify': elif service == "spotify":
return getNotImplemented() return getNotImplemented()
else: else:
return getBadRequest() return getBadRequest()
# Returns the duration of a specific media # Returns the duration of a specific media
@app.route('/duration/<service>/<mediaid>') @app.route("/duration/<service>/<mediaid>")
def duration(service, mediaid): def duration(service, mediaid):
if service == 'youtube': if service == "youtube":
duration = get_youtube(mediaid).duration duration = get_youtube(mediaid).duration
return Response(str(duration), status=200, mimetype='text/plain') return Response(str(duration), status=200, mimetype="text/plain")
elif service == 'soundcloud': elif service == "soundcloud":
return getNotImplemented() return getNotImplemented()
elif service == 'spotify': elif service == "spotify":
return getNotImplemented() return getNotImplemented()
else: else:
return getBadRequest() return getBadRequest()
@@ -97,9 +109,9 @@ def duration(service, mediaid):
# Returns a detailed JSON export of a specific database entry. # Returns a detailed JSON export of a specific database entry.
# Will not create a new database entry where one didn't exist before. # Will not create a new database entry where one didn't exist before.
@app.route('/status/<service>/<mediaid>') @app.route("/status/<service>/<mediaid>")
def status(service, mediaid): def status(service, mediaid):
if service == 'youtube': if service == "youtube":
audio = YouTubeAudio.query.get(mediaid) audio = YouTubeAudio.query.get(mediaid)
if audio is None: if audio is None:
if YouTubeAudio.isValid(mediaid): if YouTubeAudio.isValid(mediaid):
@@ -107,37 +119,43 @@ def status(service, mediaid):
else: else:
return getInvalidID() return getInvalidID()
else: else:
return Response(audio.toJSON(), status=200, mimetype='application/json') return Response(audio.toJSON(), status=200, mimetype="application/json")
elif service == 'soundcloud': elif service == "soundcloud":
return getNotImplemented() return getNotImplemented()
elif service == 'spotify': elif service == "spotify":
return getNotImplemented() return getNotImplemented()
else: else:
return getBadRequest() return getBadRequest()
@app.route('/list/<service>') @app.route("/list/<service>")
def list(service): def list(service):
if service == 'youtube': if service == "youtube":
audios = YouTubeAudio.query.all() audios = YouTubeAudio.query.all()
return Response(','.join(audio.id for audio in audios), status=200, mimetype='text/plain') return Response(
elif service == 'soundcloud': ",".join(audio.id for audio in audios), status=200, mimetype="text/plain"
)
elif service == "soundcloud":
return getNotImplemented() return getNotImplemented()
elif service == 'spotify': elif service == "spotify":
return getNotImplemented() return getNotImplemented()
else: else:
return getBadRequest() return getBadRequest()
@app.route('/all/<service>') @app.route("/all/<service>")
def all(service): def all(service):
if service == 'youtube': if service == "youtube":
audios = YouTubeAudio.query.all() audios = YouTubeAudio.query.all()
return jsonify([audio.toJSON(True) for audio in audios]) return jsonify([audio.toJSON(True) for audio in audios])
return Response(jsonify([audio.toJSON(True) for audio in audios]), status=200, mimetype='application/json') return Response(
elif service == 'soundcloud': jsonify([audio.toJSON(True) for audio in audios]),
status=200,
mimetype="application/json",
)
elif service == "soundcloud":
return getNotImplemented() return getNotImplemented()
elif service == 'spotify': elif service == "spotify":
return getNotImplemented() return getNotImplemented()
else: else:
return getBadRequest() return getBadRequest()

View File

@@ -30,11 +30,14 @@ class CouldNotDecode(Exception):
# Stores basic information like Title/Uploader/URL etc. as well as holds methods useful # Stores basic information like Title/Uploader/URL etc. as well as holds methods useful
# for manipulating, deleting, downloading, updating, and accessing the relevant information or file. # for manipulating, deleting, downloading, updating, and accessing the relevant information or file.
class YouTubeAudio(db.Model): class YouTubeAudio(db.Model):
id = db.Column(db.String(11), id = db.Column(
primary_key=True) # 11 char id, presumed to stay the same for the long haul. Should be able to change to 12 chars. db.String(11), primary_key=True
) # 11 char id, presumed to stay the same for the long haul. Should be able to change to 12 chars.
url = db.Column(db.String(64)) # 43 -> 64 url = db.Column(db.String(64)) # 43 -> 64
title = db.Column(db.String(128)) # 120 > 128 title = db.Column(db.String(128)) # 120 > 128
creator = db.Column(db.String(128)) # Seems to be Uploader set, so be careful with this creator = db.Column(
db.String(128)
) # Seems to be Uploader set, so be careful with this
uploader = db.Column(db.String(32)) # 20 -> 32 uploader = db.Column(db.String(32)) # 20 -> 32
filename = db.Column(db.String(156)) # 128 + 11 + 1 -> 156 filename = db.Column(db.String(156)) # 128 + 11 + 1 -> 156
duration = db.Column(db.Integer) duration = db.Column(db.Integer)
@@ -44,7 +47,7 @@ class YouTubeAudio(db.Model):
# Marks a database entry as accessed by updating timestamps and counts # Marks a database entry as accessed by updating timestamps and counts
def access(self): def access(self):
print(f'{self.id} was just accessed ') print(f"{self.id} was just accessed ")
self.access_count = (self.access_count or 0) + 1 self.access_count = (self.access_count or 0) + 1
self.last_access_timestamp = datetime.utcnow() self.last_access_timestamp = datetime.utcnow()
db.session.commit() db.session.commit()
@@ -54,69 +57,95 @@ class YouTubeAudio(db.Model):
# alt: sendfile() asks for a path originating from ./app/ # alt: sendfile() asks for a path originating from ./app/
def getPath(self, alt=False): def getPath(self, alt=False):
if alt: if alt:
return os.path.join('sounds', 'youtube', self.filename) return os.path.join("sounds", "youtube", self.filename)
return os.path.join('app', 'sounds', 'youtube', self.filename) return os.path.join("app", "sounds", "youtube", self.filename)
def file_exists(self): def file_exists(self):
return os.path.exists(self.getPath()) return os.path.exists(self.getPath())
# Fills in all metadata for a database entry # Fills in all metadata for a database entry
def fill_metadata(self): def fill_metadata(self):
print(f'Filling out metadata for {self.id}') print(f"Filling out metadata for {self.id}")
# Use stdout=PIPE, [Python 3.6] production server support instead of 'capture_output=True' => 'process.stdout' # Use stdout=PIPE, [Python 3.6] production server support instead of 'capture_output=True' => 'process.stdout'
self.filename = self.id + '.mp3' self.filename = self.id + ".mp3"
command = f'youtube-dl -4 -x --audio-format mp3 --restrict-filenames --dump-json {self.id}' command = f"youtube-dl -4 -x --audio-format mp3 --restrict-filenames --dump-json {self.id}"
process = subprocess.Popen(command.split(' '), process = subprocess.Popen(
encoding='utf-8', stdout=subprocess.PIPE, stderr=subprocess.PIPE) command.split(" "),
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
data = process.communicate() data = process.communicate()
if process.returncode != 0: if process.returncode != 0:
raise CouldNotProcess( raise CouldNotProcess(
f'Command: {command}\n{data[1]}Exit Code: {process.returncode}') # process ends with a newline, not needed between f"Command: {command}\n{data[1]}Exit Code: {process.returncode}"
) # process ends with a newline, not needed between
try: try:
data = json.loads(data[0]) data = json.loads(data[0])
except json.JSONDecodeError: except json.JSONDecodeError:
raise CouldNotDecode( raise CouldNotDecode(
data) # We'll return the process data, figure out what to do with it higher up in stack (return/diagnose etc.) data
print(f'JSON acquired for {self.id}, beginning to fill.') ) # We'll return the process data, figure out what to do with it higher up in stack (return/diagnose etc.)
self.duration = data['duration'] print(f"JSON acquired for {self.id}, beginning to fill.")
self.url = data['webpage_url'] # Could be created, but we'll just infer from JSON response self.duration = data["duration"]
self.creator = data['creator'] or data['uploader'] self.url = data[
self.uploader = data['uploader'] or data['creator'] "webpage_url"
self.title = data['title'] or data[ ] # Could be created, but we'll just infer from JSON response
'alt_title'] # Do not trust alt-title ; it is volatile and uploader set, e.x. https://i.imgur.com/Tgff4rI.png self.creator = data["creator"] or data["uploader"]
print(f'Metadata filled for {self.id}') self.uploader = data["uploader"] or data["creator"]
self.title = (
data["title"] or data["alt_title"]
) # Do not trust alt-title ; it is volatile and uploader set, e.x. https://i.imgur.com/Tgff4rI.png
print(f"Metadata filled for {self.id}")
db.session.commit() db.session.commit()
# Begins the download process for a video # Begins the download process for a video
def download(self): def download(self):
print(f'Attempting download of {self.id}') print(f"Attempting download of {self.id}")
command = f'youtube-dl -x -4 --restrict-filenames --audio-quality 64K --audio-format mp3 -o ./app/sounds/youtube/%(id)s.%(ext)s {self.id}' command = f"youtube-dl -x -4 --restrict-filenames --audio-quality 64K --audio-format mp3 -o ./app/sounds/youtube/%(id)s.%(ext)s {self.id}"
process = subprocess.Popen(command.split(' '), encoding='utf-8', stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(
data = process.communicate() # Not the data for the mp3, just the output. We have to separate this in order to 'wait' for the process to complete fully. command.split(" "),
print('Checking process return code...') encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
data = (
process.communicate()
) # Not the data for the mp3, just the output. We have to separate this in order to 'wait' for the process to complete fully.
print("Checking process return code...")
if process.returncode != 0: if process.returncode != 0:
raise CouldNotProcess(f'Command: {command}\n{data[1] or data[0]}Exit Code: {process.returncode}') raise CouldNotProcess(
print('Checking for expected file...') f"Command: {command}\n{data[1] or data[0]}Exit Code: {process.returncode}"
)
print("Checking for expected file...")
if not os.path.exists(self.getPath()): if not os.path.exists(self.getPath()):
raise CouldNotDownload(data[1] or data[0]) raise CouldNotDownload(data[1] or data[0])
print(f'Download attempt for {self.id} finished successfully.') print(f"Download attempt for {self.id} finished successfully.")
# Validates whether the specified ID could be a valid YouTube video ID # Validates whether the specified ID could be a valid YouTube video ID
@staticmethod @staticmethod
def isValid(id): def isValid(id):
return re.match(r'^[A-Za-z0-9_-]{11}$', id) is not None return re.match(r"^[A-Za-z0-9_-]{11}$", id) is not None
# Returns a JSON serialization of the database entry # Returns a JSON serialization of the database entry
def toJSON(self, noConvert=False): def toJSON(self, noConvert=False):
data = {'id': self.id, 'url': self.url, 'title': self.title, 'creator': self.creator, data = {
'uploader': self.uploader, 'filename': self.filename, 'duration': self.duration, "id": self.id,
'access_count': self.access_count, 'download_timestamp': self.download_timestamp.isoformat(), "url": self.url,
'last_access_timestamp': self.last_access_timestamp.isoformat()} "title": self.title,
"creator": self.creator,
"uploader": self.uploader,
"filename": self.filename,
"duration": self.duration,
"access_count": self.access_count,
"download_timestamp": self.download_timestamp.isoformat(),
"last_access_timestamp": self.last_access_timestamp.isoformat(),
}
return data if noConvert else json.dumps(data) return data if noConvert else json.dumps(data)
def delete(self): def delete(self):
path = os.path.join('app', 'sounds', 'youtube', self.filename) path = os.path.join("app", "sounds", "youtube", self.filename)
try: try:
os.remove(path) os.remove(path)
except: except:

View File

@@ -8,7 +8,7 @@ from app import app
from config import Config from config import Config
from .spotify_explicit import main from .spotify_explicit import main
path = os.path.join('app/spotify_explicit/recent.json') path = os.path.join("app/spotify_explicit/recent.json")
def check_and_update(): def check_and_update():
@@ -16,26 +16,26 @@ def check_and_update():
with open(path) as file: with open(path) as file:
file = json.load(file) file = json.load(file)
except (FileNotFoundError, json.JSONDecodeError): except (FileNotFoundError, json.JSONDecodeError):
file = {'last_generated': -1} file = {"last_generated": -1}
if file['last_generated'] == -1: if file["last_generated"] == -1:
return True return True
else: else:
dif = time.time() - file['last_generated'] dif = time.time() - file["last_generated"]
# print('dif', dif) # print('dif', dif)
if dif >= Config.SPOTIFY_CACHE_TIME: if dif >= Config.SPOTIFY_CACHE_TIME:
return True return True
else: else:
ideal = file['last_generated'] + Config.SPOTIFY_CACHE_TIME ideal = file["last_generated"] + Config.SPOTIFY_CACHE_TIME
# print(f'Waiting another {int(ideal - time.time())} seconds') # print(f'Waiting another {int(ideal - time.time())} seconds')
return False return False
@app.route('/spotify/') @app.route("/spotify/")
def spotify(): def spotify():
if check_and_update(): if check_and_update():
print('Graph out of date - running update command') print("Graph out of date - running update command")
with open(path, 'w+') as file: with open(path, "w+") as file:
file = json.dump({'last_generated': int(time.time())}, file) file = json.dump({"last_generated": int(time.time())}, file)
main.main() main.main()
return send_file('spotify_explicit/export/export.png') return send_file("spotify_explicit/export/export.png")

View File

@@ -4,34 +4,36 @@ import os
import sys import sys
# Path to API Credentials file # Path to API Credentials file
PATH = os.path.join(sys.path[0], 'auth.json') PATH = os.path.join(sys.path[0], "auth.json")
# Ensure the file exists, if not, generate one and error with a reason # Ensure the file exists, if not, generate one and error with a reason
if not os.path.exists(PATH): if not os.path.exists(PATH):
with open(PATH, 'w') as file: with open(PATH, "w") as file:
# Dump a pretty-printed dictionary with default values # Dump a pretty-printed dictionary with default values
json.dump( json.dump(
{ {
'USERNAME': 'Your Username Here', "USERNAME": "Your Username Here",
'CLIENT_ID': 'Your Client ID Here', "CLIENT_ID": "Your Client ID Here",
'CLIENT_SECRET': 'Your Client Secret Here', "CLIENT_SECRET": "Your Client Secret Here",
'REDIRECT_URI': 'Your Redirect URI Callback Here', "REDIRECT_URI": "Your Redirect URI Callback Here",
'SCOPE': ['Your Scopes Here'] "SCOPE": ["Your Scopes Here"],
}, },
file, file,
indent=3 indent=3,
) )
# Error critically, then exit # Error critically, then exit
logging.critical("No \'auth.json\' file detected, one has been created for you") logging.critical("No 'auth.json' file detected, one has been created for you")
logging.critical("Please fill out with your Spotify credentials, and then restart the program") logging.critical(
"Please fill out with your Spotify credentials, and then restart the program"
)
sys.exit() sys.exit()
# Open and parse file # Open and parse file
FILE = json.load(open(PATH, 'r')) FILE = json.load(open(PATH, "r"))
# Load all configuration variables # Load all configuration variables
USERNAME = FILE['USERNAME'] USERNAME = FILE["USERNAME"]
CLIENT_ID = FILE['CLIENT_ID'] CLIENT_ID = FILE["CLIENT_ID"]
CLIENT_SECRET = FILE['CLIENT_SECRET'] CLIENT_SECRET = FILE["CLIENT_SECRET"]
REDIRECT_URI = FILE['REDIRECT_URI'] REDIRECT_URI = FILE["REDIRECT_URI"]
SCOPE = ' '.join(FILE['SCOPE']) SCOPE = " ".join(FILE["SCOPE"])

View File

@@ -11,7 +11,7 @@ from . import pull
def main(): def main():
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logging.info('Pulling data from Spotify') logging.info("Pulling data from Spotify")
refresh() refresh()
process.main() process.main()
@@ -19,14 +19,20 @@ def main():
# Refreshes tracks from files if the token from Spotipy has expired, # Refreshes tracks from files if the token from Spotipy has expired,
# thus keeping us up to date in most cases while keeping rate limits # thus keeping us up to date in most cases while keeping rate limits
def refresh(): def refresh():
file_path = os.path.join(sys.path[0], f'.cache-{auth.USERNAME}') file_path = os.path.join(sys.path[0], f".cache-{auth.USERNAME}")
if os.path.exists(file_path): if os.path.exists(file_path):
cache = json.load(open(file_path, 'r')) cache = json.load(open(file_path, "r"))
if True or time.time() > cache['expires_at']: if True or time.time() > cache["expires_at"]:
logging.info('Refreshing Spotify data by pulling tracks, this may take a moment.') logging.info(
"Refreshing Spotify data by pulling tracks, this may take a moment."
)
pull.main() pull.main()
else: else:
logging.info('Spotify data deemed to be recent enough (under {} seconds old)'.format(cache['expires_in'])) logging.info(
"Spotify data deemed to be recent enough (under {} seconds old)".format(
cache["expires_in"]
)
)
else: else:
pull.main() pull.main()

View File

@@ -9,13 +9,11 @@ import numpy as np
# Gets all files in tracks folder, returns them in parsed JSON # Gets all files in tracks folder, returns them in parsed JSON
def get_files(): def get_files():
folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tracks') folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), "tracks")
files = [] files = []
for file in os.listdir(folder): for file in os.listdir(folder):
with open(os.path.join(os.path.join(folder, file))) as file: with open(os.path.join(os.path.join(folder, file))) as file:
files.append( files.append(json.load(file))
json.load(file)
)
return files return files
@@ -23,17 +21,17 @@ def get_files():
def combine_files(files): def combine_files(files):
items = [] items = []
for file in files: for file in files:
items.extend(file['items']) items.extend(file["items"])
return items return items
# Prints the data in a interesting format # Prints the data in a interesting format
def print_data(data): def print_data(data):
for i, item in enumerate(data): for i, item in enumerate(data):
date = dateutil.parser.parse(item['added_at']) date = dateutil.parser.parse(item["added_at"])
explicit = '!' if item['track']['explicit'] else ' ' explicit = "!" if item["track"]["explicit"] else " "
track_name = item['track']['name'] track_name = item["track"]["name"]
artists = ' & '.join(artist['name'] for artist in item['track']['artists']) artists = " & ".join(artist["name"] for artist in item["track"]["artists"])
print('[{}] {} "{}" by {}'.format(date, explicit, track_name, artists)) print('[{}] {} "{}" by {}'.format(date, explicit, track_name, artists))
@@ -41,10 +39,10 @@ def process_data(data):
# Process the data by Month/Year, then by Clean/Explicit # Process the data by Month/Year, then by Clean/Explicit
scores = {} scores = {}
for item in data: for item in data:
date = dateutil.parser.parse(item['added_at']).strftime('%b %Y') date = dateutil.parser.parse(item["added_at"]).strftime("%b %Y")
if date not in scores.keys(): if date not in scores.keys():
scores[date] = [0, 0] scores[date] = [0, 0]
scores[date][1 if item['track']['explicit'] else 0] += 1 scores[date][1 if item["track"]["explicit"] else 0] += 1
# Create simplified arrays for each piece of data # Create simplified arrays for each piece of data
months = list(scores.keys())[::-1] months = list(scores.keys())[::-1]
@@ -54,7 +52,7 @@ def process_data(data):
explicit.append(item[1]) explicit.append(item[1])
# Done processing date properly, start plotting work # Done processing date properly, start plotting work
logging.info('Processed data, creating plot from data') logging.info("Processed data, creating plot from data")
# Weird numpy stuff # Weird numpy stuff
n = len(scores.values()) n = len(scores.values())
ind = np.arange(n) ind = np.arange(n)
@@ -63,33 +61,35 @@ def process_data(data):
plt.figure(figsize=(10.0, 6.0)) plt.figure(figsize=(10.0, 6.0))
# Stacked Bars # Stacked Bars
p1 = plt.bar(ind, explicit, width) p1 = plt.bar(ind, explicit, width)
p2 = plt.bar(ind, clean, width, bottom=explicit) # bottom= just has the bar sit on top of the explicit p2 = plt.bar(
ind, clean, width, bottom=explicit
) # bottom= just has the bar sit on top of the explicit
# Plot labeling # Plot labeling
plt.title('Song Count by Clean/Explicit') plt.title("Song Count by Clean/Explicit")
plt.ylabel('Song Count') plt.ylabel("Song Count")
plt.xlabel('Month') plt.xlabel("Month")
plt.xticks(ind, months, rotation=270) # Rotation 90 will have the plt.xticks(ind, months, rotation=270) # Rotation 90 will have the
plt.legend((p1[0], p2[0]), ('Explicit', 'Clean')) plt.legend((p1[0], p2[0]), ("Explicit", "Clean"))
fig = plt.gcf() # Magic to save to image and then show fig = plt.gcf() # Magic to save to image and then show
# Save the figure, overwriting anything in your way # Save the figure, overwriting anything in your way
logging.info('Saving the figure to the \'export\' folder') logging.info("Saving the figure to the 'export' folder")
export_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'export') export_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), "export")
if not os.path.exists(export_folder): if not os.path.exists(export_folder):
os.makedirs(export_folder) os.makedirs(export_folder)
plt.tight_layout() plt.tight_layout()
fig.savefig( fig.savefig(
os.path.join( os.path.join(
export_folder, export_folder,
'export' "export"
# datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S') # datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')
), ),
dpi=100, dpi=100,
quality=95 quality=95,
) )
# Finally show the figure to # Finally show the figure to
logging.info('Showing plot to User') logging.info("Showing plot to User")
# plt.show() # plt.show()
# Copy the figure to your clipboard to paste in Excel # Copy the figure to your clipboard to paste in Excel
@@ -101,10 +101,17 @@ def process_data(data):
# Will paste into Excel very easily # Will paste into Excel very easily
def copy(months, clean, explicit): def copy(months, clean, explicit):
from pyperclip import copy from pyperclip import copy
top = 'Period\tClean\tExplicit\n'
copy(top + '\n'.join([ top = "Period\tClean\tExplicit\n"
f'{item[0]}\t{item[1]}\t{item[2]}' for item in zip(months, clean, explicit) copy(
])) top
+ "\n".join(
[
f"{item[0]}\t{item[1]}\t{item[2]}"
for item in zip(months, clean, explicit)
]
)
)
def main(): def main():
@@ -114,7 +121,10 @@ def main():
logging.info(f"Read and parse {len(files)} track files") logging.info(f"Read and parse {len(files)} track files")
logging.info("Combining into single track file for ease of access") logging.info("Combining into single track file for ease of access")
data = combine_files(files) data = combine_files(files)
data.sort(key=lambda item: dateutil.parser.parse(item['added_at']).timestamp(), reverse=True) data.sort(
logging.info(f'File combined with {len(data)} items') key=lambda item: dateutil.parser.parse(item["added_at"]).timestamp(),
logging.info('Processing file...') reverse=True,
)
logging.info(f"File combined with {len(data)} items")
logging.info("Processing file...")
process_data(data) process_data(data)

View File

@@ -13,54 +13,58 @@ from . import auth
def main(): def main():
# Get Authorization # Get Authorization
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logging.info('Authorizing with Spotify via Spotipy') logging.info("Authorizing with Spotify via Spotipy")
logging.warning('May require User Interaction to authenticate properly!') logging.warning("May require User Interaction to authenticate properly!")
token = util.prompt_for_user_token( token = util.prompt_for_user_token(
username=auth.USERNAME, username=auth.USERNAME,
scope=auth.SCOPE, scope=auth.SCOPE,
client_id=auth.CLIENT_ID, client_id=auth.CLIENT_ID,
client_secret=auth.CLIENT_SECRET, client_secret=auth.CLIENT_SECRET,
redirect_uri=auth.REDIRECT_URI redirect_uri=auth.REDIRECT_URI,
) )
sp = spotipy.Spotify(auth=token) sp = spotipy.Spotify(auth=token)
logging.info('Authorized with Spotify via Spotipy') logging.info("Authorized with Spotify via Spotipy")
tracks_folder = os.path.join(os.path.dirname(__file__), 'tracks') tracks_folder = os.path.join(os.path.dirname(__file__), "tracks")
logging.warning('Clearing all files in tracks folder for new files') logging.warning("Clearing all files in tracks folder for new files")
if os.path.exists(tracks_folder): if os.path.exists(tracks_folder):
shutil.rmtree(tracks_folder) # Delete folder and all contents (old track files) shutil.rmtree(tracks_folder) # Delete folder and all contents (old track files)
os.makedirs(tracks_folder) # Recreate the folder just deleted os.makedirs(tracks_folder) # Recreate the folder just deleted
logging.info('Cleared folder, ready to download new track files') logging.info("Cleared folder, ready to download new track files")
curoffset, curlimit = 0, 50 curoffset, curlimit = 0, 50
while curoffset >= 0: while curoffset >= 0:
# Request and identify what was received # Request and identify what was received
logging.info('Requesting {} to {}'.format(curoffset, curoffset + curlimit)) logging.info("Requesting {} to {}".format(curoffset, curoffset + curlimit))
response = sp.current_user_saved_tracks(limit=curlimit, offset=curoffset) response = sp.current_user_saved_tracks(limit=curlimit, offset=curoffset)
received = len(response['items']) received = len(response["items"])
logging.info('Received {} to {}'.format(curoffset, curoffset + received)) logging.info("Received {} to {}".format(curoffset, curoffset + received))
# Create path/filename # Create path/filename
filename = f'saved-tracks-{curoffset}-{curoffset + received}.json' filename = f"saved-tracks-{curoffset}-{curoffset + received}.json"
filepath = os.path.join(tracks_folder, filename) filepath = os.path.join(tracks_folder, filename)
# Save track file # Save track file
with open(filepath, 'w+') as file: with open(filepath, "w+") as file:
json.dump(response, file) json.dump(response, file)
logging.info('Saved at "{}" ({})'.format( logging.info(
f'\\tracks\\{filename}', 'Saved at "{}" ({})'.format(
size(os.path.getsize(filepath))) f"\\tracks\\{filename}", size(os.path.getsize(filepath))
)
) )
# Decide whether we have received all possible tracks # Decide whether we have received all possible tracks
if received < curlimit: if received < curlimit:
logging.info('Requested and saved {} tracks split over {} files ({})'.format( logging.info(
curoffset + received, "Requested and saved {} tracks split over {} files ({})".format(
len(os.listdir(tracks_folder)), curoffset + received,
size( len(os.listdir(tracks_folder)),
sum( size(
os.path.getsize(os.path.join(tracks_folder, file)) for file in os.listdir(tracks_folder) sum(
os.path.getsize(os.path.join(tracks_folder, file))
for file in os.listdir(tracks_folder)
),
system=alternative,
), ),
system=alternative
) )
)) )
break break
# Continuing, so increment offset # Continuing, so increment offset
curoffset += curlimit curoffset += curlimit