From bc3cd82a95692d6d881521c57058be8b18e11df0 Mon Sep 17 00:00:00 2001 From: Xevion Date: Sun, 8 Mar 2020 20:49:06 -0500 Subject: [PATCH] PyCharm grand repo wide reformat with black formatter --- app/__init__.py | 3 +- app/dashboard.py | 28 +++--- app/forms.py | 53 +++++++---- app/ftbhot.py | 24 ++--- app/hidden.py | 85 +++++++++++------ app/models.py | 28 +++--- app/panzer.py | 26 +++--- app/routes.py | 161 +++++++++++++++++++------------- app/simple_routes.py | 23 +++-- app/sound.py | 106 ++++++++++++--------- app/sound_models.py | 99 +++++++++++++------- app/spotify.py | 20 ++-- app/spotify_explicit/auth.py | 34 +++---- app/spotify_explicit/main.py | 18 ++-- app/spotify_explicit/process.py | 70 ++++++++------ app/spotify_explicit/pull.py | 50 +++++----- 16 files changed, 487 insertions(+), 341 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 602b676..f5eca5f 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -4,6 +4,7 @@ from flask_limiter import Limiter from flask_limiter.util import get_remote_address from flask_login import LoginManager from flask_migrate import Migrate + # Flask Extensions from flask_sqlalchemy import SQLAlchemy @@ -15,7 +16,7 @@ app.config.from_object(Config) app.url_map.strict_slashes = False # App extension setup login = LoginManager(app) -login.login_view = 'login' +login.login_view = "login" db = SQLAlchemy(app) migrate = Migrate(app, db) limiter = Limiter(app, key_func=get_remote_address, default_limits=["10 per second"]) diff --git a/app/dashboard.py b/app/dashboard.py index 6e6a470..1fdcb50 100644 --- a/app/dashboard.py +++ b/app/dashboard.py @@ -6,41 +6,43 @@ from app.custom import require_role from app.forms import ProfileSettingsForm, ProfilePictureForm -@app.route('/dashboard') +@app.route("/dashboard") @login_required def dashboard(): - return render_template('/dashboard/dashboard.html') + return render_template("/dashboard/dashboard.html") -@app.route('/dashboard/profile_settings', methods=['GET']) +@app.route("/dashboard/profile_settings", methods=["GET"]) @login_required def profile_settings(): psform = ProfileSettingsForm() ppform = ProfilePictureForm() - return render_template('/dashboard/profile_settings.html', psform=psform, ppform=ppform) + return render_template( + "/dashboard/profile_settings.html", psform=psform, ppform=ppform + ) -@app.route('/dashboard/profile_settings/submit', methods=['POST']) +@app.route("/dashboard/profile_settings/submit", methods=["POST"]) @login_required def profile_settings_submit(): form = ProfileSettingsForm() if form.validate_on_submit(): data = { - 'show_email': form.show_email.data or None, - 'profile_picture_file': request.files + "show_email": form.show_email.data or None, + "profile_picture_file": request.files, } return jsonify(data=data) - return '{}' + return "{}" -@app.route('/dashboard/constants') +@app.route("/dashboard/constants") @login_required -@require_role(roles=['Admin']) +@require_role(roles=["Admin"]) def constants(): - return render_template('/dashboard/constants.html') + return render_template("/dashboard/constants.html") -@app.route('/dashboard/rbac') +@app.route("/dashboard/rbac") @login_required def rbac(): - return render_template('/dashboard/rbac.html') + return render_template("/dashboard/rbac.html") diff --git a/app/forms.py b/app/forms.py index 228f5a1..819dd37 100644 --- a/app/forms.py +++ b/app/forms.py @@ -1,43 +1,58 @@ from flask_wtf import FlaskForm -from wtforms import StringField, PasswordField, BooleanField, SubmitField, RadioField, FileField +from wtforms import ( + StringField, + PasswordField, + BooleanField, + SubmitField, + RadioField, + FileField, +) from wtforms.validators import ValidationError, DataRequired, EqualTo, Email, URL from app.models import User class LoginForm(FlaskForm): - username = StringField('Username', validators=[DataRequired()]) - password = PasswordField('Password', validators=[DataRequired()]) - remember_me = BooleanField('Remember Me') - submit = SubmitField('Sign in') + username = StringField("Username", validators=[DataRequired()]) + password = PasswordField("Password", validators=[DataRequired()]) + remember_me = BooleanField("Remember Me") + submit = SubmitField("Sign in") class RegistrationForm(FlaskForm): - username = StringField('Username', validators=[DataRequired()]) - email = StringField('Email', validators=[DataRequired(), Email()]) - password = PasswordField('Password', validators=[DataRequired()]) - password2 = PasswordField('Repeat Password', validators=[DataRequired(), EqualTo('password')]) - submit = SubmitField('Register') + username = StringField("Username", validators=[DataRequired()]) + email = StringField("Email", validators=[DataRequired(), Email()]) + password = PasswordField("Password", validators=[DataRequired()]) + password2 = PasswordField( + "Repeat Password", validators=[DataRequired(), EqualTo("password")] + ) + submit = SubmitField("Register") def validate_username(self, username): user = User.query.filter_by(username=username.data).first() if user is not None: - raise ValidationError('That username is not available.') + raise ValidationError("That username is not available.") def validate_email(self, email): user = User.query.filter_by(email=email.data).first() if user is not None: - raise ValidationError('That email address is not available.') + raise ValidationError("That email address is not available.") class ProfileSettingsForm(FlaskForm): - show_email = RadioField('Show Email', default='registered', - choices=[('public', 'Public'), ('registered', 'Registered Users Only'), - ('hidden', 'Hidden')]) - submit = SubmitField('Save Profile Settings') + show_email = RadioField( + "Show Email", + default="registered", + choices=[ + ("public", "Public"), + ("registered", "Registered Users Only"), + ("hidden", "Hidden"), + ], + ) + submit = SubmitField("Save Profile Settings") class ProfilePictureForm(FlaskForm): - profile_picture_file = FileField('Upload Profile Picture') - profile_picture_url = StringField('Use URL for Profile Picture', validators=[URL()]) - submit = SubmitField('Submit Profile Picture') + profile_picture_file = FileField("Upload Profile Picture") + profile_picture_url = StringField("Use URL for Profile Picture", validators=[URL()]) + submit = SubmitField("Submit Profile Picture") diff --git a/app/ftbhot.py b/app/ftbhot.py index d71312e..c613086 100644 --- a/app/ftbhot.py +++ b/app/ftbhot.py @@ -3,25 +3,25 @@ import flask from app import app -@app.route('/ftbhot/about') -@app.route('/ftbhot/about/') +@app.route("/ftbhot/about") +@app.route("/ftbhot/about/") def ftbhot_about(): - return flask.render_template('/ftbhot/about.html') + return flask.render_template("/ftbhot/about.html") -@app.route('/ftbhot/auth') -@app.route('/ftbhot/auth/') +@app.route("/ftbhot/auth") +@app.route("/ftbhot/auth/") def ftbhot_auth(): - return 'WIP' + return "WIP" -@app.route('/ftbhot') -@app.route('/ftbhot/') +@app.route("/ftbhot") +@app.route("/ftbhot/") def ftbhot(): - return flask.render_template('/ftbhot/embed.html') + return flask.render_template("/ftbhot/embed.html") -@app.route('/ftbhot/json') -@app.route('/ftbhot/json/') +@app.route("/ftbhot/json") +@app.route("/ftbhot/json/") def ftbhot_embed(): - return flask.render_template('/ftbhot/current.json') + return flask.render_template("/ftbhot/current.json") diff --git a/app/hidden.py b/app/hidden.py index 43b023e..fb0c26e 100644 --- a/app/hidden.py +++ b/app/hidden.py @@ -11,23 +11,23 @@ from app.custom import require_role from app.models import Search -@app.route('/hidden/history') +@app.route("/hidden/history") @login_required -@require_role(roles=['Hidden', 'Admin']) +@require_role(roles=["Hidden", "Admin"]) def hidden_history(): - return render_template('hidden_history.html') + return render_template("hidden_history.html") -@app.route('/hidden/help') +@app.route("/hidden/help") @login_required -@require_role(roles=['Hidden']) +@require_role(roles=["Hidden"]) def hidden_help(): - return render_template('hidden_help.html') + return render_template("hidden_help.html") # Parses strings to test for "boolean-ness" def boolparse(string, default=False): - trues = ['true', '1'] + trues = ["true", "1"] if string is None: return default elif string.lower() in trues: @@ -35,26 +35,29 @@ def boolparse(string, default=False): return False -@app.route('/hidden/') +@app.route("/hidden/") @login_required -@require_role(roles=['Hidden']) +@require_role(roles=["Hidden"]) def hidden(): # Handled within request - tags = request.args.get('tags') or 'trap' + tags = request.args.get("tags") or "trap" try: - page = int(request.args.get('page') or 1) + page = int(request.args.get("page") or 1) except (TypeError, ValueError): - return '\"page\" parameter must be Integer.
Invalid \"page\" parameter: \"{}\"'.format( - request.args.get('page')) + return '"page" parameter must be Integer.
Invalid "page" parameter: "{}"'.format( + request.args.get("page") + ) # Handled within building try: - count = int(request.args.get('count') or 50) + count = int(request.args.get("count") or 50) except (TypeError, ValueError): - return '\"count\" parameter must be Integer.
Invalid \"count\": \"{}\"'.format(request.args.get('count')) - base64 = boolparse(request.args.get('base64')) + return '"count" parameter must be Integer.
Invalid "count": "{}"'.format( + request.args.get("count") + ) + base64 = boolparse(request.args.get("base64")) # Handled within Jinja template - showfull = boolparse(request.args.get('showfull')) - showtags = boolparse(request.args.get('showtags')) + showfull = boolparse(request.args.get("showfull")) + showtags = boolparse(request.args.get("showtags")) # Request, Parse & Build Data data = build_data(tags, page - 1, count, base64, showfull) # Handling for limiters @@ -63,18 +66,33 @@ def hidden(): count = min(25, count) else: count = min(50, count) - search = Search(user_id=current_user.id, exact_url=str(request.url), query_args=json.dumps(request.args.to_dict())) + search = Search( + user_id=current_user.id, + exact_url=str(request.url), + query_args=json.dumps(request.args.to_dict()), + ) db.session.add(search) db.session.commit() - return render_template('hidden.html', title='Gelbooru Browser', data=data, tags=tags, page=page, count=count, - base64=base64, showfull=showfull, showtags=showtags) + return render_template( + "hidden.html", + title="Gelbooru Browser", + data=data, + tags=tags, + page=page, + count=count, + base64=base64, + showfull=showfull, + showtags=showtags, + ) def base64ify(url): return base64.b64encode(requests.get(url).content).decode() -gelbooru_api_url = "https://gelbooru.com/index.php?page=dapi&s=post&q=index&tags={}&pid={}&limit={}" +gelbooru_api_url = ( + "https://gelbooru.com/index.php?page=dapi&s=post&q=index&tags={}&pid={}&limit={}" +) gelbooru_view_url = "https://gelbooru.com/index.php?page=post&s=view&id={}" @@ -87,24 +105,29 @@ def build_data(tags, page, count, base64, showfull): build = [] try: - parse['posts']['post'] + parse["posts"]["post"] except KeyError: return build - for index, element in enumerate(parse['posts']['post'][:count]): + for index, element in enumerate(parse["posts"]["post"][:count]): temp = { - 'index': str(index + 1), - 'real_url': element['@file_url'], - 'sample_url': element['@preview_url'], + "index": str(index + 1), + "real_url": element["@file_url"], + "sample_url": element["@preview_url"], # strips tags, ensures no empty tags (may be unnecessary) - 'tags': list(filter(lambda tag: tag != '', [tag.strip() for tag in element['@tags'].split(' ')])), - 'view': gelbooru_view_url.format(element['@id']) + "tags": list( + filter( + lambda tag: tag != "", + [tag.strip() for tag in element["@tags"].split(" ")], + ) + ), + "view": gelbooru_view_url.format(element["@id"]), } if base64: if not showfull: - temp['base64'] = base64ify(temp['sample_url']) + temp["base64"] = base64ify(temp["sample_url"]) else: - temp['base64'] = base64ify(temp['real_url']) + temp["base64"] = base64ify(temp["real_url"]) build.append(temp) return build diff --git a/app/models.py b/app/models.py index 580d771..380c465 100644 --- a/app/models.py +++ b/app/models.py @@ -16,9 +16,9 @@ class User(UserMixin, db.Model): email = db.Column(db.String(120), index=True, unique=True) register_timestamp = db.Column(db.DateTime, default=datetime.utcnow) password_hash = db.Column(db.String(64)) - posts = db.relationship('Post', backref='author', lazy='dynamic') - search_history = db.relationship('Search', backref='user', lazy='dynamic') - uroles = db.Column(db.String(80), default='') + posts = db.relationship("Post", backref="author", lazy="dynamic") + search_history = db.relationship("Search", backref="user", lazy="dynamic") + uroles = db.Column(db.String(80), default="") about_me = db.Column(db.String(320)) last_seen = db.Column(db.DateTime, default=datetime.utcnow) show_email = db.Column(db.Boolean, default=False) @@ -31,11 +31,11 @@ class User(UserMixin, db.Model): raise "{} has no password_hash set!".format(self.__repr__()) return check_password_hash(self.password_hash, password) - # Retains order while making sure that there are no duplicate role values and they are capitalized + # Retains order while making sure that there are no duplicate role values and they are capitalized def post_role_processing(self): user_roles = self.get_roles() user_roles = list(dict.fromkeys(user_roles)) - self.uroles = ' '.join([role.title() for role in user_roles]) + self.uroles = " ".join([role.title() for role in user_roles]) self.uroles = self.uroles.strip() def delete_role(self, role): @@ -55,7 +55,7 @@ class User(UserMixin, db.Model): return success def get_roles(self): - return self.uroles.split(' ') + return self.uroles.split(" ") def add_role(self, role): self.add_roles([role]) @@ -63,12 +63,12 @@ class User(UserMixin, db.Model): def add_roles(self, roles, postprocess=True): user_roles = self.get_roles() # Ensure whitespace is replaced with a underscore - roles = ['_'.join(role.split()) for role in roles] + roles = ["_".join(role.split()) for role in roles] if type(roles) == str: user_roles.append(roles) elif type(roles) == list: user_roles.extend(roles) - user_roles = ' '.join(user_roles) + user_roles = " ".join(user_roles) self.uroles = user_roles if postprocess: self.post_role_processing() @@ -92,7 +92,7 @@ class User(UserMixin, db.Model): return True def __repr__(self): - return ''.format(self.username) + return "".format(self.username) class Search(db.Model): @@ -100,20 +100,22 @@ class Search(db.Model): exact_url = db.Column(db.String(160)) query_args = db.Column(db.String(120)) timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow) - user_id = db.Column(db.Integer, db.ForeignKey('user.id')) + user_id = db.Column(db.Integer, db.ForeignKey("user.id")) def __repr__(self): - return ''.format(User.query.filter_by(id=self.user_id).first().username, self.timestamp) + return "".format( + User.query.filter_by(id=self.user_id).first().username, self.timestamp + ) class Post(db.Model): id = db.Column(db.Integer, primary_key=True) body = db.Column(db.String(140)) timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow) - user_id = db.Column(db.Integer, db.ForeignKey('user.id')) + user_id = db.Column(db.Integer, db.ForeignKey("user.id")) def __repr__(self): - return ''.format(self.body) + return "".format(self.body) @login.user_loader diff --git a/app/panzer.py b/app/panzer.py index c9f88f5..770f30a 100644 --- a/app/panzer.py +++ b/app/panzer.py @@ -7,13 +7,13 @@ from PIL import Image, ImageDraw, ImageFont from app import app -@app.route('/panzer/') -@app.route('/panzer') -@app.route('/panzer/') -@app.route('/panzer//') -def panzer(string='bionicles are cooler than sex'): - string = string.replace('+', ' ') - string = string.replace('\n', '%0A') +@app.route("/panzer/") +@app.route("/panzer") +@app.route("/panzer/") +@app.route("/panzer//") +def panzer(string="bionicles are cooler than sex"): + string = string.replace("+", " ") + string = string.replace("\n", "%0A") image = create_panzer(string) return serve_pil_image(image) @@ -21,12 +21,12 @@ def panzer(string='bionicles are cooler than sex'): def create_panzer(string): img = Image.open("./app/static/panzer.jpeg") draw = ImageDraw.Draw(img) - font1 = ImageFont.truetype('./app/static/arial.ttf', size=30) - draw.text((10, 20), 'Oh panzer of the lake, what is your wisdom?', font=font1) - font2 = ImageFont.truetype('./app/static/arial.ttf', size=30) + font1 = ImageFont.truetype("./app/static/arial.ttf", size=30) + draw.text((10, 20), "Oh panzer of the lake, what is your wisdom?", font=font1) + font2 = ImageFont.truetype("./app/static/arial.ttf", size=30) topleft = (250, 500) wrapped = wrap(string, width=25) - wrapped = [text.replace('%0A', '\n') for text in wrapped] + wrapped = [text.replace("%0A", "\n") for text in wrapped] for y, text in enumerate(wrapped): draw.text((topleft[0], topleft[1] + (y * 33)), text, font=font2) return img @@ -34,6 +34,6 @@ def create_panzer(string): def serve_pil_image(pil_img): img_io = BytesIO() - pil_img.save(img_io, 'JPEG', quality=50) + pil_img.save(img_io, "JPEG", quality=50) img_io.seek(0) - return flask.send_file(img_io, mimetype='image/jpeg') + return flask.send_file(img_io, mimetype="image/jpeg") diff --git a/app/routes.py b/app/routes.py index dbb23d9..8d6e8c3 100644 --- a/app/routes.py +++ b/app/routes.py @@ -16,39 +16,62 @@ from app.models import User print = pprint.PrettyPrinter().pprint fake = faker.Faker() -strgen = lambda length, charset=string.ascii_letters, weights=None: ''.join( - random.choices(list(charset), k=length, weights=weights)) +strgen = lambda length, charset=string.ascii_letters, weights=None: "".join( + random.choices(list(charset), k=length, weights=weights) +) -@app.route('/', subdomain='api') +@app.route("/", subdomain="api") def api_index(): return "api" -@app.route('/time/') +@app.route("/time/") def time(): - value = request.args.get('value') + value = request.args.get("value") if not value: - return '
'.join( - ['[int] value', '[int list] lengths', '[string list] strings', '[boolean] reverse', '[string] pluralappend', - '[boolean] synonym']) + return "
".join( + [ + "[int] value", + "[int list] lengths", + "[string list] strings", + "[boolean] reverse", + "[string] pluralappend", + "[boolean] synonym", + ] + ) value = int(value) - lengths = request.args.get('lengths') - if lengths: lengths = lengths.split(',') - strings = request.args.get('strings') - if strings: strings = strings.split(',') - if (len(lengths or []) + len(strings or []) > 0) and (len(lengths or []) + 1 != len(strings or [])): - return f'error: lengths ({len(lengths or [])}) and strings ({len(strings or [])}) arrays must be same length to process properly' - if lengths: lengths = list(map(int, lengths)) - reverse = request.args.get('reverse') - if reverse: reverse = bool(reverse) - return timeformat(value=value, lengths=lengths or [60, 60, 24, 365], - strings=strings or ['second', 'minute', 'hour', 'day', 'year'], - reverse=True if reverse is None else reverse) + lengths = request.args.get("lengths") + if lengths: + lengths = lengths.split(",") + strings = request.args.get("strings") + if strings: + strings = strings.split(",") + if (len(lengths or []) + len(strings or []) > 0) and ( + len(lengths or []) + 1 != len(strings or []) + ): + return f"error: lengths ({len(lengths or [])}) and strings ({len(strings or [])}) arrays must be same length to process properly" + if lengths: + lengths = list(map(int, lengths)) + reverse = request.args.get("reverse") + if reverse: + reverse = bool(reverse) + return timeformat( + value=value, + lengths=lengths or [60, 60, 24, 365], + strings=strings or ["second", "minute", "hour", "day", "year"], + reverse=True if reverse is None else reverse, + ) -def timeformat(value, lengths=[60, 60, 24, 365], strings=['second', 'minute', 'hour', 'day', 'year'], reverse=True, - pluralappend='s', synonym=False): +def timeformat( + value, + lengths=[60, 60, 24, 365], + strings=["second", "minute", "hour", "day", "year"], + reverse=True, + pluralappend="s", + synonym=False, +): converted = [value] for index, length in enumerate(lengths): temp = converted[-1] // length @@ -58,19 +81,23 @@ def timeformat(value, lengths=[60, 60, 24, 365], strings=['second', 'minute', 'h converted.append(temp) else: break - strings = strings[:len(converted)] - build = ['{} {}'.format(value, strings[i] + pluralappend if value > 1 or value == 0 else strings[i]) for i, value in - enumerate(converted)][::-1] - build = ', '.join(build) + strings = strings[: len(converted)] + build = [ + "{} {}".format( + value, strings[i] + pluralappend if value > 1 or value == 0 else strings[i] + ) + for i, value in enumerate(converted) + ][::-1] + build = ", ".join(build) return build -@app.route('/avatar/') -@app.route('/avatar//') -@app.route('/avatar/') -def getAvatar(id=''): +@app.route("/avatar/") +@app.route("/avatar//") +@app.route("/avatar/") +def getAvatar(id=""): # Constants - headers = {'Authorization': f'Bot {app.config["DISCORD_TOKEN"]}'} + headers = {"Authorization": f'Bot {app.config["DISCORD_TOKEN"]}'} api = "https://discordapp.com/api/v6/users/{}" cdn = "https://cdn.discordapp.com/avatars/{}/{}.png" # Get User Data which contains Avatar Hash @@ -78,74 +105,76 @@ def getAvatar(id=''): if response.status_code != 200: return response.text user = json.loads(response.text) - url = cdn.format(id, user['avatar']) - return "".format(url) + url = cdn.format(id, user["avatar"]) + return ''.format(url) -@app.route('/userinfo/') +@app.route("/userinfo/") @login_required -@require_role(roles=['Admin']) +@require_role(roles=["Admin"]) def user_info(): prepare = { - 'id': current_user.get_id(), - 'email': current_user.email, - 'username': current_user.username, - 'password_hash': current_user.password_hash, - 'is_active': current_user.is_active, - 'is_anonymous': current_user.is_anonymous, - 'is_authenticated': current_user.is_authenticated, - 'metadata': current_user.metadata.info, - 'uroles': current_user.get_roles() + "id": current_user.get_id(), + "email": current_user.email, + "username": current_user.username, + "password_hash": current_user.password_hash, + "is_active": current_user.is_active, + "is_anonymous": current_user.is_anonymous, + "is_authenticated": current_user.is_authenticated, + "metadata": current_user.metadata.info, + "uroles": current_user.get_roles(), } return jsonify(prepare) -@app.route('/') +@app.route("/") def index(): jobs = [ - 'Student Photographer', - 'Highschool Student', - 'Web Developer', - 'Python Developer', - 'Software Engineer', + "Student Photographer", + "Highschool Student", + "Web Developer", + "Python Developer", + "Software Engineer", ] - return render_template('index.html', job=random.choice(jobs)) + return render_template("index.html", job=random.choice(jobs)) -@app.route('/register/', methods=['GET', 'POST']) +@app.route("/register/", methods=["GET", "POST"]) def register(): if current_user.is_authenticated: - return redirect(url_for('dashboard')) + return redirect(url_for("dashboard")) form = RegistrationForm() if form.validate_on_submit(): user = User(username=form.username.data, email=form.email.data) user.set_password(form.password.data) db.session.add(user) db.session.commit() - flash('Registered Successfully!', 'info') - return redirect(url_for('login')) - return render_template('register.html', title='Register', form=form, hideRegister=True) + flash("Registered Successfully!", "info") + return redirect(url_for("login")) + return render_template( + "register.html", title="Register", form=form, hideRegister=True + ) -@app.route('/login/', methods=['GET', 'POST']) +@app.route("/login/", methods=["GET", "POST"]) def login(): if current_user.is_authenticated: - return redirect(url_for('dashboard')) + return redirect(url_for("dashboard")) form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(username=form.username.data).first() if user is None or not user.check_password(form.password.data): - flash('Invalid username or password', 'error') - return redirect(url_for('login')) + flash("Invalid username or password", "error") + return redirect(url_for("login")) login_user(user, remember=form.remember_me.data) - next_page = request.args.get('next') - if not next_page or url_parse(next_page).netloc != '': - next_page = url_for('index') + next_page = request.args.get("next") + if not next_page or url_parse(next_page).netloc != "": + next_page = url_for("index") return redirect(next_page) - return render_template('login.html', title='Login', form=form, hideLogin=True) + return render_template("login.html", title="Login", form=form, hideLogin=True) -@app.route('/logout/') +@app.route("/logout/") def logout(): logout_user() - return redirect(url_for('index')) + return redirect(url_for("index")) diff --git a/app/simple_routes.py b/app/simple_routes.py index b08e36b..3475a45 100644 --- a/app/simple_routes.py +++ b/app/simple_routes.py @@ -8,28 +8,33 @@ from app import app markdown = mistune.Markdown() -@app.route('/keybase.txt') +@app.route("/keybase.txt") def keybase(): - return app.send_static_file('keybase.txt') + return app.send_static_file("keybase.txt") -@app.route('/modpacks') +@app.route("/modpacks") def modpacks(): - return markdown(open(os.path.join(app.root_path, 'static', 'MODPACKS.MD'), 'r').read()) + return markdown( + open(os.path.join(app.root_path, "static", "MODPACKS.MD"), "r").read() + ) -@app.route('/favicon.ico') +@app.route("/favicon.ico") def favicon(): - return send_from_directory(os.path.join(app.root_path, 'static'), 'favicon.ico', - mimetype='image/vnd.microsoft.icon') + return send_from_directory( + os.path.join(app.root_path, "static"), + "favicon.ico", + mimetype="image/vnd.microsoft.icon", + ) @app.errorhandler(401) def unauthorized(e): - return redirect(url_for('login')) + return redirect(url_for("login")) @app.errorhandler(404) def page_not_found(e): # note that we set the 404 status explicitly - return render_template('error.html', code=404, message='Content not found...'), 404 + return render_template("error.html", code=404, message="Content not found..."), 404 diff --git a/app/sound.py b/app/sound.py index 652bb23..3b42a3f 100644 --- a/app/sound.py +++ b/app/sound.py @@ -2,18 +2,27 @@ from flask import Response, send_file, request, jsonify from flask_login import current_user from app import app, db, limiter -from app.sound_models import YouTubeAudio, CouldNotDecode, CouldNotDownload, CouldNotProcess +from app.sound_models import ( + YouTubeAudio, + CouldNotDecode, + CouldNotDownload, + CouldNotProcess, +) # Selection of Lambdas for creating new responses # Not sure if Responses change based on Request Context, but it doesn't hurt. -getBadRequest = lambda: Response('Bad request', status=400, mimetype='text/plain') -getNotImplemented = lambda: Response('Not implemented', status=501, mimetype='text/plain') -getInvalidID = lambda: Response('Invalid ID', status=400, mimetype='text/plain') -getNotDownloaded = lambda: Response('Media not yet downloaded', status=400, mimetype='text/plain') +getBadRequest = lambda: Response("Bad request", status=400, mimetype="text/plain") +getNotImplemented = lambda: Response( + "Not implemented", status=501, mimetype="text/plain" +) +getInvalidID = lambda: Response("Invalid ID", status=400, mimetype="text/plain") +getNotDownloaded = lambda: Response( + "Media not yet downloaded", status=400, mimetype="text/plain" +) -# Retrieves the YouTubeAudio object relevant to the mediaid if available. If not, it facilitates the creation and writing of one. -# Also helps with access times. +# Retrieves the YouTubeAudio object relevant to the mediaid if available. If not, it facilitates the creation and +# writing of one. Also helps with access times. def get_youtube(mediaid): audio = YouTubeAudio.query.get(mediaid) if audio is not None: @@ -30,9 +39,9 @@ def get_youtube(mediaid): basic_responses = { - CouldNotDecode: 'Could not decode process response.', - CouldNotDownload: 'Could not download video.', - CouldNotProcess: 'Could not process.' + CouldNotDecode: "Could not decode process response.", + CouldNotDownload: "Could not download video.", + CouldNotProcess: "Could not process.", } @@ -41,55 +50,58 @@ basic_responses = { # Shows error in full context IF authenticated + admin, otherwise basic error description, OTHERWISE a basic error message. def errorCheck(e): if type(e) in basic_responses.keys(): - response = f'{basic_responses[type(e)]}' + response = f"{basic_responses[type(e)]}" else: raise e - if current_user.is_authenticated and current_user.has_role('Admin'): response = str(e) + '\n' + response - return Response(response, status=200, mimetype='text/plain') + if current_user.is_authenticated and current_user.has_role("Admin"): + response = str(e) + "\n" + response + return Response(response, status=200, mimetype="text/plain") # Under the request context, it grabs the same args needed to decide whether the stream has been downloaded previously # It applies rate limiting differently based on service, and whether the stream has been accessed previously def downloadLimiter(): - if request.view_args['service'] == 'youtube': - if YouTubeAudio.query.get(request.view_args['mediaid']) is not None: - return '5/minute' + if request.view_args["service"] == "youtube": + if YouTubeAudio.query.get(request.view_args["mediaid"]) is not None: + return "5/minute" else: - return '1/30seconds' + return "1/30seconds" else: - return '10/minute' + return "10/minute" # Streams back the specified media back to the client -@app.route('/stream//') -@limiter.limit(downloadLimiter, lambda: 'global', error_message='429 Too Many Requests') +@app.route("/stream//") +@limiter.limit(downloadLimiter, lambda: "global", error_message="429 Too Many Requests") def stream(service, mediaid): - if service == 'youtube': + if service == "youtube": if YouTubeAudio.isValid(mediaid): try: audio = get_youtube(mediaid) except Exception as e: return errorCheck(e) - return send_file(audio.getPath(alt=True), attachment_filename=audio.filename) + return send_file( + audio.getPath(alt=True), attachment_filename=audio.filename + ) else: return getInvalidID() - elif service == 'soundcloud': + elif service == "soundcloud": return getNotImplemented() - elif service == 'spotify': + elif service == "spotify": return getNotImplemented() else: return getBadRequest() # Returns the duration of a specific media -@app.route('/duration//') +@app.route("/duration//") def duration(service, mediaid): - if service == 'youtube': + if service == "youtube": duration = get_youtube(mediaid).duration - return Response(str(duration), status=200, mimetype='text/plain') - elif service == 'soundcloud': + return Response(str(duration), status=200, mimetype="text/plain") + elif service == "soundcloud": return getNotImplemented() - elif service == 'spotify': + elif service == "spotify": return getNotImplemented() else: return getBadRequest() @@ -97,9 +109,9 @@ def duration(service, mediaid): # Returns a detailed JSON export of a specific database entry. # Will not create a new database entry where one didn't exist before. -@app.route('/status//') +@app.route("/status//") def status(service, mediaid): - if service == 'youtube': + if service == "youtube": audio = YouTubeAudio.query.get(mediaid) if audio is None: if YouTubeAudio.isValid(mediaid): @@ -107,37 +119,43 @@ def status(service, mediaid): else: return getInvalidID() else: - return Response(audio.toJSON(), status=200, mimetype='application/json') - elif service == 'soundcloud': + return Response(audio.toJSON(), status=200, mimetype="application/json") + elif service == "soundcloud": return getNotImplemented() - elif service == 'spotify': + elif service == "spotify": return getNotImplemented() else: return getBadRequest() -@app.route('/list/') +@app.route("/list/") def list(service): - if service == 'youtube': + if service == "youtube": audios = YouTubeAudio.query.all() - return Response(','.join(audio.id for audio in audios), status=200, mimetype='text/plain') - elif service == 'soundcloud': + return Response( + ",".join(audio.id for audio in audios), status=200, mimetype="text/plain" + ) + elif service == "soundcloud": return getNotImplemented() - elif service == 'spotify': + elif service == "spotify": return getNotImplemented() else: return getBadRequest() -@app.route('/all/') +@app.route("/all/") def all(service): - if service == 'youtube': + if service == "youtube": audios = YouTubeAudio.query.all() return jsonify([audio.toJSON(True) for audio in audios]) - return Response(jsonify([audio.toJSON(True) for audio in audios]), status=200, mimetype='application/json') - elif service == 'soundcloud': + return Response( + jsonify([audio.toJSON(True) for audio in audios]), + status=200, + mimetype="application/json", + ) + elif service == "soundcloud": return getNotImplemented() - elif service == 'spotify': + elif service == "spotify": return getNotImplemented() else: return getBadRequest() diff --git a/app/sound_models.py b/app/sound_models.py index 5d885f3..c251979 100644 --- a/app/sound_models.py +++ b/app/sound_models.py @@ -30,11 +30,14 @@ class CouldNotDecode(Exception): # Stores basic information like Title/Uploader/URL etc. as well as holds methods useful # for manipulating, deleting, downloading, updating, and accessing the relevant information or file. class YouTubeAudio(db.Model): - id = db.Column(db.String(11), - primary_key=True) # 11 char id, presumed to stay the same for the long haul. Should be able to change to 12 chars. + id = db.Column( + db.String(11), primary_key=True + ) # 11 char id, presumed to stay the same for the long haul. Should be able to change to 12 chars. url = db.Column(db.String(64)) # 43 -> 64 title = db.Column(db.String(128)) # 120 > 128 - creator = db.Column(db.String(128)) # Seems to be Uploader set, so be careful with this + creator = db.Column( + db.String(128) + ) # Seems to be Uploader set, so be careful with this uploader = db.Column(db.String(32)) # 20 -> 32 filename = db.Column(db.String(156)) # 128 + 11 + 1 -> 156 duration = db.Column(db.Integer) @@ -44,7 +47,7 @@ class YouTubeAudio(db.Model): # Marks a database entry as accessed by updating timestamps and counts def access(self): - print(f'{self.id} was just accessed ') + print(f"{self.id} was just accessed ") self.access_count = (self.access_count or 0) + 1 self.last_access_timestamp = datetime.utcnow() db.session.commit() @@ -54,69 +57,95 @@ class YouTubeAudio(db.Model): # alt: sendfile() asks for a path originating from ./app/ def getPath(self, alt=False): if alt: - return os.path.join('sounds', 'youtube', self.filename) - return os.path.join('app', 'sounds', 'youtube', self.filename) + return os.path.join("sounds", "youtube", self.filename) + return os.path.join("app", "sounds", "youtube", self.filename) def file_exists(self): return os.path.exists(self.getPath()) # Fills in all metadata for a database entry def fill_metadata(self): - print(f'Filling out metadata for {self.id}') + print(f"Filling out metadata for {self.id}") # Use stdout=PIPE, [Python 3.6] production server support instead of 'capture_output=True' => 'process.stdout' - self.filename = self.id + '.mp3' - command = f'youtube-dl -4 -x --audio-format mp3 --restrict-filenames --dump-json {self.id}' - process = subprocess.Popen(command.split(' '), - encoding='utf-8', stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.filename = self.id + ".mp3" + command = f"youtube-dl -4 -x --audio-format mp3 --restrict-filenames --dump-json {self.id}" + process = subprocess.Popen( + command.split(" "), + encoding="utf-8", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) data = process.communicate() if process.returncode != 0: raise CouldNotProcess( - f'Command: {command}\n{data[1]}Exit Code: {process.returncode}') # process ends with a newline, not needed between + f"Command: {command}\n{data[1]}Exit Code: {process.returncode}" + ) # process ends with a newline, not needed between try: data = json.loads(data[0]) except json.JSONDecodeError: raise CouldNotDecode( - data) # We'll return the process data, figure out what to do with it higher up in stack (return/diagnose etc.) - print(f'JSON acquired for {self.id}, beginning to fill.') - self.duration = data['duration'] - self.url = data['webpage_url'] # Could be created, but we'll just infer from JSON response - self.creator = data['creator'] or data['uploader'] - self.uploader = data['uploader'] or data['creator'] - self.title = data['title'] or data[ - 'alt_title'] # Do not trust alt-title ; it is volatile and uploader set, e.x. https://i.imgur.com/Tgff4rI.png - print(f'Metadata filled for {self.id}') + data + ) # We'll return the process data, figure out what to do with it higher up in stack (return/diagnose etc.) + print(f"JSON acquired for {self.id}, beginning to fill.") + self.duration = data["duration"] + self.url = data[ + "webpage_url" + ] # Could be created, but we'll just infer from JSON response + self.creator = data["creator"] or data["uploader"] + self.uploader = data["uploader"] or data["creator"] + self.title = ( + data["title"] or data["alt_title"] + ) # Do not trust alt-title ; it is volatile and uploader set, e.x. https://i.imgur.com/Tgff4rI.png + print(f"Metadata filled for {self.id}") db.session.commit() # Begins the download process for a video def download(self): - print(f'Attempting download of {self.id}') - command = f'youtube-dl -x -4 --restrict-filenames --audio-quality 64K --audio-format mp3 -o ./app/sounds/youtube/%(id)s.%(ext)s {self.id}' - process = subprocess.Popen(command.split(' '), encoding='utf-8', stdout=subprocess.PIPE, stderr=subprocess.PIPE) - data = process.communicate() # Not the data for the mp3, just the output. We have to separate this in order to 'wait' for the process to complete fully. - print('Checking process return code...') + print(f"Attempting download of {self.id}") + command = f"youtube-dl -x -4 --restrict-filenames --audio-quality 64K --audio-format mp3 -o ./app/sounds/youtube/%(id)s.%(ext)s {self.id}" + process = subprocess.Popen( + command.split(" "), + encoding="utf-8", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + data = ( + process.communicate() + ) # Not the data for the mp3, just the output. We have to separate this in order to 'wait' for the process to complete fully. + print("Checking process return code...") if process.returncode != 0: - raise CouldNotProcess(f'Command: {command}\n{data[1] or data[0]}Exit Code: {process.returncode}') - print('Checking for expected file...') + raise CouldNotProcess( + f"Command: {command}\n{data[1] or data[0]}Exit Code: {process.returncode}" + ) + print("Checking for expected file...") if not os.path.exists(self.getPath()): raise CouldNotDownload(data[1] or data[0]) - print(f'Download attempt for {self.id} finished successfully.') + print(f"Download attempt for {self.id} finished successfully.") # Validates whether the specified ID could be a valid YouTube video ID @staticmethod def isValid(id): - return re.match(r'^[A-Za-z0-9_-]{11}$', id) is not None + return re.match(r"^[A-Za-z0-9_-]{11}$", id) is not None # Returns a JSON serialization of the database entry def toJSON(self, noConvert=False): - data = {'id': self.id, 'url': self.url, 'title': self.title, 'creator': self.creator, - 'uploader': self.uploader, 'filename': self.filename, 'duration': self.duration, - 'access_count': self.access_count, 'download_timestamp': self.download_timestamp.isoformat(), - 'last_access_timestamp': self.last_access_timestamp.isoformat()} + data = { + "id": self.id, + "url": self.url, + "title": self.title, + "creator": self.creator, + "uploader": self.uploader, + "filename": self.filename, + "duration": self.duration, + "access_count": self.access_count, + "download_timestamp": self.download_timestamp.isoformat(), + "last_access_timestamp": self.last_access_timestamp.isoformat(), + } return data if noConvert else json.dumps(data) def delete(self): - path = os.path.join('app', 'sounds', 'youtube', self.filename) + path = os.path.join("app", "sounds", "youtube", self.filename) try: os.remove(path) except: diff --git a/app/spotify.py b/app/spotify.py index a7cbd80..cef0a3e 100644 --- a/app/spotify.py +++ b/app/spotify.py @@ -8,7 +8,7 @@ from app import app from config import Config from .spotify_explicit import main -path = os.path.join('app/spotify_explicit/recent.json') +path = os.path.join("app/spotify_explicit/recent.json") def check_and_update(): @@ -16,26 +16,26 @@ def check_and_update(): with open(path) as file: file = json.load(file) except (FileNotFoundError, json.JSONDecodeError): - file = {'last_generated': -1} + file = {"last_generated": -1} - if file['last_generated'] == -1: + if file["last_generated"] == -1: return True else: - dif = time.time() - file['last_generated'] + dif = time.time() - file["last_generated"] # print('dif', dif) if dif >= Config.SPOTIFY_CACHE_TIME: return True else: - ideal = file['last_generated'] + Config.SPOTIFY_CACHE_TIME + ideal = file["last_generated"] + Config.SPOTIFY_CACHE_TIME # print(f'Waiting another {int(ideal - time.time())} seconds') return False -@app.route('/spotify/') +@app.route("/spotify/") def spotify(): if check_and_update(): - print('Graph out of date - running update command') - with open(path, 'w+') as file: - file = json.dump({'last_generated': int(time.time())}, file) + print("Graph out of date - running update command") + with open(path, "w+") as file: + file = json.dump({"last_generated": int(time.time())}, file) main.main() - return send_file('spotify_explicit/export/export.png') + return send_file("spotify_explicit/export/export.png") diff --git a/app/spotify_explicit/auth.py b/app/spotify_explicit/auth.py index ca7a6a1..4066424 100644 --- a/app/spotify_explicit/auth.py +++ b/app/spotify_explicit/auth.py @@ -4,34 +4,36 @@ import os import sys # Path to API Credentials file -PATH = os.path.join(sys.path[0], 'auth.json') +PATH = os.path.join(sys.path[0], "auth.json") # Ensure the file exists, if not, generate one and error with a reason if not os.path.exists(PATH): - with open(PATH, 'w') as file: + with open(PATH, "w") as file: # Dump a pretty-printed dictionary with default values json.dump( { - 'USERNAME': 'Your Username Here', - 'CLIENT_ID': 'Your Client ID Here', - 'CLIENT_SECRET': 'Your Client Secret Here', - 'REDIRECT_URI': 'Your Redirect URI Callback Here', - 'SCOPE': ['Your Scopes Here'] + "USERNAME": "Your Username Here", + "CLIENT_ID": "Your Client ID Here", + "CLIENT_SECRET": "Your Client Secret Here", + "REDIRECT_URI": "Your Redirect URI Callback Here", + "SCOPE": ["Your Scopes Here"], }, file, - indent=3 + indent=3, ) # Error critically, then exit - logging.critical("No \'auth.json\' file detected, one has been created for you") - logging.critical("Please fill out with your Spotify credentials, and then restart the program") + logging.critical("No 'auth.json' file detected, one has been created for you") + logging.critical( + "Please fill out with your Spotify credentials, and then restart the program" + ) sys.exit() # Open and parse file -FILE = json.load(open(PATH, 'r')) +FILE = json.load(open(PATH, "r")) # Load all configuration variables -USERNAME = FILE['USERNAME'] -CLIENT_ID = FILE['CLIENT_ID'] -CLIENT_SECRET = FILE['CLIENT_SECRET'] -REDIRECT_URI = FILE['REDIRECT_URI'] -SCOPE = ' '.join(FILE['SCOPE']) +USERNAME = FILE["USERNAME"] +CLIENT_ID = FILE["CLIENT_ID"] +CLIENT_SECRET = FILE["CLIENT_SECRET"] +REDIRECT_URI = FILE["REDIRECT_URI"] +SCOPE = " ".join(FILE["SCOPE"]) diff --git a/app/spotify_explicit/main.py b/app/spotify_explicit/main.py index 9bfe1b5..46af0bf 100644 --- a/app/spotify_explicit/main.py +++ b/app/spotify_explicit/main.py @@ -11,7 +11,7 @@ from . import pull def main(): logging.basicConfig(level=logging.INFO) - logging.info('Pulling data from Spotify') + logging.info("Pulling data from Spotify") refresh() process.main() @@ -19,14 +19,20 @@ def main(): # Refreshes tracks from files if the token from Spotipy has expired, # thus keeping us up to date in most cases while keeping rate limits def refresh(): - file_path = os.path.join(sys.path[0], f'.cache-{auth.USERNAME}') + file_path = os.path.join(sys.path[0], f".cache-{auth.USERNAME}") if os.path.exists(file_path): - cache = json.load(open(file_path, 'r')) - if True or time.time() > cache['expires_at']: - logging.info('Refreshing Spotify data by pulling tracks, this may take a moment.') + cache = json.load(open(file_path, "r")) + if True or time.time() > cache["expires_at"]: + logging.info( + "Refreshing Spotify data by pulling tracks, this may take a moment." + ) pull.main() else: - logging.info('Spotify data deemed to be recent enough (under {} seconds old)'.format(cache['expires_in'])) + logging.info( + "Spotify data deemed to be recent enough (under {} seconds old)".format( + cache["expires_in"] + ) + ) else: pull.main() diff --git a/app/spotify_explicit/process.py b/app/spotify_explicit/process.py index e408c3c..ffe5518 100644 --- a/app/spotify_explicit/process.py +++ b/app/spotify_explicit/process.py @@ -9,13 +9,11 @@ import numpy as np # Gets all files in tracks folder, returns them in parsed JSON def get_files(): - folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tracks') + folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), "tracks") files = [] for file in os.listdir(folder): with open(os.path.join(os.path.join(folder, file))) as file: - files.append( - json.load(file) - ) + files.append(json.load(file)) return files @@ -23,17 +21,17 @@ def get_files(): def combine_files(files): items = [] for file in files: - items.extend(file['items']) + items.extend(file["items"]) return items # Prints the data in a interesting format def print_data(data): for i, item in enumerate(data): - date = dateutil.parser.parse(item['added_at']) - explicit = '!' if item['track']['explicit'] else ' ' - track_name = item['track']['name'] - artists = ' & '.join(artist['name'] for artist in item['track']['artists']) + date = dateutil.parser.parse(item["added_at"]) + explicit = "!" if item["track"]["explicit"] else " " + track_name = item["track"]["name"] + artists = " & ".join(artist["name"] for artist in item["track"]["artists"]) print('[{}] {} "{}" by {}'.format(date, explicit, track_name, artists)) @@ -41,10 +39,10 @@ def process_data(data): # Process the data by Month/Year, then by Clean/Explicit scores = {} for item in data: - date = dateutil.parser.parse(item['added_at']).strftime('%b %Y') + date = dateutil.parser.parse(item["added_at"]).strftime("%b %Y") if date not in scores.keys(): scores[date] = [0, 0] - scores[date][1 if item['track']['explicit'] else 0] += 1 + scores[date][1 if item["track"]["explicit"] else 0] += 1 # Create simplified arrays for each piece of data months = list(scores.keys())[::-1] @@ -54,7 +52,7 @@ def process_data(data): explicit.append(item[1]) # Done processing date properly, start plotting work - logging.info('Processed data, creating plot from data') + logging.info("Processed data, creating plot from data") # Weird numpy stuff n = len(scores.values()) ind = np.arange(n) @@ -63,33 +61,35 @@ def process_data(data): plt.figure(figsize=(10.0, 6.0)) # Stacked Bars p1 = plt.bar(ind, explicit, width) - p2 = plt.bar(ind, clean, width, bottom=explicit) # bottom= just has the bar sit on top of the explicit + p2 = plt.bar( + ind, clean, width, bottom=explicit + ) # bottom= just has the bar sit on top of the explicit # Plot labeling - plt.title('Song Count by Clean/Explicit') - plt.ylabel('Song Count') - plt.xlabel('Month') + plt.title("Song Count by Clean/Explicit") + plt.ylabel("Song Count") + plt.xlabel("Month") plt.xticks(ind, months, rotation=270) # Rotation 90 will have the - plt.legend((p1[0], p2[0]), ('Explicit', 'Clean')) + plt.legend((p1[0], p2[0]), ("Explicit", "Clean")) fig = plt.gcf() # Magic to save to image and then show # Save the figure, overwriting anything in your way - logging.info('Saving the figure to the \'export\' folder') - export_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'export') + logging.info("Saving the figure to the 'export' folder") + export_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), "export") if not os.path.exists(export_folder): os.makedirs(export_folder) plt.tight_layout() fig.savefig( os.path.join( export_folder, - 'export' + "export" # datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S') ), dpi=100, - quality=95 + quality=95, ) - # Finally show the figure to - logging.info('Showing plot to User') + # Finally show the figure to + logging.info("Showing plot to User") # plt.show() # Copy the figure to your clipboard to paste in Excel @@ -101,10 +101,17 @@ def process_data(data): # Will paste into Excel very easily def copy(months, clean, explicit): from pyperclip import copy - top = 'Period\tClean\tExplicit\n' - copy(top + '\n'.join([ - f'{item[0]}\t{item[1]}\t{item[2]}' for item in zip(months, clean, explicit) - ])) + + top = "Period\tClean\tExplicit\n" + copy( + top + + "\n".join( + [ + f"{item[0]}\t{item[1]}\t{item[2]}" + for item in zip(months, clean, explicit) + ] + ) + ) def main(): @@ -114,7 +121,10 @@ def main(): logging.info(f"Read and parse {len(files)} track files") logging.info("Combining into single track file for ease of access") data = combine_files(files) - data.sort(key=lambda item: dateutil.parser.parse(item['added_at']).timestamp(), reverse=True) - logging.info(f'File combined with {len(data)} items') - logging.info('Processing file...') + data.sort( + key=lambda item: dateutil.parser.parse(item["added_at"]).timestamp(), + reverse=True, + ) + logging.info(f"File combined with {len(data)} items") + logging.info("Processing file...") process_data(data) diff --git a/app/spotify_explicit/pull.py b/app/spotify_explicit/pull.py index 2a82776..5be60e7 100644 --- a/app/spotify_explicit/pull.py +++ b/app/spotify_explicit/pull.py @@ -13,54 +13,58 @@ from . import auth def main(): # Get Authorization logging.basicConfig(level=logging.INFO) - logging.info('Authorizing with Spotify via Spotipy') - logging.warning('May require User Interaction to authenticate properly!') + logging.info("Authorizing with Spotify via Spotipy") + logging.warning("May require User Interaction to authenticate properly!") token = util.prompt_for_user_token( username=auth.USERNAME, scope=auth.SCOPE, client_id=auth.CLIENT_ID, client_secret=auth.CLIENT_SECRET, - redirect_uri=auth.REDIRECT_URI + redirect_uri=auth.REDIRECT_URI, ) sp = spotipy.Spotify(auth=token) - logging.info('Authorized with Spotify via Spotipy') + logging.info("Authorized with Spotify via Spotipy") - tracks_folder = os.path.join(os.path.dirname(__file__), 'tracks') - logging.warning('Clearing all files in tracks folder for new files') + tracks_folder = os.path.join(os.path.dirname(__file__), "tracks") + logging.warning("Clearing all files in tracks folder for new files") if os.path.exists(tracks_folder): shutil.rmtree(tracks_folder) # Delete folder and all contents (old track files) os.makedirs(tracks_folder) # Recreate the folder just deleted - logging.info('Cleared folder, ready to download new track files') + logging.info("Cleared folder, ready to download new track files") curoffset, curlimit = 0, 50 while curoffset >= 0: # Request and identify what was received - logging.info('Requesting {} to {}'.format(curoffset, curoffset + curlimit)) + logging.info("Requesting {} to {}".format(curoffset, curoffset + curlimit)) response = sp.current_user_saved_tracks(limit=curlimit, offset=curoffset) - received = len(response['items']) - logging.info('Received {} to {}'.format(curoffset, curoffset + received)) + received = len(response["items"]) + logging.info("Received {} to {}".format(curoffset, curoffset + received)) # Create path/filename - filename = f'saved-tracks-{curoffset}-{curoffset + received}.json' + filename = f"saved-tracks-{curoffset}-{curoffset + received}.json" filepath = os.path.join(tracks_folder, filename) # Save track file - with open(filepath, 'w+') as file: + with open(filepath, "w+") as file: json.dump(response, file) - logging.info('Saved at "{}" ({})'.format( - f'\\tracks\\{filename}', - size(os.path.getsize(filepath))) + logging.info( + 'Saved at "{}" ({})'.format( + f"\\tracks\\{filename}", size(os.path.getsize(filepath)) + ) ) # Decide whether we have received all possible tracks if received < curlimit: - logging.info('Requested and saved {} tracks split over {} files ({})'.format( - curoffset + received, - len(os.listdir(tracks_folder)), - size( - sum( - os.path.getsize(os.path.join(tracks_folder, file)) for file in os.listdir(tracks_folder) + logging.info( + "Requested and saved {} tracks split over {} files ({})".format( + curoffset + received, + len(os.listdir(tracks_folder)), + size( + sum( + os.path.getsize(os.path.join(tracks_folder, file)) + for file in os.listdir(tracks_folder) + ), + system=alternative, ), - system=alternative ) - )) + ) break # Continuing, so increment offset curoffset += curlimit