1#include "connection_pool.h"
2
3#include "connection.h"
4
5#include <base/dbg.h>
6#include <base/mem.h>
7#include <base/str.h>
8#include <base/thread.h>
9
10#include <engine/console.h>
11#include <engine/shared/config.h>
12
13#include <chrono>
14#include <cstring>
15#include <iterator>
16#include <memory>
17#include <thread>
18#include <vector>
19
20using namespace std::chrono_literals;
21
22// helper struct to hold thread data
23struct CSqlExecData
24{
25 CSqlExecData(
26 CDbConnectionPool::FRead pFunc,
27 std::unique_ptr<const ISqlData> pThreadData,
28 const char *pName);
29 CSqlExecData(
30 CDbConnectionPool::FWrite pFunc,
31 std::unique_ptr<const ISqlData> pThreadData,
32 const char *pName);
33 CSqlExecData(
34 CDbConnectionPool::Mode m,
35 const char aFilename[64]);
36 CSqlExecData(
37 CDbConnectionPool::Mode m,
38 const CMysqlConfig *pMysqlConfig);
39 CSqlExecData(IConsole *pConsole, CDbConnectionPool::Mode m);
40 ~CSqlExecData() = default;
41
42 enum
43 {
44 READ_ACCESS,
45 WRITE_ACCESS,
46 ADD_MYSQL,
47 ADD_SQLITE,
48 PRINT,
49 } m_Mode;
50 union
51 {
52 CDbConnectionPool::FRead m_pReadFunc;
53 CDbConnectionPool::FWrite m_pWriteFunc;
54 struct
55 {
56 CDbConnectionPool::Mode m_Mode;
57 CMysqlConfig m_Config;
58 } m_Mysql;
59 struct
60 {
61 CDbConnectionPool::Mode m_Mode;
62 char m_Filename[64];
63 } m_Sqlite;
64 struct
65 {
66 IConsole *m_pConsole;
67 CDbConnectionPool::Mode m_Mode;
68 } m_Print;
69 } m_Ptr;
70
71 std::unique_ptr<const ISqlData> m_pThreadData;
72 const char *m_pName;
73};
74
75CSqlExecData::CSqlExecData(
76 CDbConnectionPool::FRead pFunc,
77 std::unique_ptr<const ISqlData> pThreadData,
78 const char *pName) :
79 m_Mode(READ_ACCESS),
80 m_pThreadData(std::move(pThreadData)),
81 m_pName(pName)
82{
83 m_Ptr.m_pReadFunc = pFunc;
84}
85
86CSqlExecData::CSqlExecData(
87 CDbConnectionPool::FWrite pFunc,
88 std::unique_ptr<const ISqlData> pThreadData,
89 const char *pName) :
90 m_Mode(WRITE_ACCESS),
91 m_pThreadData(std::move(pThreadData)),
92 m_pName(pName)
93{
94 m_Ptr.m_pWriteFunc = pFunc;
95}
96
97CSqlExecData::CSqlExecData(
98 CDbConnectionPool::Mode m,
99 const char aFilename[64]) :
100 m_Mode(ADD_SQLITE),
101 m_pThreadData(nullptr),
102 m_pName("add sqlite server")
103{
104 m_Ptr.m_Sqlite.m_Mode = m;
105 str_copy(dst&: m_Ptr.m_Sqlite.m_Filename, src: aFilename);
106}
107CSqlExecData::CSqlExecData(CDbConnectionPool::Mode m,
108 const CMysqlConfig *pMysqlConfig) :
109 m_Mode(ADD_MYSQL),
110 m_pThreadData(nullptr),
111 m_pName("add mysql server")
112{
113 m_Ptr.m_Mysql.m_Mode = m;
114 mem_copy(dest: &m_Ptr.m_Mysql.m_Config, source: pMysqlConfig, size: sizeof(m_Ptr.m_Mysql.m_Config));
115}
116
117CSqlExecData::CSqlExecData(IConsole *pConsole, CDbConnectionPool::Mode m) :
118 m_Mode(PRINT),
119 m_pThreadData(nullptr),
120 m_pName("print database server")
121{
122 m_Ptr.m_Print.m_pConsole = pConsole;
123 m_Ptr.m_Print.m_Mode = m;
124}
125
126void CDbConnectionPool::Print(IConsole *pConsole, Mode DatabaseMode)
127{
128 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pConsole, args&: DatabaseMode);
129 m_InsertIdx %= std::size(m_pShared->m_aQueries);
130 m_pShared->m_NumBackup.Signal();
131}
132
133void CDbConnectionPool::RegisterSqliteDatabase(Mode DatabaseMode, const char aFilename[64])
134{
135 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: DatabaseMode, args&: aFilename);
136 m_InsertIdx %= std::size(m_pShared->m_aQueries);
137 m_pShared->m_NumBackup.Signal();
138}
139
140void CDbConnectionPool::RegisterMysqlDatabase(Mode DatabaseMode, const CMysqlConfig *pMysqlConfig)
141{
142 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: DatabaseMode, args&: pMysqlConfig);
143 m_InsertIdx %= std::size(m_pShared->m_aQueries);
144 m_pShared->m_NumBackup.Signal();
145}
146
147void CDbConnectionPool::Execute(
148 FRead pFunc,
149 std::unique_ptr<const ISqlData> pSqlRequestData,
150 const char *pName)
151{
152 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pFunc, args: std::move(pSqlRequestData), args&: pName);
153 m_InsertIdx %= std::size(m_pShared->m_aQueries);
154 m_pShared->m_NumBackup.Signal();
155}
156
157void CDbConnectionPool::ExecuteWrite(
158 FWrite pFunc,
159 std::unique_ptr<const ISqlData> pSqlRequestData,
160 const char *pName)
161{
162 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pFunc, args: std::move(pSqlRequestData), args&: pName);
163 m_InsertIdx %= std::size(m_pShared->m_aQueries);
164 m_pShared->m_NumBackup.Signal();
165}
166
167void CDbConnectionPool::OnShutdown()
168{
169 if(m_Shutdown)
170 return;
171 m_Shutdown = true;
172 m_pShared->m_Shutdown.store(i: true);
173 m_pShared->m_NumBackup.Signal();
174 int i = 0;
175 while(m_pShared->m_Shutdown.load())
176 {
177 // print a log about every two seconds
178 if(i % 20 == 0 && i > 0)
179 {
180 dbg_msg(sys: "sql", fmt: "Waiting for score threads to complete (%ds)", i / 10);
181 }
182 ++i;
183 std::this_thread::sleep_for(rtime: 100ms);
184 }
185}
186
187// The backup worker thread looks at write queries and stores them
188// in the sqlite database (WRITE_BACKUP). It skips over read queries.
189// After processing the query, it gets passed on to the Worker thread.
190// This is done to not loose ranks when the server shuts down before all
191// queries are executed on the mysql server
192class CBackup
193{
194public:
195 CBackup(std::shared_ptr<CDbConnectionPool::CSharedData> pShared, int DebugSql) :
196 m_DebugSql(DebugSql), m_pShared(std::move(pShared)) {}
197 static void Start(void *pUser);
198
199private:
200 bool m_DebugSql;
201
202 void ProcessQueries();
203
204 std::unique_ptr<IDbConnection> m_pWriteBackup;
205
206 std::shared_ptr<CDbConnectionPool::CSharedData> m_pShared;
207};
208
209/* static */
210void CBackup::Start(void *pUser)
211{
212 CBackup *pThis = (CBackup *)pUser;
213 pThis->ProcessQueries();
214 delete pThis;
215}
216
217void CBackup::ProcessQueries()
218{
219 for(int JobNum = 0;; JobNum++)
220 {
221 m_pShared->m_NumBackup.Wait();
222 CSqlExecData *pThreadData = m_pShared->m_aQueries[JobNum % std::size(m_pShared->m_aQueries)].get();
223
224 // work through all database jobs after OnShutdown is called before exiting the thread
225 if(pThreadData == nullptr)
226 {
227 m_pShared->m_NumWorker.Signal();
228 return;
229 }
230
231 if(pThreadData->m_Mode == CSqlExecData::ADD_SQLITE &&
232 pThreadData->m_Ptr.m_Sqlite.m_Mode == CDbConnectionPool::Mode::WRITE_BACKUP)
233 {
234 m_pWriteBackup = CreateSqliteConnection(pFilename: pThreadData->m_Ptr.m_Sqlite.m_Filename, Setup: true);
235 }
236 else if(pThreadData->m_Mode == CSqlExecData::WRITE_ACCESS && m_pWriteBackup.get())
237 {
238 bool Success = CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteBackup.get(), pData: pThreadData, w: Write::BACKUP_FIRST);
239 if(m_DebugSql || !Success)
240 dbg_msg(sys: "sql", fmt: "[%i] %s done on write backup database, Success=%i", JobNum, pThreadData->m_pName, Success);
241 }
242 m_pShared->m_NumWorker.Signal();
243 }
244}
245
246// the worker threads executes queries on mysql or sqlite. If we write on
247// a mysql server and have a backup server configured, we'll remove the
248// entry from the backup server after completing it on the write server.
249// static void Worker(void *pUser);
250class CWorker
251{
252public:
253 CWorker(std::shared_ptr<CDbConnectionPool::CSharedData> pShared, int DebugSql) :
254 m_DebugSql(DebugSql), m_pShared(std::move(pShared)) {}
255 static void Start(void *pUser);
256 void ProcessQueries();
257
258private:
259 void Print(IConsole *pConsole, CDbConnectionPool::Mode DatabaseMode);
260
261 bool m_DebugSql;
262
263 // There are two possible configurations
264 // * sqlite mode: There exists exactly one READ and the same WRITE server
265 // with no WRITE_BACKUP server
266 // * mysql mode: there can exist multiple READ server, there must be at
267 // most one WRITE server. The WRITE server for all DDNet
268 // Servers must be the same (to counteract double loads).
269 // There may be one WRITE_BACKUP sqlite server.
270 // This variable should only change, before the worker threads
271 std::vector<std::unique_ptr<IDbConnection>> m_vpReadConnections;
272 std::unique_ptr<IDbConnection> m_pWriteConnection;
273 std::unique_ptr<IDbConnection> m_pWriteBackup;
274
275 std::shared_ptr<CDbConnectionPool::CSharedData> m_pShared;
276};
277
278/* static */
279void CWorker::Start(void *pUser)
280{
281 CWorker *pThis = (CWorker *)pUser;
282 pThis->ProcessQueries();
283 delete pThis;
284}
285
286void CWorker::ProcessQueries()
287{
288 // remember last working server and try to connect to it first
289 int ReadServer = 0;
290 // enter fail mode when a sql request fails, skip read request during it and
291 // write to the backup database until all requests are handled
292 bool FailMode = false;
293 for(int JobNum = 0;; JobNum++)
294 {
295 if(FailMode && m_pShared->m_NumWorker.GetApproximateValue() == 0)
296 {
297 FailMode = false;
298 }
299 m_pShared->m_NumWorker.Wait();
300 auto pThreadData = std::move(m_pShared->m_aQueries[JobNum % std::size(m_pShared->m_aQueries)]);
301 // work through all database jobs after OnShutdown is called before exiting the thread
302 if(pThreadData == nullptr)
303 {
304 m_pShared->m_Shutdown.store(i: false);
305 return;
306 }
307 bool Success = false;
308 switch(pThreadData->m_Mode)
309 {
310 case CSqlExecData::READ_ACCESS:
311 {
312 for(size_t i = 0; i < m_vpReadConnections.size(); i++)
313 {
314 if(m_pShared->m_Shutdown)
315 {
316 dbg_msg(sys: "sql", fmt: "[%i] %s dismissed read request during shutdown", JobNum, pThreadData->m_pName);
317 break;
318 }
319 if(FailMode)
320 {
321 dbg_msg(sys: "sql", fmt: "[%i] %s dismissed read request during FailMode", JobNum, pThreadData->m_pName);
322 break;
323 }
324 int CurServer = (ReadServer + i) % (int)m_vpReadConnections.size();
325 if(CDbConnectionPool::ExecSqlFunc(pConnection: m_vpReadConnections[CurServer].get(), pData: pThreadData.get(), w: Write::NORMAL))
326 {
327 ReadServer = CurServer;
328 if(m_DebugSql)
329 dbg_msg(sys: "sql", fmt: "[%i] %s done on read database %d", JobNum, pThreadData->m_pName, CurServer);
330 Success = true;
331 break;
332 }
333 }
334 if(!Success)
335 {
336 FailMode = true;
337 }
338 }
339 break;
340 case CSqlExecData::WRITE_ACCESS:
341 {
342 if(m_pShared->m_Shutdown && m_pWriteBackup != nullptr)
343 {
344 dbg_msg(sys: "sql", fmt: "[%i] %s skipped to backup database during shutdown", JobNum, pThreadData->m_pName);
345 }
346 else if(FailMode && m_pWriteBackup != nullptr)
347 {
348 dbg_msg(sys: "sql", fmt: "[%i] %s skipped to backup database during FailMode", JobNum, pThreadData->m_pName);
349 }
350 else if(CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteConnection.get(), pData: pThreadData.get(), w: Write::NORMAL))
351 {
352 if(m_DebugSql)
353 dbg_msg(sys: "sql", fmt: "[%i] %s done on write database", JobNum, pThreadData->m_pName);
354 Success = true;
355 }
356 // enter fail mode if not successful
357 FailMode = FailMode || !Success;
358 const Write w = Success ? Write::NORMAL_SUCCEEDED : Write::NORMAL_FAILED;
359 if(m_pWriteBackup && CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteBackup.get(), pData: pThreadData.get(), w))
360 {
361 if(m_DebugSql)
362 dbg_msg(sys: "sql", fmt: "[%i] %s done move write on backup database to non-backup table", JobNum, pThreadData->m_pName);
363 Success = true;
364 }
365 }
366 break;
367 case CSqlExecData::ADD_MYSQL:
368 {
369 auto pMysql = CreateMysqlConnection(Config: pThreadData->m_Ptr.m_Mysql.m_Config);
370 switch(pThreadData->m_Ptr.m_Mysql.m_Mode)
371 {
372 case CDbConnectionPool::Mode::READ:
373 m_vpReadConnections.push_back(x: std::move(pMysql));
374 break;
375 case CDbConnectionPool::Mode::WRITE:
376 m_pWriteConnection = std::move(pMysql);
377 break;
378 case CDbConnectionPool::Mode::WRITE_BACKUP:
379 m_pWriteBackup = std::move(pMysql);
380 break;
381 case CDbConnectionPool::Mode::NUM_MODES:
382 break;
383 }
384 Success = true;
385 break;
386 }
387 case CSqlExecData::ADD_SQLITE:
388 {
389 auto pSqlite = CreateSqliteConnection(pFilename: pThreadData->m_Ptr.m_Sqlite.m_Filename, Setup: true);
390 switch(pThreadData->m_Ptr.m_Sqlite.m_Mode)
391 {
392 case CDbConnectionPool::Mode::READ:
393 m_vpReadConnections.push_back(x: std::move(pSqlite));
394 break;
395 case CDbConnectionPool::Mode::WRITE:
396 m_pWriteConnection = std::move(pSqlite);
397 break;
398 case CDbConnectionPool::Mode::WRITE_BACKUP:
399 m_pWriteBackup = std::move(pSqlite);
400 break;
401 case CDbConnectionPool::Mode::NUM_MODES:
402 break;
403 }
404 Success = true;
405 break;
406 }
407 case CSqlExecData::PRINT:
408 Print(pConsole: pThreadData->m_Ptr.m_Print.m_pConsole, DatabaseMode: pThreadData->m_Ptr.m_Print.m_Mode);
409 Success = true;
410 break;
411 }
412 if(!Success)
413 dbg_msg(sys: "sql", fmt: "[%i] %s failed on all databases", JobNum, pThreadData->m_pName);
414 if(pThreadData->m_pThreadData != nullptr && pThreadData->m_pThreadData->m_pResult != nullptr)
415 {
416 pThreadData->m_pThreadData->m_pResult->m_Success = Success;
417 pThreadData->m_pThreadData->m_pResult->m_Completed.store(i: true);
418 }
419 }
420}
421
422void CWorker::Print(IConsole *pConsole, CDbConnectionPool::Mode DatabaseMode)
423{
424 if(DatabaseMode == CDbConnectionPool::Mode::READ)
425 {
426 for(auto &pReadConnection : m_vpReadConnections)
427 pReadConnection->Print(pConsole, pMode: "Read");
428 if(m_vpReadConnections.empty())
429 pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server", pStr: "There are no read databases");
430 }
431 else if(DatabaseMode == CDbConnectionPool::Mode::WRITE)
432 {
433 if(m_pWriteConnection)
434 m_pWriteConnection->Print(pConsole, pMode: "Write");
435 else
436 pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server", pStr: "There are no write databases");
437 }
438 else if(DatabaseMode == CDbConnectionPool::Mode::WRITE_BACKUP)
439 {
440 if(m_pWriteBackup)
441 m_pWriteBackup->Print(pConsole, pMode: "WriteBackup");
442 else
443 pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server", pStr: "There are no write backup databases");
444 }
445}
446
447/* static */
448bool CDbConnectionPool::ExecSqlFunc(IDbConnection *pConnection, CSqlExecData *pData, Write w)
449{
450 if(pConnection == nullptr)
451 {
452 dbg_msg(sys: "sql", fmt: "No database given");
453 return false;
454 }
455 char aError[256] = "unknown error";
456 if(!pConnection->Connect(pError: aError, ErrorSize: sizeof(aError)))
457 {
458 dbg_msg(sys: "sql", fmt: "failed connecting to db: %s", aError);
459 return false;
460 }
461 bool Success = false;
462 switch(pData->m_Mode)
463 {
464 case CSqlExecData::READ_ACCESS:
465 Success = pData->m_Ptr.m_pReadFunc(pConnection, pData->m_pThreadData.get(), aError, sizeof(aError));
466 break;
467 case CSqlExecData::WRITE_ACCESS:
468 Success = pData->m_Ptr.m_pWriteFunc(pConnection, pData->m_pThreadData.get(), w, aError, sizeof(aError));
469 break;
470 default:
471 dbg_assert_failed("unreachable");
472 }
473 pConnection->Disconnect();
474 if(!Success)
475 {
476 dbg_msg(sys: "sql", fmt: "%s failed: %s", pData->m_pName, aError);
477 }
478 return Success;
479}
480
481CDbConnectionPool::CDbConnectionPool()
482{
483 m_pShared = std::make_shared<CSharedData>();
484 m_pWorkerThread = thread_init(threadfunc: CWorker::Start, user: new CWorker(m_pShared, g_Config.m_DbgSql), name: "database worker thread");
485 m_pBackupThread = thread_init(threadfunc: CBackup::Start, user: new CBackup(m_pShared, g_Config.m_DbgSql), name: "database backup worker thread");
486}
487
488CDbConnectionPool::~CDbConnectionPool()
489{
490 OnShutdown();
491 if(m_pWorkerThread)
492 thread_wait(thread: m_pWorkerThread);
493 if(m_pBackupThread)
494 thread_wait(thread: m_pBackupThread);
495}
496