import xmltodict
import json
class RCCSOAPMessages():
GetAllJobsMsg = """
"""
OpenJobMsg = """
{JobId}
{JobExpiration}
{JobCores}
{ScriptName}
{Arguments}
"""
BatchJobMsg = """
{JobId}
{JobExpiration}
{JobCores}
{ScriptName}
{Arguments}
"""
CloseJobMsg = """
{JobId}
"""
ExecuteScriptMsg = """
{JobId}
{ScriptName}
{Script}
{Arguments}
"""
def ParseXMLResponse(self, response : str):
return xmltodict.parse(response.strip())
def GenerateArguments(self, Arguments) -> str:
"""
Converts the list of arguments into XML to be passed into the script
"""
ArgumentsText = ""
for arg in Arguments:
# Types
# LUA_TNIL
# LUA_TBOOLEAN
# LUA_TNUMBER
# LUA_TSTRING
# LUA_TTABLE
# We need to convert the arg
if type(arg) == bool:
argType = "LUA_TBOOLEAN"
argValue = str(arg).lower()
elif type(arg) == int or type(arg) == float:
argType = "LUA_TNUMBER"
argValue = str(arg)
elif type(arg) == str:
argType = "LUA_TSTRING"
argValue = arg
ArgumentsText += f"""
{argType}
{argValue}
"""
return ArgumentsText
def FormatOpenJobMessage( self, JobId : str, Expiration : int = 20, Cores : int = 1, ScriptName : str = "RunScript", RunScript : str = "", Arguments = []) -> str:
"""
Formats the OpenJobMsg with the given parameters
"""
ParsedArguments : str = self.GenerateArguments(Arguments)
return self.OpenJobMsg.format(
JobId = JobId,
JobExpiration = str(Expiration),
JobCores = str(Cores),
ScriptName = ScriptName,
RunScript = RunScript,
Arguments = ParsedArguments
)
def FormatBatchJobMessage( self, JobId : str, Expiration : int = 20, Cores : int = 1, ScriptName : str = "RunScript", RunScript : str = "", Arguments = [] ) -> str:
"""
Formats the BatchJobMsg with the given parameters
"""
ParsedArguments : str = self.GenerateArguments(Arguments)
return self.BatchJobMsg.format(
JobId = JobId,
JobExpiration = str(Expiration),
JobCores = str(Cores),
ScriptName = ScriptName,
RunScript = RunScript,
Arguments = ParsedArguments
)
def FormatCloseJobMessage( self, JobId : str ) -> str:
"""
Formats the CloseJobMsg with the given parameters
"""
return self.CloseJobMsg.format(
JobId = JobId
)
def FormatExecuteScriptMessage( self, JobId : str, ScriptName : str = "Script", Script : str = "", Arguments = []) -> str:
"""
Formats the ExecuteScriptMsg with the given parameters
"""
ParsedArguments : str = self.GenerateArguments(Arguments)
return self.ExecuteScriptMsg.format(
JobId = JobId,
ScriptName = ScriptName,
Script = Script,
Arguments = ParsedArguments
)
def FormatGameOpenJSON( self, PlaceId : int, CreatorId : int, JobId : str, ApiKey : str, MaxPlayers : int = 10, GsmInterval : int = 20, PortNumber : int = 53640, CreatorType : str = "User", PlaceVersion : int = 1, MachineAddress : str = "127.0.0.1", UniverseId : int | None = 0):
"""
Formats the GameOpen JSON with the given parameters used for 2017L+ RCC
"""
return json.dumps({
"Mode": "GameServer",
"Settings": {
"PlaceId": PlaceId,
"CreatorId": CreatorId,
"GameId": JobId,
"MachineAddress": MachineAddress,
"MaxPlayers": MaxPlayers,
"GsmInterval": GsmInterval,
"MaxGameInstances": 1,
"PreferredPlayerCapacity": MaxPlayers,
"UniverseId": PlaceId if UniverseId is None else UniverseId,
"BaseUrl": "syntax.eco",
"MatchmakingContextId": 1,
"CreatorType": CreatorType,
"PlaceVersion": PlaceVersion,
"JobId": JobId,
"PreferredPort": PortNumber,
"ApiKey": ApiKey,
"PlaceVisitAccessKey": "None",
"PlaceFetchUrl": f"https://www.syntax.eco/asset/?id={str(PlaceId)}"
}
})
def ParseGetAllJobsResponse( self, ResponseText : str ):
"""
Parses the GetAllJobs response into a list of dictionaries
"""
ParsedResponse = self.ParseXMLResponse(ResponseText)
JobsList = ParsedResponse["SOAP-ENV:Envelope"]["SOAP-ENV:Body"]["ns1:GetAllJobsResponse"]
if JobsList is not None and "ns1:GetAllJobsResult" in JobsList:
JobsList = JobsList["ns1:GetAllJobsResult"]
else:
return []
if type(JobsList) == dict:
# Only one job
# Format it into a dict without ns1:
JobDict = {
"id" : JobsList["ns1:id"],
"expirationInSeconds" : JobsList["ns1:expirationInSeconds"],
"category" : JobsList["ns1:category"],
"cores" : JobsList["ns1:cores"]
}
return [JobDict]
elif type(JobsList) == list:
# Multiple jobs
# Format them into a list of dicts without ns1:
JobsDictList = []
for Job in JobsList:
JobDict = {
"id" : Job["ns1:id"],
"expirationInSeconds" : Job["ns1:expirationInSeconds"],
"category" : Job["ns1:category"],
"cores" : Job["ns1:cores"]
}
JobsDictList.append(JobDict)
return JobsDictList
else:
# No jobs
return []