From 2ac57a64340013d326473a2f2921e2b64d1e7c21 Mon Sep 17 00:00:00 2001 From: Andreew Gregory Date: Sun, 29 Mar 2026 13:49:00 +0300 Subject: [PATCH] server.py fixed --- dedicated_ai_server/config.py | 44 +++++++++++++++++++++ dedicated_ai_server/server.py | 72 +++++++++++++++++++++++------------ 2 files changed, 92 insertions(+), 24 deletions(-) create mode 100644 dedicated_ai_server/config.py diff --git a/dedicated_ai_server/config.py b/dedicated_ai_server/config.py new file mode 100644 index 0000000..5ae7b19 --- /dev/null +++ b/dedicated_ai_server/config.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Mapping + +import tomllib + + +@dataclass(frozen=True) +class Config: + listening_addr: str + listening_port: int + secret: str + + +def _read_toml(path: Path) -> Mapping[str, Any]: + with path.open("rb") as handle: + return tomllib.load(handle) + + +def read_config( + config_path: str = "config.toml", + secret_config_path: str = "secret-config.toml", +) -> Config: + config_data = _read_toml(Path(config_path)) + secret_data = _read_toml(Path(secret_config_path)) + + addr = str(config_data.get("listening_addr", "127.0.0.1")) + port_raw = config_data.get("listening_port", 9000) + try: + port = int(port_raw) + except (TypeError, ValueError) as exc: + raise ValueError(f"Invalid listening_port value {port_raw!r}") from exc + + secret = secret_data.get("secret") + if not secret: + raise ValueError("Missing secret in secret-config.toml") + + return Config( + listening_addr=addr, + listening_port=port, + secret=str(secret), + ) diff --git a/dedicated_ai_server/server.py b/dedicated_ai_server/server.py index 57072d4..4cff69d 100644 --- a/dedicated_ai_server/server.py +++ b/dedicated_ai_server/server.py @@ -10,16 +10,12 @@ import torch from transformers import AutoModelForCausalLM, AutoTokenizer from api import Request, Response +from config import Config, read_config from secret_stream_socket import ProtocolError, SecretStreamSocket, wrap_connection_socket -SERVER_HOST = "127.0.0.1" -SERVER_PORT = 9000 -SHARED_SECRET = "change-me" - MODEL_ID = "zai-org/GLM-4.7-Flash" MAX_NEW_TOKENS = 256 -PIECE_CHUNK_SIZE = 64 @dataclass @@ -35,9 +31,6 @@ class PendingChatCompletionRecord: was_cancelled: bool = False _lock: threading.Lock = field(default_factory=threading.Lock, repr=False) - - - def mark_cancelled(self) -> None: with self._lock: self.was_cancelled = True @@ -54,6 +47,7 @@ class WorkItem: record: PendingChatCompletionRecord + @dataclass class ModelBundle: tokenizer: Any @@ -114,21 +108,46 @@ def generate_llm_pieces(bundle: ModelBundle, messages: list) -> Iterable[str]: inputs = {name: tensor.to(bundle.model.device) for name, tensor in inputs.items()} input_ids = inputs.get("input_ids") - input_len = int(input_ids.shape[1]) if input_ids is not None else 0 + attention_mask = inputs.get("attention_mask") + past_key_values = None - with torch.inference_mode(): - output_ids = bundle.model.generate( - **inputs, - max_new_tokens=MAX_NEW_TOKENS, - do_sample=False, - ) + eos_token_id = bundle.model.config.eos_token_id + if eos_token_id is None: + eos_token_id = bundle.tokenizer.eos_token_id + if eos_token_id is None: + eos_token_ids = set() + elif isinstance(eos_token_id, (list, tuple, set)): + eos_token_ids = set(int(x) for x in eos_token_id) + else: + eos_token_ids = {int(eos_token_id)} - generated_ids = output_ids[0][input_len:] - text = bundle.tokenizer.decode(generated_ids, skip_special_tokens=True) - if not text: - return [] + for _ in range(MAX_NEW_TOKENS): + with torch.inference_mode(): + outputs = bundle.model( + input_ids=input_ids, + attention_mask=attention_mask, + past_key_values=past_key_values, + use_cache=True, + ) - return [text[i:i + PIECE_CHUNK_SIZE] for i in range(0, len(text), PIECE_CHUNK_SIZE)] + logits = outputs.logits[:, -1, :] + next_token_id = torch.argmax(logits, dim=-1) + token_id = int(next_token_id.item()) + + if token_id in eos_token_ids: + break + + token_text = bundle.tokenizer.decode([token_id], skip_special_tokens=True) + if token_text: + yield token_text +- + past_key_values = outputs.past_key_values + input_ids = next_token_id.unsqueeze(1) + if attention_mask is not None: + attention_mask = torch.cat( + [attention_mask, attention_mask.new_ones((attention_mask.shape[0], 1))], + dim=1, + ) def worker_loop( @@ -313,7 +332,7 @@ async def handle_client( await transport.close() -async def run_server(model_bundle: ModelBundle) -> None: +async def run_server(model_bundle: ModelBundle, config: Config) -> None: pending: Dict[int, PendingChatCompletionRecord] = {} pending_lock = threading.Lock() work_queue: queue.Queue[WorkItem | None] = queue.Queue() @@ -330,13 +349,17 @@ async def run_server(model_bundle: ModelBundle) -> None: await handle_client( reader, writer, - SHARED_SECRET, + config.secret, work_queue, pending, pending_lock, ) - server = await asyncio.start_server(client_handler, SERVER_HOST, SERVER_PORT) + server = await asyncio.start_server( + client_handler, + config.listening_addr, + config.listening_port, + ) addr = ", ".join(str(sock.getsockname()) for sock in server.sockets or []) print(f"[listening] {addr}") @@ -345,9 +368,10 @@ async def run_server(model_bundle: ModelBundle) -> None: def main() -> None: + config = read_config() model_bundle = load_local_model() print("[model] loaded", flush=True) - asyncio.run(run_server(model_bundle)) + asyncio.run(run_server(model_bundle, config)) if __name__ == "__main__":