1#include "connection_pool.h"
2#include "connection.h"
3
4#include <base/system.h>
5#include <cstring>
6#include <engine/console.h>
7
8#include <chrono>
9#include <iterator>
10#include <memory>
11#include <thread>
12#include <vector>
13
14using namespace std::chrono_literals;
15
16// helper struct to hold thread data
17struct CSqlExecData
18{
19 CSqlExecData(
20 CDbConnectionPool::FRead pFunc,
21 std::unique_ptr<const ISqlData> pThreadData,
22 const char *pName);
23 CSqlExecData(
24 CDbConnectionPool::FWrite pFunc,
25 std::unique_ptr<const ISqlData> pThreadData,
26 const char *pName);
27 CSqlExecData(
28 CDbConnectionPool::Mode m,
29 const char aFileName[64]);
30 CSqlExecData(
31 CDbConnectionPool::Mode m,
32 const CMysqlConfig *pMysqlConfig);
33 CSqlExecData(IConsole *pConsole, CDbConnectionPool::Mode m);
34 ~CSqlExecData() = default;
35
36 enum
37 {
38 READ_ACCESS,
39 WRITE_ACCESS,
40 ADD_MYSQL,
41 ADD_SQLITE,
42 PRINT,
43 } m_Mode;
44 union
45 {
46 CDbConnectionPool::FRead m_pReadFunc;
47 CDbConnectionPool::FWrite m_pWriteFunc;
48 struct
49 {
50 CDbConnectionPool::Mode m_Mode;
51 CMysqlConfig m_Config;
52 } m_Mysql;
53 struct
54 {
55 CDbConnectionPool::Mode m_Mode;
56 char m_FileName[64];
57 } m_Sqlite;
58 struct
59 {
60 IConsole *m_pConsole;
61 CDbConnectionPool::Mode m_Mode;
62 } m_Print;
63 } m_Ptr;
64
65 std::unique_ptr<const ISqlData> m_pThreadData;
66 const char *m_pName;
67};
68
69CSqlExecData::CSqlExecData(
70 CDbConnectionPool::FRead pFunc,
71 std::unique_ptr<const ISqlData> pThreadData,
72 const char *pName) :
73 m_Mode(READ_ACCESS),
74 m_pThreadData(std::move(pThreadData)),
75 m_pName(pName)
76{
77 m_Ptr.m_pReadFunc = pFunc;
78}
79
80CSqlExecData::CSqlExecData(
81 CDbConnectionPool::FWrite pFunc,
82 std::unique_ptr<const ISqlData> pThreadData,
83 const char *pName) :
84 m_Mode(WRITE_ACCESS),
85 m_pThreadData(std::move(pThreadData)),
86 m_pName(pName)
87{
88 m_Ptr.m_pWriteFunc = pFunc;
89}
90
91CSqlExecData::CSqlExecData(
92 CDbConnectionPool::Mode m,
93 const char aFileName[64]) :
94 m_Mode(ADD_SQLITE),
95 m_pThreadData(nullptr),
96 m_pName("add sqlite server")
97{
98 m_Ptr.m_Sqlite.m_Mode = m;
99 mem_copy(dest: m_Ptr.m_Sqlite.m_FileName, source: aFileName, size: sizeof(m_Ptr.m_Sqlite.m_FileName));
100}
101CSqlExecData::CSqlExecData(CDbConnectionPool::Mode m,
102 const CMysqlConfig *pMysqlConfig) :
103 m_Mode(ADD_MYSQL),
104 m_pThreadData(nullptr),
105 m_pName("add mysql server")
106{
107 m_Ptr.m_Mysql.m_Mode = m;
108 mem_copy(dest: &m_Ptr.m_Mysql.m_Config, source: pMysqlConfig, size: sizeof(m_Ptr.m_Mysql.m_Config));
109}
110
111CSqlExecData::CSqlExecData(IConsole *pConsole, CDbConnectionPool::Mode m) :
112 m_Mode(PRINT),
113 m_pThreadData(nullptr),
114 m_pName("print database server")
115{
116 m_Ptr.m_Print.m_pConsole = pConsole;
117 m_Ptr.m_Print.m_Mode = m;
118}
119
120void CDbConnectionPool::Print(IConsole *pConsole, Mode DatabaseMode)
121{
122 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pConsole, args&: DatabaseMode);
123 m_InsertIdx %= std::size(m_pShared->m_aQueries);
124 m_pShared->m_NumBackup.Signal();
125}
126
127void CDbConnectionPool::RegisterSqliteDatabase(Mode DatabaseMode, const char aFileName[64])
128{
129 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: DatabaseMode, args&: aFileName);
130 m_InsertIdx %= std::size(m_pShared->m_aQueries);
131 m_pShared->m_NumBackup.Signal();
132}
133
134void CDbConnectionPool::RegisterMysqlDatabase(Mode DatabaseMode, const CMysqlConfig *pMysqlConfig)
135{
136 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: DatabaseMode, args&: pMysqlConfig);
137 m_InsertIdx %= std::size(m_pShared->m_aQueries);
138 m_pShared->m_NumBackup.Signal();
139}
140
141void CDbConnectionPool::Execute(
142 FRead pFunc,
143 std::unique_ptr<const ISqlData> pSqlRequestData,
144 const char *pName)
145{
146 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pFunc, args: std::move(pSqlRequestData), args&: pName);
147 m_InsertIdx %= std::size(m_pShared->m_aQueries);
148 m_pShared->m_NumBackup.Signal();
149}
150
151void CDbConnectionPool::ExecuteWrite(
152 FWrite pFunc,
153 std::unique_ptr<const ISqlData> pSqlRequestData,
154 const char *pName)
155{
156 m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pFunc, args: std::move(pSqlRequestData), args&: pName);
157 m_InsertIdx %= std::size(m_pShared->m_aQueries);
158 m_pShared->m_NumBackup.Signal();
159}
160
161void CDbConnectionPool::OnShutdown()
162{
163 if(m_Shutdown)
164 return;
165 m_Shutdown = true;
166 m_pShared->m_Shutdown.store(i: true);
167 m_pShared->m_NumBackup.Signal();
168 int i = 0;
169 while(m_pShared->m_Shutdown.load())
170 {
171 // print a log about every two seconds
172 if(i % 20 == 0 && i > 0)
173 {
174 dbg_msg(sys: "sql", fmt: "Waiting for score threads to complete (%ds)", i / 10);
175 }
176 ++i;
177 std::this_thread::sleep_for(rtime: 100ms);
178 }
179}
180
181// The backup worker thread looks at write queries and stores them
182// in the sqlite database (WRITE_BACKUP). It skips over read queries.
183// After processing the query, it gets passed on to the Worker thread.
184// This is done to not loose ranks when the server shuts down before all
185// queries are executed on the mysql server
186class CBackup
187{
188public:
189 CBackup(std::shared_ptr<CDbConnectionPool::CSharedData> pShared) :
190 m_pShared(std::move(pShared)) {}
191 static void Start(void *pUser);
192
193private:
194 void ProcessQueries();
195
196 std::unique_ptr<IDbConnection> m_pWriteBackup;
197
198 std::shared_ptr<CDbConnectionPool::CSharedData> m_pShared;
199};
200
201/* static */
202void CBackup::Start(void *pUser)
203{
204 CBackup *pThis = (CBackup *)pUser;
205 pThis->ProcessQueries();
206 delete pThis;
207}
208
209void CBackup::ProcessQueries()
210{
211 for(int JobNum = 0;; JobNum++)
212 {
213 m_pShared->m_NumBackup.Wait();
214 CSqlExecData *pThreadData = m_pShared->m_aQueries[JobNum % std::size(m_pShared->m_aQueries)].get();
215
216 // work through all database jobs after OnShutdown is called before exiting the thread
217 if(pThreadData == nullptr)
218 {
219 m_pShared->m_NumWorker.Signal();
220 return;
221 }
222
223 if(pThreadData->m_Mode == CSqlExecData::ADD_SQLITE &&
224 pThreadData->m_Ptr.m_Sqlite.m_Mode == CDbConnectionPool::Mode::WRITE_BACKUP)
225 {
226 m_pWriteBackup = CreateSqliteConnection(pFilename: pThreadData->m_Ptr.m_Sqlite.m_FileName, Setup: true);
227 }
228 else if(pThreadData->m_Mode == CSqlExecData::WRITE_ACCESS && m_pWriteBackup.get())
229 {
230 bool Success = CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteBackup.get(), pData: pThreadData, w: Write::BACKUP_FIRST);
231 dbg_msg(sys: "sql", fmt: "[%i] %s done on write backup database, Success=%i", JobNum, pThreadData->m_pName, Success);
232 }
233 m_pShared->m_NumWorker.Signal();
234 }
235}
236
237// the worker threads executes queries on mysql or sqlite. If we write on
238// a mysql server and have a backup server configured, we'll remove the
239// entry from the backup server after completing it on the write server.
240// static void Worker(void *pUser);
241class CWorker
242{
243public:
244 CWorker(std::shared_ptr<CDbConnectionPool::CSharedData> pShared) :
245 m_pShared(std::move(pShared)) {}
246 static void Start(void *pUser);
247 void ProcessQueries();
248
249private:
250 void Print(IConsole *pConsole, CDbConnectionPool::Mode DatabaseMode);
251
252 // There are two possible configurations
253 // * sqlite mode: There exists exactly one READ and the same WRITE server
254 // with no WRITE_BACKUP server
255 // * mysql mode: there can exist multiple READ server, there must be at
256 // most one WRITE server. The WRITE server for all DDNet
257 // Servers must be the same (to counteract double loads).
258 // There may be one WRITE_BACKUP sqlite server.
259 // This variable should only change, before the worker threads
260 std::vector<std::unique_ptr<IDbConnection>> m_vpReadConnections;
261 std::unique_ptr<IDbConnection> m_pWriteConnection;
262 std::unique_ptr<IDbConnection> m_pWriteBackup;
263
264 std::shared_ptr<CDbConnectionPool::CSharedData> m_pShared;
265};
266
267/* static */
268void CWorker::Start(void *pUser)
269{
270 CWorker *pThis = (CWorker *)pUser;
271 pThis->ProcessQueries();
272 delete pThis;
273}
274
275void CWorker::ProcessQueries()
276{
277 // remember last working server and try to connect to it first
278 int ReadServer = 0;
279 // enter fail mode when a sql request fails, skip read request during it and
280 // write to the backup database until all requests are handled
281 bool FailMode = false;
282 for(int JobNum = 0;; JobNum++)
283 {
284 if(FailMode && m_pShared->m_NumWorker.GetApproximateValue() == 0)
285 {
286 FailMode = false;
287 }
288 m_pShared->m_NumWorker.Wait();
289 auto pThreadData = std::move(m_pShared->m_aQueries[JobNum % std::size(m_pShared->m_aQueries)]);
290 // work through all database jobs after OnShutdown is called before exiting the thread
291 if(pThreadData == nullptr)
292 {
293 m_pShared->m_Shutdown.store(i: false);
294 return;
295 }
296 bool Success = false;
297 switch(pThreadData->m_Mode)
298 {
299 case CSqlExecData::READ_ACCESS:
300 {
301 for(size_t i = 0; i < m_vpReadConnections.size(); i++)
302 {
303 if(m_pShared->m_Shutdown)
304 {
305 dbg_msg(sys: "sql", fmt: "[%i] %s dismissed read request during shutdown", JobNum, pThreadData->m_pName);
306 break;
307 }
308 if(FailMode)
309 {
310 dbg_msg(sys: "sql", fmt: "[%i] %s dismissed read request during FailMode", JobNum, pThreadData->m_pName);
311 break;
312 }
313 int CurServer = (ReadServer + i) % (int)m_vpReadConnections.size();
314 if(CDbConnectionPool::ExecSqlFunc(pConnection: m_vpReadConnections[CurServer].get(), pData: pThreadData.get(), w: Write::NORMAL))
315 {
316 ReadServer = CurServer;
317 dbg_msg(sys: "sql", fmt: "[%i] %s done on read database %d", JobNum, pThreadData->m_pName, CurServer);
318 Success = true;
319 break;
320 }
321 }
322 if(!Success)
323 {
324 FailMode = true;
325 }
326 }
327 break;
328 case CSqlExecData::WRITE_ACCESS:
329 {
330 if(m_pShared->m_Shutdown && m_pWriteBackup != nullptr)
331 {
332 dbg_msg(sys: "sql", fmt: "[%i] %s skipped to backup database during shutdown", JobNum, pThreadData->m_pName);
333 }
334 else if(FailMode && m_pWriteBackup != nullptr)
335 {
336 dbg_msg(sys: "sql", fmt: "[%i] %s skipped to backup database during FailMode", JobNum, pThreadData->m_pName);
337 }
338 else if(CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteConnection.get(), pData: pThreadData.get(), w: Write::NORMAL))
339 {
340 dbg_msg(sys: "sql", fmt: "[%i] %s done on write database", JobNum, pThreadData->m_pName);
341 Success = true;
342 }
343 // enter fail mode if not successful
344 FailMode = FailMode || !Success;
345 const Write w = Success ? Write::NORMAL_SUCCEEDED : Write::NORMAL_FAILED;
346 if(m_pWriteBackup && CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteBackup.get(), pData: pThreadData.get(), w))
347 {
348 dbg_msg(sys: "sql", fmt: "[%i] %s done move write on backup database to non-backup table", JobNum, pThreadData->m_pName);
349 Success = true;
350 }
351 }
352 break;
353 case CSqlExecData::ADD_MYSQL:
354 {
355 auto pMysql = CreateMysqlConnection(Config: pThreadData->m_Ptr.m_Mysql.m_Config);
356 switch(pThreadData->m_Ptr.m_Mysql.m_Mode)
357 {
358 case CDbConnectionPool::Mode::READ:
359 m_vpReadConnections.push_back(x: std::move(pMysql));
360 break;
361 case CDbConnectionPool::Mode::WRITE:
362 m_pWriteConnection = std::move(pMysql);
363 break;
364 case CDbConnectionPool::Mode::WRITE_BACKUP:
365 m_pWriteBackup = std::move(pMysql);
366 break;
367 case CDbConnectionPool::Mode::NUM_MODES:
368 break;
369 }
370 Success = true;
371 break;
372 }
373 case CSqlExecData::ADD_SQLITE:
374 {
375 auto pSqlite = CreateSqliteConnection(pFilename: pThreadData->m_Ptr.m_Sqlite.m_FileName, Setup: true);
376 switch(pThreadData->m_Ptr.m_Sqlite.m_Mode)
377 {
378 case CDbConnectionPool::Mode::READ:
379 m_vpReadConnections.push_back(x: std::move(pSqlite));
380 break;
381 case CDbConnectionPool::Mode::WRITE:
382 m_pWriteConnection = std::move(pSqlite);
383 break;
384 case CDbConnectionPool::Mode::WRITE_BACKUP:
385 m_pWriteBackup = std::move(pSqlite);
386 break;
387 case CDbConnectionPool::Mode::NUM_MODES:
388 break;
389 }
390 Success = true;
391 break;
392 }
393 case CSqlExecData::PRINT:
394 Print(pConsole: pThreadData->m_Ptr.m_Print.m_pConsole, DatabaseMode: pThreadData->m_Ptr.m_Print.m_Mode);
395 Success = true;
396 break;
397 }
398 if(!Success)
399 dbg_msg(sys: "sql", fmt: "[%i] %s failed on all databases", JobNum, pThreadData->m_pName);
400 if(pThreadData->m_pThreadData != nullptr && pThreadData->m_pThreadData->m_pResult != nullptr)
401 {
402 pThreadData->m_pThreadData->m_pResult->m_Success = Success;
403 pThreadData->m_pThreadData->m_pResult->m_Completed.store(i: true);
404 }
405 }
406}
407
408void CWorker::Print(IConsole *pConsole, CDbConnectionPool::Mode DatabaseMode)
409{
410 if(DatabaseMode == CDbConnectionPool::Mode::READ)
411 {
412 for(auto &pReadConnection : m_vpReadConnections)
413 pReadConnection->Print(pConsole, pMode: "Read");
414 if(m_vpReadConnections.empty())
415 pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server", pStr: "There are no read databases");
416 }
417 else if(DatabaseMode == CDbConnectionPool::Mode::WRITE)
418 {
419 if(m_pWriteConnection)
420 m_pWriteConnection->Print(pConsole, pMode: "Write");
421 else
422 pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server", pStr: "There are no write databases");
423 }
424 else if(DatabaseMode == CDbConnectionPool::Mode::WRITE_BACKUP)
425 {
426 if(m_pWriteBackup)
427 m_pWriteBackup->Print(pConsole, pMode: "WriteBackup");
428 else
429 pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server", pStr: "There are no write backup databases");
430 }
431}
432
433/* static */
434bool CDbConnectionPool::ExecSqlFunc(IDbConnection *pConnection, CSqlExecData *pData, Write w)
435{
436 if(pConnection == nullptr)
437 {
438 dbg_msg(sys: "sql", fmt: "No database given");
439 return false;
440 }
441 char aError[256] = "unknown error";
442 if(pConnection->Connect(pError: aError, ErrorSize: sizeof(aError)))
443 {
444 dbg_msg(sys: "sql", fmt: "failed connecting to db: %s", aError);
445 return false;
446 }
447 bool Success = false;
448 switch(pData->m_Mode)
449 {
450 case CSqlExecData::READ_ACCESS:
451 Success = !pData->m_Ptr.m_pReadFunc(pConnection, pData->m_pThreadData.get(), aError, sizeof(aError));
452 break;
453 case CSqlExecData::WRITE_ACCESS:
454 Success = !pData->m_Ptr.m_pWriteFunc(pConnection, pData->m_pThreadData.get(), w, aError, sizeof(aError));
455 break;
456 default:
457 dbg_assert(false, "unreachable");
458 }
459 pConnection->Disconnect();
460 if(!Success)
461 {
462 dbg_msg(sys: "sql", fmt: "%s failed: %s", pData->m_pName, aError);
463 }
464 return Success;
465}
466
467CDbConnectionPool::CDbConnectionPool()
468{
469 m_pShared = std::make_shared<CSharedData>();
470 m_pWorkerThread = thread_init(threadfunc: CWorker::Start, user: new CWorker(m_pShared), name: "database worker thread");
471 m_pBackupThread = thread_init(threadfunc: CBackup::Start, user: new CBackup(m_pShared), name: "database backup worker thread");
472}
473
474CDbConnectionPool::~CDbConnectionPool()
475{
476 OnShutdown();
477 if(m_pWorkerThread)
478 thread_wait(thread: m_pWorkerThread);
479 if(m_pBackupThread)
480 thread_wait(thread: m_pBackupThread);
481}
482