1#include "connection_pool.h"
2
3#include "connection.h"
4
5#include <base/system.h>
6#include <base/thread.h>
7
8#include <engine/console.h>
9#include <engine/shared/config.h>
10
11#include <chrono>
12#include <cstring>
13#include <iterator>
14#include <memory>
15#include <thread>
16#include <vector>
17
18using namespace std::chrono_literals;
19
20// helper struct to hold thread data
21struct CSqlExecData
22{
23 CSqlExecData(
24 CDbConnectionPool::FRead pFunc,
25 std::unique_ptr<const ISqlData> pThreadData,
26 const char *pName);
27 CSqlExecData(
28 CDbConnectionPool::FWrite pFunc,
29 std::unique_ptr<const ISqlData> pThreadData,
30 const char *pName);
31 CSqlExecData(
32 CDbConnectionPool::Mode m,
33 const char aFilename[64]);
34 CSqlExecData(
35 CDbConnectionPool::Mode m,
36 const CMysqlConfig *pMysqlConfig);
37 CSqlExecData(IConsole *pConsole, CDbConnectionPool::Mode m);
38 ~CSqlExecData() = default;
39
40 enum
41 {
42 READ_ACCESS,
43 WRITE_ACCESS,
44 ADD_MYSQL,
45 ADD_SQLITE,
46 PRINT,
47 } m_Mode;
48 union
49 {
50 CDbConnectionPool::FRead m_pReadFunc;
51 CDbConnectionPool::FWrite m_pWriteFunc;
52 struct
53 {
54 CDbConnectionPool::Mode m_Mode;
55 CMysqlConfig m_Config;
56 } m_Mysql;
57 struct
58 {
59 CDbConnectionPool::Mode m_Mode;
60 char m_Filename[64];
61 } m_Sqlite;
62 struct
63 {
64 IConsole *m_pConsole;
65 CDbConnectionPool::Mode m_Mode;
66 } m_Print;
67 } m_Ptr;
68
69 std::unique_ptr<const ISqlData> m_pThreadData;
70 const char *m_pName;
71};
72
73CSqlExecData::CSqlExecData(
74 CDbConnectionPool::FRead pFunc,
75 std::unique_ptr<const ISqlData> pThreadData,
76 const char *pName) :
77 m_Mode(READ_ACCESS),
78 m_pThreadData(std::move(pThreadData)),
79 m_pName(pName)
80{
81 m_Ptr.m_pReadFunc = pFunc;
82}
83
84CSqlExecData::CSqlExecData(
85 CDbConnectionPool::FWrite pFunc,
86 std::unique_ptr<const ISqlData> pThreadData,
87 const char *pName) :
88 m_Mode(WRITE_ACCESS),
89 m_pThreadData(std::move(pThreadData)),
90 m_pName(pName)
91{
92 m_Ptr.m_pWriteFunc = pFunc;
93}
94
95CSqlExecData::CSqlExecData(
96 CDbConnectionPool::Mode m,
97 const char aFilename[64]) :
98 m_Mode(ADD_SQLITE),
99 m_pThreadData(nullptr),
100 m_pName("add sqlite server")
101{
102 m_Ptr.m_Sqlite.m_Mode = m;
103 str_copy(dst&: m_Ptr.m_Sqlite.m_Filename, src: aFilename);
104}
105CSqlExecData::CSqlExecData(CDbConnectionPool::Mode m,
106 const CMysqlConfig *pMysqlConfig) :
107 m_Mode(ADD_MYSQL),
108 m_pThreadData(nullptr),
109 m_pName("add mysql server")
110{
111 m_Ptr.m_Mysql.m_Mode = m;
112 mem_copy(dest: &m_Ptr.m_Mysql.m_Config, source: pMysqlConfig, size: sizeof(m_Ptr.m_Mysql.m_Config));
113}
114
115CSqlExecData::CSqlExecData(IConsole *pConsole, CDbConnectionPool::Mode m) :
116 m_Mode(PRINT),
117 m_pThreadData(nullptr),
118 m_pName("print database server")
119{
120 m_Ptr.m_Print.m_pConsole = pConsole;
121 m_Ptr.m_Print.m_Mode = m;
122}
123
124void CDbConnectionPool::Print(IConsole *pConsole, Mode DatabaseMode)
125{
126 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pConsole, args&: DatabaseMode);
127 m_InsertIdx %= std::size(m_pShared->m_aQueries);
128 m_pShared->m_NumBackup.Signal();
129}
130
131void CDbConnectionPool::RegisterSqliteDatabase(Mode DatabaseMode, const char aFilename[64])
132{
133 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: DatabaseMode, args&: aFilename);
134 m_InsertIdx %= std::size(m_pShared->m_aQueries);
135 m_pShared->m_NumBackup.Signal();
136}
137
138void CDbConnectionPool::RegisterMysqlDatabase(Mode DatabaseMode, const CMysqlConfig *pMysqlConfig)
139{
140 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: DatabaseMode, args&: pMysqlConfig);
141 m_InsertIdx %= std::size(m_pShared->m_aQueries);
142 m_pShared->m_NumBackup.Signal();
143}
144
145void CDbConnectionPool::Execute(
146 FRead pFunc,
147 std::unique_ptr<const ISqlData> pSqlRequestData,
148 const char *pName)
149{
150 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pFunc, args: std::move(pSqlRequestData), args&: pName);
151 m_InsertIdx %= std::size(m_pShared->m_aQueries);
152 m_pShared->m_NumBackup.Signal();
153}
154
155void CDbConnectionPool::ExecuteWrite(
156 FWrite pFunc,
157 std::unique_ptr<const ISqlData> pSqlRequestData,
158 const char *pName)
159{
160 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pFunc, args: std::move(pSqlRequestData), args&: pName);
161 m_InsertIdx %= std::size(m_pShared->m_aQueries);
162 m_pShared->m_NumBackup.Signal();
163}
164
165void CDbConnectionPool::OnShutdown()
166{
167 if(m_Shutdown)
168 return;
169 m_Shutdown = true;
170 m_pShared->m_Shutdown.store(i: true);
171 m_pShared->m_NumBackup.Signal();
172 int i = 0;
173 while(m_pShared->m_Shutdown.load())
174 {
175 // print a log about every two seconds
176 if(i % 20 == 0 && i > 0)
177 {
178 dbg_msg(sys: "sql", fmt: "Waiting for score threads to complete (%ds)", i / 10);
179 }
180 ++i;
181 std::this_thread::sleep_for(rtime: 100ms);
182 }
183}
184
185// The backup worker thread looks at write queries and stores them
186// in the sqlite database (WRITE_BACKUP). It skips over read queries.
187// After processing the query, it gets passed on to the Worker thread.
188// This is done to not loose ranks when the server shuts down before all
189// queries are executed on the mysql server
190class CBackup
191{
192public:
193 CBackup(std::shared_ptr<CDbConnectionPool::CSharedData> pShared, int DebugSql) :
194 m_DebugSql(DebugSql), m_pShared(std::move(pShared)) {}
195 static void Start(void *pUser);
196
197private:
198 bool m_DebugSql;
199
200 void ProcessQueries();
201
202 std::unique_ptr<IDbConnection> m_pWriteBackup;
203
204 std::shared_ptr<CDbConnectionPool::CSharedData> m_pShared;
205};
206
207/* static */
208void CBackup::Start(void *pUser)
209{
210 CBackup *pThis = (CBackup *)pUser;
211 pThis->ProcessQueries();
212 delete pThis;
213}
214
215void CBackup::ProcessQueries()
216{
217 for(int JobNum = 0;; JobNum++)
218 {
219 m_pShared->m_NumBackup.Wait();
220 CSqlExecData *pThreadData = m_pShared->m_aQueries[JobNum % std::size(m_pShared->m_aQueries)].get();
221
222 // work through all database jobs after OnShutdown is called before exiting the thread
223 if(pThreadData == nullptr)
224 {
225 m_pShared->m_NumWorker.Signal();
226 return;
227 }
228
229 if(pThreadData->m_Mode == CSqlExecData::ADD_SQLITE &&
230 pThreadData->m_Ptr.m_Sqlite.m_Mode == CDbConnectionPool::Mode::WRITE_BACKUP)
231 {
232 m_pWriteBackup = CreateSqliteConnection(pFilename: pThreadData->m_Ptr.m_Sqlite.m_Filename, Setup: true);
233 }
234 else if(pThreadData->m_Mode == CSqlExecData::WRITE_ACCESS && m_pWriteBackup.get())
235 {
236 bool Success = CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteBackup.get(), pData: pThreadData, w: Write::BACKUP_FIRST);
237 if(m_DebugSql || !Success)
238 dbg_msg(sys: "sql", fmt: "[%i] %s done on write backup database, Success=%i", JobNum, pThreadData->m_pName, Success);
239 }
240 m_pShared->m_NumWorker.Signal();
241 }
242}
243
244// the worker threads executes queries on mysql or sqlite. If we write on
245// a mysql server and have a backup server configured, we'll remove the
246// entry from the backup server after completing it on the write server.
247// static void Worker(void *pUser);
248class CWorker
249{
250public:
251 CWorker(std::shared_ptr<CDbConnectionPool::CSharedData> pShared, int DebugSql) :
252 m_DebugSql(DebugSql), m_pShared(std::move(pShared)) {}
253 static void Start(void *pUser);
254 void ProcessQueries();
255
256private:
257 void Print(IConsole *pConsole, CDbConnectionPool::Mode DatabaseMode);
258
259 bool m_DebugSql;
260
261 // There are two possible configurations
262 // * sqlite mode: There exists exactly one READ and the same WRITE server
263 // with no WRITE_BACKUP server
264 // * mysql mode: there can exist multiple READ server, there must be at
265 // most one WRITE server. The WRITE server for all DDNet
266 // Servers must be the same (to counteract double loads).
267 // There may be one WRITE_BACKUP sqlite server.
268 // This variable should only change, before the worker threads
269 std::vector<std::unique_ptr<IDbConnection>> m_vpReadConnections;
270 std::unique_ptr<IDbConnection> m_pWriteConnection;
271 std::unique_ptr<IDbConnection> m_pWriteBackup;
272
273 std::shared_ptr<CDbConnectionPool::CSharedData> m_pShared;
274};
275
276/* static */
277void CWorker::Start(void *pUser)
278{
279 CWorker *pThis = (CWorker *)pUser;
280 pThis->ProcessQueries();
281 delete pThis;
282}
283
284void CWorker::ProcessQueries()
285{
286 // remember last working server and try to connect to it first
287 int ReadServer = 0;
288 // enter fail mode when a sql request fails, skip read request during it and
289 // write to the backup database until all requests are handled
290 bool FailMode = false;
291 for(int JobNum = 0;; JobNum++)
292 {
293 if(FailMode && m_pShared->m_NumWorker.GetApproximateValue() == 0)
294 {
295 FailMode = false;
296 }
297 m_pShared->m_NumWorker.Wait();
298 auto pThreadData = std::move(m_pShared->m_aQueries[JobNum % std::size(m_pShared->m_aQueries)]);
299 // work through all database jobs after OnShutdown is called before exiting the thread
300 if(pThreadData == nullptr)
301 {
302 m_pShared->m_Shutdown.store(i: false);
303 return;
304 }
305 bool Success = false;
306 switch(pThreadData->m_Mode)
307 {
308 case CSqlExecData::READ_ACCESS:
309 {
310 for(size_t i = 0; i < m_vpReadConnections.size(); i++)
311 {
312 if(m_pShared->m_Shutdown)
313 {
314 dbg_msg(sys: "sql", fmt: "[%i] %s dismissed read request during shutdown", JobNum, pThreadData->m_pName);
315 break;
316 }
317 if(FailMode)
318 {
319 dbg_msg(sys: "sql", fmt: "[%i] %s dismissed read request during FailMode", JobNum, pThreadData->m_pName);
320 break;
321 }
322 int CurServer = (ReadServer + i) % (int)m_vpReadConnections.size();
323 if(CDbConnectionPool::ExecSqlFunc(pConnection: m_vpReadConnections[CurServer].get(), pData: pThreadData.get(), w: Write::NORMAL))
324 {
325 ReadServer = CurServer;
326 if(m_DebugSql)
327 dbg_msg(sys: "sql", fmt: "[%i] %s done on read database %d", JobNum, pThreadData->m_pName, CurServer);
328 Success = true;
329 break;
330 }
331 }
332 if(!Success)
333 {
334 FailMode = true;
335 }
336 }
337 break;
338 case CSqlExecData::WRITE_ACCESS:
339 {
340 if(m_pShared->m_Shutdown && m_pWriteBackup != nullptr)
341 {
342 dbg_msg(sys: "sql", fmt: "[%i] %s skipped to backup database during shutdown", JobNum, pThreadData->m_pName);
343 }
344 else if(FailMode && m_pWriteBackup != nullptr)
345 {
346 dbg_msg(sys: "sql", fmt: "[%i] %s skipped to backup database during FailMode", JobNum, pThreadData->m_pName);
347 }
348 else if(CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteConnection.get(), pData: pThreadData.get(), w: Write::NORMAL))
349 {
350 if(m_DebugSql)
351 dbg_msg(sys: "sql", fmt: "[%i] %s done on write database", JobNum, pThreadData->m_pName);
352 Success = true;
353 }
354 // enter fail mode if not successful
355 FailMode = FailMode || !Success;
356 const Write w = Success ? Write::NORMAL_SUCCEEDED : Write::NORMAL_FAILED;
357 if(m_pWriteBackup && CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteBackup.get(), pData: pThreadData.get(), w))
358 {
359 if(m_DebugSql)
360 dbg_msg(sys: "sql", fmt: "[%i] %s done move write on backup database to non-backup table", JobNum, pThreadData->m_pName);
361 Success = true;
362 }
363 }
364 break;
365 case CSqlExecData::ADD_MYSQL:
366 {
367 auto pMysql = CreateMysqlConnection(Config: pThreadData->m_Ptr.m_Mysql.m_Config);
368 switch(pThreadData->m_Ptr.m_Mysql.m_Mode)
369 {
370 case CDbConnectionPool::Mode::READ:
371 m_vpReadConnections.push_back(x: std::move(pMysql));
372 break;
373 case CDbConnectionPool::Mode::WRITE:
374 m_pWriteConnection = std::move(pMysql);
375 break;
376 case CDbConnectionPool::Mode::WRITE_BACKUP:
377 m_pWriteBackup = std::move(pMysql);
378 break;
379 case CDbConnectionPool::Mode::NUM_MODES:
380 break;
381 }
382 Success = true;
383 break;
384 }
385 case CSqlExecData::ADD_SQLITE:
386 {
387 auto pSqlite = CreateSqliteConnection(pFilename: pThreadData->m_Ptr.m_Sqlite.m_Filename, Setup: true);
388 switch(pThreadData->m_Ptr.m_Sqlite.m_Mode)
389 {
390 case CDbConnectionPool::Mode::READ:
391 m_vpReadConnections.push_back(x: std::move(pSqlite));
392 break;
393 case CDbConnectionPool::Mode::WRITE:
394 m_pWriteConnection = std::move(pSqlite);
395 break;
396 case CDbConnectionPool::Mode::WRITE_BACKUP:
397 m_pWriteBackup = std::move(pSqlite);
398 break;
399 case CDbConnectionPool::Mode::NUM_MODES:
400 break;
401 }
402 Success = true;
403 break;
404 }
405 case CSqlExecData::PRINT:
406 Print(pConsole: pThreadData->m_Ptr.m_Print.m_pConsole, DatabaseMode: pThreadData->m_Ptr.m_Print.m_Mode);
407 Success = true;
408 break;
409 }
410 if(!Success)
411 dbg_msg(sys: "sql", fmt: "[%i] %s failed on all databases", JobNum, pThreadData->m_pName);
412 if(pThreadData->m_pThreadData != nullptr && pThreadData->m_pThreadData->m_pResult != nullptr)
413 {
414 pThreadData->m_pThreadData->m_pResult->m_Success = Success;
415 pThreadData->m_pThreadData->m_pResult->m_Completed.store(i: true);
416 }
417 }
418}
419
420void CWorker::Print(IConsole *pConsole, CDbConnectionPool::Mode DatabaseMode)
421{
422 if(DatabaseMode == CDbConnectionPool::Mode::READ)
423 {
424 for(auto &pReadConnection : m_vpReadConnections)
425 pReadConnection->Print(pConsole, pMode: "Read");
426 if(m_vpReadConnections.empty())
427 pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server", pStr: "There are no read databases");
428 }
429 else if(DatabaseMode == CDbConnectionPool::Mode::WRITE)
430 {
431 if(m_pWriteConnection)
432 m_pWriteConnection->Print(pConsole, pMode: "Write");
433 else
434 pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server", pStr: "There are no write databases");
435 }
436 else if(DatabaseMode == CDbConnectionPool::Mode::WRITE_BACKUP)
437 {
438 if(m_pWriteBackup)
439 m_pWriteBackup->Print(pConsole, pMode: "WriteBackup");
440 else
441 pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server", pStr: "There are no write backup databases");
442 }
443}
444
445/* static */
446bool CDbConnectionPool::ExecSqlFunc(IDbConnection *pConnection, CSqlExecData *pData, Write w)
447{
448 if(pConnection == nullptr)
449 {
450 dbg_msg(sys: "sql", fmt: "No database given");
451 return false;
452 }
453 char aError[256] = "unknown error";
454 if(!pConnection->Connect(pError: aError, ErrorSize: sizeof(aError)))
455 {
456 dbg_msg(sys: "sql", fmt: "failed connecting to db: %s", aError);
457 return false;
458 }
459 bool Success = false;
460 switch(pData->m_Mode)
461 {
462 case CSqlExecData::READ_ACCESS:
463 Success = pData->m_Ptr.m_pReadFunc(pConnection, pData->m_pThreadData.get(), aError, sizeof(aError));
464 break;
465 case CSqlExecData::WRITE_ACCESS:
466 Success = pData->m_Ptr.m_pWriteFunc(pConnection, pData->m_pThreadData.get(), w, aError, sizeof(aError));
467 break;
468 default:
469 dbg_assert_failed("unreachable");
470 }
471 pConnection->Disconnect();
472 if(!Success)
473 {
474 dbg_msg(sys: "sql", fmt: "%s failed: %s", pData->m_pName, aError);
475 }
476 return Success;
477}
478
479CDbConnectionPool::CDbConnectionPool()
480{
481 m_pShared = std::make_shared<CSharedData>();
482 m_pWorkerThread = thread_init(threadfunc: CWorker::Start, user: new CWorker(m_pShared, g_Config.m_DbgSql), name: "database worker thread");
483 m_pBackupThread = thread_init(threadfunc: CBackup::Start, user: new CBackup(m_pShared, g_Config.m_DbgSql), name: "database backup worker thread");
484}
485
486CDbConnectionPool::~CDbConnectionPool()
487{
488 OnShutdown();
489 if(m_pWorkerThread)
490 thread_wait(thread: m_pWorkerThread);
491 if(m_pBackupThread)
492 thread_wait(thread: m_pBackupThread);
493}
494