syntaxwebsite/app/pages/signup/signup.py

263 lines
11 KiB
Python

from flask import Blueprint, render_template, request, redirect, url_for, session, flash, redirect, make_response, abort
from app.models.user import User
from app.models.invite_key import InviteKey
from app.models.usereconomy import UserEconomy
from app.models.user_avatar import UserAvatar
from app.models.login_records import LoginRecord
from app.util.textfilter import FilterText, TextNotAllowedException
from app.models.past_usernames import PastUsername
from app.pages.login.login import CreateLoginRecord
from app.extensions import db, limiter, csrf, redis_controller, get_remote_address
from app.pages.avatar.avatar import AllowedBodyColors
from app.routes.thumbnailer import TakeUserThumbnail
import logging
import hashlib
from datetime import datetime
from app.util import websiteFeatures, auth, turnstile, redislock
from app.services import invitekeys, proxydetection
from sqlalchemy import func
import random
from datetime import datetime, timedelta
from config import Config
import string
config = Config()
signup = Blueprint("signup", __name__, template_folder="pages")
@signup.route("/signup", methods=["GET"])
@limiter.exempt()
def signup_page():
if auth.isAuthenticated():
return redirect("/home")
SignupEnabled = True
if not websiteFeatures.GetWebsiteFeature("WebSignup"):
flash("Signups are temporarily disabled")
SignupEnabled = False
return render_template("signup/signup.html", SignupEnabled=SignupEnabled)
def RateLimitReached(e):
flash("You are being rate limited. Please try again later.")
return redirect("/signup")
def isUsernameAllowed( username : str ):
if len(username) < 3 or len(username) > 20:
return False, "Usernames must be between 3 and 20 characters long"
if username[0] == "_" or username[-1] == "_":
return False, "Usernames must not start or end with an underscore"
if username.count("_") > 1:
return False, "Usernames must not contain more than one underscore"
if username.count(" ") > 0:
return False, "Usernames must not contain spaces"
allowedCharacters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_"
hasLetter = False
for char in username:
if char not in allowedCharacters:
return False, "Usernames must only contain letters, numbers and underscores"
if char in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ":
hasLetter = True
if not hasLetter:
return False, "Usernames must contain at least one letter"
return True, "Allowed"
@signup.route("/signup", methods=["POST"])
@limiter.limit("10/minute", on_breach=RateLimitReached)
def signup_post():
if auth.isAuthenticated():
return redirect("/home")
username = request.form.get("username", default=None, type=str)
password = request.form.get("password", default=None, type=str)
agreedtoTerms = request.form.get("agreetoTermsandPrivacy", default="off", type=str) == "on"
#invitekey = request.form.get("invite-key", default=None, type=str)
if not websiteFeatures.GetWebsiteFeature("WebSignup"):
flash("Signups are temporarily disabled")
return redirect("/signup")
if 'cf-turnstile-response' not in request.form or request.form.get('cf-turnstile-response') == '':
flash("Please complete the captcha", "error")
return redirect("/signup")
if username is None or password is None: #or invitekey is None:
flash("Please fill in all fields")
return redirect("/signup")
if not agreedtoTerms:
flash("You must agree to the Terms of service and privacy policy")
return redirect("/signup")
allowed, reason = isUsernameAllowed(username)
if not allowed:
flash(reason)
return redirect("/signup")
if len(password) < 8:
flash("Password must be at least 8 characters long")
return redirect("/signup")
try:
FilterText( Text = username, ThrowException = True, UseExtendedBadWords=True)
except TextNotAllowedException:
flash("Username is not friendly for Syntax")
return redirect("/signup")
RequestAddressRisk : int = proxydetection.fetch_address_risk( get_remote_address() )
if RequestAddressRisk == 1:
flash("Please disable your VPN or proxy and try again")
return redirect("/signup")
if RequestAddressRisk == 3:
flash("Error 255, please report in our discord.") # We want to be as vague as possible
return redirect("/signup")
redisResultIP = redis_controller.get("signupcooldown:" + get_remote_address())
redisResultToken = None
if request.cookies.get("t", default=None, type=str) is not None:
redisResultToken = redis_controller.get("signupcooldown:token:" + str(request.cookies.get(key="t", default=None, type=str)))
if redisResultIP is not None or redisResultToken is not None:
if redisResultIP is None:
redis_controller.set("signupcooldown:" + get_remote_address(), "1", ex=60*60*24*7)
if redisResultToken is None and request.cookies.get("t", default=None, type=str) is not None:
redis_controller.set("signupcooldown:token:" + str(request.cookies.get(key="t", default=None, type=str)), "1", ex=60*60*24*7)
flash("You have recently made an account in the past 7 days.")
return redirect("/signup")
SessionToken = request.cookies.get(key="t", default=None, type=str)
if SessionToken is None:
flash("An issue occured while creating your account. Please try again later.")
return redirect("/signup")
if len(SessionToken) != 128:
flash("An issue occured while creating your account. Please try again later.")
return redirect("/signup")
for char in SessionToken:
if char not in (string.ascii_letters + string.digits):
flash("An issue occured while creating your account. Please try again later.")
return redirect("/signup")
UserIPHash = hashlib.md5(get_remote_address().encode("utf-8")).hexdigest()
LoginRecords : list[LoginRecord] = LoginRecord.query.filter(LoginRecord.ip == UserIPHash).distinct(LoginRecord.userid).all()
for record in LoginRecords:
if record.User.accountstatus != 1:
if record.session_token != SessionToken:
NewLoginRecord = LoginRecord(
userid = record.userid,
ip = UserIPHash,
session_token = SessionToken,
useragent = request.headers.get("User-Agent"),
timestamp = datetime.utcnow()
)
db.session.add(NewLoginRecord)
db.session.commit()
flash("An issue occured while creating your account. Please try again later.")
return redirect("/signup")
LoginRecords : list[LoginRecord] = LoginRecord.query.filter(LoginRecord.session_token == SessionToken).distinct(LoginRecord.userid).all()
for record in LoginRecords:
if record.User.accountstatus != 1:
NewLoginRecord = LoginRecord(
userid = record.userid,
ip = UserIPHash,
session_token = SessionToken,
useragent = request.headers.get("User-Agent"),
timestamp = datetime.utcnow()
)
db.session.add(NewLoginRecord)
db.session.commit()
flash("An issue occured while creating your account. Please try again later.")
return redirect("/signup")
UserSignupLock = redislock.acquire_lock("UserSignupLock", acquire_timeout = 20, lock_timeout=1)
if not UserSignupLock:
flash("An issue occured while creating your account. Please try again later.")
return redirect("/signup")
user = User.query.filter(func.lower(User.username) == func.lower(username)).first()
if user is not None:
redislock.release_lock("UserSignupLock", UserSignupLock)
flash("Username already taken")
return redirect("/signup")
pastUsername = PastUsername.query.filter(func.lower(PastUsername.username) == func.lower(username)).first()
if pastUsername is not None:
redislock.release_lock("UserSignupLock", UserSignupLock)
flash("Username already taken")
return redirect("/signup")
# try:
# inviteKey : InviteKey = invitekeys.GetInviteKey(invitekey)
# except invitekeys.InviteExceptions.InvalidInviteKey:
# redislock.release_lock("UserSignupLock", UserSignupLock)
# flash("Invalid invite key")
# return redirect("/signup")
# if inviteKey.used_by is not None:
# redislock.release_lock("UserSignupLock", UserSignupLock)
# flash("Invite key already used")
# return redirect("/signup")
# InviteKeyCreator : User = User.query.filter_by(id=inviteKey.created_by).first()
# if InviteKeyCreator is not None:
# if InviteKeyCreator.accountstatus != 1:
# redislock.release_lock("UserSignupLock", UserSignupLock)
# flash("Invite key creator's account is banned")
# return redirect("/signup")
if not turnstile.VerifyToken(request.form.get('cf-turnstile-response')):
redislock.release_lock("UserSignupLock", UserSignupLock)
flash("Invalid captcha", "error")
if request.cookies.get(key="t", default=None, type=str) is None:
abort(400)
redis_controller.set("signupcooldown:" + get_remote_address(), "1", ex=60*60*24*7)
if request.cookies.get("t", default=None, type=str) is not None:
redis_controller.set("signupcooldown:token:" + str(request.cookies.get(key="t", default=None, type=str)), "1", ex=60*60*24*7)
NewRegisteredUser : User = User(
username = username,
password = "",
created = datetime.utcnow(),
lastonline = datetime.utcnow(),
)
db.session.add(NewRegisteredUser)
db.session.commit()
auth.SetPassword(NewRegisteredUser, password)
userEconomy = UserEconomy(
userid = NewRegisteredUser.id,
robux = 0,
tix = 0
)
db.session.add(userEconomy)
userAvatar : UserAvatar = UserAvatar(
user_id=NewRegisteredUser.id
)
userAvatar.torso_color_id = random.choice(AllowedBodyColors)
db.session.add(userAvatar)
#invitekeys.UseInviteKey(invitekey, NewRegisteredUser)
db.session.commit()
redislock.release_lock("UserSignupLock", UserSignupLock)
logging.info(f"New user registered: {NewRegisteredUser.username} ({NewRegisteredUser.id})")
TakeUserThumbnail(NewRegisteredUser.id)
sessionToken = auth.CreateToken(userid=NewRegisteredUser.id, ip=get_remote_address())
resp = make_response(redirect("/home"))
resp.set_cookie(".ROBLOSECURITY", sessionToken, expires=datetime.utcnow() + timedelta(days=365), domain=f".{config.BaseDomain}")
if request.cookies.get("t", default=None, type=str) is not None:
NewToken = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(128))
resp.set_cookie("t", NewToken, expires=datetime.utcnow() + timedelta(days=365), domain=f".{config.BaseDomain}")
CreateLoginRecord( NewRegisteredUser.id, NewToken )
else:
CreateLoginRecord( NewRegisteredUser.id )
return resp