70 lines
2.3 KiB
Python

from __future__ import annotations
import socket
from typing import Final
from construct import ConstructError
from api import Request
from secret_stream_socket import ProtocolError, wrap_connection_socket
SERVER_HOST: Final = "127.0.0.1"
SERVER_PORT: Final = 9000
SHARED_SECRET: Final = "change-me"
def print_batch(peer: str, request_id: int, batch: object) -> None:
print(
f"[packet] {peer} sent request_id={request_id} with {len(batch.messages)} message(s)",
flush=True,
)
for index, message in enumerate(batch.messages):
print(
f" [{index}] role={message.role!r} content={message.content!r}",
flush=True,
)
def handle_client(
client_sock: socket.socket,
address: tuple[str, int],
shared_secret: str,
) -> None:
peer = f"{address[0]}:{address[1]}"
def on_frame(frame: bytes) -> None:
req = Request.parse(frame)
if req.kind == "chat":
print_batch(peer, req.request_id, req.payload)
elif req.kind == "cancel":
print(f"[packet] {peer} sent request_id={req.request_id} cancel", flush=True)
else:
raise ConstructError(f"Unknown request kind {req.kind!r}")
with wrap_connection_socket(client_sock, shared_secret) as transport:
print(f"[connected] {peer}", flush=True)
transport.run_receiving_loop(on_frame)
print(f"[disconnected] {peer}", flush=True)
def run_server(host: str, port: int, shared_secret: str) -> None:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_sock:
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind((host, port))
server_sock.listen()
print(f"[listening] {host}:{port}", flush=True)
while True:
client_sock, address = server_sock.accept()
try:
handle_client(client_sock, address, shared_secret)
except (ConstructError, ProtocolError) as exc:
print(f"[protocol error] {address[0]}:{address[1]}: {exc}", flush=True)
except Exception as exc:
print(f"[error] {address[0]}:{address[1]}: {exc}", flush=True)
if __name__ == "__main__":
try:
run_server(SERVER_HOST, SERVER_PORT, SHARED_SECRET)
except KeyboardInterrupt:
print("\n[stopped]", flush=True)