SyntaxGameServer/UDPProxy.py

52 lines
2.0 KiB
Python

import logging
import socket
import os
import subprocess
def IsPortInUse( port : int ) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.1)
return s.connect_ex(('127.0.0.1', port)) == 0
class UDPProxy():
"""
A wrapper for socat ( Linux ) and netcat ( Windows ) to forward UDP packets
"""
UDPProxyProcess = None
UDPProxyPort = 0
UDPProxyTargetPort = 0
UDPProxyTargetHost = "127.0.0.1"
isRunning = False
def __init__(self, UDPProxyPort : int, UDPProxyTargetPort : int, UDPProxyTargetHost : str = "127.0.0.1"):
self.UDPProxyPort = UDPProxyPort
self.UDPProxyTargetPort = UDPProxyTargetPort
self.UDPProxyTargetHost = UDPProxyTargetHost
def StartUDPProxy(self):
if self.isRunning:
logging.warning("UDPProxy is already running")
return
if IsPortInUse(self.UDPProxyPort):
logging.warning(f"UDPProxyPort {self.UDPProxyPort} is already in use")
return
logging.info(f"Starting UDPProxy on port {self.UDPProxyPort} forwarding to {self.UDPProxyTargetHost}:{self.UDPProxyTargetPort}")
# We need to handle multiple clients at once so we use fork
if os.name == "nt":
self.UDPProxyProcess = subprocess.Popen(["quilkin", "--no-admin", "proxy", "-p", str(self.UDPProxyPort), "-t", f"{self.UDPProxyTargetHost}:{self.UDPProxyTargetPort}"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
self.UDPProxyProcess = subprocess.Popen(["socat", "udp-listen:{},fork".format(self.UDPProxyPort), "udp:127.0.0.1:{}".format(self.UDPProxyTargetPort)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
self.isRunning = True
def StopUDPProxy(self):
if self.isRunning is False:
logging.warning("UDPProxy is not running")
return
logging.info(f"Stopping UDPProxy on port {self.UDPProxyPort}")
self.UDPProxyProcess.kill()
self.isRunning = False
def __del__(self):
self.StopUDPProxy()