syntaxwebsite/app/pages/settings/settings.py

556 lines
23 KiB
Python

import base64
import hashlib
import pyotp
import random
import re
import logging
import requests
import uuid
import json
from app.util.discord import DiscordUserInfo, ExchangeCodeForToken, UnexpectedStatusCode, MissingScope, GetUserInfoFromToken, GenerateAuthorizationURL
from app.models.linked_discord import LinkedDiscord
from app.models.user_email import UserEmail
from app.pages.signup.signup import isUsernameAllowed
from app.models.past_usernames import PastUsername
from app.util.redislock import acquire_lock, release_lock
from app.models.usereconomy import UserEconomy
from datetime import datetime, timedelta
from app.pages.messages.messages import CreateSystemMessage
from app.util.membership import GetUserMembership, GiveUserMembership, UserHasHigherMembershipException, UserDoesNotExistException, RemoveUserMembership
from app.enums.MembershipType import MembershipType
from flask import Blueprint, render_template, request, redirect, url_for, session, flash, redirect, make_response, abort
from app.util import auth, websiteFeatures, textfilter, turnstile, transactions
from app.extensions import db, limiter, redis_controller
from app.models.user import User
from requests.auth import HTTPBasicAuth
from config import Config
from app.models.asset import Asset
from app.models.userassets import UserAsset
from sqlalchemy import func
from app.services import economy
from app.enums.TransactionType import TransactionType
config = Config()
def generate_secret_key_from_string(secret_string):
secret_bytes = secret_string.encode('utf-8')
char_list = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"
random.seed(secret_bytes)
secret_key = ''.join([random.choice(char_list) for _ in range(32)])
return secret_key
settings = Blueprint("settings", __name__, template_folder="pages")
@settings.route("/settings", methods=["GET"])
@auth.authenticated_required
def settings_page():
user : User = auth.GetCurrentUser()
LinkedDiscordObj : LinkedDiscord = LinkedDiscord.query.filter_by(user_id=user.id).first()
if LinkedDiscordObj is not None:
DiscordUserInfoObj : DiscordUserInfo = DiscordUserInfo(LinkedDiscordObj.discord_id, LinkedDiscordObj.discord_username, LinkedDiscordObj.discord_avatar, GlobalName = None, Discriminator = LinkedDiscordObj.discord_discriminator)
else:
DiscordUserInfoObj = None
LinkedEmailObj : UserEmail = UserEmail.query.filter_by(user_id=user.id).first()
HiddenEmail = None
if LinkedEmailObj:
emailParts = LinkedEmailObj.email.split("@")
FirstPart = emailParts[0][0] + "*" * (len(emailParts[0])-1)
SecondPart = emailParts[1]
HiddenEmail = FirstPart + "@" + SecondPart
return render_template("settings/settings.html", TOTPenabled = user.TOTPEnabled, description=user.description, DiscordUserInfoObj=DiscordUserInfoObj, LinkedEmailObj=LinkedEmailObj, HiddenEmail=HiddenEmail)
@settings.route("/settings/description", methods=["POST"])
@auth.authenticated_required
def settings_description():
description = request.form.get("description")
if description is None:
return redirect("/settings")
if len(description) > 256:
flash("Description is too long")
return redirect("/settings")
newlineCount = description.count("\n")
if newlineCount > 5:
flash("Description can only have 5 lines")
return redirect("/settings")
user : User = auth.GetCurrentUser()
user.description = textfilter.FilterText(description)
db.session.commit()
return redirect("/settings")
@settings.route("/settings/enableTOTP", methods=["GET"])
@auth.authenticated_required
def settings_enableTOTP():
return render_template("settings/enableTOTP.html")
@settings.route("/settings/enableTOTP", methods=["POST"])
@auth.authenticated_required
def settings_enableTOTP_post():
if "code" not in request.form:
flash("Please fill in all fields")
return redirect("/settings/enableTOTP")
code = request.form.get("code")
user : User = auth.GetCurrentUser()
if user.TOTPEnabled:
return redirect("/settings")
secret_key = generate_secret_key_from_string(str(user.id) + config.FLASK_SESSION_KEY)
totp = pyotp.TOTP(secret_key)
if not totp.verify(code):
flash("Invalid code")
return redirect("/settings/enableTOTP")
user.TOTPEnabled = True
db.session.commit()
return redirect("/settings")
@settings.route("/settings/TOTP/image", methods=["GET"])
@auth.authenticated_required
def settings_TOTP_image():
user : User = auth.GetCurrentUser()
if user.TOTPEnabled:
return redirect("/settings")
secret_key = generate_secret_key_from_string(str(user.id) + config.FLASK_SESSION_KEY)
import pyqrcode
totp = pyotp.TOTP(secret_key)
url = pyqrcode.create(totp.provisioning_uri(user.username, issuer_name="Syntax"))
resp = make_response(base64.b64decode(url.png_as_base64_str(scale=5)))
resp.headers['Content-Type'] = 'image/png'
return resp
import hashlib
def GenerateStateHash( Cookie : str ) -> str:
"""
GenerateStateHash generates a state hash for discord oauth2.
"""
salted_cookie = Cookie + config.FLASK_SESSION_KEY
return hashlib.md5(salted_cookie.encode('utf-8')).hexdigest()
@settings.route("/settings/unlink_discord", methods=["POST"])
@auth.authenticated_required
def settings_unlink_discord():
AuthenticatedUser : User = auth.GetCurrentUser()
LinkedDiscordObj : LinkedDiscord = LinkedDiscord.query.filter_by(user_id=AuthenticatedUser.id).first()
if LinkedDiscordObj is None:
flash("You do not have a linked discord account")
return redirect("/settings")
if LinkedDiscordObj.linked_on + timedelta(hours=24) > datetime.utcnow():
flash("You cannot unlink your discord account until 24 hours after linking")
return redirect("/settings")
db.session.delete(LinkedDiscordObj)
db.session.commit()
UserMembership : MembershipType = GetUserMembership(AuthenticatedUser)
if UserMembership == MembershipType.BuildersClub:
RemoveUserMembership(AuthenticatedUser)
flash("Discord account unlinked")
return redirect("/settings")
@settings.route("/settings/discord_link", methods=["GET"])
@auth.authenticated_required
def settings_discord_link():
if not websiteFeatures.GetWebsiteFeature("UsernameChange"):
flash("Discord Linking is temporarily disabled")
return redirect("/settings")
UserObj : User = auth.GetCurrentUser()
LinkedDiscordObj : LinkedDiscord = LinkedDiscord.query.filter_by(user_id=UserObj.id).first()
if LinkedDiscordObj is not None:
return redirect("/settings")
state = GenerateStateHash(request.cookies[".ROBLOSECURITY"])
AuthorizationURL = GenerateAuthorizationURL(state)
return redirect(AuthorizationURL)
@settings.route("/settings/discord_handler", methods=["GET"])
@auth.authenticated_required
def settings_discord_handler():
if not websiteFeatures.GetWebsiteFeature("UsernameChange"):
flash("Discord Linking is temporarily disabled")
return redirect("/settings")
UserObj : User = auth.GetCurrentUser()
LinkedDiscordObj : LinkedDiscord = LinkedDiscord.query.filter_by(user_id=UserObj.id).first()
if LinkedDiscordObj is not None:
flash("You already have a linked discord account")
return redirect("/settings")
expectedState = GenerateStateHash(request.cookies[".ROBLOSECURITY"])
givenState = request.args.get(key="state", default=None, type=str)
if givenState is None:
flash("Invalid state")
return redirect("/settings")
if givenState != expectedState:
flash("Invalid state")
return redirect("/settings")
givenCode = request.args.get(key="code", default=None, type=str)
if givenCode is None:
flash("Invalid code")
return redirect("/settings")
try:
DiscordOAuth2TokenExchangeResponseJSON = ExchangeCodeForToken(givenCode)
except UnexpectedStatusCode as e:
flash(f"UnexpectedStatusCodeException: {str(e)}")
return redirect("/settings")
except MissingScope as e:
flash(f"MissingScopeException: {str(e)}")
return redirect("/settings")
try:
DiscordUserInfoObj : DiscordUserInfo = GetUserInfoFromToken(DiscordOAuth2TokenExchangeResponseJSON["access_token"])
except UnexpectedStatusCode as e:
flash(f"UnexpectedStatusCodeException: {str(e)}")
return redirect("/settings")
DiscordAccountCreationDatetime = datetime.utcfromtimestamp( ( (int(DiscordUserInfoObj.UserId) >> 22) + 1420070400000) / 1000 )
if DiscordAccountCreationDatetime + timedelta(days=7) > datetime.utcnow():
flash("Discord account must be at least 7 days old")
return redirect("/settings")
LinkedDiscordObj : LinkedDiscord = LinkedDiscord.query.filter_by(discord_id=DiscordUserInfoObj.UserId).first()
if LinkedDiscordObj is not None:
if LinkedDiscordObj.linked_on + timedelta(hours=24) > datetime.utcnow():
flash("This discord account has recently been linked to another account, please try again later")
return redirect("/settings")
try:
CreateSystemMessage(
subject = "Discord Account unlinked",
message = f"Your discord account was unlinked from your account because \"Account linked to another user\", if you currently have a Builders Club membership it will be automatically removed from your account until you link your discord account again.",
userid = LinkedDiscordObj.user_id
)
OldUserMembership : MembershipType = GetUserMembership(LinkedDiscordObj.user_id)
if OldUserMembership == MembershipType.BuildersClub:
RemoveUserMembership(LinkedDiscordObj.user_id)
except:
pass
db.session.delete(LinkedDiscordObj)
db.session.commit()
NewLinkedDiscordObj = LinkedDiscord(
user_id=UserObj.id,
discord_id=DiscordUserInfoObj.UserId,
discord_username=DiscordUserInfoObj.Username,
discord_discriminator=DiscordUserInfoObj.Discriminator,
discord_avatar=DiscordUserInfoObj.AvatarHash,
discord_access_token=DiscordOAuth2TokenExchangeResponseJSON["access_token"],
discord_refresh_token=DiscordOAuth2TokenExchangeResponseJSON["refresh_token"],
discord_expiry=datetime.utcnow() + timedelta(seconds=DiscordOAuth2TokenExchangeResponseJSON["expires_in"])
)
db.session.add(NewLinkedDiscordObj)
db.session.commit()
CurrentUserMembership : MembershipType = GetUserMembership(UserObj)
if CurrentUserMembership == MembershipType.NonBuildersClub:
try:
GiveUserMembership(UserObj, MembershipType.BuildersClub, expiration=timedelta(days=31))
except Exception as e:
logging.error(f"Failed to give user builders club membership: {str(e)}")
return redirect("/settings")
return redirect("/settings")
@settings.route("/settings/update-password", methods=["GET"])
@auth.authenticated_required
def settings_update_password():
if not websiteFeatures.GetWebsiteFeature("PasswordChange"):
flash("Password changing is temporarily disabled")
return redirect("/settings")
AuthenticatedUser : User = auth.GetCurrentUser()
return render_template("settings/changepassword.html", is2FAEnabled = AuthenticatedUser.TOTPEnabled)
@settings.route("/settings/update-password", methods=["POST"])
@auth.authenticated_required
@limiter.limit("20/minute")
def settings_update_password_post():
if not websiteFeatures.GetWebsiteFeature("PasswordChange"):
flash("Password changing is temporarily disabled")
return redirect("/settings")
AuthenticatedUser : User = auth.GetCurrentUser()
CurrentPassword = request.form.get( key = "current-password", default = "", type = str )
NewPassword = request.form.get( key = "new-password", default = "", type = str )
NewPasswordConfirm = request.form.get( key = "confirm-password", default = "", type = str )
OTPCode = request.form.get( key = "2fa-code", default = None, type = str )
if AuthenticatedUser.TOTPEnabled:
if OTPCode is None:
flash("Please fill in all fields")
return redirect("/settings/update-password")
isValidCode = auth.Validate2FACode(AuthenticatedUser.id, OTPCode)
if not isValidCode:
flash("Invalid 2FA code")
return redirect("/settings/update-password")
if not auth.VerifyPassword(AuthenticatedUser, CurrentPassword):
flash("Invalid current password")
return redirect("/settings/update-password")
if NewPassword != NewPasswordConfirm:
flash("Passwords do not match")
return redirect("/settings/update-password")
if len(NewPassword) < 8:
flash("Password must be at least 8 characters")
return redirect("/settings/update-password")
if len(NewPassword) > 256:
flash("Password cannot be longer than 256 characters")
return redirect("/settings/update-password")
auth.SetPassword(AuthenticatedUser, NewPassword)
flash("Password updated")
return redirect("/settings")
@settings.route("/settings/update-username", methods=["GET"])
@auth.authenticated_required
def settings_update_username():
if not websiteFeatures.GetWebsiteFeature("UsernameChange"):
flash("Username changing is temporarily disabled")
return redirect("/settings")
return render_template("settings/changeusername.html")
@settings.route("/settings/update-username", methods=["POST"])
@auth.authenticated_required
@limiter.limit("20/minute")
def settings_update_username_post():
if not websiteFeatures.GetWebsiteFeature("UsernameChange"):
flash("Username changing is temporarily disabled")
return redirect("/settings")
AuthenticatedUser : User = auth.GetCurrentUser()
NewUsername = request.form.get( key = "new-username", default = "", type = str )
Password = request.form.get( key = "password", default = "", type = str )
if not auth.VerifyPassword(AuthenticatedUser, Password):
flash("Incorrect password")
return redirect("/settings/update-password")
isValidUsername , Reason = isUsernameAllowed(NewUsername)
if not isValidUsername:
flash(Reason)
return redirect("/settings/update-username")
try:
textfilter.FilterText(NewUsername, ThrowException=True, UseExtendedBadWords=True)
except Exception as e:
flash("Username contains inappropriate words")
return redirect("/settings/update-username")
if NewUsername == AuthenticatedUser.username:
flash("Username cannot be the same as your current username")
return redirect("/settings/update-username")
UsernameTaken = User.query.filter(func.lower(User.username) == func.lower(NewUsername)).first()
if UsernameTaken is not None:
flash("Username is taken")
return redirect("/settings/update-username")
PastUsernameObj : PastUsername = PastUsername.query.filter(func.lower(PastUsername.username) == func.lower(NewUsername)).first()
if PastUsernameObj is not None:
if PastUsernameObj.user_id != AuthenticatedUser.id:
flash("Username is taken")
return redirect("/settings/update-username")
UserRobuxBalance, _ = economy.GetUserBalance( TargetUser = AuthenticatedUser )
if UserRobuxBalance < 1000:
flash("You need at least 1000 robux to change your username")
return redirect("/settings/update-username")
try:
economy.DecrementTargetBalance(
Target = AuthenticatedUser,
Amount = 1000,
CurrencyType = 0
)
except economy.InsufficientFundsException:
flash("You need at least 1000 robux to change your username")
return redirect("/settings/update-username")
except economy.EconomyLockAcquireException:
flash("Failed to change username, please try again later no robux was deducted")
return redirect("/settings/update-username")
except Exception as e:
flash("Failed to change username, please try again later")
return redirect("/settings/update-username")
try:
NewPastUsernameObj = PastUsername(
user_id=AuthenticatedUser.id,
username=AuthenticatedUser.username
)
db.session.add(NewPastUsernameObj)
AuthenticatedUser.username = NewUsername
db.session.commit()
except Exception as e:
flash("Failed to change username, please try again later")
return redirect("/settings/update-username")
try:
transactions.CreateTransaction(
CurrencyAmount = 1000,
CurrencyType = 0,
TransactionType = TransactionType.Purchase,
AssetId = None,
CustomText = None,
Sender = AuthenticatedUser
)
except:
pass
flash("Username updated")
return redirect("/settings")
@settings.route("/settings/update-email", methods=["GET"])
@auth.authenticated_required
def settings_update_email():
if not websiteFeatures.GetWebsiteFeature("EmailChange"):
flash("Email changing is temporarily disabled")
return redirect("/settings")
return render_template("settings/changeemail.html")
EmailRegex = r"^[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?$" # RFC 5322 Compliant, because some fucker is gonna complain that he cant verify his email from his self-hosted shit
@settings.route("/settings/update-email", methods=["POST"])
@auth.authenticated_required
@limiter.limit("20/minute")
def settings_update_email_post():
AuthenticatedUser : User = auth.GetCurrentUser()
# We've been breached. Allow people to change their emails without cooldown
#if redis_controller.exists(f"update_email_v1_cooldown:{AuthenticatedUser.id}"):
#flash("Please wait 12 hours before updating your email again", "error")
#return redirect("/settings/update-email")
NewEmail = request.form.get( key = "new-email", default = "", type = str ).strip().lower()
Password = request.form.get( key = "password", default = "", type = str )
CloudflareTurnstileKey = request.form.get( key = "cf-turnstile-response", default = None, type=str )
if CloudflareTurnstileKey is None or CloudflareTurnstileKey == '':
flash("Please complete the captcha", "error")
return redirect("/settings/update-password")
if not turnstile.VerifyToken( CloudflareTurnstileKey ):
flash("Invalid captcha answer", "error")
return redirect("/settings/update-password")
if not auth.VerifyPassword(AuthenticatedUser, Password):
flash("Incorrect password", "error")
return redirect("/settings/update-password")
if not re.match( EmailRegex, NewEmail ):
flash("Invalid Email, Not RFC 5322 compliant", "error")
return redirect("/settings/update-password")
UserEmailObj : UserEmail = UserEmail.query.filter_by(user_id = AuthenticatedUser.id ).first()
if UserEmailObj:
if UserEmailObj.email == NewEmail and UserEmailObj.verified:
flash("Email is already verified to this account", "error")
return redirect("/settings/update-email")
UserEmailObj.email = NewEmail
UserEmailObj.verified = False
UserEmailObj.updated_at = datetime.utcnow()
else:
NewUserEmailObj = UserEmail(
user_id = AuthenticatedUser.id,
email = NewEmail,
verified = False
)
db.session.add(NewUserEmailObj)
db.session.commit()
VerificationUUID = str(uuid.uuid4())
VerificationURL = f"{config.BaseURL}/settings/update-email/verify?id={VerificationUUID}&user={AuthenticatedUser.id}"
redis_controller.set(f"update_email_v1:{VerificationUUID}", AuthenticatedUser.id, ex=60 * 60 * 2)
redis_controller.set(f"update_email_v1_cooldown:{AuthenticatedUser.id}", "1", ex=60 * 60 * 12)
# Send verification email
EmailData = {
"Messages": [
{
"From": {
"Email": Config.MAILJET_NOREPLY_SENDER,
"Name": "syntax.eco"
},
"To": [
{
"Email": NewEmail,
"Name": AuthenticatedUser.username
}
],
"TemplateID": Config.MAILJET_EMAILVERIFY_TEMPLATE_ID,
"TemplateLanguage": True,
"Subject": f"Verify your email on Syntax! - {AuthenticatedUser.username}",
"Variables": {
"username": AuthenticatedUser.username,
"verificationlink": VerificationURL
}
}
]
}
try:
EmailResponse = requests.post(
url="https://api.mailjet.com/v3.1/send",
data=json.dumps(EmailData),
headers={
"Content-Type": "application/json"
},
auth = HTTPBasicAuth(
Config.MAILJET_APIKEY,
Config.MAILJET_SECRETKEY
)
)
if EmailResponse.status_code != 200:
flash("Failed to send verification email", "error")
redis_controller.delete(f"update_email_v1:{VerificationUUID}")
redis_controller.delete(f"update_email_v1_cooldown:{AuthenticatedUser.id}")
return redirect("/settings/update-email")
except Exception as e:
flash("Failed to send verification email", "error")
redis_controller.delete(f"update_email_v1:{VerificationUUID}")
redis_controller.delete(f"update_email_v1_cooldown:{AuthenticatedUser.id}")
return redirect("/settings/update-email")
flash("Verification email sent", "success")
return redirect("/settings/update-email")
@settings.route("/settings/update-email/verify", methods=["GET"])
def VerifyEmailGet():
EmailVerificationUUID : str = request.args.get( key = "id", default = "", type = str )
ExpectedUserId : int = request.args.get( key = "user", default = None, type = int )
if EmailVerificationUUID == "" or ExpectedUserId is None:
return abort(404)
UserId = redis_controller.get(f"update_email_v1:{EmailVerificationUUID}")
if UserId is None:
return abort(404)
if int(UserId) != ExpectedUserId:
return abort(404)
UserObj : User = User.query.filter_by(id=UserId).first()
UserEmailObj : UserEmail = UserEmail.query.filter_by(user_id=UserObj.id).first()
if UserEmailObj is None:
return abort(404)
UserEmailObj.verified = True
UserEmailObj.updated_at = datetime.utcnow()
db.session.commit()
RewardObj = None
if config.VERIFIED_EMAIL_REWARD_ASSET > 0:
if not UserAsset.query.filter_by(userid=UserObj.id, assetid=config.VERIFIED_EMAIL_REWARD_ASSET).first():
NewUserAssetObj : UserAsset = UserAsset(
userid=UserObj.id,
assetid=config.VERIFIED_EMAIL_REWARD_ASSET
)
db.session.add(NewUserAssetObj)
db.session.commit()
RewardObj = Asset.query.filter_by(id=config.VERIFIED_EMAIL_REWARD_ASSET).first()
return render_template("settings/emailverify_success.html", UserObj = UserObj, RewardObj = RewardObj)