1#include "connection_pool.h"
2
3#include "connection.h"
4
5#include <base/system.h>
6
7#include <engine/console.h>
8#include <engine/shared/config.h>
9
10#include <chrono>
11#include <cstring>
12#include <iterator>
13#include <memory>
14#include <thread>
15#include <vector>
16
17using namespace std::chrono_literals;
18
19// helper struct to hold thread data
20struct CSqlExecData
21{
22 CSqlExecData(
23 CDbConnectionPool::FRead pFunc,
24 std::unique_ptr<const ISqlData> pThreadData,
25 const char *pName);
26 CSqlExecData(
27 CDbConnectionPool::FWrite pFunc,
28 std::unique_ptr<const ISqlData> pThreadData,
29 const char *pName);
30 CSqlExecData(
31 CDbConnectionPool::Mode m,
32 const char aFilename[64]);
33 CSqlExecData(
34 CDbConnectionPool::Mode m,
35 const CMysqlConfig *pMysqlConfig);
36 CSqlExecData(IConsole *pConsole, CDbConnectionPool::Mode m);
37 ~CSqlExecData() = default;
38
39 enum
40 {
41 READ_ACCESS,
42 WRITE_ACCESS,
43 ADD_MYSQL,
44 ADD_SQLITE,
45 PRINT,
46 } m_Mode;
47 union
48 {
49 CDbConnectionPool::FRead m_pReadFunc;
50 CDbConnectionPool::FWrite m_pWriteFunc;
51 struct
52 {
53 CDbConnectionPool::Mode m_Mode;
54 CMysqlConfig m_Config;
55 } m_Mysql;
56 struct
57 {
58 CDbConnectionPool::Mode m_Mode;
59 char m_Filename[64];
60 } m_Sqlite;
61 struct
62 {
63 IConsole *m_pConsole;
64 CDbConnectionPool::Mode m_Mode;
65 } m_Print;
66 } m_Ptr;
67
68 std::unique_ptr<const ISqlData> m_pThreadData;
69 const char *m_pName;
70};
71
72CSqlExecData::CSqlExecData(
73 CDbConnectionPool::FRead pFunc,
74 std::unique_ptr<const ISqlData> pThreadData,
75 const char *pName) :
76 m_Mode(READ_ACCESS),
77 m_pThreadData(std::move(pThreadData)),
78 m_pName(pName)
79{
80 m_Ptr.m_pReadFunc = pFunc;
81}
82
83CSqlExecData::CSqlExecData(
84 CDbConnectionPool::FWrite pFunc,
85 std::unique_ptr<const ISqlData> pThreadData,
86 const char *pName) :
87 m_Mode(WRITE_ACCESS),
88 m_pThreadData(std::move(pThreadData)),
89 m_pName(pName)
90{
91 m_Ptr.m_pWriteFunc = pFunc;
92}
93
94CSqlExecData::CSqlExecData(
95 CDbConnectionPool::Mode m,
96 const char aFilename[64]) :
97 m_Mode(ADD_SQLITE),
98 m_pThreadData(nullptr),
99 m_pName("add sqlite server")
100{
101 m_Ptr.m_Sqlite.m_Mode = m;
102 str_copy(dst&: m_Ptr.m_Sqlite.m_Filename, src: aFilename);
103}
104CSqlExecData::CSqlExecData(CDbConnectionPool::Mode m,
105 const CMysqlConfig *pMysqlConfig) :
106 m_Mode(ADD_MYSQL),
107 m_pThreadData(nullptr),
108 m_pName("add mysql server")
109{
110 m_Ptr.m_Mysql.m_Mode = m;
111 mem_copy(dest: &m_Ptr.m_Mysql.m_Config, source: pMysqlConfig, size: sizeof(m_Ptr.m_Mysql.m_Config));
112}
113
114CSqlExecData::CSqlExecData(IConsole *pConsole, CDbConnectionPool::Mode m) :
115 m_Mode(PRINT),
116 m_pThreadData(nullptr),
117 m_pName("print database server")
118{
119 m_Ptr.m_Print.m_pConsole = pConsole;
120 m_Ptr.m_Print.m_Mode = m;
121}
122
123void CDbConnectionPool::Print(IConsole *pConsole, Mode DatabaseMode)
124{
125 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pConsole, args&: DatabaseMode);
126 m_InsertIdx %= std::size(m_pShared->m_aQueries);
127 m_pShared->m_NumBackup.Signal();
128}
129
130void CDbConnectionPool::RegisterSqliteDatabase(Mode DatabaseMode, const char aFilename[64])
131{
132 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: DatabaseMode, args&: aFilename);
133 m_InsertIdx %= std::size(m_pShared->m_aQueries);
134 m_pShared->m_NumBackup.Signal();
135}
136
137void CDbConnectionPool::RegisterMysqlDatabase(Mode DatabaseMode, const CMysqlConfig *pMysqlConfig)
138{
139 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: DatabaseMode, args&: pMysqlConfig);
140 m_InsertIdx %= std::size(m_pShared->m_aQueries);
141 m_pShared->m_NumBackup.Signal();
142}
143
144void CDbConnectionPool::Execute(
145 FRead pFunc,
146 std::unique_ptr<const ISqlData> pSqlRequestData,
147 const char *pName)
148{
149 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pFunc, args: std::move(pSqlRequestData), args&: pName);
150 m_InsertIdx %= std::size(m_pShared->m_aQueries);
151 m_pShared->m_NumBackup.Signal();
152}
153
154void CDbConnectionPool::ExecuteWrite(
155 FWrite pFunc,
156 std::unique_ptr<const ISqlData> pSqlRequestData,
157 const char *pName)
158{
159 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pFunc, args: std::move(pSqlRequestData), args&: pName);
160 m_InsertIdx %= std::size(m_pShared->m_aQueries);
161 m_pShared->m_NumBackup.Signal();
162}
163
164void CDbConnectionPool::OnShutdown()
165{
166 if(m_Shutdown)
167 return;
168 m_Shutdown = true;
169 m_pShared->m_Shutdown.store(i: true);
170 m_pShared->m_NumBackup.Signal();
171 int i = 0;
172 while(m_pShared->m_Shutdown.load())
173 {
174 // print a log about every two seconds
175 if(i % 20 == 0 && i > 0)
176 {
177 dbg_msg(sys: "sql", fmt: "Waiting for score threads to complete (%ds)", i / 10);
178 }
179 ++i;
180 std::this_thread::sleep_for(rtime: 100ms);
181 }
182}
183
184// The backup worker thread looks at write queries and stores them
185// in the sqlite database (WRITE_BACKUP). It skips over read queries.
186// After processing the query, it gets passed on to the Worker thread.
187// This is done to not loose ranks when the server shuts down before all
188// queries are executed on the mysql server
189class CBackup
190{
191public:
192 CBackup(std::shared_ptr<CDbConnectionPool::CSharedData> pShared, int DebugSql) :
193 m_DebugSql(DebugSql), m_pShared(std::move(pShared)) {}
194 static void Start(void *pUser);
195
196private:
197 bool m_DebugSql;
198
199 void ProcessQueries();
200
201 std::unique_ptr<IDbConnection> m_pWriteBackup;
202
203 std::shared_ptr<CDbConnectionPool::CSharedData> m_pShared;
204};
205
206/* static */
207void CBackup::Start(void *pUser)
208{
209 CBackup *pThis = (CBackup *)pUser;
210 pThis->ProcessQueries();
211 delete pThis;
212}
213
214void CBackup::ProcessQueries()
215{
216 for(int JobNum = 0;; JobNum++)
217 {
218 m_pShared->m_NumBackup.Wait();
219 CSqlExecData *pThreadData = m_pShared->m_aQueries[JobNum % std::size(m_pShared->m_aQueries)].get();
220
221 // work through all database jobs after OnShutdown is called before exiting the thread
222 if(pThreadData == nullptr)
223 {
224 m_pShared->m_NumWorker.Signal();
225 return;
226 }
227
228 if(pThreadData->m_Mode == CSqlExecData::ADD_SQLITE &&
229 pThreadData->m_Ptr.m_Sqlite.m_Mode == CDbConnectionPool::Mode::WRITE_BACKUP)
230 {
231 m_pWriteBackup = CreateSqliteConnection(pFilename: pThreadData->m_Ptr.m_Sqlite.m_Filename, Setup: true);
232 }
233 else if(pThreadData->m_Mode == CSqlExecData::WRITE_ACCESS && m_pWriteBackup.get())
234 {
235 bool Success = CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteBackup.get(), pData: pThreadData, w: Write::BACKUP_FIRST);
236 if(m_DebugSql || !Success)
237 dbg_msg(sys: "sql", fmt: "[%i] %s done on write backup database, Success=%i", JobNum, pThreadData->m_pName, Success);
238 }
239 m_pShared->m_NumWorker.Signal();
240 }
241}
242
243// the worker threads executes queries on mysql or sqlite. If we write on
244// a mysql server and have a backup server configured, we'll remove the
245// entry from the backup server after completing it on the write server.
246// static void Worker(void *pUser);
247class CWorker
248{
249public:
250 CWorker(std::shared_ptr<CDbConnectionPool::CSharedData> pShared, int DebugSql) :
251 m_DebugSql(DebugSql), m_pShared(std::move(pShared)) {}
252 static void Start(void *pUser);
253 void ProcessQueries();
254
255private:
256 void Print(IConsole *pConsole, CDbConnectionPool::Mode DatabaseMode);
257
258 bool m_DebugSql;
259
260 // There are two possible configurations
261 // * sqlite mode: There exists exactly one READ and the same WRITE server
262 // with no WRITE_BACKUP server
263 // * mysql mode: there can exist multiple READ server, there must be at
264 // most one WRITE server. The WRITE server for all DDNet
265 // Servers must be the same (to counteract double loads).
266 // There may be one WRITE_BACKUP sqlite server.
267 // This variable should only change, before the worker threads
268 std::vector<std::unique_ptr<IDbConnection>> m_vpReadConnections;
269 std::unique_ptr<IDbConnection> m_pWriteConnection;
270 std::unique_ptr<IDbConnection> m_pWriteBackup;
271
272 std::shared_ptr<CDbConnectionPool::CSharedData> m_pShared;
273};
274
275/* static */
276void CWorker::Start(void *pUser)
277{
278 CWorker *pThis = (CWorker *)pUser;
279 pThis->ProcessQueries();
280 delete pThis;
281}
282
283void CWorker::ProcessQueries()
284{
285 // remember last working server and try to connect to it first
286 int ReadServer = 0;
287 // enter fail mode when a sql request fails, skip read request during it and
288 // write to the backup database until all requests are handled
289 bool FailMode = false;
290 for(int JobNum = 0;; JobNum++)
291 {
292 if(FailMode && m_pShared->m_NumWorker.GetApproximateValue() == 0)
293 {
294 FailMode = false;
295 }
296 m_pShared->m_NumWorker.Wait();
297 auto pThreadData = std::move(m_pShared->m_aQueries[JobNum % std::size(m_pShared->m_aQueries)]);
298 // work through all database jobs after OnShutdown is called before exiting the thread
299 if(pThreadData == nullptr)
300 {
301 m_pShared->m_Shutdown.store(i: false);
302 return;
303 }
304 bool Success = false;
305 switch(pThreadData->m_Mode)
306 {
307 case CSqlExecData::READ_ACCESS:
308 {
309 for(size_t i = 0; i < m_vpReadConnections.size(); i++)
310 {
311 if(m_pShared->m_Shutdown)
312 {
313 dbg_msg(sys: "sql", fmt: "[%i] %s dismissed read request during shutdown", JobNum, pThreadData->m_pName);
314 break;
315 }
316 if(FailMode)
317 {
318 dbg_msg(sys: "sql", fmt: "[%i] %s dismissed read request during FailMode", JobNum, pThreadData->m_pName);
319 break;
320 }
321 int CurServer = (ReadServer + i) % (int)m_vpReadConnections.size();
322 if(CDbConnectionPool::ExecSqlFunc(pConnection: m_vpReadConnections[CurServer].get(), pData: pThreadData.get(), w: Write::NORMAL))
323 {
324 ReadServer = CurServer;
325 if(m_DebugSql)
326 dbg_msg(sys: "sql", fmt: "[%i] %s done on read database %d", JobNum, pThreadData->m_pName, CurServer);
327 Success = true;
328 break;
329 }
330 }
331 if(!Success)
332 {
333 FailMode = true;
334 }
335 }
336 break;
337 case CSqlExecData::WRITE_ACCESS:
338 {
339 if(m_pShared->m_Shutdown && m_pWriteBackup != nullptr)
340 {
341 dbg_msg(sys: "sql", fmt: "[%i] %s skipped to backup database during shutdown", JobNum, pThreadData->m_pName);
342 }
343 else if(FailMode && m_pWriteBackup != nullptr)
344 {
345 dbg_msg(sys: "sql", fmt: "[%i] %s skipped to backup database during FailMode", JobNum, pThreadData->m_pName);
346 }
347 else if(CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteConnection.get(), pData: pThreadData.get(), w: Write::NORMAL))
348 {
349 if(m_DebugSql)
350 dbg_msg(sys: "sql", fmt: "[%i] %s done on write database", JobNum, pThreadData->m_pName);
351 Success = true;
352 }
353 // enter fail mode if not successful
354 FailMode = FailMode || !Success;
355 const Write w = Success ? Write::NORMAL_SUCCEEDED : Write::NORMAL_FAILED;
356 if(m_pWriteBackup && CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteBackup.get(), pData: pThreadData.get(), w))
357 {
358 if(m_DebugSql)
359 dbg_msg(sys: "sql", fmt: "[%i] %s done move write on backup database to non-backup table", JobNum, pThreadData->m_pName);
360 Success = true;
361 }
362 }
363 break;
364 case CSqlExecData::ADD_MYSQL:
365 {
366 auto pMysql = CreateMysqlConnection(Config: pThreadData->m_Ptr.m_Mysql.m_Config);
367 switch(pThreadData->m_Ptr.m_Mysql.m_Mode)
368 {
369 case CDbConnectionPool::Mode::READ:
370 m_vpReadConnections.push_back(x: std::move(pMysql));
371 break;
372 case CDbConnectionPool::Mode::WRITE:
373 m_pWriteConnection = std::move(pMysql);
374 break;
375 case CDbConnectionPool::Mode::WRITE_BACKUP:
376 m_pWriteBackup = std::move(pMysql);
377 break;
378 case CDbConnectionPool::Mode::NUM_MODES:
379 break;
380 }
381 Success = true;
382 break;
383 }
384 case CSqlExecData::ADD_SQLITE:
385 {
386 auto pSqlite = CreateSqliteConnection(pFilename: pThreadData->m_Ptr.m_Sqlite.m_Filename, Setup: true);
387 switch(pThreadData->m_Ptr.m_Sqlite.m_Mode)
388 {
389 case CDbConnectionPool::Mode::READ:
390 m_vpReadConnections.push_back(x: std::move(pSqlite));
391 break;
392 case CDbConnectionPool::Mode::WRITE:
393 m_pWriteConnection = std::move(pSqlite);
394 break;
395 case CDbConnectionPool::Mode::WRITE_BACKUP:
396 m_pWriteBackup = std::move(pSqlite);
397 break;
398 case CDbConnectionPool::Mode::NUM_MODES:
399 break;
400 }
401 Success = true;
402 break;
403 }
404 case CSqlExecData::PRINT:
405 Print(pConsole: pThreadData->m_Ptr.m_Print.m_pConsole, DatabaseMode: pThreadData->m_Ptr.m_Print.m_Mode);
406 Success = true;
407 break;
408 }
409 if(!Success)
410 dbg_msg(sys: "sql", fmt: "[%i] %s failed on all databases", JobNum, pThreadData->m_pName);
411 if(pThreadData->m_pThreadData != nullptr && pThreadData->m_pThreadData->m_pResult != nullptr)
412 {
413 pThreadData->m_pThreadData->m_pResult->m_Success = Success;
414 pThreadData->m_pThreadData->m_pResult->m_Completed.store(i: true);
415 }
416 }
417}
418
419void CWorker::Print(IConsole *pConsole, CDbConnectionPool::Mode DatabaseMode)
420{
421 if(DatabaseMode == CDbConnectionPool::Mode::READ)
422 {
423 for(auto &pReadConnection : m_vpReadConnections)
424 pReadConnection->Print(pConsole, pMode: "Read");
425 if(m_vpReadConnections.empty())
426 pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server", pStr: "There are no read databases");
427 }
428 else if(DatabaseMode == CDbConnectionPool::Mode::WRITE)
429 {
430 if(m_pWriteConnection)
431 m_pWriteConnection->Print(pConsole, pMode: "Write");
432 else
433 pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server", pStr: "There are no write databases");
434 }
435 else if(DatabaseMode == CDbConnectionPool::Mode::WRITE_BACKUP)
436 {
437 if(m_pWriteBackup)
438 m_pWriteBackup->Print(pConsole, pMode: "WriteBackup");
439 else
440 pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server", pStr: "There are no write backup databases");
441 }
442}
443
444/* static */
445bool CDbConnectionPool::ExecSqlFunc(IDbConnection *pConnection, CSqlExecData *pData, Write w)
446{
447 if(pConnection == nullptr)
448 {
449 dbg_msg(sys: "sql", fmt: "No database given");
450 return false;
451 }
452 char aError[256] = "unknown error";
453 if(!pConnection->Connect(pError: aError, ErrorSize: sizeof(aError)))
454 {
455 dbg_msg(sys: "sql", fmt: "failed connecting to db: %s", aError);
456 return false;
457 }
458 bool Success = false;
459 switch(pData->m_Mode)
460 {
461 case CSqlExecData::READ_ACCESS:
462 Success = pData->m_Ptr.m_pReadFunc(pConnection, pData->m_pThreadData.get(), aError, sizeof(aError));
463 break;
464 case CSqlExecData::WRITE_ACCESS:
465 Success = pData->m_Ptr.m_pWriteFunc(pConnection, pData->m_pThreadData.get(), w, aError, sizeof(aError));
466 break;
467 default:
468 dbg_assert_failed("unreachable");
469 }
470 pConnection->Disconnect();
471 if(!Success)
472 {
473 dbg_msg(sys: "sql", fmt: "%s failed: %s", pData->m_pName, aError);
474 }
475 return Success;
476}
477
478CDbConnectionPool::CDbConnectionPool()
479{
480 m_pShared = std::make_shared<CSharedData>();
481 m_pWorkerThread = thread_init(threadfunc: CWorker::Start, user: new CWorker(m_pShared, g_Config.m_DbgSql), name: "database worker thread");
482 m_pBackupThread = thread_init(threadfunc: CBackup::Start, user: new CBackup(m_pShared, g_Config.m_DbgSql), name: "database backup worker thread");
483}
484
485CDbConnectionPool::~CDbConnectionPool()
486{
487 OnShutdown();
488 if(m_pWorkerThread)
489 thread_wait(thread: m_pWorkerThread);
490 if(m_pBackupThread)
491 thread_wait(thread: m_pBackupThread);
492}
493