|
| 1 | +from socket import socket, create_connection, AF_INET, SOCK_DGRAM |
| 2 | +from struct import unpack |
| 3 | +from time import time |
| 4 | +from dataclasses import dataclass |
| 5 | +from typing import Tuple |
| 6 | + |
| 7 | +# * Dataclass |
| 8 | +@dataclass |
| 9 | +class Status: |
| 10 | + name: str |
| 11 | + map: str |
| 12 | + players: int |
| 13 | + wave: int |
| 14 | + version: float |
| 15 | + vertype: str |
| 16 | + ping: float |
| 17 | + |
| 18 | +# * Главный класс |
| 19 | +class Server: |
| 20 | + def __init__( |
| 21 | + self, |
| 22 | + server_host: str, |
| 23 | + server_port: int=6567, |
| 24 | + input_port: int=6859 |
| 25 | + ) -> None: |
| 26 | + self.server: Tuple[str, int] = (server_host, server_port) |
| 27 | + self.input_server: Tuple[str, int] = (server_host, input_port) |
| 28 | + |
| 29 | + def get_status(self, timeout: float=10.0) -> Status: |
| 30 | + # * Инициализация сервера |
| 31 | + s = socket(AF_INET, SOCK_DGRAM) |
| 32 | + s.connect(self.server) |
| 33 | + s.settimeout(timeout) |
| 34 | + |
| 35 | + # * Создание и так понятно чего для чего |
| 36 | + info = {} |
| 37 | + |
| 38 | + # * Получение данных и замер |
| 39 | + s_time = time() |
| 40 | + s.send(b"\xfe\x01") |
| 41 | + data = s.recv(1024) |
| 42 | + e_time = time() |
| 43 | + |
| 44 | + # * Парсинг |
| 45 | + info["name"] = data[1:data[0]+1].decode("utf-8") |
| 46 | + data = data[data[0]+1:] |
| 47 | + info["map"] = data[1:data[0]+1].decode("utf-8") |
| 48 | + data = data[data[0]+1:] |
| 49 | + info["players"] = unpack(">i", data[:4])[0] |
| 50 | + data = data[4:] |
| 51 | + info["wave"] = unpack(">i", data[:4])[0] |
| 52 | + data = data[4:] |
| 53 | + info["version"] = unpack(">i", data[:4])[0] |
| 54 | + data = data[4:] |
| 55 | + info["vertype"] = data[1:data[0]+1].decode("utf-8") |
| 56 | + info["ping"] = round((e_time - s_time) * 1000) |
| 57 | + |
| 58 | + return Status(**info) |
| 59 | + |
| 60 | + def send_command(self, command: str) -> None: |
| 61 | + s = create_connection(self.input_server) |
| 62 | + s.sendall( |
| 63 | + bytes( |
| 64 | + command.encode(errors="ignore") |
| 65 | + ) |
| 66 | + ) |
| 67 | + s.close() |
| 68 | + |
| 69 | + def ping(self, timeout: float=10.0) -> int: |
| 70 | + # * Инициализация сервера |
| 71 | + s = socket(AF_INET, SOCK_DGRAM) |
| 72 | + s.connect(self.server) |
| 73 | + s.settimeout(timeout) |
| 74 | + |
| 75 | + # * Получение данных и замер |
| 76 | + s_time = time() |
| 77 | + s.send(b"\xfe\x01") |
| 78 | + s.recv(1024) |
| 79 | + e_time = time() |
| 80 | + |
| 81 | + return round((e_time - s_time) * 1000) |
0 commit comments