Coverage for chatgpt_proxy / app.py: 90%

366 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-12 16:19 +0000

1# MIT License 

2# 

3# Copyright (c) 2025 Tuomo Kriikkula 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy 

6# of this software and associated documentation files (the "Software"), to deal 

7# in the Software without restriction, including without limitation the rights 

8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 

9# copies of the Software, and to permit persons to whom the Software is 

10# furnished to do so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23""" 

24Implements a simple proxy server for communication between an UnrealScript 

25client and OpenAI servers. 

26""" 

27 

28import asyncio 

29import dataclasses 

30import datetime 

31import multiprocessing as mp 

32import os 

33import secrets 

34from http import HTTPStatus 

35from multiprocessing.synchronize import Event as EventType 

36from pprint import pprint 

37from typing import Any 

38from typing import Awaitable 

39 

40import asyncpg 

41import httpx 

42import openai 

43import sanic 

44from py_markdown_table.markdown_table import markdown_table 

45from sanic import Blueprint 

46from sanic.response import HTTPResponse 

47 

48from chatgpt_proxy.auth import auth 

49from chatgpt_proxy.auth import check_and_inject_game 

50from chatgpt_proxy.auth import is_real_game_server 

51from chatgpt_proxy.cache import app_cache 

52from chatgpt_proxy.cache import db_cache 

53from chatgpt_proxy.db import pool_acquire 

54from chatgpt_proxy.db import pool_acquire_many 

55from chatgpt_proxy.db import queries 

56from chatgpt_proxy.db.models import GameObjectiveState 

57from chatgpt_proxy.db.models import SayType 

58from chatgpt_proxy.db.models import Team 

59from chatgpt_proxy.db.models import max_ast_literal_eval_size 

60from chatgpt_proxy.log import logger 

61from chatgpt_proxy.types import App 

62from chatgpt_proxy.types import Context 

63from chatgpt_proxy.types import Request 

64from chatgpt_proxy.utils import get_remote_addr 

65from chatgpt_proxy.utils import is_prod_env 

66from chatgpt_proxy.utils import utcnow 

67 

68# TODO: This breaks with nest_asyncio, which is also unmaintained! 

69# if platform.system() == "Windows": 

70# # noinspection PyUnresolvedReferences 

71# import winloop # type: ignore[import-not-found] 

72# 

73# winloop.install() 

74 

75# TODO: need to come up with a more consistent way for logging errors 

76# from request validation, etc.: 

77# - Log basic error info at info level. 

78# - Log the stack trace at debug level (use Sentry for this in prod?) 

79# - Consider Sentry for logging all other unhandled errors! 

80 

81api_v1 = Blueprint("api", version_prefix="/api/v", version=1) 

82 

83 

84def db_maintenance_process(stop_event: EventType) -> None: 

85 logger.debug("db_maintenance_process starting") 

86 asyncio.run(db_maintenance(stop_event)) 

87 asyncio.run(_suppress(app_cache.close())) # TODO: this doesn't actually run ever? 

88 logger.debug("db_maintenance_process done") 

89 

90 

91def refresh_steam_web_api_cache_process(stop_event: EventType) -> None: 

92 asyncio.run(refresh_steam_web_api_cache(stop_event)) 

93 

94 

95def make_api_v1_app(name: str = "ChatGPTProxy", **kwargs: Any) -> App: 

96 _app: App = sanic.Sanic( 

97 name, 

98 ctx=Context(), 

99 request_class=Request, # type: ignore[arg-type] 

100 **kwargs, 

101 ) 

102 # We don't expect UScript side to send large requests. 

103 _app.config.REQUEST_MAX_SIZE = 1500 

104 _app.config.REQUEST_MAX_HEADER_SIZE = 1500 

105 _app.config.OAS = not is_prod_env 

106 _app.config.SECRET = os.environ["SANIC_SECRET"] 

107 _app.config.JWT_ISSUER = auth.jwt_issuer 

108 _app.config.JWT_AUDIENCE = auth.jwt_audience 

109 

110 @_app.main_process_ready 

111 async def main_process_ready(app_: App): 

112 app_.manager.manage( 

113 name="DatabaseMaintenanceProcess", 

114 func=db_maintenance_process, 

115 kwargs={"stop_event": app_.shared_ctx.bg_process_event}, 

116 transient=True, 

117 ) 

118 app_.manager.manage( 

119 "SteamWebAPICacheRefreshProcess", 

120 func=refresh_steam_web_api_cache_process, 

121 kwargs={"stop_event": app_.shared_ctx.bg_process_event}, 

122 transient=True, 

123 ) 

124 

125 @_app.main_process_start 

126 async def main_process_start(app_: App): 

127 bg_process_event = mp.Event() 

128 app_.shared_ctx.bg_process_event = bg_process_event 

129 

130 @_app.main_process_stop 

131 async def main_process_stop(app_: App): 

132 app_.shared_ctx.bg_process_event.set() 

133 

134 @_app.before_server_start 

135 async def before_server_start(app_: App): 

136 api_key = os.environ.get("OPENAI_API_KEY") 

137 client = openai.AsyncOpenAI(api_key=api_key) 

138 app_.ctx.client = client 

139 app_.ext.dependency(client) 

140 

141 db_url = os.environ.get("DATABASE_URL") 

142 pool = await asyncpg.create_pool(dsn=db_url) 

143 app_.ctx.pg_pool = pool 

144 app_.ext.dependency(pool) 

145 

146 app_.ctx.http_client = httpx.AsyncClient() 

147 app_.ext.dependency(app_.ctx.http_client) 

148 

149 # noinspection PyProtectedMember 

150 @_app.before_server_stop 

151 async def before_server_stop(app_: App): 

152 logger.debug("before_server_stop") 

153 

154 # TODO: cleanup should have timeouts! 

155 # If timed out, ignore it but log warning! 

156 

157 if app_.ctx._client: 

158 logger.debug("closing OpenAI client") 

159 await _suppress(app_.ctx._client.close()) 

160 if app_.ctx._pg_pool: 

161 logger.debug("closing pool") 

162 await _suppress(app_.ctx._pg_pool.close()) 

163 if app_.ctx._http_client: 

164 logger.debug("closing http client") 

165 await _suppress(app_.ctx._http_client.aclose()) 

166 

167 logger.debug("closing app cache") 

168 await _suppress(app_cache.close()) 

169 logger.debug("closing DB cache") 

170 await _suppress(db_cache.close()) 

171 

172 _app.blueprint(api_v1) 

173 

174 return _app 

175 

176 

177async def _suppress(f: Awaitable[Any]) -> Any: 

178 try: 

179 return await f 

180 except Exception as e: 

181 logger.exception("{}: error: {}", f, type(e).__name__) 

182 

183 

184# TODO: dynamic model selection? 

185openai_model = "gpt-5-nano" 

186openai_timeout = 60.0 # TODO: this might be way too low? 

187 

188prompt_max_game_chat_msgs = 30 

189prompt_max_game_kills = 30 

190 

191default_llm_task = """ 

192Your task is to roleplay as a 1970s retro-futuristic AI. 

193Your name is MAINFRAME CHARLIE. You are provided with new 

194information updates regularly and your job is to react to them, 

195while staying in role. You are given data from an on-going real-life 

196multiplayer video game match (Rising Storm 2: Vietnam) regularly. 

197While writing responses, do not acknowledge the fact that you are 

198reacting to a video game match. Pretend it is a real 1960s-1970s 

199Vietnam war scenario. Keep your tone semi-serious, while using 

200era and scenario appropriate humor every now and then. 

201""" 

202 

203base_prompt_initial = """ 

204{llm_task} 

205 

206This is the beginning of a new game. The current level is {level_sanitized}. 

207 

208The game currently contains the following players: 

209{markdown_scoreboard_table} 

210 

211The last kills scored during the game are the 

212following (in ascending chronological order): 

213{markdown_kills_table} 

214 

215The last chat messages sent during the game are the 

216following (ascending in chronological order): 

217{markdown_chat_msgs_table} 

218 

219{initial_instruction} 

220""" 

221 

222# TODO: we should include the time since the last prompt here? 

223base_prompt_consecutive = """ 

224The current game scoreboard: 

225{markdown_scoreboard_table} 

226 

227Since the last update, the following kills 

228have been scored (last {num_kills} kills): 

229{markdown_kills_table} 

230 

231Since the last update, the following chat 

232messages have been sent (last {num_messages} messages): 

233{markdown_chat_msgs_table} 

234 

235{instruction} 

236""" 

237 

238# TODO: set these lower for debug/dev env? 

239# We prune all matches that have ended or are older 

240# than 5 hours during database maintenance. 

241game_expiration = datetime.timedelta(hours=5) 

242api_key_deletion_leeway = datetime.timedelta(minutes=5) 

243db_maintenance_interval = datetime.timedelta(minutes=30).total_seconds() 

244steam_web_api_cache_refresh_interval = datetime.timedelta(minutes=30).total_seconds() 

245 

246game_id_length = 24 

247 

248# TODO: should this be parametrized? Sent in from the UScript side? 

249max_message_length = 200 

250 

251 

252def format_base_prompt_initial( 

253 llm_task: str, 

254 level_sanitized: str, 

255 markdown_scoreboard_table: str, 

256 markdown_kills_table: str, 

257 markdown_chat_msgs_table: str, 

258 initial_instruction: str, 

259) -> str: 

260 return base_prompt_initial.format( 

261 llm_task=llm_task, 

262 level_sanitized=level_sanitized, 

263 markdown_scoreboard_table=markdown_scoreboard_table, 

264 markdown_kills_table=markdown_kills_table, 

265 markdown_chat_msgs_table=markdown_chat_msgs_table, 

266 initial_instruction=initial_instruction, 

267 ) 

268 

269 

270def format_base_prompt_consecutive( 

271 markdown_scoreboard_table: str, 

272 markdown_kills_table: str, 

273 num_kills: int, 

274 markdown_chat_msgs_table: str, 

275 num_msgs: int, 

276 instruction: str, 

277) -> str: 

278 return base_prompt_consecutive.format( 

279 markdown_scoreboard_table=markdown_scoreboard_table, 

280 markdown_kills_table=markdown_kills_table, 

281 num_kills=num_kills, 

282 markdown_chat_msgs_table=markdown_chat_msgs_table, 

283 num_messages=num_msgs, 

284 instruction=instruction, 

285 ) 

286 

287 

288async def get_scoreboard_markdown_table( 

289 conn: asyncpg.Connection, 

290 game_id: str, 

291) -> tuple[int, str]: 

292 players = await queries.select_game_players(conn, game_id) 

293 pprint(players) 

294 

295 scoreboard = "" 

296 if players: 

297 scoreboard = markdown_table([ 

298 player.as_markdown_dict() 

299 for player in players 

300 ]).get_markdown() 

301 pprint(scoreboard) 

302 

303 return len(players), scoreboard 

304 

305 

306async def get_kills_markdown_table( 

307 conn: asyncpg.Connection, 

308 game_id: str, 

309 from_time: datetime.datetime, 

310) -> tuple[int, str]: 

311 candidate_kills = await queries.select_game_kills( 

312 conn=conn, 

313 game_id=game_id, 

314 kill_time_from=from_time, 

315 limit=prompt_max_game_kills, 

316 ) 

317 pprint(candidate_kills) 

318 

319 kills_table = "" 

320 if candidate_kills: 

321 kills_table = markdown_table([ 

322 kill.as_markdown_dict() 

323 for kill in candidate_kills 

324 ]).get_markdown() 

325 pprint(kills_table) 

326 

327 return len(candidate_kills), kills_table 

328 

329 

330async def get_chat_messages_markdown_table( 

331 conn: asyncpg.Connection, 

332 game_id: str, 

333 from_time: datetime.datetime, 

334) -> tuple[int, str]: 

335 candidate_msgs = await queries.select_game_chat_messages( 

336 conn=conn, 

337 game_id=game_id, 

338 send_time_from=from_time, 

339 limit=prompt_max_game_chat_msgs, 

340 ) 

341 pprint(candidate_msgs) 

342 

343 msgs_table = "" 

344 if candidate_msgs: 

345 msgs_table = markdown_table([ 

346 msg.as_markdown_dict() 

347 for msg in candidate_msgs 

348 ]).get_markdown() 

349 pprint(msgs_table) 

350 

351 return len(candidate_msgs), msgs_table 

352 

353 

354def sanitize_level_name(level: str) -> str: 

355 level = level.split("-")[-1] 

356 level = level.replace("_", " ") 

357 return level 

358 

359 

360@api_v1.get("/game/<game_id:str>") 

361async def get_game( 

362 _: Request, 

363 game_id: str, 

364 pg_pool: asyncpg.Pool, 

365) -> HTTPResponse: 

366 # TODO: do we need these for anything other than testing? 

367 if is_prod_env: 

368 return HTTPResponse(status=HTTPStatus.NOT_FOUND) 

369 

370 async with pool_acquire(pg_pool) as conn: 

371 db_game = await queries.select_game(conn=conn, game_id=game_id) 

372 

373 if not db_game: 

374 return HTTPResponse(status=HTTPStatus.NOT_FOUND) 

375 

376 game_dict = dataclasses.asdict(db_game) 

377 game_dict["start_time"] = db_game.start_time.isoformat() 

378 stop_time = db_game.stop_time 

379 if stop_time: 

380 game_dict["stop_time"] = stop_time.isoformat() 

381 game_dict["game_server_address"] = str(db_game.game_server_address) 

382 

383 return sanic.json(game_dict) 

384 

385 

386@api_v1.post("/game") 

387async def post_game( 

388 request: Request, 

389 pg_pool: asyncpg.Pool, 

390 client: openai.AsyncOpenAI, 

391) -> HTTPResponse: 

392 try: 

393 data = request.body.decode("utf-8") 

394 level, friendly_level_name, port = data.split("\n") 

395 game_port = int(port) 

396 except Exception as e: 

397 logger.info("error parsing game data: {}: {}", type(e).__name__, e) 

398 logger.opt(exception=e).debug("") 

399 return HTTPResponse(status=HTTPStatus.BAD_REQUEST) 

400 

401 now = utcnow() 

402 game_id = secrets.token_hex(game_id_length) 

403 addr = get_remote_addr(request) 

404 

405 # TODO: this would be so much better with functools.Placeholder! 

406 tasks_args = ( 

407 (get_scoreboard_markdown_table, (game_id,)), 

408 (get_kills_markdown_table, (game_id, now)), 

409 (get_chat_messages_markdown_table, (game_id, now)), 

410 ) 

411 async with pool_acquire_many( 

412 pool=pg_pool, 

413 count=len(tasks_args), 

414 ) as conns: 

415 tasks = ( 

416 # TODO: fix this type error with Python 3.14 and 

417 # functools.Placeholder! Upgrade to 3.14! 

418 task(conns[i], *args) # type: ignore[operator] 

419 for i, (task, args) in enumerate(tasks_args) 

420 ) 

421 ((_, scoreboard_table), 

422 (_, kills_table), 

423 (_, chat_msgs_table)) = await asyncio.gather(*tasks) 

424 

425 async with pool_acquire(pg_pool) as conn: 

426 async with conn.transaction(): 

427 await queries.insert_game( 

428 conn=conn, 

429 game_id=game_id, 

430 level=level, 

431 game_server_address=addr, 

432 game_server_port=game_port, 

433 start_time=now, 

434 stop_time=None, 

435 openai_previous_response_id=None, 

436 ) 

437 

438 # TODO: should this be parametrized? At least take in the name? 

439 llm_task = default_llm_task 

440 

441 if friendly_level_name: 

442 level_sanitized = friendly_level_name 

443 else: 

444 level_sanitized = sanitize_level_name(level) 

445 

446 # TODO: should we parametrize this? 

447 initial_instruction = ( 

448 f"Provide a short greeting/boot-up message " 

449 f"(maximum length {max_message_length} characters)." 

450 ) 

451 

452 prompt = format_base_prompt_initial( 

453 llm_task=llm_task, 

454 level_sanitized=level_sanitized, 

455 markdown_scoreboard_table=scoreboard_table, 

456 markdown_kills_table=kills_table, 

457 markdown_chat_msgs_table=chat_msgs_table, 

458 initial_instruction=initial_instruction, 

459 ) 

460 

461 async with conn.transaction(): 

462 openai_resp = await client.responses.create( 

463 model=openai_model, 

464 input=prompt, 

465 timeout=openai_timeout, 

466 ) 

467 await queries.insert_openai_query( 

468 game_id=game_id, 

469 conn=conn, 

470 time=utcnow(), 

471 game_server_address=addr, 

472 game_server_port=game_port, 

473 request_length=len(prompt), 

474 response_length=len(openai_resp.output_text), 

475 openai_response_id=openai_resp.id, 

476 ) 

477 

478 greeting = openai_resp.output_text 

479 

480 return sanic.text( 

481 f"{game_id}\n{greeting}", 

482 status=HTTPStatus.CREATED, 

483 # TODO: use url_for! 

484 headers={"Location": f"{api_v1.version_prefix}{api_v1.version}/game/{game_id}"}, 

485 # TODO: put the full domain here? 

486 ) 

487 

488 

489@api_v1.put("/game/<game_id:str>") 

490@check_and_inject_game 

491async def put_game( 

492 request: Request, 

493 game_id: str, 

494 pg_pool: asyncpg.Pool, 

495) -> HTTPResponse: 

496 """Update existing game. We break a REST principle here by 

497 allowing partial updates in PUT, mostly because we're lazy, 

498 due to the fact that the existing UScript HTTP client lacks 

499 PATCH support. 

500 """ 

501 # Assuming the only time we'll get a PUT on /game is when 

502 # it's marked as finished by the game server. 

503 # TODO: make this support other fields too if needed. 

504 

505 start_time = request.ctx.game.start_time 

506 

507 try: 

508 world_time = float(request.body.decode("utf-8")) 

509 stop_time = start_time + datetime.timedelta(seconds=world_time) 

510 except Exception as e: 

511 logger.debug("error parsing game data: {}: {}", type(e).__name__, e) 

512 return HTTPResponse(status=HTTPStatus.BAD_REQUEST) 

513 

514 async with pool_acquire(pg_pool) as conn: 

515 async with conn.transaction(): 

516 await queries.update_game( 

517 conn=conn, 

518 game_id=game_id, 

519 stop_time=stop_time, 

520 ) 

521 

522 return HTTPResponse(status=HTTPStatus.NO_CONTENT) 

523 

524 

525@api_v1.post("/game/<game_id:str>/message") 

526@check_and_inject_game 

527async def post_game_message( 

528 request: Request, 

529 game_id: str, 

530 pg_pool: asyncpg.Pool, 

531 client: openai.AsyncOpenAI, 

532) -> HTTPResponse: 

533 # TODO: full implementation! Prompt building! 

534 

535 game = request.ctx.game 

536 

537 previous_response_id: str | None = game.openai_previous_response_id 

538 if previous_response_id is None: 

539 logger.warning("unable to handle request for game with no openai_previous_response_id") 

540 return HTTPResponse(status=HTTPStatus.SERVICE_UNAVAILABLE) 

541 

542 async with pool_acquire(pg_pool) as conn: 

543 previous_query = await queries.select_openai_query( 

544 conn=conn, 

545 openai_response_id=previous_response_id, 

546 ) 

547 if not previous_query: 

548 logger.warning("cannot find OpenAI query for id: {}", previous_response_id) 

549 return HTTPResponse(status=HTTPStatus.SERVICE_UNAVAILABLE) 

550 

551 try: 

552 data_in = request.body.decode("utf-8").split("\n") 

553 say_type = SayType(data_in[0]) 

554 say_team = Team(data_in[1]) 

555 say_name = data_in[2] 

556 prompt_in = data_in[3] 

557 except Exception as e: 

558 logger.info("error parsing game message data: {}: {}", type(e).__name__, e) 

559 # TODO: debug log stack trace or something? 

560 return HTTPResponse(status=HTTPStatus.BAD_REQUEST) 

561 

562 # TODO: this would be so much better with functools.Placeholder! 

563 tasks_args = ( 

564 (get_scoreboard_markdown_table, (game_id,)), 

565 (get_kills_markdown_table, (game_id, previous_query.time)), 

566 (get_chat_messages_markdown_table, (game_id, previous_query.time)), 

567 ) 

568 async with pool_acquire_many( 

569 pool=pg_pool, 

570 count=len(tasks_args), 

571 ) as conns: 

572 tasks = ( 

573 # TODO: fix this type error with Python 3.14 and 

574 # functools.Placeholder! Upgrade to 3.14! 

575 task(conns[i], *args) # type: ignore[operator] 

576 for i, (task, args) in enumerate(tasks_args) 

577 ) 

578 ((_, scoreboard_table), 

579 (num_kills, kills_table), 

580 (num_msgs, chat_msgs_table)) = await asyncio.gather(*tasks) 

581 

582 # TODO: have some maximum upper limit for total prompt length? 

583 prompt = format_base_prompt_consecutive( 

584 markdown_scoreboard_table=scoreboard_table, 

585 markdown_kills_table=kills_table, 

586 num_kills=num_kills, 

587 markdown_chat_msgs_table=chat_msgs_table, 

588 num_msgs=num_msgs, 

589 instruction=prompt_in, # TODO! 

590 ) 

591 

592 async with pool_acquire(pg_pool) as conn: 

593 async with conn.transaction(): 

594 # TODO: how to best use instruction param here? 

595 resp = await client.responses.create( 

596 model=openai_model, 

597 input=prompt, 

598 previous_response_id=previous_response_id, 

599 timeout=openai_timeout, 

600 ) 

601 await queries.insert_openai_query( 

602 conn=conn, 

603 game_id=game_id, 

604 time=utcnow(), 

605 game_server_address=game.game_server_address, 

606 game_server_port=game.game_server_port, 

607 request_length=len(prompt), 

608 response_length=len(resp.output_text), 

609 openai_response_id=resp.id, 

610 ) 

611 

612 msg = resp.output_text.replace("\n", " ") 

613 resp_data = f"{say_type}\n{say_team}\n{say_name}\n{msg}" 

614 

615 return sanic.text( 

616 resp_data, 

617 status=HTTPStatus.OK, 

618 ) 

619 

620 

621@api_v1.post("/game/<game_id:str>/kill") 

622@check_and_inject_game 

623async def post_game_kill( 

624 request: Request, 

625 game_id: str, 

626 pg_pool: asyncpg.Pool, 

627) -> HTTPResponse: 

628 game = request.ctx.game 

629 

630 try: 

631 parts = request.body.decode("utf-8").split("\n") 

632 world_time = float(parts[0]) 

633 killer_name = parts[1] 

634 victim_name = parts[2] 

635 killer_team = Team(parts[3]) 

636 victim_team = Team(parts[4]) 

637 damage_type = parts[5] 

638 kill_distance_m = float(parts[6]) 

639 

640 kill_time = game.start_time + datetime.timedelta(seconds=world_time) 

641 except Exception as e: 

642 logger.debug("failed to parse game kill data: {}: {}", type(e).__name__, e) 

643 return sanic.HTTPResponse(status=HTTPStatus.BAD_REQUEST) 

644 

645 async with pool_acquire(pg_pool) as conn: 

646 async with conn.transaction(): 

647 await queries.insert_game_kill( 

648 conn=conn, 

649 game_id=game_id, 

650 kill_time=kill_time, 

651 killer_name=killer_name, 

652 victim_name=victim_name, 

653 killer_team=killer_team, 

654 victim_team=victim_team, 

655 damage_type=damage_type, 

656 kill_distance_m=kill_distance_m, 

657 ) 

658 

659 return sanic.HTTPResponse(status=HTTPStatus.NO_CONTENT) 

660 

661 

662@api_v1.put("/game/<game_id:str>/player/<player_id:int>") 

663@check_and_inject_game 

664async def put_game_player( 

665 request: Request, 

666 game_id: str, 

667 player_id: int, 

668 pg_pool: asyncpg.Pool, 

669) -> HTTPResponse: 

670 _ = request.ctx.game # TODO: needed here? 

671 

672 try: 

673 parts = request.body.decode("utf-8").split("\n") 

674 name = parts[0] 

675 team = Team(parts[1]) 

676 score = int(parts[2]) 

677 except Exception as e: 

678 logger.debug("failed to parse game player data: {}: {}", type(e).__name__, e) 

679 return sanic.HTTPResponse(status=HTTPStatus.BAD_REQUEST) 

680 

681 async with pool_acquire(pg_pool) as conn: 

682 async with conn.transaction(): 

683 created = await queries.upsert_game_player( 

684 conn=conn, 

685 game_id=game_id, 

686 player_id=player_id, 

687 name=name, 

688 team_index=int(team), 

689 score=score, 

690 ) 

691 

692 status = HTTPStatus.CREATED if created else HTTPStatus.NO_CONTENT 

693 return sanic.HTTPResponse(status=status) 

694 

695 

696@api_v1.delete("/game/<game_id:str>/player/<player_id:int>") 

697@check_and_inject_game 

698async def delete_game_player( 

699 _: Request, 

700 game_id: str, 

701 player_id: int, 

702 pg_pool: asyncpg.Pool, 

703) -> HTTPResponse: 

704 async with pool_acquire(pg_pool) as conn: 

705 if not await queries.game_player_exists( 

706 conn=conn, 

707 game_id=game_id, 

708 player_id=player_id, 

709 ): 

710 return HTTPResponse(status=HTTPStatus.NOT_FOUND) 

711 

712 async with conn.transaction(): 

713 await queries.delete_game_player( 

714 conn=conn, 

715 game_id=game_id, 

716 player_id=player_id, 

717 ) 

718 

719 return HTTPResponse(status=HTTPStatus.NO_CONTENT) 

720 

721 

722@api_v1.post("/game/<game_id:str>/chat_message") 

723@check_and_inject_game 

724async def post_game_chat_message( 

725 request: Request, 

726 game_id: str, 

727 pg_pool: asyncpg.Pool, 

728) -> HTTPResponse: 

729 _ = request.ctx.game # TODO: needed here? 

730 

731 try: 

732 parts = request.body.decode("utf-8").split("\n") 

733 player_name = parts[0] 

734 player_team = Team(parts[1]) 

735 say_type = SayType(parts[2]) 

736 msg = parts[3] 

737 async with pool_acquire(pg_pool) as conn: 

738 async with conn.transaction(): 

739 await queries.insert_game_chat_message( 

740 conn=conn, 

741 game_id=game_id, 

742 message=msg, 

743 send_time=utcnow(), 

744 sender_name=player_name, 

745 sender_team=player_team, 

746 channel=say_type, 

747 ) 

748 except Exception as e: 

749 logger.debug("failed to parse chat message data: {}: {}", type(e).__name__, e) 

750 return sanic.HTTPResponse(status=HTTPStatus.BAD_REQUEST) 

751 

752 return sanic.HTTPResponse( 

753 status=HTTPStatus.NO_CONTENT, 

754 # TODO: do even want to do this? Do we need getters for these resources? 

755 # headers={"Location": f"/game/{game_id}/{chat_message_id}"}, # TODO: put full domain here? 

756 ) 

757 

758 

759@api_v1.put("/game/<game_id:str>/objective_state") 

760@check_and_inject_game 

761async def put_game_objective_state( 

762 request: Request, 

763 game_id: str, 

764 pg_pool: asyncpg.Pool, 

765) -> HTTPResponse: 

766 # Defensive check to avoid passing long strings to literal_eval. 

767 if len(request.body) > max_ast_literal_eval_size: 

768 return sanic.HTTPResponse(status=HTTPStatus.BAD_REQUEST) 

769 

770 _ = request.ctx.game # TODO: needed here? 

771 

772 # TODO: maybe do proper relative DB design for this if needed? 

773 # Right now it is done the quick and dirty way on purpose. 

774 # [("Objective A",0),("Objective B",1),...] 

775 try: 

776 data = request.body.decode("utf-8") 

777 if not data: 

778 raise ValueError("no objective state data") 

779 

780 obj_state = GameObjectiveState.from_wire_format( 

781 game_id=game_id, 

782 wire_format_data=data, 

783 ) 

784 

785 except Exception as e: 

786 logger.debug("error parsing objectives data: {}: {}", type(e).__name__, e) 

787 logger.opt(exception=e).debug("") # TODO: should use this more! 

788 return HTTPResponse(status=HTTPStatus.BAD_REQUEST) 

789 

790 async with pool_acquire(pg_pool) as conn: 

791 async with conn.transaction(): 

792 created = await queries.upsert_game_objective_state( 

793 conn=conn, 

794 state=obj_state, 

795 ) 

796 

797 status = HTTPStatus.CREATED if created else HTTPStatus.NO_CONTENT 

798 return sanic.HTTPResponse(status=status) 

799 

800 

801async def db_maintenance(stop_event: EventType) -> None: 

802 pool: asyncpg.Pool | None = None 

803 

804 try: 

805 logger.debug("db_maintenance starting") 

806 

807 db_url = os.environ.get("DATABASE_URL") 

808 pool = await asyncpg.create_pool(dsn=db_url, min_size=1, max_size=1) 

809 

810 while not stop_event.wait(db_maintenance_interval): 

811 async with pool_acquire(pool) as conn: 

812 async with conn.transaction(): 

813 result = await queries.delete_completed_games(conn, game_expiration) 

814 logger.info("delete_completed_games: {}", result) 

815 

816 async with conn.transaction(): 

817 result = await queries.delete_old_api_keys( 

818 conn, 

819 leeway=api_key_deletion_leeway, 

820 ) 

821 logger.info("delete_old_api_keys: {}", result) 

822 

823 except KeyboardInterrupt: 

824 logger.debug("db_maintenance stopping") 

825 if pool: 

826 await pool.close() 

827 

828 

829async def refresh_steam_web_api_cache(stop_event: EventType) -> None: 

830 pool: asyncpg.Pool | None = None 

831 

832 try: 

833 db_url = os.environ.get("DATABASE_URL") 

834 pool = await asyncpg.create_pool(dsn=db_url, min_size=1, max_size=1) 

835 

836 while not stop_event.wait(steam_web_api_cache_refresh_interval): 

837 async with pool_acquire(pool) as conn: 

838 api_keys = await queries.select_game_server_api_keys(conn) 

839 logger.info("refreshing Steam Web API cache for {} keys", len(api_keys)) 

840 async with httpx.AsyncClient(timeout=30.0) as client: 

841 tasks = [ 

842 is_real_game_server( 

843 game_server_address=api_key["game_server_address"], 

844 game_server_port=api_key["game_server_port"], 

845 pg_pool=pool, 

846 http_client=client, 

847 ) 

848 for api_key in api_keys 

849 ] 

850 await asyncio.gather(*tasks) 

851 

852 except KeyboardInterrupt: 

853 if pool: 

854 await pool.close() 

855 

856 

857@api_v1.on_request 

858async def api_v1_on_request(request: Request) -> HTTPResponse | None: 

859 authenticated = await auth.check_token(request, request.app.ctx.pg_pool) 

860 if not authenticated: 

861 return sanic.text("Unauthorized.", status=HTTPStatus.UNAUTHORIZED) 

862 

863 return None 

864 

865 

866app = make_api_v1_app() 

867 

868if __name__ == "__main__": 

869 app.config.INSPECTOR = True 

870 logger.level("DEBUG") 

871 app.run(host="0.0.0.0", port=8080, debug=True, dev=True, access_log=True)