diff --git a/app/__init__.py b/app/__init__.py
index 602b676..f5eca5f 100644
--- a/app/__init__.py
+++ b/app/__init__.py
@@ -4,6 +4,7 @@ from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_login import LoginManager
from flask_migrate import Migrate
+
# Flask Extensions
from flask_sqlalchemy import SQLAlchemy
@@ -15,7 +16,7 @@ app.config.from_object(Config)
app.url_map.strict_slashes = False
# App extension setup
login = LoginManager(app)
-login.login_view = 'login'
+login.login_view = "login"
db = SQLAlchemy(app)
migrate = Migrate(app, db)
limiter = Limiter(app, key_func=get_remote_address, default_limits=["10 per second"])
diff --git a/app/dashboard.py b/app/dashboard.py
index 6e6a470..1fdcb50 100644
--- a/app/dashboard.py
+++ b/app/dashboard.py
@@ -6,41 +6,43 @@ from app.custom import require_role
from app.forms import ProfileSettingsForm, ProfilePictureForm
-@app.route('/dashboard')
+@app.route("/dashboard")
@login_required
def dashboard():
- return render_template('/dashboard/dashboard.html')
+ return render_template("/dashboard/dashboard.html")
-@app.route('/dashboard/profile_settings', methods=['GET'])
+@app.route("/dashboard/profile_settings", methods=["GET"])
@login_required
def profile_settings():
psform = ProfileSettingsForm()
ppform = ProfilePictureForm()
- return render_template('/dashboard/profile_settings.html', psform=psform, ppform=ppform)
+ return render_template(
+ "/dashboard/profile_settings.html", psform=psform, ppform=ppform
+ )
-@app.route('/dashboard/profile_settings/submit', methods=['POST'])
+@app.route("/dashboard/profile_settings/submit", methods=["POST"])
@login_required
def profile_settings_submit():
form = ProfileSettingsForm()
if form.validate_on_submit():
data = {
- 'show_email': form.show_email.data or None,
- 'profile_picture_file': request.files
+ "show_email": form.show_email.data or None,
+ "profile_picture_file": request.files,
}
return jsonify(data=data)
- return '{}'
+ return "{}"
-@app.route('/dashboard/constants')
+@app.route("/dashboard/constants")
@login_required
-@require_role(roles=['Admin'])
+@require_role(roles=["Admin"])
def constants():
- return render_template('/dashboard/constants.html')
+ return render_template("/dashboard/constants.html")
-@app.route('/dashboard/rbac')
+@app.route("/dashboard/rbac")
@login_required
def rbac():
- return render_template('/dashboard/rbac.html')
+ return render_template("/dashboard/rbac.html")
diff --git a/app/forms.py b/app/forms.py
index 228f5a1..819dd37 100644
--- a/app/forms.py
+++ b/app/forms.py
@@ -1,43 +1,58 @@
from flask_wtf import FlaskForm
-from wtforms import StringField, PasswordField, BooleanField, SubmitField, RadioField, FileField
+from wtforms import (
+ StringField,
+ PasswordField,
+ BooleanField,
+ SubmitField,
+ RadioField,
+ FileField,
+)
from wtforms.validators import ValidationError, DataRequired, EqualTo, Email, URL
from app.models import User
class LoginForm(FlaskForm):
- username = StringField('Username', validators=[DataRequired()])
- password = PasswordField('Password', validators=[DataRequired()])
- remember_me = BooleanField('Remember Me')
- submit = SubmitField('Sign in')
+ username = StringField("Username", validators=[DataRequired()])
+ password = PasswordField("Password", validators=[DataRequired()])
+ remember_me = BooleanField("Remember Me")
+ submit = SubmitField("Sign in")
class RegistrationForm(FlaskForm):
- username = StringField('Username', validators=[DataRequired()])
- email = StringField('Email', validators=[DataRequired(), Email()])
- password = PasswordField('Password', validators=[DataRequired()])
- password2 = PasswordField('Repeat Password', validators=[DataRequired(), EqualTo('password')])
- submit = SubmitField('Register')
+ username = StringField("Username", validators=[DataRequired()])
+ email = StringField("Email", validators=[DataRequired(), Email()])
+ password = PasswordField("Password", validators=[DataRequired()])
+ password2 = PasswordField(
+ "Repeat Password", validators=[DataRequired(), EqualTo("password")]
+ )
+ submit = SubmitField("Register")
def validate_username(self, username):
user = User.query.filter_by(username=username.data).first()
if user is not None:
- raise ValidationError('That username is not available.')
+ raise ValidationError("That username is not available.")
def validate_email(self, email):
user = User.query.filter_by(email=email.data).first()
if user is not None:
- raise ValidationError('That email address is not available.')
+ raise ValidationError("That email address is not available.")
class ProfileSettingsForm(FlaskForm):
- show_email = RadioField('Show Email', default='registered',
- choices=[('public', 'Public'), ('registered', 'Registered Users Only'),
- ('hidden', 'Hidden')])
- submit = SubmitField('Save Profile Settings')
+ show_email = RadioField(
+ "Show Email",
+ default="registered",
+ choices=[
+ ("public", "Public"),
+ ("registered", "Registered Users Only"),
+ ("hidden", "Hidden"),
+ ],
+ )
+ submit = SubmitField("Save Profile Settings")
class ProfilePictureForm(FlaskForm):
- profile_picture_file = FileField('Upload Profile Picture')
- profile_picture_url = StringField('Use URL for Profile Picture', validators=[URL()])
- submit = SubmitField('Submit Profile Picture')
+ profile_picture_file = FileField("Upload Profile Picture")
+ profile_picture_url = StringField("Use URL for Profile Picture", validators=[URL()])
+ submit = SubmitField("Submit Profile Picture")
diff --git a/app/ftbhot.py b/app/ftbhot.py
index d71312e..c613086 100644
--- a/app/ftbhot.py
+++ b/app/ftbhot.py
@@ -3,25 +3,25 @@ import flask
from app import app
-@app.route('/ftbhot/about')
-@app.route('/ftbhot/about/')
+@app.route("/ftbhot/about")
+@app.route("/ftbhot/about/")
def ftbhot_about():
- return flask.render_template('/ftbhot/about.html')
+ return flask.render_template("/ftbhot/about.html")
-@app.route('/ftbhot/auth')
-@app.route('/ftbhot/auth/')
+@app.route("/ftbhot/auth")
+@app.route("/ftbhot/auth/")
def ftbhot_auth():
- return 'WIP'
+ return "WIP"
-@app.route('/ftbhot')
-@app.route('/ftbhot/')
+@app.route("/ftbhot")
+@app.route("/ftbhot/")
def ftbhot():
- return flask.render_template('/ftbhot/embed.html')
+ return flask.render_template("/ftbhot/embed.html")
-@app.route('/ftbhot/json')
-@app.route('/ftbhot/json/')
+@app.route("/ftbhot/json")
+@app.route("/ftbhot/json/")
def ftbhot_embed():
- return flask.render_template('/ftbhot/current.json')
+ return flask.render_template("/ftbhot/current.json")
diff --git a/app/hidden.py b/app/hidden.py
index 43b023e..fb0c26e 100644
--- a/app/hidden.py
+++ b/app/hidden.py
@@ -11,23 +11,23 @@ from app.custom import require_role
from app.models import Search
-@app.route('/hidden/history')
+@app.route("/hidden/history")
@login_required
-@require_role(roles=['Hidden', 'Admin'])
+@require_role(roles=["Hidden", "Admin"])
def hidden_history():
- return render_template('hidden_history.html')
+ return render_template("hidden_history.html")
-@app.route('/hidden/help')
+@app.route("/hidden/help")
@login_required
-@require_role(roles=['Hidden'])
+@require_role(roles=["Hidden"])
def hidden_help():
- return render_template('hidden_help.html')
+ return render_template("hidden_help.html")
# Parses strings to test for "boolean-ness"
def boolparse(string, default=False):
- trues = ['true', '1']
+ trues = ["true", "1"]
if string is None:
return default
elif string.lower() in trues:
@@ -35,26 +35,29 @@ def boolparse(string, default=False):
return False
-@app.route('/hidden/')
+@app.route("/hidden/")
@login_required
-@require_role(roles=['Hidden'])
+@require_role(roles=["Hidden"])
def hidden():
# Handled within request
- tags = request.args.get('tags') or 'trap'
+ tags = request.args.get("tags") or "trap"
try:
- page = int(request.args.get('page') or 1)
+ page = int(request.args.get("page") or 1)
except (TypeError, ValueError):
- return '\"page\" parameter must be Integer.
Invalid \"page\" parameter: \"{}\"'.format(
- request.args.get('page'))
+ return '"page" parameter must be Integer.
Invalid "page" parameter: "{}"'.format(
+ request.args.get("page")
+ )
# Handled within building
try:
- count = int(request.args.get('count') or 50)
+ count = int(request.args.get("count") or 50)
except (TypeError, ValueError):
- return '\"count\" parameter must be Integer.
Invalid \"count\": \"{}\"'.format(request.args.get('count'))
- base64 = boolparse(request.args.get('base64'))
+ return '"count" parameter must be Integer.
Invalid "count": "{}"'.format(
+ request.args.get("count")
+ )
+ base64 = boolparse(request.args.get("base64"))
# Handled within Jinja template
- showfull = boolparse(request.args.get('showfull'))
- showtags = boolparse(request.args.get('showtags'))
+ showfull = boolparse(request.args.get("showfull"))
+ showtags = boolparse(request.args.get("showtags"))
# Request, Parse & Build Data
data = build_data(tags, page - 1, count, base64, showfull)
# Handling for limiters
@@ -63,18 +66,33 @@ def hidden():
count = min(25, count)
else:
count = min(50, count)
- search = Search(user_id=current_user.id, exact_url=str(request.url), query_args=json.dumps(request.args.to_dict()))
+ search = Search(
+ user_id=current_user.id,
+ exact_url=str(request.url),
+ query_args=json.dumps(request.args.to_dict()),
+ )
db.session.add(search)
db.session.commit()
- return render_template('hidden.html', title='Gelbooru Browser', data=data, tags=tags, page=page, count=count,
- base64=base64, showfull=showfull, showtags=showtags)
+ return render_template(
+ "hidden.html",
+ title="Gelbooru Browser",
+ data=data,
+ tags=tags,
+ page=page,
+ count=count,
+ base64=base64,
+ showfull=showfull,
+ showtags=showtags,
+ )
def base64ify(url):
return base64.b64encode(requests.get(url).content).decode()
-gelbooru_api_url = "https://gelbooru.com/index.php?page=dapi&s=post&q=index&tags={}&pid={}&limit={}"
+gelbooru_api_url = (
+ "https://gelbooru.com/index.php?page=dapi&s=post&q=index&tags={}&pid={}&limit={}"
+)
gelbooru_view_url = "https://gelbooru.com/index.php?page=post&s=view&id={}"
@@ -87,24 +105,29 @@ def build_data(tags, page, count, base64, showfull):
build = []
try:
- parse['posts']['post']
+ parse["posts"]["post"]
except KeyError:
return build
- for index, element in enumerate(parse['posts']['post'][:count]):
+ for index, element in enumerate(parse["posts"]["post"][:count]):
temp = {
- 'index': str(index + 1),
- 'real_url': element['@file_url'],
- 'sample_url': element['@preview_url'],
+ "index": str(index + 1),
+ "real_url": element["@file_url"],
+ "sample_url": element["@preview_url"],
# strips tags, ensures no empty tags (may be unnecessary)
- 'tags': list(filter(lambda tag: tag != '', [tag.strip() for tag in element['@tags'].split(' ')])),
- 'view': gelbooru_view_url.format(element['@id'])
+ "tags": list(
+ filter(
+ lambda tag: tag != "",
+ [tag.strip() for tag in element["@tags"].split(" ")],
+ )
+ ),
+ "view": gelbooru_view_url.format(element["@id"]),
}
if base64:
if not showfull:
- temp['base64'] = base64ify(temp['sample_url'])
+ temp["base64"] = base64ify(temp["sample_url"])
else:
- temp['base64'] = base64ify(temp['real_url'])
+ temp["base64"] = base64ify(temp["real_url"])
build.append(temp)
return build
diff --git a/app/models.py b/app/models.py
index 580d771..380c465 100644
--- a/app/models.py
+++ b/app/models.py
@@ -16,9 +16,9 @@ class User(UserMixin, db.Model):
email = db.Column(db.String(120), index=True, unique=True)
register_timestamp = db.Column(db.DateTime, default=datetime.utcnow)
password_hash = db.Column(db.String(64))
- posts = db.relationship('Post', backref='author', lazy='dynamic')
- search_history = db.relationship('Search', backref='user', lazy='dynamic')
- uroles = db.Column(db.String(80), default='')
+ posts = db.relationship("Post", backref="author", lazy="dynamic")
+ search_history = db.relationship("Search", backref="user", lazy="dynamic")
+ uroles = db.Column(db.String(80), default="")
about_me = db.Column(db.String(320))
last_seen = db.Column(db.DateTime, default=datetime.utcnow)
show_email = db.Column(db.Boolean, default=False)
@@ -31,11 +31,11 @@ class User(UserMixin, db.Model):
raise "{} has no password_hash set!".format(self.__repr__())
return check_password_hash(self.password_hash, password)
- # Retains order while making sure that there are no duplicate role values and they are capitalized
+ # Retains order while making sure that there are no duplicate role values and they are capitalized
def post_role_processing(self):
user_roles = self.get_roles()
user_roles = list(dict.fromkeys(user_roles))
- self.uroles = ' '.join([role.title() for role in user_roles])
+ self.uroles = " ".join([role.title() for role in user_roles])
self.uroles = self.uroles.strip()
def delete_role(self, role):
@@ -55,7 +55,7 @@ class User(UserMixin, db.Model):
return success
def get_roles(self):
- return self.uroles.split(' ')
+ return self.uroles.split(" ")
def add_role(self, role):
self.add_roles([role])
@@ -63,12 +63,12 @@ class User(UserMixin, db.Model):
def add_roles(self, roles, postprocess=True):
user_roles = self.get_roles()
# Ensure whitespace is replaced with a underscore
- roles = ['_'.join(role.split()) for role in roles]
+ roles = ["_".join(role.split()) for role in roles]
if type(roles) == str:
user_roles.append(roles)
elif type(roles) == list:
user_roles.extend(roles)
- user_roles = ' '.join(user_roles)
+ user_roles = " ".join(user_roles)
self.uroles = user_roles
if postprocess:
self.post_role_processing()
@@ -92,7 +92,7 @@ class User(UserMixin, db.Model):
return True
def __repr__(self):
- return ''.format(self.username)
+ return "".format(self.username)
class Search(db.Model):
@@ -100,20 +100,22 @@ class Search(db.Model):
exact_url = db.Column(db.String(160))
query_args = db.Column(db.String(120))
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
- user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
+ user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
def __repr__(self):
- return ''.format(User.query.filter_by(id=self.user_id).first().username, self.timestamp)
+ return "".format(
+ User.query.filter_by(id=self.user_id).first().username, self.timestamp
+ )
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
body = db.Column(db.String(140))
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
- user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
+ user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
def __repr__(self):
- return ''.format(self.body)
+ return "".format(self.body)
@login.user_loader
diff --git a/app/panzer.py b/app/panzer.py
index c9f88f5..770f30a 100644
--- a/app/panzer.py
+++ b/app/panzer.py
@@ -7,13 +7,13 @@ from PIL import Image, ImageDraw, ImageFont
from app import app
-@app.route('/panzer/')
-@app.route('/panzer')
-@app.route('/panzer/')
-@app.route('/panzer//')
-def panzer(string='bionicles are cooler than sex'):
- string = string.replace('+', ' ')
- string = string.replace('\n', '%0A')
+@app.route("/panzer/")
+@app.route("/panzer")
+@app.route("/panzer/")
+@app.route("/panzer//")
+def panzer(string="bionicles are cooler than sex"):
+ string = string.replace("+", " ")
+ string = string.replace("\n", "%0A")
image = create_panzer(string)
return serve_pil_image(image)
@@ -21,12 +21,12 @@ def panzer(string='bionicles are cooler than sex'):
def create_panzer(string):
img = Image.open("./app/static/panzer.jpeg")
draw = ImageDraw.Draw(img)
- font1 = ImageFont.truetype('./app/static/arial.ttf', size=30)
- draw.text((10, 20), 'Oh panzer of the lake, what is your wisdom?', font=font1)
- font2 = ImageFont.truetype('./app/static/arial.ttf', size=30)
+ font1 = ImageFont.truetype("./app/static/arial.ttf", size=30)
+ draw.text((10, 20), "Oh panzer of the lake, what is your wisdom?", font=font1)
+ font2 = ImageFont.truetype("./app/static/arial.ttf", size=30)
topleft = (250, 500)
wrapped = wrap(string, width=25)
- wrapped = [text.replace('%0A', '\n') for text in wrapped]
+ wrapped = [text.replace("%0A", "\n") for text in wrapped]
for y, text in enumerate(wrapped):
draw.text((topleft[0], topleft[1] + (y * 33)), text, font=font2)
return img
@@ -34,6 +34,6 @@ def create_panzer(string):
def serve_pil_image(pil_img):
img_io = BytesIO()
- pil_img.save(img_io, 'JPEG', quality=50)
+ pil_img.save(img_io, "JPEG", quality=50)
img_io.seek(0)
- return flask.send_file(img_io, mimetype='image/jpeg')
+ return flask.send_file(img_io, mimetype="image/jpeg")
diff --git a/app/routes.py b/app/routes.py
index dbb23d9..8d6e8c3 100644
--- a/app/routes.py
+++ b/app/routes.py
@@ -16,39 +16,62 @@ from app.models import User
print = pprint.PrettyPrinter().pprint
fake = faker.Faker()
-strgen = lambda length, charset=string.ascii_letters, weights=None: ''.join(
- random.choices(list(charset), k=length, weights=weights))
+strgen = lambda length, charset=string.ascii_letters, weights=None: "".join(
+ random.choices(list(charset), k=length, weights=weights)
+)
-@app.route('/', subdomain='api')
+@app.route("/", subdomain="api")
def api_index():
return "api"
-@app.route('/time/')
+@app.route("/time/")
def time():
- value = request.args.get('value')
+ value = request.args.get("value")
if not value:
- return '
'.join(
- ['[int] value', '[int list] lengths', '[string list] strings', '[boolean] reverse', '[string] pluralappend',
- '[boolean] synonym'])
+ return "
".join(
+ [
+ "[int] value",
+ "[int list] lengths",
+ "[string list] strings",
+ "[boolean] reverse",
+ "[string] pluralappend",
+ "[boolean] synonym",
+ ]
+ )
value = int(value)
- lengths = request.args.get('lengths')
- if lengths: lengths = lengths.split(',')
- strings = request.args.get('strings')
- if strings: strings = strings.split(',')
- if (len(lengths or []) + len(strings or []) > 0) and (len(lengths or []) + 1 != len(strings or [])):
- return f'error: lengths ({len(lengths or [])}) and strings ({len(strings or [])}) arrays must be same length to process properly'
- if lengths: lengths = list(map(int, lengths))
- reverse = request.args.get('reverse')
- if reverse: reverse = bool(reverse)
- return timeformat(value=value, lengths=lengths or [60, 60, 24, 365],
- strings=strings or ['second', 'minute', 'hour', 'day', 'year'],
- reverse=True if reverse is None else reverse)
+ lengths = request.args.get("lengths")
+ if lengths:
+ lengths = lengths.split(",")
+ strings = request.args.get("strings")
+ if strings:
+ strings = strings.split(",")
+ if (len(lengths or []) + len(strings or []) > 0) and (
+ len(lengths or []) + 1 != len(strings or [])
+ ):
+ return f"error: lengths ({len(lengths or [])}) and strings ({len(strings or [])}) arrays must be same length to process properly"
+ if lengths:
+ lengths = list(map(int, lengths))
+ reverse = request.args.get("reverse")
+ if reverse:
+ reverse = bool(reverse)
+ return timeformat(
+ value=value,
+ lengths=lengths or [60, 60, 24, 365],
+ strings=strings or ["second", "minute", "hour", "day", "year"],
+ reverse=True if reverse is None else reverse,
+ )
-def timeformat(value, lengths=[60, 60, 24, 365], strings=['second', 'minute', 'hour', 'day', 'year'], reverse=True,
- pluralappend='s', synonym=False):
+def timeformat(
+ value,
+ lengths=[60, 60, 24, 365],
+ strings=["second", "minute", "hour", "day", "year"],
+ reverse=True,
+ pluralappend="s",
+ synonym=False,
+):
converted = [value]
for index, length in enumerate(lengths):
temp = converted[-1] // length
@@ -58,19 +81,23 @@ def timeformat(value, lengths=[60, 60, 24, 365], strings=['second', 'minute', 'h
converted.append(temp)
else:
break
- strings = strings[:len(converted)]
- build = ['{} {}'.format(value, strings[i] + pluralappend if value > 1 or value == 0 else strings[i]) for i, value in
- enumerate(converted)][::-1]
- build = ', '.join(build)
+ strings = strings[: len(converted)]
+ build = [
+ "{} {}".format(
+ value, strings[i] + pluralappend if value > 1 or value == 0 else strings[i]
+ )
+ for i, value in enumerate(converted)
+ ][::-1]
+ build = ", ".join(build)
return build
-@app.route('/avatar/')
-@app.route('/avatar//')
-@app.route('/avatar/')
-def getAvatar(id=''):
+@app.route("/avatar/")
+@app.route("/avatar//")
+@app.route("/avatar/")
+def getAvatar(id=""):
# Constants
- headers = {'Authorization': f'Bot {app.config["DISCORD_TOKEN"]}'}
+ headers = {"Authorization": f'Bot {app.config["DISCORD_TOKEN"]}'}
api = "https://discordapp.com/api/v6/users/{}"
cdn = "https://cdn.discordapp.com/avatars/{}/{}.png"
# Get User Data which contains Avatar Hash
@@ -78,74 +105,76 @@ def getAvatar(id=''):
if response.status_code != 200:
return response.text
user = json.loads(response.text)
- url = cdn.format(id, user['avatar'])
- return "
".format(url)
+ url = cdn.format(id, user["avatar"])
+ return '
'.format(url)
-@app.route('/userinfo/')
+@app.route("/userinfo/")
@login_required
-@require_role(roles=['Admin'])
+@require_role(roles=["Admin"])
def user_info():
prepare = {
- 'id': current_user.get_id(),
- 'email': current_user.email,
- 'username': current_user.username,
- 'password_hash': current_user.password_hash,
- 'is_active': current_user.is_active,
- 'is_anonymous': current_user.is_anonymous,
- 'is_authenticated': current_user.is_authenticated,
- 'metadata': current_user.metadata.info,
- 'uroles': current_user.get_roles()
+ "id": current_user.get_id(),
+ "email": current_user.email,
+ "username": current_user.username,
+ "password_hash": current_user.password_hash,
+ "is_active": current_user.is_active,
+ "is_anonymous": current_user.is_anonymous,
+ "is_authenticated": current_user.is_authenticated,
+ "metadata": current_user.metadata.info,
+ "uroles": current_user.get_roles(),
}
return jsonify(prepare)
-@app.route('/')
+@app.route("/")
def index():
jobs = [
- 'Student Photographer',
- 'Highschool Student',
- 'Web Developer',
- 'Python Developer',
- 'Software Engineer',
+ "Student Photographer",
+ "Highschool Student",
+ "Web Developer",
+ "Python Developer",
+ "Software Engineer",
]
- return render_template('index.html', job=random.choice(jobs))
+ return render_template("index.html", job=random.choice(jobs))
-@app.route('/register/', methods=['GET', 'POST'])
+@app.route("/register/", methods=["GET", "POST"])
def register():
if current_user.is_authenticated:
- return redirect(url_for('dashboard'))
+ return redirect(url_for("dashboard"))
form = RegistrationForm()
if form.validate_on_submit():
user = User(username=form.username.data, email=form.email.data)
user.set_password(form.password.data)
db.session.add(user)
db.session.commit()
- flash('Registered Successfully!', 'info')
- return redirect(url_for('login'))
- return render_template('register.html', title='Register', form=form, hideRegister=True)
+ flash("Registered Successfully!", "info")
+ return redirect(url_for("login"))
+ return render_template(
+ "register.html", title="Register", form=form, hideRegister=True
+ )
-@app.route('/login/', methods=['GET', 'POST'])
+@app.route("/login/", methods=["GET", "POST"])
def login():
if current_user.is_authenticated:
- return redirect(url_for('dashboard'))
+ return redirect(url_for("dashboard"))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user is None or not user.check_password(form.password.data):
- flash('Invalid username or password', 'error')
- return redirect(url_for('login'))
+ flash("Invalid username or password", "error")
+ return redirect(url_for("login"))
login_user(user, remember=form.remember_me.data)
- next_page = request.args.get('next')
- if not next_page or url_parse(next_page).netloc != '':
- next_page = url_for('index')
+ next_page = request.args.get("next")
+ if not next_page or url_parse(next_page).netloc != "":
+ next_page = url_for("index")
return redirect(next_page)
- return render_template('login.html', title='Login', form=form, hideLogin=True)
+ return render_template("login.html", title="Login", form=form, hideLogin=True)
-@app.route('/logout/')
+@app.route("/logout/")
def logout():
logout_user()
- return redirect(url_for('index'))
+ return redirect(url_for("index"))
diff --git a/app/simple_routes.py b/app/simple_routes.py
index b08e36b..3475a45 100644
--- a/app/simple_routes.py
+++ b/app/simple_routes.py
@@ -8,28 +8,33 @@ from app import app
markdown = mistune.Markdown()
-@app.route('/keybase.txt')
+@app.route("/keybase.txt")
def keybase():
- return app.send_static_file('keybase.txt')
+ return app.send_static_file("keybase.txt")
-@app.route('/modpacks')
+@app.route("/modpacks")
def modpacks():
- return markdown(open(os.path.join(app.root_path, 'static', 'MODPACKS.MD'), 'r').read())
+ return markdown(
+ open(os.path.join(app.root_path, "static", "MODPACKS.MD"), "r").read()
+ )
-@app.route('/favicon.ico')
+@app.route("/favicon.ico")
def favicon():
- return send_from_directory(os.path.join(app.root_path, 'static'), 'favicon.ico',
- mimetype='image/vnd.microsoft.icon')
+ return send_from_directory(
+ os.path.join(app.root_path, "static"),
+ "favicon.ico",
+ mimetype="image/vnd.microsoft.icon",
+ )
@app.errorhandler(401)
def unauthorized(e):
- return redirect(url_for('login'))
+ return redirect(url_for("login"))
@app.errorhandler(404)
def page_not_found(e):
# note that we set the 404 status explicitly
- return render_template('error.html', code=404, message='Content not found...'), 404
+ return render_template("error.html", code=404, message="Content not found..."), 404
diff --git a/app/sound.py b/app/sound.py
index 652bb23..3b42a3f 100644
--- a/app/sound.py
+++ b/app/sound.py
@@ -2,18 +2,27 @@ from flask import Response, send_file, request, jsonify
from flask_login import current_user
from app import app, db, limiter
-from app.sound_models import YouTubeAudio, CouldNotDecode, CouldNotDownload, CouldNotProcess
+from app.sound_models import (
+ YouTubeAudio,
+ CouldNotDecode,
+ CouldNotDownload,
+ CouldNotProcess,
+)
# Selection of Lambdas for creating new responses
# Not sure if Responses change based on Request Context, but it doesn't hurt.
-getBadRequest = lambda: Response('Bad request', status=400, mimetype='text/plain')
-getNotImplemented = lambda: Response('Not implemented', status=501, mimetype='text/plain')
-getInvalidID = lambda: Response('Invalid ID', status=400, mimetype='text/plain')
-getNotDownloaded = lambda: Response('Media not yet downloaded', status=400, mimetype='text/plain')
+getBadRequest = lambda: Response("Bad request", status=400, mimetype="text/plain")
+getNotImplemented = lambda: Response(
+ "Not implemented", status=501, mimetype="text/plain"
+)
+getInvalidID = lambda: Response("Invalid ID", status=400, mimetype="text/plain")
+getNotDownloaded = lambda: Response(
+ "Media not yet downloaded", status=400, mimetype="text/plain"
+)
-# Retrieves the YouTubeAudio object relevant to the mediaid if available. If not, it facilitates the creation and writing of one.
-# Also helps with access times.
+# Retrieves the YouTubeAudio object relevant to the mediaid if available. If not, it facilitates the creation and
+# writing of one. Also helps with access times.
def get_youtube(mediaid):
audio = YouTubeAudio.query.get(mediaid)
if audio is not None:
@@ -30,9 +39,9 @@ def get_youtube(mediaid):
basic_responses = {
- CouldNotDecode: 'Could not decode process response.',
- CouldNotDownload: 'Could not download video.',
- CouldNotProcess: 'Could not process.'
+ CouldNotDecode: "Could not decode process response.",
+ CouldNotDownload: "Could not download video.",
+ CouldNotProcess: "Could not process.",
}
@@ -41,55 +50,58 @@ basic_responses = {
# Shows error in full context IF authenticated + admin, otherwise basic error description, OTHERWISE a basic error message.
def errorCheck(e):
if type(e) in basic_responses.keys():
- response = f'{basic_responses[type(e)]}'
+ response = f"{basic_responses[type(e)]}"
else:
raise e
- if current_user.is_authenticated and current_user.has_role('Admin'): response = str(e) + '\n' + response
- return Response(response, status=200, mimetype='text/plain')
+ if current_user.is_authenticated and current_user.has_role("Admin"):
+ response = str(e) + "\n" + response
+ return Response(response, status=200, mimetype="text/plain")
# Under the request context, it grabs the same args needed to decide whether the stream has been downloaded previously
# It applies rate limiting differently based on service, and whether the stream has been accessed previously
def downloadLimiter():
- if request.view_args['service'] == 'youtube':
- if YouTubeAudio.query.get(request.view_args['mediaid']) is not None:
- return '5/minute'
+ if request.view_args["service"] == "youtube":
+ if YouTubeAudio.query.get(request.view_args["mediaid"]) is not None:
+ return "5/minute"
else:
- return '1/30seconds'
+ return "1/30seconds"
else:
- return '10/minute'
+ return "10/minute"
# Streams back the specified media back to the client
-@app.route('/stream//')
-@limiter.limit(downloadLimiter, lambda: 'global', error_message='429 Too Many Requests')
+@app.route("/stream//")
+@limiter.limit(downloadLimiter, lambda: "global", error_message="429 Too Many Requests")
def stream(service, mediaid):
- if service == 'youtube':
+ if service == "youtube":
if YouTubeAudio.isValid(mediaid):
try:
audio = get_youtube(mediaid)
except Exception as e:
return errorCheck(e)
- return send_file(audio.getPath(alt=True), attachment_filename=audio.filename)
+ return send_file(
+ audio.getPath(alt=True), attachment_filename=audio.filename
+ )
else:
return getInvalidID()
- elif service == 'soundcloud':
+ elif service == "soundcloud":
return getNotImplemented()
- elif service == 'spotify':
+ elif service == "spotify":
return getNotImplemented()
else:
return getBadRequest()
# Returns the duration of a specific media
-@app.route('/duration//')
+@app.route("/duration//")
def duration(service, mediaid):
- if service == 'youtube':
+ if service == "youtube":
duration = get_youtube(mediaid).duration
- return Response(str(duration), status=200, mimetype='text/plain')
- elif service == 'soundcloud':
+ return Response(str(duration), status=200, mimetype="text/plain")
+ elif service == "soundcloud":
return getNotImplemented()
- elif service == 'spotify':
+ elif service == "spotify":
return getNotImplemented()
else:
return getBadRequest()
@@ -97,9 +109,9 @@ def duration(service, mediaid):
# Returns a detailed JSON export of a specific database entry.
# Will not create a new database entry where one didn't exist before.
-@app.route('/status//')
+@app.route("/status//")
def status(service, mediaid):
- if service == 'youtube':
+ if service == "youtube":
audio = YouTubeAudio.query.get(mediaid)
if audio is None:
if YouTubeAudio.isValid(mediaid):
@@ -107,37 +119,43 @@ def status(service, mediaid):
else:
return getInvalidID()
else:
- return Response(audio.toJSON(), status=200, mimetype='application/json')
- elif service == 'soundcloud':
+ return Response(audio.toJSON(), status=200, mimetype="application/json")
+ elif service == "soundcloud":
return getNotImplemented()
- elif service == 'spotify':
+ elif service == "spotify":
return getNotImplemented()
else:
return getBadRequest()
-@app.route('/list/')
+@app.route("/list/")
def list(service):
- if service == 'youtube':
+ if service == "youtube":
audios = YouTubeAudio.query.all()
- return Response(','.join(audio.id for audio in audios), status=200, mimetype='text/plain')
- elif service == 'soundcloud':
+ return Response(
+ ",".join(audio.id for audio in audios), status=200, mimetype="text/plain"
+ )
+ elif service == "soundcloud":
return getNotImplemented()
- elif service == 'spotify':
+ elif service == "spotify":
return getNotImplemented()
else:
return getBadRequest()
-@app.route('/all/')
+@app.route("/all/")
def all(service):
- if service == 'youtube':
+ if service == "youtube":
audios = YouTubeAudio.query.all()
return jsonify([audio.toJSON(True) for audio in audios])
- return Response(jsonify([audio.toJSON(True) for audio in audios]), status=200, mimetype='application/json')
- elif service == 'soundcloud':
+ return Response(
+ jsonify([audio.toJSON(True) for audio in audios]),
+ status=200,
+ mimetype="application/json",
+ )
+ elif service == "soundcloud":
return getNotImplemented()
- elif service == 'spotify':
+ elif service == "spotify":
return getNotImplemented()
else:
return getBadRequest()
diff --git a/app/sound_models.py b/app/sound_models.py
index 5d885f3..c251979 100644
--- a/app/sound_models.py
+++ b/app/sound_models.py
@@ -30,11 +30,14 @@ class CouldNotDecode(Exception):
# Stores basic information like Title/Uploader/URL etc. as well as holds methods useful
# for manipulating, deleting, downloading, updating, and accessing the relevant information or file.
class YouTubeAudio(db.Model):
- id = db.Column(db.String(11),
- primary_key=True) # 11 char id, presumed to stay the same for the long haul. Should be able to change to 12 chars.
+ id = db.Column(
+ db.String(11), primary_key=True
+ ) # 11 char id, presumed to stay the same for the long haul. Should be able to change to 12 chars.
url = db.Column(db.String(64)) # 43 -> 64
title = db.Column(db.String(128)) # 120 > 128
- creator = db.Column(db.String(128)) # Seems to be Uploader set, so be careful with this
+ creator = db.Column(
+ db.String(128)
+ ) # Seems to be Uploader set, so be careful with this
uploader = db.Column(db.String(32)) # 20 -> 32
filename = db.Column(db.String(156)) # 128 + 11 + 1 -> 156
duration = db.Column(db.Integer)
@@ -44,7 +47,7 @@ class YouTubeAudio(db.Model):
# Marks a database entry as accessed by updating timestamps and counts
def access(self):
- print(f'{self.id} was just accessed ')
+ print(f"{self.id} was just accessed ")
self.access_count = (self.access_count or 0) + 1
self.last_access_timestamp = datetime.utcnow()
db.session.commit()
@@ -54,69 +57,95 @@ class YouTubeAudio(db.Model):
# alt: sendfile() asks for a path originating from ./app/
def getPath(self, alt=False):
if alt:
- return os.path.join('sounds', 'youtube', self.filename)
- return os.path.join('app', 'sounds', 'youtube', self.filename)
+ return os.path.join("sounds", "youtube", self.filename)
+ return os.path.join("app", "sounds", "youtube", self.filename)
def file_exists(self):
return os.path.exists(self.getPath())
# Fills in all metadata for a database entry
def fill_metadata(self):
- print(f'Filling out metadata for {self.id}')
+ print(f"Filling out metadata for {self.id}")
# Use stdout=PIPE, [Python 3.6] production server support instead of 'capture_output=True' => 'process.stdout'
- self.filename = self.id + '.mp3'
- command = f'youtube-dl -4 -x --audio-format mp3 --restrict-filenames --dump-json {self.id}'
- process = subprocess.Popen(command.split(' '),
- encoding='utf-8', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ self.filename = self.id + ".mp3"
+ command = f"youtube-dl -4 -x --audio-format mp3 --restrict-filenames --dump-json {self.id}"
+ process = subprocess.Popen(
+ command.split(" "),
+ encoding="utf-8",
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
data = process.communicate()
if process.returncode != 0:
raise CouldNotProcess(
- f'Command: {command}\n{data[1]}Exit Code: {process.returncode}') # process ends with a newline, not needed between
+ f"Command: {command}\n{data[1]}Exit Code: {process.returncode}"
+ ) # process ends with a newline, not needed between
try:
data = json.loads(data[0])
except json.JSONDecodeError:
raise CouldNotDecode(
- data) # We'll return the process data, figure out what to do with it higher up in stack (return/diagnose etc.)
- print(f'JSON acquired for {self.id}, beginning to fill.')
- self.duration = data['duration']
- self.url = data['webpage_url'] # Could be created, but we'll just infer from JSON response
- self.creator = data['creator'] or data['uploader']
- self.uploader = data['uploader'] or data['creator']
- self.title = data['title'] or data[
- 'alt_title'] # Do not trust alt-title ; it is volatile and uploader set, e.x. https://i.imgur.com/Tgff4rI.png
- print(f'Metadata filled for {self.id}')
+ data
+ ) # We'll return the process data, figure out what to do with it higher up in stack (return/diagnose etc.)
+ print(f"JSON acquired for {self.id}, beginning to fill.")
+ self.duration = data["duration"]
+ self.url = data[
+ "webpage_url"
+ ] # Could be created, but we'll just infer from JSON response
+ self.creator = data["creator"] or data["uploader"]
+ self.uploader = data["uploader"] or data["creator"]
+ self.title = (
+ data["title"] or data["alt_title"]
+ ) # Do not trust alt-title ; it is volatile and uploader set, e.x. https://i.imgur.com/Tgff4rI.png
+ print(f"Metadata filled for {self.id}")
db.session.commit()
# Begins the download process for a video
def download(self):
- print(f'Attempting download of {self.id}')
- command = f'youtube-dl -x -4 --restrict-filenames --audio-quality 64K --audio-format mp3 -o ./app/sounds/youtube/%(id)s.%(ext)s {self.id}'
- process = subprocess.Popen(command.split(' '), encoding='utf-8', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- data = process.communicate() # Not the data for the mp3, just the output. We have to separate this in order to 'wait' for the process to complete fully.
- print('Checking process return code...')
+ print(f"Attempting download of {self.id}")
+ command = f"youtube-dl -x -4 --restrict-filenames --audio-quality 64K --audio-format mp3 -o ./app/sounds/youtube/%(id)s.%(ext)s {self.id}"
+ process = subprocess.Popen(
+ command.split(" "),
+ encoding="utf-8",
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ data = (
+ process.communicate()
+ ) # Not the data for the mp3, just the output. We have to separate this in order to 'wait' for the process to complete fully.
+ print("Checking process return code...")
if process.returncode != 0:
- raise CouldNotProcess(f'Command: {command}\n{data[1] or data[0]}Exit Code: {process.returncode}')
- print('Checking for expected file...')
+ raise CouldNotProcess(
+ f"Command: {command}\n{data[1] or data[0]}Exit Code: {process.returncode}"
+ )
+ print("Checking for expected file...")
if not os.path.exists(self.getPath()):
raise CouldNotDownload(data[1] or data[0])
- print(f'Download attempt for {self.id} finished successfully.')
+ print(f"Download attempt for {self.id} finished successfully.")
# Validates whether the specified ID could be a valid YouTube video ID
@staticmethod
def isValid(id):
- return re.match(r'^[A-Za-z0-9_-]{11}$', id) is not None
+ return re.match(r"^[A-Za-z0-9_-]{11}$", id) is not None
# Returns a JSON serialization of the database entry
def toJSON(self, noConvert=False):
- data = {'id': self.id, 'url': self.url, 'title': self.title, 'creator': self.creator,
- 'uploader': self.uploader, 'filename': self.filename, 'duration': self.duration,
- 'access_count': self.access_count, 'download_timestamp': self.download_timestamp.isoformat(),
- 'last_access_timestamp': self.last_access_timestamp.isoformat()}
+ data = {
+ "id": self.id,
+ "url": self.url,
+ "title": self.title,
+ "creator": self.creator,
+ "uploader": self.uploader,
+ "filename": self.filename,
+ "duration": self.duration,
+ "access_count": self.access_count,
+ "download_timestamp": self.download_timestamp.isoformat(),
+ "last_access_timestamp": self.last_access_timestamp.isoformat(),
+ }
return data if noConvert else json.dumps(data)
def delete(self):
- path = os.path.join('app', 'sounds', 'youtube', self.filename)
+ path = os.path.join("app", "sounds", "youtube", self.filename)
try:
os.remove(path)
except:
diff --git a/app/spotify.py b/app/spotify.py
index a7cbd80..cef0a3e 100644
--- a/app/spotify.py
+++ b/app/spotify.py
@@ -8,7 +8,7 @@ from app import app
from config import Config
from .spotify_explicit import main
-path = os.path.join('app/spotify_explicit/recent.json')
+path = os.path.join("app/spotify_explicit/recent.json")
def check_and_update():
@@ -16,26 +16,26 @@ def check_and_update():
with open(path) as file:
file = json.load(file)
except (FileNotFoundError, json.JSONDecodeError):
- file = {'last_generated': -1}
+ file = {"last_generated": -1}
- if file['last_generated'] == -1:
+ if file["last_generated"] == -1:
return True
else:
- dif = time.time() - file['last_generated']
+ dif = time.time() - file["last_generated"]
# print('dif', dif)
if dif >= Config.SPOTIFY_CACHE_TIME:
return True
else:
- ideal = file['last_generated'] + Config.SPOTIFY_CACHE_TIME
+ ideal = file["last_generated"] + Config.SPOTIFY_CACHE_TIME
# print(f'Waiting another {int(ideal - time.time())} seconds')
return False
-@app.route('/spotify/')
+@app.route("/spotify/")
def spotify():
if check_and_update():
- print('Graph out of date - running update command')
- with open(path, 'w+') as file:
- file = json.dump({'last_generated': int(time.time())}, file)
+ print("Graph out of date - running update command")
+ with open(path, "w+") as file:
+ file = json.dump({"last_generated": int(time.time())}, file)
main.main()
- return send_file('spotify_explicit/export/export.png')
+ return send_file("spotify_explicit/export/export.png")
diff --git a/app/spotify_explicit/auth.py b/app/spotify_explicit/auth.py
index ca7a6a1..4066424 100644
--- a/app/spotify_explicit/auth.py
+++ b/app/spotify_explicit/auth.py
@@ -4,34 +4,36 @@ import os
import sys
# Path to API Credentials file
-PATH = os.path.join(sys.path[0], 'auth.json')
+PATH = os.path.join(sys.path[0], "auth.json")
# Ensure the file exists, if not, generate one and error with a reason
if not os.path.exists(PATH):
- with open(PATH, 'w') as file:
+ with open(PATH, "w") as file:
# Dump a pretty-printed dictionary with default values
json.dump(
{
- 'USERNAME': 'Your Username Here',
- 'CLIENT_ID': 'Your Client ID Here',
- 'CLIENT_SECRET': 'Your Client Secret Here',
- 'REDIRECT_URI': 'Your Redirect URI Callback Here',
- 'SCOPE': ['Your Scopes Here']
+ "USERNAME": "Your Username Here",
+ "CLIENT_ID": "Your Client ID Here",
+ "CLIENT_SECRET": "Your Client Secret Here",
+ "REDIRECT_URI": "Your Redirect URI Callback Here",
+ "SCOPE": ["Your Scopes Here"],
},
file,
- indent=3
+ indent=3,
)
# Error critically, then exit
- logging.critical("No \'auth.json\' file detected, one has been created for you")
- logging.critical("Please fill out with your Spotify credentials, and then restart the program")
+ logging.critical("No 'auth.json' file detected, one has been created for you")
+ logging.critical(
+ "Please fill out with your Spotify credentials, and then restart the program"
+ )
sys.exit()
# Open and parse file
-FILE = json.load(open(PATH, 'r'))
+FILE = json.load(open(PATH, "r"))
# Load all configuration variables
-USERNAME = FILE['USERNAME']
-CLIENT_ID = FILE['CLIENT_ID']
-CLIENT_SECRET = FILE['CLIENT_SECRET']
-REDIRECT_URI = FILE['REDIRECT_URI']
-SCOPE = ' '.join(FILE['SCOPE'])
+USERNAME = FILE["USERNAME"]
+CLIENT_ID = FILE["CLIENT_ID"]
+CLIENT_SECRET = FILE["CLIENT_SECRET"]
+REDIRECT_URI = FILE["REDIRECT_URI"]
+SCOPE = " ".join(FILE["SCOPE"])
diff --git a/app/spotify_explicit/main.py b/app/spotify_explicit/main.py
index 9bfe1b5..46af0bf 100644
--- a/app/spotify_explicit/main.py
+++ b/app/spotify_explicit/main.py
@@ -11,7 +11,7 @@ from . import pull
def main():
logging.basicConfig(level=logging.INFO)
- logging.info('Pulling data from Spotify')
+ logging.info("Pulling data from Spotify")
refresh()
process.main()
@@ -19,14 +19,20 @@ def main():
# Refreshes tracks from files if the token from Spotipy has expired,
# thus keeping us up to date in most cases while keeping rate limits
def refresh():
- file_path = os.path.join(sys.path[0], f'.cache-{auth.USERNAME}')
+ file_path = os.path.join(sys.path[0], f".cache-{auth.USERNAME}")
if os.path.exists(file_path):
- cache = json.load(open(file_path, 'r'))
- if True or time.time() > cache['expires_at']:
- logging.info('Refreshing Spotify data by pulling tracks, this may take a moment.')
+ cache = json.load(open(file_path, "r"))
+ if True or time.time() > cache["expires_at"]:
+ logging.info(
+ "Refreshing Spotify data by pulling tracks, this may take a moment."
+ )
pull.main()
else:
- logging.info('Spotify data deemed to be recent enough (under {} seconds old)'.format(cache['expires_in']))
+ logging.info(
+ "Spotify data deemed to be recent enough (under {} seconds old)".format(
+ cache["expires_in"]
+ )
+ )
else:
pull.main()
diff --git a/app/spotify_explicit/process.py b/app/spotify_explicit/process.py
index e408c3c..ffe5518 100644
--- a/app/spotify_explicit/process.py
+++ b/app/spotify_explicit/process.py
@@ -9,13 +9,11 @@ import numpy as np
# Gets all files in tracks folder, returns them in parsed JSON
def get_files():
- folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tracks')
+ folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), "tracks")
files = []
for file in os.listdir(folder):
with open(os.path.join(os.path.join(folder, file))) as file:
- files.append(
- json.load(file)
- )
+ files.append(json.load(file))
return files
@@ -23,17 +21,17 @@ def get_files():
def combine_files(files):
items = []
for file in files:
- items.extend(file['items'])
+ items.extend(file["items"])
return items
# Prints the data in a interesting format
def print_data(data):
for i, item in enumerate(data):
- date = dateutil.parser.parse(item['added_at'])
- explicit = '!' if item['track']['explicit'] else ' '
- track_name = item['track']['name']
- artists = ' & '.join(artist['name'] for artist in item['track']['artists'])
+ date = dateutil.parser.parse(item["added_at"])
+ explicit = "!" if item["track"]["explicit"] else " "
+ track_name = item["track"]["name"]
+ artists = " & ".join(artist["name"] for artist in item["track"]["artists"])
print('[{}] {} "{}" by {}'.format(date, explicit, track_name, artists))
@@ -41,10 +39,10 @@ def process_data(data):
# Process the data by Month/Year, then by Clean/Explicit
scores = {}
for item in data:
- date = dateutil.parser.parse(item['added_at']).strftime('%b %Y')
+ date = dateutil.parser.parse(item["added_at"]).strftime("%b %Y")
if date not in scores.keys():
scores[date] = [0, 0]
- scores[date][1 if item['track']['explicit'] else 0] += 1
+ scores[date][1 if item["track"]["explicit"] else 0] += 1
# Create simplified arrays for each piece of data
months = list(scores.keys())[::-1]
@@ -54,7 +52,7 @@ def process_data(data):
explicit.append(item[1])
# Done processing date properly, start plotting work
- logging.info('Processed data, creating plot from data')
+ logging.info("Processed data, creating plot from data")
# Weird numpy stuff
n = len(scores.values())
ind = np.arange(n)
@@ -63,33 +61,35 @@ def process_data(data):
plt.figure(figsize=(10.0, 6.0))
# Stacked Bars
p1 = plt.bar(ind, explicit, width)
- p2 = plt.bar(ind, clean, width, bottom=explicit) # bottom= just has the bar sit on top of the explicit
+ p2 = plt.bar(
+ ind, clean, width, bottom=explicit
+ ) # bottom= just has the bar sit on top of the explicit
# Plot labeling
- plt.title('Song Count by Clean/Explicit')
- plt.ylabel('Song Count')
- plt.xlabel('Month')
+ plt.title("Song Count by Clean/Explicit")
+ plt.ylabel("Song Count")
+ plt.xlabel("Month")
plt.xticks(ind, months, rotation=270) # Rotation 90 will have the
- plt.legend((p1[0], p2[0]), ('Explicit', 'Clean'))
+ plt.legend((p1[0], p2[0]), ("Explicit", "Clean"))
fig = plt.gcf() # Magic to save to image and then show
# Save the figure, overwriting anything in your way
- logging.info('Saving the figure to the \'export\' folder')
- export_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'export')
+ logging.info("Saving the figure to the 'export' folder")
+ export_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), "export")
if not os.path.exists(export_folder):
os.makedirs(export_folder)
plt.tight_layout()
fig.savefig(
os.path.join(
export_folder,
- 'export'
+ "export"
# datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')
),
dpi=100,
- quality=95
+ quality=95,
)
- # Finally show the figure to
- logging.info('Showing plot to User')
+ # Finally show the figure to
+ logging.info("Showing plot to User")
# plt.show()
# Copy the figure to your clipboard to paste in Excel
@@ -101,10 +101,17 @@ def process_data(data):
# Will paste into Excel very easily
def copy(months, clean, explicit):
from pyperclip import copy
- top = 'Period\tClean\tExplicit\n'
- copy(top + '\n'.join([
- f'{item[0]}\t{item[1]}\t{item[2]}' for item in zip(months, clean, explicit)
- ]))
+
+ top = "Period\tClean\tExplicit\n"
+ copy(
+ top
+ + "\n".join(
+ [
+ f"{item[0]}\t{item[1]}\t{item[2]}"
+ for item in zip(months, clean, explicit)
+ ]
+ )
+ )
def main():
@@ -114,7 +121,10 @@ def main():
logging.info(f"Read and parse {len(files)} track files")
logging.info("Combining into single track file for ease of access")
data = combine_files(files)
- data.sort(key=lambda item: dateutil.parser.parse(item['added_at']).timestamp(), reverse=True)
- logging.info(f'File combined with {len(data)} items')
- logging.info('Processing file...')
+ data.sort(
+ key=lambda item: dateutil.parser.parse(item["added_at"]).timestamp(),
+ reverse=True,
+ )
+ logging.info(f"File combined with {len(data)} items")
+ logging.info("Processing file...")
process_data(data)
diff --git a/app/spotify_explicit/pull.py b/app/spotify_explicit/pull.py
index 2a82776..5be60e7 100644
--- a/app/spotify_explicit/pull.py
+++ b/app/spotify_explicit/pull.py
@@ -13,54 +13,58 @@ from . import auth
def main():
# Get Authorization
logging.basicConfig(level=logging.INFO)
- logging.info('Authorizing with Spotify via Spotipy')
- logging.warning('May require User Interaction to authenticate properly!')
+ logging.info("Authorizing with Spotify via Spotipy")
+ logging.warning("May require User Interaction to authenticate properly!")
token = util.prompt_for_user_token(
username=auth.USERNAME,
scope=auth.SCOPE,
client_id=auth.CLIENT_ID,
client_secret=auth.CLIENT_SECRET,
- redirect_uri=auth.REDIRECT_URI
+ redirect_uri=auth.REDIRECT_URI,
)
sp = spotipy.Spotify(auth=token)
- logging.info('Authorized with Spotify via Spotipy')
+ logging.info("Authorized with Spotify via Spotipy")
- tracks_folder = os.path.join(os.path.dirname(__file__), 'tracks')
- logging.warning('Clearing all files in tracks folder for new files')
+ tracks_folder = os.path.join(os.path.dirname(__file__), "tracks")
+ logging.warning("Clearing all files in tracks folder for new files")
if os.path.exists(tracks_folder):
shutil.rmtree(tracks_folder) # Delete folder and all contents (old track files)
os.makedirs(tracks_folder) # Recreate the folder just deleted
- logging.info('Cleared folder, ready to download new track files')
+ logging.info("Cleared folder, ready to download new track files")
curoffset, curlimit = 0, 50
while curoffset >= 0:
# Request and identify what was received
- logging.info('Requesting {} to {}'.format(curoffset, curoffset + curlimit))
+ logging.info("Requesting {} to {}".format(curoffset, curoffset + curlimit))
response = sp.current_user_saved_tracks(limit=curlimit, offset=curoffset)
- received = len(response['items'])
- logging.info('Received {} to {}'.format(curoffset, curoffset + received))
+ received = len(response["items"])
+ logging.info("Received {} to {}".format(curoffset, curoffset + received))
# Create path/filename
- filename = f'saved-tracks-{curoffset}-{curoffset + received}.json'
+ filename = f"saved-tracks-{curoffset}-{curoffset + received}.json"
filepath = os.path.join(tracks_folder, filename)
# Save track file
- with open(filepath, 'w+') as file:
+ with open(filepath, "w+") as file:
json.dump(response, file)
- logging.info('Saved at "{}" ({})'.format(
- f'\\tracks\\{filename}',
- size(os.path.getsize(filepath)))
+ logging.info(
+ 'Saved at "{}" ({})'.format(
+ f"\\tracks\\{filename}", size(os.path.getsize(filepath))
+ )
)
# Decide whether we have received all possible tracks
if received < curlimit:
- logging.info('Requested and saved {} tracks split over {} files ({})'.format(
- curoffset + received,
- len(os.listdir(tracks_folder)),
- size(
- sum(
- os.path.getsize(os.path.join(tracks_folder, file)) for file in os.listdir(tracks_folder)
+ logging.info(
+ "Requested and saved {} tracks split over {} files ({})".format(
+ curoffset + received,
+ len(os.listdir(tracks_folder)),
+ size(
+ sum(
+ os.path.getsize(os.path.join(tracks_folder, file))
+ for file in os.listdir(tracks_folder)
+ ),
+ system=alternative,
),
- system=alternative
)
- ))
+ )
break
# Continuing, so increment offset
curoffset += curlimit