syntaxwebsite/app/routes/usersapi.py

219 lines
7.8 KiB
Python

# users.roblox.com
from flask import Blueprint, jsonify, request, make_response
from flask_wtf.csrf import CSRFError, generate_csrf
from app.extensions import db, redis_controller, limiter, csrf
from app.models.user import User
from app.models.userassets import UserAsset
from app.models.past_usernames import PastUsername
from app.models.asset import Asset
from app.util import membership, auth
from app.enums.AssetType import AssetType
from app.enums.MembershipType import MembershipType
from sqlalchemy import func
UsersAPI = Blueprint('UsersAPI', __name__, url_prefix='/')
csrf.exempt(UsersAPI)
@UsersAPI.errorhandler(CSRFError)
def handle_csrf_error(e):
ErrorResponse = make_response(jsonify({
"errors": [
{
"code": 0,
"message": "Token Validation Failed"
}
]
}))
ErrorResponse.status_code = 403
ErrorResponse.headers["x-csrf-token"] = generate_csrf()
return ErrorResponse
@UsersAPI.errorhandler(429)
def handle_ratelimit_reached(e):
return jsonify({
"errors": [
{
"code": 9,
"message": "The flood limit has been exceeded."
}
]
}), 429
@UsersAPI.before_request
def before_request():
if "Roblox/" not in request.user_agent.string:
if request.path == "/v1/usernames/users" or request.path == "/v1/users":
return
csrf.protect()
@UsersAPI.route('/v1/users/<int:userId>', methods=['GET'])
@limiter.limit("60/minute")
def get_user( userId : int ):
userObject : User = User.query.filter_by(id=userId).first()
if userObject is None:
return jsonify( { "errors": [ { "code": 3, "message": "The user id is invalid." } ] } ), 404
return jsonify({
"description": userObject.description,
"created": userObject.created.isoformat(),
"isBanned": userObject.accountstatus != 1,
"externalAppDisplayName": userObject.username,
"hasVerifiedBadge": False,
"id": userObject.id,
"name": userObject.username,
"displayName": userObject.username,
})
@UsersAPI.route('/v1/users/authenticated', methods=['GET'])
@limiter.limit("60/minute")
@auth.authenticated_required_api
def get_authenticated_user():
AuthenticatedUser : User = auth.GetCurrentUser()
return jsonify({
"id": AuthenticatedUser.id,
"name": AuthenticatedUser.username,
"displayName": AuthenticatedUser.username,
})
@UsersAPI.route("/v1/users/authenticated/roles", methods=["GET"])
@limiter.limit("60/minute")
@auth.authenticated_required_api
def get_authenticated_user_roles():
return jsonify({
"roles": []
}), 200
@UsersAPI.route("/v1/usernames/users", methods=["POST"])
@limiter.limit("60/minute")
def multi_username_lookup():
if not request.is_json:
return jsonify( { "errors": [ { "code": 0, "message": "The request is invalid." } ] } ), 400
if "usernames" not in request.json:
return jsonify( { "errors": [ { "code": 0, "message": "The request is invalid." } ] } ), 400
if not isinstance(request.json["usernames"], list):
return jsonify( { "errors": [ { "code": 0, "message": "The request is invalid." } ] } ), 400
usernames = request.json["usernames"]
if len(usernames) > 100:
return jsonify( { "errors": [ { "code": 2, "message": "Too many usernames." } ] } ), 400
alreadySearchedUsernames = []
data_result = []
for username in usernames:
username = str(username)
if username.lower() in alreadySearchedUsernames:
continue
alreadySearchedUsernames.append(username.lower())
userObject : User = User.query.filter(func.lower(User.username) == username.lower()).first()
if userObject is not None:
data_result.append({
"requestedUsername": username,
"hasVerifiedBadge": False,
"id": userObject.id,
"name": userObject.username,
"displayName": userObject.username
})
continue
pastusernameLookup : PastUsername = PastUsername.query.filter(func.lower(PastUsername.username) == username.lower()).first()
if pastusernameLookup is not None:
userObject : User = User.query.filter_by(id=pastusernameLookup.user_id).first()
if userObject is not None:
data_result.append({
"requestedUsername": username,
"hasVerifiedBadge": False,
"id": userObject.id,
"name": userObject.username,
"displayName": userObject.username
})
continue
return jsonify({
"data": data_result
}), 200
@UsersAPI.route("/v1/users", methods=["POST"])
@limiter.limit("60/minute")
def multi_user_lookup():
if not request.is_json:
return jsonify( { "errors": [ { "code": 0, "message": "The request is invalid." } ] } ), 400
if "userIds" not in request.json:
return jsonify( { "errors": [ { "code": 0, "message": "The request is invalid." } ] } ), 400
if not isinstance(request.json["userIds"], list):
return jsonify( { "errors": [ { "code": 0, "message": "The request is invalid." } ] } ), 400
userIds = request.json["userIds"]
if len(userIds) > 100:
return jsonify( { "errors": [ { "code": 2, "message": "Too many usernames." } ] } ), 400
alreadySearchedUserIds = []
data_result = []
for userId in userIds:
if type(userId) != int:
return jsonify( { "errors": [ { "code": 0, "message": "The request is invalid." } ] } ), 400
if userId in alreadySearchedUserIds:
continue
alreadySearchedUserIds.append(userId)
userObject : User = User.query.filter_by(id=userId).first()
if userObject is not None:
data_result.append({
"id": userObject.id,
"name": userObject.username,
"displayName": userObject.username,
"hasVerifiedBadge": False
})
continue
return jsonify({
"data": data_result
}), 200
@UsersAPI.route("/v1/users/<int:userId>/username-history", methods=["GET"])
@limiter.limit("60/minute")
def past_username_history_lookup( userId : int ):
userObject : User = User.query.filter_by(id=userId).first()
if userObject is None:
return jsonify( { "errors": [ { "code": 3, "message": "The user id is invalid." } ] } ), 404
cursorPage : int = request.args.get("cursor", default = 1, type = int)
if cursorPage < 1:
return jsonify( { "errors": [ { "code": 4, "message": "The specified cursor is invalid!" } ] } ), 400
pageLimit : int = request.args.get("limit", default = 10, type = int)
if pageLimit not in [10, 25, 50, 100]:
return jsonify( { "errors": [ { "code": 5, "message": "The specified limit is invalid!" } ] } ), 400
sortOrder : str = request.args.get("sortOrder", default = "Asc", type = str)
if sortOrder.lower() not in ["asc", "desc"]:
return jsonify( { "errors": [ { "code": 6, "message": "The specified sort order is invalid!" } ] } ), 400
pastUsernames = PastUsername.query.filter_by(user_id=userId).order_by(
PastUsername.id.desc() if sortOrder.lower() == "desc" else PastUsername.id.asc()
).paginate(page=cursorPage, per_page=pageLimit, error_out=False)
data_result = []
for pastUsername in pastUsernames.items:
data_result.append({
"name": pastUsername.username,
})
return jsonify({
"previousPageCursor": str(pastUsernames.prev_num) if pastUsernames.has_prev else None,
"nextPageCursor": str(pastUsernames.next_num) if pastUsernames.has_next else None,
"data": data_result
}),200