263 lines
11 KiB
Python
263 lines
11 KiB
Python
from flask import Blueprint, render_template, request, redirect, url_for, session, flash, redirect, make_response, abort
|
|
from app.models.user import User
|
|
from app.models.invite_key import InviteKey
|
|
from app.models.usereconomy import UserEconomy
|
|
from app.models.user_avatar import UserAvatar
|
|
from app.models.login_records import LoginRecord
|
|
from app.util.textfilter import FilterText, TextNotAllowedException
|
|
from app.models.past_usernames import PastUsername
|
|
from app.pages.login.login import CreateLoginRecord
|
|
from app.extensions import db, limiter, csrf, redis_controller, get_remote_address
|
|
from app.pages.avatar.avatar import AllowedBodyColors
|
|
from app.routes.thumbnailer import TakeUserThumbnail
|
|
import logging
|
|
import hashlib
|
|
from datetime import datetime
|
|
from app.util import websiteFeatures, auth, turnstile, redislock
|
|
from app.services import invitekeys, proxydetection
|
|
from sqlalchemy import func
|
|
import random
|
|
from datetime import datetime, timedelta
|
|
from config import Config
|
|
import string
|
|
|
|
config = Config()
|
|
signup = Blueprint("signup", __name__, template_folder="pages")
|
|
|
|
@signup.route("/signup", methods=["GET"])
|
|
@limiter.exempt()
|
|
def signup_page():
|
|
if auth.isAuthenticated():
|
|
return redirect("/home")
|
|
SignupEnabled = True
|
|
if not websiteFeatures.GetWebsiteFeature("WebSignup"):
|
|
flash("Signups are temporarily disabled")
|
|
SignupEnabled = False
|
|
return render_template("signup/signup.html", SignupEnabled=SignupEnabled)
|
|
|
|
def RateLimitReached(e):
|
|
flash("You are being rate limited. Please try again later.")
|
|
return redirect("/signup")
|
|
|
|
def isUsernameAllowed( username : str ):
|
|
if len(username) < 3 or len(username) > 20:
|
|
return False, "Usernames must be between 3 and 20 characters long"
|
|
|
|
if username[0] == "_" or username[-1] == "_":
|
|
return False, "Usernames must not start or end with an underscore"
|
|
|
|
if username.count("_") > 1:
|
|
return False, "Usernames must not contain more than one underscore"
|
|
|
|
if username.count(" ") > 0:
|
|
return False, "Usernames must not contain spaces"
|
|
|
|
allowedCharacters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_"
|
|
hasLetter = False
|
|
for char in username:
|
|
if char not in allowedCharacters:
|
|
return False, "Usernames must only contain letters, numbers and underscores"
|
|
if char in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ":
|
|
hasLetter = True
|
|
|
|
if not hasLetter:
|
|
return False, "Usernames must contain at least one letter"
|
|
|
|
return True, "Allowed"
|
|
|
|
@signup.route("/signup", methods=["POST"])
|
|
@limiter.limit("10/minute", on_breach=RateLimitReached)
|
|
def signup_post():
|
|
if auth.isAuthenticated():
|
|
return redirect("/home")
|
|
username = request.form.get("username", default=None, type=str)
|
|
password = request.form.get("password", default=None, type=str)
|
|
agreedtoTerms = request.form.get("agreetoTermsandPrivacy", default="off", type=str) == "on"
|
|
#invitekey = request.form.get("invite-key", default=None, type=str)
|
|
|
|
if not websiteFeatures.GetWebsiteFeature("WebSignup"):
|
|
flash("Signups are temporarily disabled")
|
|
return redirect("/signup")
|
|
if 'cf-turnstile-response' not in request.form or request.form.get('cf-turnstile-response') == '':
|
|
flash("Please complete the captcha", "error")
|
|
return redirect("/signup")
|
|
|
|
if username is None or password is None: #or invitekey is None:
|
|
flash("Please fill in all fields")
|
|
return redirect("/signup")
|
|
|
|
if not agreedtoTerms:
|
|
flash("You must agree to the Terms of service and privacy policy")
|
|
return redirect("/signup")
|
|
|
|
allowed, reason = isUsernameAllowed(username)
|
|
if not allowed:
|
|
flash(reason)
|
|
return redirect("/signup")
|
|
|
|
if len(password) < 8:
|
|
flash("Password must be at least 8 characters long")
|
|
return redirect("/signup")
|
|
|
|
try:
|
|
FilterText( Text = username, ThrowException = True, UseExtendedBadWords=True)
|
|
except TextNotAllowedException:
|
|
flash("Username is not friendly for Syntax")
|
|
return redirect("/signup")
|
|
|
|
RequestAddressRisk : int = proxydetection.fetch_address_risk( get_remote_address() )
|
|
if RequestAddressRisk == 1:
|
|
flash("Please disable your VPN or proxy and try again")
|
|
return redirect("/signup")
|
|
|
|
if RequestAddressRisk == 3:
|
|
flash("Error 255, please report in our discord.") # We want to be as vague as possible
|
|
return redirect("/signup")
|
|
redisResultIP = redis_controller.get("signupcooldown:" + get_remote_address())
|
|
redisResultToken = None
|
|
if request.cookies.get("t", default=None, type=str) is not None:
|
|
redisResultToken = redis_controller.get("signupcooldown:token:" + str(request.cookies.get(key="t", default=None, type=str)))
|
|
|
|
if redisResultIP is not None or redisResultToken is not None:
|
|
if redisResultIP is None:
|
|
redis_controller.set("signupcooldown:" + get_remote_address(), "1", ex=60*60*24*7)
|
|
if redisResultToken is None and request.cookies.get("t", default=None, type=str) is not None:
|
|
redis_controller.set("signupcooldown:token:" + str(request.cookies.get(key="t", default=None, type=str)), "1", ex=60*60*24*7)
|
|
|
|
flash("You have recently made an account in the past 7 days.")
|
|
return redirect("/signup")
|
|
|
|
SessionToken = request.cookies.get(key="t", default=None, type=str)
|
|
if SessionToken is None:
|
|
flash("An issue occured while creating your account. Please try again later.")
|
|
return redirect("/signup")
|
|
if len(SessionToken) != 128:
|
|
flash("An issue occured while creating your account. Please try again later.")
|
|
return redirect("/signup")
|
|
for char in SessionToken:
|
|
if char not in (string.ascii_letters + string.digits):
|
|
flash("An issue occured while creating your account. Please try again later.")
|
|
return redirect("/signup")
|
|
|
|
UserIPHash = hashlib.md5(get_remote_address().encode("utf-8")).hexdigest()
|
|
LoginRecords : list[LoginRecord] = LoginRecord.query.filter(LoginRecord.ip == UserIPHash).distinct(LoginRecord.userid).all()
|
|
for record in LoginRecords:
|
|
if record.User.accountstatus != 1:
|
|
if record.session_token != SessionToken:
|
|
NewLoginRecord = LoginRecord(
|
|
userid = record.userid,
|
|
ip = UserIPHash,
|
|
session_token = SessionToken,
|
|
useragent = request.headers.get("User-Agent"),
|
|
timestamp = datetime.utcnow()
|
|
)
|
|
db.session.add(NewLoginRecord)
|
|
db.session.commit()
|
|
|
|
flash("An issue occured while creating your account. Please try again later.")
|
|
return redirect("/signup")
|
|
LoginRecords : list[LoginRecord] = LoginRecord.query.filter(LoginRecord.session_token == SessionToken).distinct(LoginRecord.userid).all()
|
|
for record in LoginRecords:
|
|
if record.User.accountstatus != 1:
|
|
NewLoginRecord = LoginRecord(
|
|
userid = record.userid,
|
|
ip = UserIPHash,
|
|
session_token = SessionToken,
|
|
useragent = request.headers.get("User-Agent"),
|
|
timestamp = datetime.utcnow()
|
|
)
|
|
db.session.add(NewLoginRecord)
|
|
db.session.commit()
|
|
|
|
flash("An issue occured while creating your account. Please try again later.")
|
|
return redirect("/signup")
|
|
|
|
UserSignupLock = redislock.acquire_lock("UserSignupLock", acquire_timeout = 20, lock_timeout=1)
|
|
if not UserSignupLock:
|
|
flash("An issue occured while creating your account. Please try again later.")
|
|
return redirect("/signup")
|
|
|
|
user = User.query.filter(func.lower(User.username) == func.lower(username)).first()
|
|
if user is not None:
|
|
redislock.release_lock("UserSignupLock", UserSignupLock)
|
|
flash("Username already taken")
|
|
return redirect("/signup")
|
|
|
|
pastUsername = PastUsername.query.filter(func.lower(PastUsername.username) == func.lower(username)).first()
|
|
if pastUsername is not None:
|
|
redislock.release_lock("UserSignupLock", UserSignupLock)
|
|
flash("Username already taken")
|
|
return redirect("/signup")
|
|
|
|
# try:
|
|
# inviteKey : InviteKey = invitekeys.GetInviteKey(invitekey)
|
|
# except invitekeys.InviteExceptions.InvalidInviteKey:
|
|
# redislock.release_lock("UserSignupLock", UserSignupLock)
|
|
# flash("Invalid invite key")
|
|
# return redirect("/signup")
|
|
# if inviteKey.used_by is not None:
|
|
# redislock.release_lock("UserSignupLock", UserSignupLock)
|
|
# flash("Invite key already used")
|
|
# return redirect("/signup")
|
|
# InviteKeyCreator : User = User.query.filter_by(id=inviteKey.created_by).first()
|
|
# if InviteKeyCreator is not None:
|
|
# if InviteKeyCreator.accountstatus != 1:
|
|
# redislock.release_lock("UserSignupLock", UserSignupLock)
|
|
# flash("Invite key creator's account is banned")
|
|
# return redirect("/signup")
|
|
|
|
if not turnstile.VerifyToken(request.form.get('cf-turnstile-response')):
|
|
redislock.release_lock("UserSignupLock", UserSignupLock)
|
|
flash("Invalid captcha", "error")
|
|
|
|
if request.cookies.get(key="t", default=None, type=str) is None:
|
|
abort(400)
|
|
|
|
redis_controller.set("signupcooldown:" + get_remote_address(), "1", ex=60*60*24*7)
|
|
if request.cookies.get("t", default=None, type=str) is not None:
|
|
redis_controller.set("signupcooldown:token:" + str(request.cookies.get(key="t", default=None, type=str)), "1", ex=60*60*24*7)
|
|
|
|
NewRegisteredUser : User = User(
|
|
username = username,
|
|
password = "",
|
|
created = datetime.utcnow(),
|
|
lastonline = datetime.utcnow(),
|
|
)
|
|
|
|
db.session.add(NewRegisteredUser)
|
|
db.session.commit()
|
|
|
|
auth.SetPassword(NewRegisteredUser, password)
|
|
|
|
userEconomy = UserEconomy(
|
|
userid = NewRegisteredUser.id,
|
|
robux = 0,
|
|
tix = 0
|
|
)
|
|
db.session.add(userEconomy)
|
|
|
|
userAvatar : UserAvatar = UserAvatar(
|
|
user_id=NewRegisteredUser.id
|
|
)
|
|
userAvatar.torso_color_id = random.choice(AllowedBodyColors)
|
|
db.session.add(userAvatar)
|
|
|
|
#invitekeys.UseInviteKey(invitekey, NewRegisteredUser)
|
|
|
|
db.session.commit()
|
|
redislock.release_lock("UserSignupLock", UserSignupLock)
|
|
logging.info(f"New user registered: {NewRegisteredUser.username} ({NewRegisteredUser.id})")
|
|
|
|
TakeUserThumbnail(NewRegisteredUser.id)
|
|
sessionToken = auth.CreateToken(userid=NewRegisteredUser.id, ip=get_remote_address())
|
|
resp = make_response(redirect("/home"))
|
|
resp.set_cookie(".ROBLOSECURITY", sessionToken, expires=datetime.utcnow() + timedelta(days=365), domain=f".{config.BaseDomain}")
|
|
|
|
if request.cookies.get("t", default=None, type=str) is not None:
|
|
NewToken = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(128))
|
|
resp.set_cookie("t", NewToken, expires=datetime.utcnow() + timedelta(days=365), domain=f".{config.BaseDomain}")
|
|
CreateLoginRecord( NewRegisteredUser.id, NewToken )
|
|
else:
|
|
CreateLoginRecord( NewRegisteredUser.id )
|
|
|
|
return resp |