Coverage for chatgpt_proxy / tests / test_maintenance.py: 95%
111 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-12 16:19 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-12 16:19 +0000
1# MIT License
2#
3# Copyright (c) 2025 Tuomo Kriikkula
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy
6# of this software and associated documentation files (the "Software"), to deal
7# in the Software without restriction, including without limitation the rights
8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9# copies of the Software, and to permit persons to whom the Software is
10# furnished to do so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import asyncio
24import datetime
25import hashlib
26import ipaddress
27import logging
28import random
29import threading
30from typing import AsyncGenerator
31from typing import Callable
32from typing import Coroutine
34import aiocache
35import asyncpg
36import nest_asyncio
37import pytest
38import pytest_asyncio
39from pytest_loguru.plugin import caplog # noqa: F401
41from chatgpt_proxy.auth.auth import is_real_game_server_key_builder
42from chatgpt_proxy.db import queries
43from chatgpt_proxy.tests import setup # noqa: E402
44from chatgpt_proxy.utils import utils
46setup.common_test_setup()
48import chatgpt_proxy.app # noqa: E402
49from chatgpt_proxy.app import db_maintenance # noqa: E402
50from chatgpt_proxy.app import refresh_steam_web_api_cache # noqa: E402
51from chatgpt_proxy.db import pool_acquire # noqa: E402
52from chatgpt_proxy.log import logger # noqa: E402
54logger.level("DEBUG")
56_db_timeout = setup.default_test_db_timeout
58_background_tasks = set()
61@pytest_asyncio.fixture
62async def maintenance_fixture(
63) -> AsyncGenerator[asyncpg.Connection]:
64 global _task_exception
65 _task_exception = None
67 loop = asyncio.get_running_loop()
68 nest_asyncio.apply(loop=loop)
70 db_fixture_pool = await asyncpg.create_pool(
71 dsn=setup.db_base_url,
72 min_size=1,
73 max_size=1,
74 timeout=_db_timeout,
75 loop=loop,
76 )
78 async with pool_acquire(db_fixture_pool, timeout=_db_timeout) as conn:
79 await setup.drop_test_db(conn, timeout=_db_timeout)
80 await setup.create_test_db(conn, timeout=_db_timeout)
82 test_db_pool = await asyncpg.create_pool(
83 dsn=setup.db_test_url,
84 min_size=1,
85 max_size=1,
86 timeout=_db_timeout,
87 loop=loop,
88 )
90 async with pool_acquire(
91 test_db_pool,
92 timeout=_db_timeout,
93 ) as conn:
94 async with conn.transaction():
95 await setup.initialize_test_db(conn, timeout=_db_timeout)
96 await setup.seed_test_db(conn, timeout=_db_timeout)
98 yield conn
100 async with pool_acquire(db_fixture_pool, timeout=_db_timeout) as conn:
101 await setup.drop_test_db(conn, timeout=_db_timeout)
103 await db_fixture_pool.close()
104 await test_db_pool.close()
107async def delayed_check_task(
108 stop_event: threading.Event,
109 check_result_coro: Callable[[], Coroutine[None, None, bool]],
110 timeout: float,
111):
112 try:
113 now = datetime.datetime.now()
114 to = now + datetime.timedelta(seconds=timeout)
115 while not await check_result_coro():
116 now = datetime.datetime.now()
117 if now > to:
118 # TODO: would be nice to communicate the reason here,
119 # for example by somehow returning a 'context'
120 # from check_result_coro!
121 raise TimeoutError("timed out")
122 await asyncio.sleep(0.05)
123 finally:
124 stop_event.set()
127_task_exception: BaseException | None = None
130def schedule_delayed_check(
131 stop_event: threading.Event,
132 check_result_coro: Callable[[], Coroutine[None, None, bool]],
133 timeout: float,
134):
135 task = asyncio.create_task(
136 delayed_check_task(stop_event, check_result_coro, timeout))
137 _background_tasks.add(task)
139 def check_task(task_: asyncio.Task[None]):
140 global _task_exception
141 ex = task_.exception()
142 if ex:
143 logger.error("task failed: {}: {}", task_, type(ex).__name__)
144 _task_exception = ex
146 task.add_done_callback(check_task)
147 task.add_done_callback(_background_tasks.discard)
150@pytest.mark.timeout(10)
151@pytest.mark.asyncio
152async def test_db_maintenance(maintenance_fixture, caplog) -> None:
153 caplog.set_level(logging.DEBUG)
154 db_conn = maintenance_fixture
156 chatgpt_proxy.app.db_maintenance_interval = 0.5
158 old_game_ids = [
159 "OLD_GAME_0",
160 "OLD_GAME_1",
161 "OLD_GAME_2",
162 ]
164 expr = chatgpt_proxy.app.game_expiration
165 expr_hours = expr.total_seconds() // 3600
167 await db_conn.execute(
168 f"""
169 INSERT INTO "game" (id,
170 level,
171 start_time,
172 stop_time,
173 game_server_address,
174 game_server_port,
175 openai_previous_response_id)
176 VALUES ('{old_game_ids[0]}',
177 'VNTE-Mapperino',
178 NOW() - INTERVAL '{expr_hours + 10} hours',
179 NOW() - INTERVAL '{expr_hours + 9} hours',
180 INET '127.0.1.1',
181 7777,
182 'openai_dummy_id_1'),
183 ('{old_game_ids[1]}',
184 'RRTE-DogCat',
185 NOW() - INTERVAL '{expr_hours + 9} hours',
186 NOW() - INTERVAL '{expr_hours + 8} hours',
187 INET '127.0.1.1',
188 7777,
189 'openai_dummy_id_2'),
190 ('{old_game_ids[2]}',
191 'VNSU-AnLaoValleyXD',
192 NOW() - INTERVAL '{expr_hours + 2} hours',
193 NOW() - INTERVAL '{expr_hours + 1} hours',
194 INET '127.0.1.1',
195 7777,
196 'openai_dummy_id_2')
197 ;
198 """,
199 timeout=_db_timeout,
200 )
202 now = utils.utcnow()
203 creations = [
204 now - datetime.timedelta(hours=x * 2, minutes=random.randint(0, 59))
205 for x in range(5)
206 ]
207 expirations = [
208 now - datetime.timedelta(hours=x + 1)
209 for x in range(5)
210 ]
211 key_hashes = [
212 hashlib.sha256(random.randbytes(64)).digest()
213 for _ in range(5)
214 ]
216 for x, (created_at, expires_at, key_hash) in enumerate(zip(creations, expirations, key_hashes)):
217 await db_conn.execute(
218 f"""
219 INSERT INTO "game_server_api_key"
220 (created_at, expires_at, api_key_hash, game_server_address, game_server_port, name)
221 VALUES
222 ($1, $2, $3, INET '127.0.1.1', 7777, 'test_maintenance API key {x}');
223 """,
224 created_at,
225 expires_at,
226 key_hash,
227 timeout=_db_timeout,
228 )
230 async def check_result() -> bool:
231 games = await queries.select_games(db_conn)
232 old_games_deleted = not any(
233 game.id in old_game_ids
234 for game in games
235 )
236 keys = await queries.select_game_server_api_keys(db_conn)
237 old_api_keys_deleted = not any(
238 key["api_key_hash"] in key_hashes
239 for key in keys
240 )
241 return old_games_deleted and old_api_keys_deleted
243 stop_event = threading.Event()
244 schedule_delayed_check(stop_event, check_result_coro=check_result, timeout=5.0)
245 await db_maintenance(stop_event) # type: ignore[arg-type]
246 if _task_exception:
247 raise _task_exception
250@pytest.mark.timeout(10)
251@pytest.mark.asyncio
252async def test_refresh_steam_web_api_cache(maintenance_fixture, caplog) -> None:
253 caplog.set_level(logging.DEBUG)
254 _ = maintenance_fixture
256 # TODO: fill database with API keys and ???.
258 chatgpt_proxy.app.steam_web_api_cache_refresh_interval = 0.5
260 async def check_result() -> bool:
261 # TODO: change this when aiocache is updated!
262 _ = aiocache.Cache.MEMORY
263 _ = is_real_game_server_key_builder(
264 game_server_address=ipaddress.IPv4Address("127.0.0.1"),
265 game_server_port=7777,
266 )
267 # await cache.get(key)
269 return True
271 stop_event = threading.Event()
272 schedule_delayed_check(stop_event, check_result_coro=check_result, timeout=5.0)
273 # TODO: this needs mocked Steam Web API? See test_api.py.
274 # TODO: how can we even test this? Check that the cache object
275 # has the keys directly?
276 await refresh_steam_web_api_cache(stop_event) # type: ignore[arg-type]
277 if _task_exception:
278 raise _task_exception