Coverage for chatgpt_proxy / tests / test_maintenance.py: 95%

111 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-12 16:19 +0000

1# MIT License 

2# 

3# Copyright (c) 2025 Tuomo Kriikkula 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy 

6# of this software and associated documentation files (the "Software"), to deal 

7# in the Software without restriction, including without limitation the rights 

8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 

9# copies of the Software, and to permit persons to whom the Software is 

10# furnished to do so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in all 

13# copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

21# SOFTWARE. 

22 

23import asyncio 

24import datetime 

25import hashlib 

26import ipaddress 

27import logging 

28import random 

29import threading 

30from typing import AsyncGenerator 

31from typing import Callable 

32from typing import Coroutine 

33 

34import aiocache 

35import asyncpg 

36import nest_asyncio 

37import pytest 

38import pytest_asyncio 

39from pytest_loguru.plugin import caplog # noqa: F401 

40 

41from chatgpt_proxy.auth.auth import is_real_game_server_key_builder 

42from chatgpt_proxy.db import queries 

43from chatgpt_proxy.tests import setup # noqa: E402 

44from chatgpt_proxy.utils import utils 

45 

46setup.common_test_setup() 

47 

48import chatgpt_proxy.app # noqa: E402 

49from chatgpt_proxy.app import db_maintenance # noqa: E402 

50from chatgpt_proxy.app import refresh_steam_web_api_cache # noqa: E402 

51from chatgpt_proxy.db import pool_acquire # noqa: E402 

52from chatgpt_proxy.log import logger # noqa: E402 

53 

54logger.level("DEBUG") 

55 

56_db_timeout = setup.default_test_db_timeout 

57 

58_background_tasks = set() 

59 

60 

61@pytest_asyncio.fixture 

62async def maintenance_fixture( 

63) -> AsyncGenerator[asyncpg.Connection]: 

64 global _task_exception 

65 _task_exception = None 

66 

67 loop = asyncio.get_running_loop() 

68 nest_asyncio.apply(loop=loop) 

69 

70 db_fixture_pool = await asyncpg.create_pool( 

71 dsn=setup.db_base_url, 

72 min_size=1, 

73 max_size=1, 

74 timeout=_db_timeout, 

75 loop=loop, 

76 ) 

77 

78 async with pool_acquire(db_fixture_pool, timeout=_db_timeout) as conn: 

79 await setup.drop_test_db(conn, timeout=_db_timeout) 

80 await setup.create_test_db(conn, timeout=_db_timeout) 

81 

82 test_db_pool = await asyncpg.create_pool( 

83 dsn=setup.db_test_url, 

84 min_size=1, 

85 max_size=1, 

86 timeout=_db_timeout, 

87 loop=loop, 

88 ) 

89 

90 async with pool_acquire( 

91 test_db_pool, 

92 timeout=_db_timeout, 

93 ) as conn: 

94 async with conn.transaction(): 

95 await setup.initialize_test_db(conn, timeout=_db_timeout) 

96 await setup.seed_test_db(conn, timeout=_db_timeout) 

97 

98 yield conn 

99 

100 async with pool_acquire(db_fixture_pool, timeout=_db_timeout) as conn: 

101 await setup.drop_test_db(conn, timeout=_db_timeout) 

102 

103 await db_fixture_pool.close() 

104 await test_db_pool.close() 

105 

106 

107async def delayed_check_task( 

108 stop_event: threading.Event, 

109 check_result_coro: Callable[[], Coroutine[None, None, bool]], 

110 timeout: float, 

111): 

112 try: 

113 now = datetime.datetime.now() 

114 to = now + datetime.timedelta(seconds=timeout) 

115 while not await check_result_coro(): 

116 now = datetime.datetime.now() 

117 if now > to: 

118 # TODO: would be nice to communicate the reason here, 

119 # for example by somehow returning a 'context' 

120 # from check_result_coro! 

121 raise TimeoutError("timed out") 

122 await asyncio.sleep(0.05) 

123 finally: 

124 stop_event.set() 

125 

126 

127_task_exception: BaseException | None = None 

128 

129 

130def schedule_delayed_check( 

131 stop_event: threading.Event, 

132 check_result_coro: Callable[[], Coroutine[None, None, bool]], 

133 timeout: float, 

134): 

135 task = asyncio.create_task( 

136 delayed_check_task(stop_event, check_result_coro, timeout)) 

137 _background_tasks.add(task) 

138 

139 def check_task(task_: asyncio.Task[None]): 

140 global _task_exception 

141 ex = task_.exception() 

142 if ex: 

143 logger.error("task failed: {}: {}", task_, type(ex).__name__) 

144 _task_exception = ex 

145 

146 task.add_done_callback(check_task) 

147 task.add_done_callback(_background_tasks.discard) 

148 

149 

150@pytest.mark.timeout(10) 

151@pytest.mark.asyncio 

152async def test_db_maintenance(maintenance_fixture, caplog) -> None: 

153 caplog.set_level(logging.DEBUG) 

154 db_conn = maintenance_fixture 

155 

156 chatgpt_proxy.app.db_maintenance_interval = 0.5 

157 

158 old_game_ids = [ 

159 "OLD_GAME_0", 

160 "OLD_GAME_1", 

161 "OLD_GAME_2", 

162 ] 

163 

164 expr = chatgpt_proxy.app.game_expiration 

165 expr_hours = expr.total_seconds() // 3600 

166 

167 await db_conn.execute( 

168 f""" 

169 INSERT INTO "game" (id, 

170 level, 

171 start_time, 

172 stop_time, 

173 game_server_address, 

174 game_server_port, 

175 openai_previous_response_id) 

176 VALUES ('{old_game_ids[0]}', 

177 'VNTE-Mapperino', 

178 NOW() - INTERVAL '{expr_hours + 10} hours', 

179 NOW() - INTERVAL '{expr_hours + 9} hours', 

180 INET '127.0.1.1', 

181 7777, 

182 'openai_dummy_id_1'), 

183 ('{old_game_ids[1]}', 

184 'RRTE-DogCat', 

185 NOW() - INTERVAL '{expr_hours + 9} hours', 

186 NOW() - INTERVAL '{expr_hours + 8} hours', 

187 INET '127.0.1.1', 

188 7777, 

189 'openai_dummy_id_2'), 

190 ('{old_game_ids[2]}', 

191 'VNSU-AnLaoValleyXD', 

192 NOW() - INTERVAL '{expr_hours + 2} hours', 

193 NOW() - INTERVAL '{expr_hours + 1} hours', 

194 INET '127.0.1.1', 

195 7777, 

196 'openai_dummy_id_2') 

197 ; 

198 """, 

199 timeout=_db_timeout, 

200 ) 

201 

202 now = utils.utcnow() 

203 creations = [ 

204 now - datetime.timedelta(hours=x * 2, minutes=random.randint(0, 59)) 

205 for x in range(5) 

206 ] 

207 expirations = [ 

208 now - datetime.timedelta(hours=x + 1) 

209 for x in range(5) 

210 ] 

211 key_hashes = [ 

212 hashlib.sha256(random.randbytes(64)).digest() 

213 for _ in range(5) 

214 ] 

215 

216 for x, (created_at, expires_at, key_hash) in enumerate(zip(creations, expirations, key_hashes)): 

217 await db_conn.execute( 

218 f""" 

219 INSERT INTO "game_server_api_key" 

220 (created_at, expires_at, api_key_hash, game_server_address, game_server_port, name) 

221 VALUES 

222 ($1, $2, $3, INET '127.0.1.1', 7777, 'test_maintenance API key {x}'); 

223 """, 

224 created_at, 

225 expires_at, 

226 key_hash, 

227 timeout=_db_timeout, 

228 ) 

229 

230 async def check_result() -> bool: 

231 games = await queries.select_games(db_conn) 

232 old_games_deleted = not any( 

233 game.id in old_game_ids 

234 for game in games 

235 ) 

236 keys = await queries.select_game_server_api_keys(db_conn) 

237 old_api_keys_deleted = not any( 

238 key["api_key_hash"] in key_hashes 

239 for key in keys 

240 ) 

241 return old_games_deleted and old_api_keys_deleted 

242 

243 stop_event = threading.Event() 

244 schedule_delayed_check(stop_event, check_result_coro=check_result, timeout=5.0) 

245 await db_maintenance(stop_event) # type: ignore[arg-type] 

246 if _task_exception: 

247 raise _task_exception 

248 

249 

250@pytest.mark.timeout(10) 

251@pytest.mark.asyncio 

252async def test_refresh_steam_web_api_cache(maintenance_fixture, caplog) -> None: 

253 caplog.set_level(logging.DEBUG) 

254 _ = maintenance_fixture 

255 

256 # TODO: fill database with API keys and ???. 

257 

258 chatgpt_proxy.app.steam_web_api_cache_refresh_interval = 0.5 

259 

260 async def check_result() -> bool: 

261 # TODO: change this when aiocache is updated! 

262 _ = aiocache.Cache.MEMORY 

263 _ = is_real_game_server_key_builder( 

264 game_server_address=ipaddress.IPv4Address("127.0.0.1"), 

265 game_server_port=7777, 

266 ) 

267 # await cache.get(key) 

268 

269 return True 

270 

271 stop_event = threading.Event() 

272 schedule_delayed_check(stop_event, check_result_coro=check_result, timeout=5.0) 

273 # TODO: this needs mocked Steam Web API? See test_api.py. 

274 # TODO: how can we even test this? Check that the cache object 

275 # has the keys directly? 

276 await refresh_steam_web_api_cache(stop_event) # type: ignore[arg-type] 

277 if _task_exception: 

278 raise _task_exception