Coverage for chatgpt_proxy / app.py: 90%
366 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-12 16:19 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-12 16:19 +0000
1# MIT License
2#
3# Copyright (c) 2025 Tuomo Kriikkula
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy
6# of this software and associated documentation files (the "Software"), to deal
7# in the Software without restriction, including without limitation the rights
8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9# copies of the Software, and to permit persons to whom the Software is
10# furnished to do so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23"""
24Implements a simple proxy server for communication between an UnrealScript
25client and OpenAI servers.
26"""
28import asyncio
29import dataclasses
30import datetime
31import multiprocessing as mp
32import os
33import secrets
34from http import HTTPStatus
35from multiprocessing.synchronize import Event as EventType
36from pprint import pprint
37from typing import Any
38from typing import Awaitable
40import asyncpg
41import httpx
42import openai
43import sanic
44from py_markdown_table.markdown_table import markdown_table
45from sanic import Blueprint
46from sanic.response import HTTPResponse
48from chatgpt_proxy.auth import auth
49from chatgpt_proxy.auth import check_and_inject_game
50from chatgpt_proxy.auth import is_real_game_server
51from chatgpt_proxy.cache import app_cache
52from chatgpt_proxy.cache import db_cache
53from chatgpt_proxy.db import pool_acquire
54from chatgpt_proxy.db import pool_acquire_many
55from chatgpt_proxy.db import queries
56from chatgpt_proxy.db.models import GameObjectiveState
57from chatgpt_proxy.db.models import SayType
58from chatgpt_proxy.db.models import Team
59from chatgpt_proxy.db.models import max_ast_literal_eval_size
60from chatgpt_proxy.log import logger
61from chatgpt_proxy.types import App
62from chatgpt_proxy.types import Context
63from chatgpt_proxy.types import Request
64from chatgpt_proxy.utils import get_remote_addr
65from chatgpt_proxy.utils import is_prod_env
66from chatgpt_proxy.utils import utcnow
68# TODO: This breaks with nest_asyncio, which is also unmaintained!
69# if platform.system() == "Windows":
70# # noinspection PyUnresolvedReferences
71# import winloop # type: ignore[import-not-found]
72#
73# winloop.install()
75# TODO: need to come up with a more consistent way for logging errors
76# from request validation, etc.:
77# - Log basic error info at info level.
78# - Log the stack trace at debug level (use Sentry for this in prod?)
79# - Consider Sentry for logging all other unhandled errors!
81api_v1 = Blueprint("api", version_prefix="/api/v", version=1)
84def db_maintenance_process(stop_event: EventType) -> None:
85 logger.debug("db_maintenance_process starting")
86 asyncio.run(db_maintenance(stop_event))
87 asyncio.run(_suppress(app_cache.close())) # TODO: this doesn't actually run ever?
88 logger.debug("db_maintenance_process done")
91def refresh_steam_web_api_cache_process(stop_event: EventType) -> None:
92 asyncio.run(refresh_steam_web_api_cache(stop_event))
95def make_api_v1_app(name: str = "ChatGPTProxy", **kwargs: Any) -> App:
96 _app: App = sanic.Sanic(
97 name,
98 ctx=Context(),
99 request_class=Request, # type: ignore[arg-type]
100 **kwargs,
101 )
102 # We don't expect UScript side to send large requests.
103 _app.config.REQUEST_MAX_SIZE = 1500
104 _app.config.REQUEST_MAX_HEADER_SIZE = 1500
105 _app.config.OAS = not is_prod_env
106 _app.config.SECRET = os.environ["SANIC_SECRET"]
107 _app.config.JWT_ISSUER = auth.jwt_issuer
108 _app.config.JWT_AUDIENCE = auth.jwt_audience
110 @_app.main_process_ready
111 async def main_process_ready(app_: App):
112 app_.manager.manage(
113 name="DatabaseMaintenanceProcess",
114 func=db_maintenance_process,
115 kwargs={"stop_event": app_.shared_ctx.bg_process_event},
116 transient=True,
117 )
118 app_.manager.manage(
119 "SteamWebAPICacheRefreshProcess",
120 func=refresh_steam_web_api_cache_process,
121 kwargs={"stop_event": app_.shared_ctx.bg_process_event},
122 transient=True,
123 )
125 @_app.main_process_start
126 async def main_process_start(app_: App):
127 bg_process_event = mp.Event()
128 app_.shared_ctx.bg_process_event = bg_process_event
130 @_app.main_process_stop
131 async def main_process_stop(app_: App):
132 app_.shared_ctx.bg_process_event.set()
134 @_app.before_server_start
135 async def before_server_start(app_: App):
136 api_key = os.environ.get("OPENAI_API_KEY")
137 client = openai.AsyncOpenAI(api_key=api_key)
138 app_.ctx.client = client
139 app_.ext.dependency(client)
141 db_url = os.environ.get("DATABASE_URL")
142 pool = await asyncpg.create_pool(dsn=db_url)
143 app_.ctx.pg_pool = pool
144 app_.ext.dependency(pool)
146 app_.ctx.http_client = httpx.AsyncClient()
147 app_.ext.dependency(app_.ctx.http_client)
149 # noinspection PyProtectedMember
150 @_app.before_server_stop
151 async def before_server_stop(app_: App):
152 logger.debug("before_server_stop")
154 # TODO: cleanup should have timeouts!
155 # If timed out, ignore it but log warning!
157 if app_.ctx._client:
158 logger.debug("closing OpenAI client")
159 await _suppress(app_.ctx._client.close())
160 if app_.ctx._pg_pool:
161 logger.debug("closing pool")
162 await _suppress(app_.ctx._pg_pool.close())
163 if app_.ctx._http_client:
164 logger.debug("closing http client")
165 await _suppress(app_.ctx._http_client.aclose())
167 logger.debug("closing app cache")
168 await _suppress(app_cache.close())
169 logger.debug("closing DB cache")
170 await _suppress(db_cache.close())
172 _app.blueprint(api_v1)
174 return _app
177async def _suppress(f: Awaitable[Any]) -> Any:
178 try:
179 return await f
180 except Exception as e:
181 logger.exception("{}: error: {}", f, type(e).__name__)
184# TODO: dynamic model selection?
185openai_model = "gpt-5-nano"
186openai_timeout = 60.0 # TODO: this might be way too low?
188prompt_max_game_chat_msgs = 30
189prompt_max_game_kills = 30
191default_llm_task = """
192Your task is to roleplay as a 1970s retro-futuristic AI.
193Your name is MAINFRAME CHARLIE. You are provided with new
194information updates regularly and your job is to react to them,
195while staying in role. You are given data from an on-going real-life
196multiplayer video game match (Rising Storm 2: Vietnam) regularly.
197While writing responses, do not acknowledge the fact that you are
198reacting to a video game match. Pretend it is a real 1960s-1970s
199Vietnam war scenario. Keep your tone semi-serious, while using
200era and scenario appropriate humor every now and then.
201"""
203base_prompt_initial = """
204{llm_task}
206This is the beginning of a new game. The current level is {level_sanitized}.
208The game currently contains the following players:
209{markdown_scoreboard_table}
211The last kills scored during the game are the
212following (in ascending chronological order):
213{markdown_kills_table}
215The last chat messages sent during the game are the
216following (ascending in chronological order):
217{markdown_chat_msgs_table}
219{initial_instruction}
220"""
222# TODO: we should include the time since the last prompt here?
223base_prompt_consecutive = """
224The current game scoreboard:
225{markdown_scoreboard_table}
227Since the last update, the following kills
228have been scored (last {num_kills} kills):
229{markdown_kills_table}
231Since the last update, the following chat
232messages have been sent (last {num_messages} messages):
233{markdown_chat_msgs_table}
235{instruction}
236"""
238# TODO: set these lower for debug/dev env?
239# We prune all matches that have ended or are older
240# than 5 hours during database maintenance.
241game_expiration = datetime.timedelta(hours=5)
242api_key_deletion_leeway = datetime.timedelta(minutes=5)
243db_maintenance_interval = datetime.timedelta(minutes=30).total_seconds()
244steam_web_api_cache_refresh_interval = datetime.timedelta(minutes=30).total_seconds()
246game_id_length = 24
248# TODO: should this be parametrized? Sent in from the UScript side?
249max_message_length = 200
252def format_base_prompt_initial(
253 llm_task: str,
254 level_sanitized: str,
255 markdown_scoreboard_table: str,
256 markdown_kills_table: str,
257 markdown_chat_msgs_table: str,
258 initial_instruction: str,
259) -> str:
260 return base_prompt_initial.format(
261 llm_task=llm_task,
262 level_sanitized=level_sanitized,
263 markdown_scoreboard_table=markdown_scoreboard_table,
264 markdown_kills_table=markdown_kills_table,
265 markdown_chat_msgs_table=markdown_chat_msgs_table,
266 initial_instruction=initial_instruction,
267 )
270def format_base_prompt_consecutive(
271 markdown_scoreboard_table: str,
272 markdown_kills_table: str,
273 num_kills: int,
274 markdown_chat_msgs_table: str,
275 num_msgs: int,
276 instruction: str,
277) -> str:
278 return base_prompt_consecutive.format(
279 markdown_scoreboard_table=markdown_scoreboard_table,
280 markdown_kills_table=markdown_kills_table,
281 num_kills=num_kills,
282 markdown_chat_msgs_table=markdown_chat_msgs_table,
283 num_messages=num_msgs,
284 instruction=instruction,
285 )
288async def get_scoreboard_markdown_table(
289 conn: asyncpg.Connection,
290 game_id: str,
291) -> tuple[int, str]:
292 players = await queries.select_game_players(conn, game_id)
293 pprint(players)
295 scoreboard = ""
296 if players:
297 scoreboard = markdown_table([
298 player.as_markdown_dict()
299 for player in players
300 ]).get_markdown()
301 pprint(scoreboard)
303 return len(players), scoreboard
306async def get_kills_markdown_table(
307 conn: asyncpg.Connection,
308 game_id: str,
309 from_time: datetime.datetime,
310) -> tuple[int, str]:
311 candidate_kills = await queries.select_game_kills(
312 conn=conn,
313 game_id=game_id,
314 kill_time_from=from_time,
315 limit=prompt_max_game_kills,
316 )
317 pprint(candidate_kills)
319 kills_table = ""
320 if candidate_kills:
321 kills_table = markdown_table([
322 kill.as_markdown_dict()
323 for kill in candidate_kills
324 ]).get_markdown()
325 pprint(kills_table)
327 return len(candidate_kills), kills_table
330async def get_chat_messages_markdown_table(
331 conn: asyncpg.Connection,
332 game_id: str,
333 from_time: datetime.datetime,
334) -> tuple[int, str]:
335 candidate_msgs = await queries.select_game_chat_messages(
336 conn=conn,
337 game_id=game_id,
338 send_time_from=from_time,
339 limit=prompt_max_game_chat_msgs,
340 )
341 pprint(candidate_msgs)
343 msgs_table = ""
344 if candidate_msgs:
345 msgs_table = markdown_table([
346 msg.as_markdown_dict()
347 for msg in candidate_msgs
348 ]).get_markdown()
349 pprint(msgs_table)
351 return len(candidate_msgs), msgs_table
354def sanitize_level_name(level: str) -> str:
355 level = level.split("-")[-1]
356 level = level.replace("_", " ")
357 return level
360@api_v1.get("/game/<game_id:str>")
361async def get_game(
362 _: Request,
363 game_id: str,
364 pg_pool: asyncpg.Pool,
365) -> HTTPResponse:
366 # TODO: do we need these for anything other than testing?
367 if is_prod_env:
368 return HTTPResponse(status=HTTPStatus.NOT_FOUND)
370 async with pool_acquire(pg_pool) as conn:
371 db_game = await queries.select_game(conn=conn, game_id=game_id)
373 if not db_game:
374 return HTTPResponse(status=HTTPStatus.NOT_FOUND)
376 game_dict = dataclasses.asdict(db_game)
377 game_dict["start_time"] = db_game.start_time.isoformat()
378 stop_time = db_game.stop_time
379 if stop_time:
380 game_dict["stop_time"] = stop_time.isoformat()
381 game_dict["game_server_address"] = str(db_game.game_server_address)
383 return sanic.json(game_dict)
386@api_v1.post("/game")
387async def post_game(
388 request: Request,
389 pg_pool: asyncpg.Pool,
390 client: openai.AsyncOpenAI,
391) -> HTTPResponse:
392 try:
393 data = request.body.decode("utf-8")
394 level, friendly_level_name, port = data.split("\n")
395 game_port = int(port)
396 except Exception as e:
397 logger.info("error parsing game data: {}: {}", type(e).__name__, e)
398 logger.opt(exception=e).debug("")
399 return HTTPResponse(status=HTTPStatus.BAD_REQUEST)
401 now = utcnow()
402 game_id = secrets.token_hex(game_id_length)
403 addr = get_remote_addr(request)
405 # TODO: this would be so much better with functools.Placeholder!
406 tasks_args = (
407 (get_scoreboard_markdown_table, (game_id,)),
408 (get_kills_markdown_table, (game_id, now)),
409 (get_chat_messages_markdown_table, (game_id, now)),
410 )
411 async with pool_acquire_many(
412 pool=pg_pool,
413 count=len(tasks_args),
414 ) as conns:
415 tasks = (
416 # TODO: fix this type error with Python 3.14 and
417 # functools.Placeholder! Upgrade to 3.14!
418 task(conns[i], *args) # type: ignore[operator]
419 for i, (task, args) in enumerate(tasks_args)
420 )
421 ((_, scoreboard_table),
422 (_, kills_table),
423 (_, chat_msgs_table)) = await asyncio.gather(*tasks)
425 async with pool_acquire(pg_pool) as conn:
426 async with conn.transaction():
427 await queries.insert_game(
428 conn=conn,
429 game_id=game_id,
430 level=level,
431 game_server_address=addr,
432 game_server_port=game_port,
433 start_time=now,
434 stop_time=None,
435 openai_previous_response_id=None,
436 )
438 # TODO: should this be parametrized? At least take in the name?
439 llm_task = default_llm_task
441 if friendly_level_name:
442 level_sanitized = friendly_level_name
443 else:
444 level_sanitized = sanitize_level_name(level)
446 # TODO: should we parametrize this?
447 initial_instruction = (
448 f"Provide a short greeting/boot-up message "
449 f"(maximum length {max_message_length} characters)."
450 )
452 prompt = format_base_prompt_initial(
453 llm_task=llm_task,
454 level_sanitized=level_sanitized,
455 markdown_scoreboard_table=scoreboard_table,
456 markdown_kills_table=kills_table,
457 markdown_chat_msgs_table=chat_msgs_table,
458 initial_instruction=initial_instruction,
459 )
461 async with conn.transaction():
462 openai_resp = await client.responses.create(
463 model=openai_model,
464 input=prompt,
465 timeout=openai_timeout,
466 )
467 await queries.insert_openai_query(
468 game_id=game_id,
469 conn=conn,
470 time=utcnow(),
471 game_server_address=addr,
472 game_server_port=game_port,
473 request_length=len(prompt),
474 response_length=len(openai_resp.output_text),
475 openai_response_id=openai_resp.id,
476 )
478 greeting = openai_resp.output_text
480 return sanic.text(
481 f"{game_id}\n{greeting}",
482 status=HTTPStatus.CREATED,
483 # TODO: use url_for!
484 headers={"Location": f"{api_v1.version_prefix}{api_v1.version}/game/{game_id}"},
485 # TODO: put the full domain here?
486 )
489@api_v1.put("/game/<game_id:str>")
490@check_and_inject_game
491async def put_game(
492 request: Request,
493 game_id: str,
494 pg_pool: asyncpg.Pool,
495) -> HTTPResponse:
496 """Update existing game. We break a REST principle here by
497 allowing partial updates in PUT, mostly because we're lazy,
498 due to the fact that the existing UScript HTTP client lacks
499 PATCH support.
500 """
501 # Assuming the only time we'll get a PUT on /game is when
502 # it's marked as finished by the game server.
503 # TODO: make this support other fields too if needed.
505 start_time = request.ctx.game.start_time
507 try:
508 world_time = float(request.body.decode("utf-8"))
509 stop_time = start_time + datetime.timedelta(seconds=world_time)
510 except Exception as e:
511 logger.debug("error parsing game data: {}: {}", type(e).__name__, e)
512 return HTTPResponse(status=HTTPStatus.BAD_REQUEST)
514 async with pool_acquire(pg_pool) as conn:
515 async with conn.transaction():
516 await queries.update_game(
517 conn=conn,
518 game_id=game_id,
519 stop_time=stop_time,
520 )
522 return HTTPResponse(status=HTTPStatus.NO_CONTENT)
525@api_v1.post("/game/<game_id:str>/message")
526@check_and_inject_game
527async def post_game_message(
528 request: Request,
529 game_id: str,
530 pg_pool: asyncpg.Pool,
531 client: openai.AsyncOpenAI,
532) -> HTTPResponse:
533 # TODO: full implementation! Prompt building!
535 game = request.ctx.game
537 previous_response_id: str | None = game.openai_previous_response_id
538 if previous_response_id is None:
539 logger.warning("unable to handle request for game with no openai_previous_response_id")
540 return HTTPResponse(status=HTTPStatus.SERVICE_UNAVAILABLE)
542 async with pool_acquire(pg_pool) as conn:
543 previous_query = await queries.select_openai_query(
544 conn=conn,
545 openai_response_id=previous_response_id,
546 )
547 if not previous_query:
548 logger.warning("cannot find OpenAI query for id: {}", previous_response_id)
549 return HTTPResponse(status=HTTPStatus.SERVICE_UNAVAILABLE)
551 try:
552 data_in = request.body.decode("utf-8").split("\n")
553 say_type = SayType(data_in[0])
554 say_team = Team(data_in[1])
555 say_name = data_in[2]
556 prompt_in = data_in[3]
557 except Exception as e:
558 logger.info("error parsing game message data: {}: {}", type(e).__name__, e)
559 # TODO: debug log stack trace or something?
560 return HTTPResponse(status=HTTPStatus.BAD_REQUEST)
562 # TODO: this would be so much better with functools.Placeholder!
563 tasks_args = (
564 (get_scoreboard_markdown_table, (game_id,)),
565 (get_kills_markdown_table, (game_id, previous_query.time)),
566 (get_chat_messages_markdown_table, (game_id, previous_query.time)),
567 )
568 async with pool_acquire_many(
569 pool=pg_pool,
570 count=len(tasks_args),
571 ) as conns:
572 tasks = (
573 # TODO: fix this type error with Python 3.14 and
574 # functools.Placeholder! Upgrade to 3.14!
575 task(conns[i], *args) # type: ignore[operator]
576 for i, (task, args) in enumerate(tasks_args)
577 )
578 ((_, scoreboard_table),
579 (num_kills, kills_table),
580 (num_msgs, chat_msgs_table)) = await asyncio.gather(*tasks)
582 # TODO: have some maximum upper limit for total prompt length?
583 prompt = format_base_prompt_consecutive(
584 markdown_scoreboard_table=scoreboard_table,
585 markdown_kills_table=kills_table,
586 num_kills=num_kills,
587 markdown_chat_msgs_table=chat_msgs_table,
588 num_msgs=num_msgs,
589 instruction=prompt_in, # TODO!
590 )
592 async with pool_acquire(pg_pool) as conn:
593 async with conn.transaction():
594 # TODO: how to best use instruction param here?
595 resp = await client.responses.create(
596 model=openai_model,
597 input=prompt,
598 previous_response_id=previous_response_id,
599 timeout=openai_timeout,
600 )
601 await queries.insert_openai_query(
602 conn=conn,
603 game_id=game_id,
604 time=utcnow(),
605 game_server_address=game.game_server_address,
606 game_server_port=game.game_server_port,
607 request_length=len(prompt),
608 response_length=len(resp.output_text),
609 openai_response_id=resp.id,
610 )
612 msg = resp.output_text.replace("\n", " ")
613 resp_data = f"{say_type}\n{say_team}\n{say_name}\n{msg}"
615 return sanic.text(
616 resp_data,
617 status=HTTPStatus.OK,
618 )
621@api_v1.post("/game/<game_id:str>/kill")
622@check_and_inject_game
623async def post_game_kill(
624 request: Request,
625 game_id: str,
626 pg_pool: asyncpg.Pool,
627) -> HTTPResponse:
628 game = request.ctx.game
630 try:
631 parts = request.body.decode("utf-8").split("\n")
632 world_time = float(parts[0])
633 killer_name = parts[1]
634 victim_name = parts[2]
635 killer_team = Team(parts[3])
636 victim_team = Team(parts[4])
637 damage_type = parts[5]
638 kill_distance_m = float(parts[6])
640 kill_time = game.start_time + datetime.timedelta(seconds=world_time)
641 except Exception as e:
642 logger.debug("failed to parse game kill data: {}: {}", type(e).__name__, e)
643 return sanic.HTTPResponse(status=HTTPStatus.BAD_REQUEST)
645 async with pool_acquire(pg_pool) as conn:
646 async with conn.transaction():
647 await queries.insert_game_kill(
648 conn=conn,
649 game_id=game_id,
650 kill_time=kill_time,
651 killer_name=killer_name,
652 victim_name=victim_name,
653 killer_team=killer_team,
654 victim_team=victim_team,
655 damage_type=damage_type,
656 kill_distance_m=kill_distance_m,
657 )
659 return sanic.HTTPResponse(status=HTTPStatus.NO_CONTENT)
662@api_v1.put("/game/<game_id:str>/player/<player_id:int>")
663@check_and_inject_game
664async def put_game_player(
665 request: Request,
666 game_id: str,
667 player_id: int,
668 pg_pool: asyncpg.Pool,
669) -> HTTPResponse:
670 _ = request.ctx.game # TODO: needed here?
672 try:
673 parts = request.body.decode("utf-8").split("\n")
674 name = parts[0]
675 team = Team(parts[1])
676 score = int(parts[2])
677 except Exception as e:
678 logger.debug("failed to parse game player data: {}: {}", type(e).__name__, e)
679 return sanic.HTTPResponse(status=HTTPStatus.BAD_REQUEST)
681 async with pool_acquire(pg_pool) as conn:
682 async with conn.transaction():
683 created = await queries.upsert_game_player(
684 conn=conn,
685 game_id=game_id,
686 player_id=player_id,
687 name=name,
688 team_index=int(team),
689 score=score,
690 )
692 status = HTTPStatus.CREATED if created else HTTPStatus.NO_CONTENT
693 return sanic.HTTPResponse(status=status)
696@api_v1.delete("/game/<game_id:str>/player/<player_id:int>")
697@check_and_inject_game
698async def delete_game_player(
699 _: Request,
700 game_id: str,
701 player_id: int,
702 pg_pool: asyncpg.Pool,
703) -> HTTPResponse:
704 async with pool_acquire(pg_pool) as conn:
705 if not await queries.game_player_exists(
706 conn=conn,
707 game_id=game_id,
708 player_id=player_id,
709 ):
710 return HTTPResponse(status=HTTPStatus.NOT_FOUND)
712 async with conn.transaction():
713 await queries.delete_game_player(
714 conn=conn,
715 game_id=game_id,
716 player_id=player_id,
717 )
719 return HTTPResponse(status=HTTPStatus.NO_CONTENT)
722@api_v1.post("/game/<game_id:str>/chat_message")
723@check_and_inject_game
724async def post_game_chat_message(
725 request: Request,
726 game_id: str,
727 pg_pool: asyncpg.Pool,
728) -> HTTPResponse:
729 _ = request.ctx.game # TODO: needed here?
731 try:
732 parts = request.body.decode("utf-8").split("\n")
733 player_name = parts[0]
734 player_team = Team(parts[1])
735 say_type = SayType(parts[2])
736 msg = parts[3]
737 async with pool_acquire(pg_pool) as conn:
738 async with conn.transaction():
739 await queries.insert_game_chat_message(
740 conn=conn,
741 game_id=game_id,
742 message=msg,
743 send_time=utcnow(),
744 sender_name=player_name,
745 sender_team=player_team,
746 channel=say_type,
747 )
748 except Exception as e:
749 logger.debug("failed to parse chat message data: {}: {}", type(e).__name__, e)
750 return sanic.HTTPResponse(status=HTTPStatus.BAD_REQUEST)
752 return sanic.HTTPResponse(
753 status=HTTPStatus.NO_CONTENT,
754 # TODO: do even want to do this? Do we need getters for these resources?
755 # headers={"Location": f"/game/{game_id}/{chat_message_id}"}, # TODO: put full domain here?
756 )
759@api_v1.put("/game/<game_id:str>/objective_state")
760@check_and_inject_game
761async def put_game_objective_state(
762 request: Request,
763 game_id: str,
764 pg_pool: asyncpg.Pool,
765) -> HTTPResponse:
766 # Defensive check to avoid passing long strings to literal_eval.
767 if len(request.body) > max_ast_literal_eval_size:
768 return sanic.HTTPResponse(status=HTTPStatus.BAD_REQUEST)
770 _ = request.ctx.game # TODO: needed here?
772 # TODO: maybe do proper relative DB design for this if needed?
773 # Right now it is done the quick and dirty way on purpose.
774 # [("Objective A",0),("Objective B",1),...]
775 try:
776 data = request.body.decode("utf-8")
777 if not data:
778 raise ValueError("no objective state data")
780 obj_state = GameObjectiveState.from_wire_format(
781 game_id=game_id,
782 wire_format_data=data,
783 )
785 except Exception as e:
786 logger.debug("error parsing objectives data: {}: {}", type(e).__name__, e)
787 logger.opt(exception=e).debug("") # TODO: should use this more!
788 return HTTPResponse(status=HTTPStatus.BAD_REQUEST)
790 async with pool_acquire(pg_pool) as conn:
791 async with conn.transaction():
792 created = await queries.upsert_game_objective_state(
793 conn=conn,
794 state=obj_state,
795 )
797 status = HTTPStatus.CREATED if created else HTTPStatus.NO_CONTENT
798 return sanic.HTTPResponse(status=status)
801async def db_maintenance(stop_event: EventType) -> None:
802 pool: asyncpg.Pool | None = None
804 try:
805 logger.debug("db_maintenance starting")
807 db_url = os.environ.get("DATABASE_URL")
808 pool = await asyncpg.create_pool(dsn=db_url, min_size=1, max_size=1)
810 while not stop_event.wait(db_maintenance_interval):
811 async with pool_acquire(pool) as conn:
812 async with conn.transaction():
813 result = await queries.delete_completed_games(conn, game_expiration)
814 logger.info("delete_completed_games: {}", result)
816 async with conn.transaction():
817 result = await queries.delete_old_api_keys(
818 conn,
819 leeway=api_key_deletion_leeway,
820 )
821 logger.info("delete_old_api_keys: {}", result)
823 except KeyboardInterrupt:
824 logger.debug("db_maintenance stopping")
825 if pool:
826 await pool.close()
829async def refresh_steam_web_api_cache(stop_event: EventType) -> None:
830 pool: asyncpg.Pool | None = None
832 try:
833 db_url = os.environ.get("DATABASE_URL")
834 pool = await asyncpg.create_pool(dsn=db_url, min_size=1, max_size=1)
836 while not stop_event.wait(steam_web_api_cache_refresh_interval):
837 async with pool_acquire(pool) as conn:
838 api_keys = await queries.select_game_server_api_keys(conn)
839 logger.info("refreshing Steam Web API cache for {} keys", len(api_keys))
840 async with httpx.AsyncClient(timeout=30.0) as client:
841 tasks = [
842 is_real_game_server(
843 game_server_address=api_key["game_server_address"],
844 game_server_port=api_key["game_server_port"],
845 pg_pool=pool,
846 http_client=client,
847 )
848 for api_key in api_keys
849 ]
850 await asyncio.gather(*tasks)
852 except KeyboardInterrupt:
853 if pool:
854 await pool.close()
857@api_v1.on_request
858async def api_v1_on_request(request: Request) -> HTTPResponse | None:
859 authenticated = await auth.check_token(request, request.app.ctx.pg_pool)
860 if not authenticated:
861 return sanic.text("Unauthorized.", status=HTTPStatus.UNAUTHORIZED)
863 return None
866app = make_api_v1_app()
868if __name__ == "__main__":
869 app.config.INSPECTOR = True
870 logger.level("DEBUG")
871 app.run(host="0.0.0.0", port=8080, debug=True, dev=True, access_log=True)