1 | #include "connection_pool.h" |
2 | #include "connection.h" |
3 | |
4 | #include <base/system.h> |
5 | #include <cstring> |
6 | #include <engine/console.h> |
7 | |
8 | #include <chrono> |
9 | #include <iterator> |
10 | #include <memory> |
11 | #include <thread> |
12 | #include <vector> |
13 | |
14 | using namespace std::chrono_literals; |
15 | |
16 | // helper struct to hold thread data |
17 | struct CSqlExecData |
18 | { |
19 | CSqlExecData( |
20 | CDbConnectionPool::FRead pFunc, |
21 | std::unique_ptr<const ISqlData> pThreadData, |
22 | const char *pName); |
23 | CSqlExecData( |
24 | CDbConnectionPool::FWrite pFunc, |
25 | std::unique_ptr<const ISqlData> pThreadData, |
26 | const char *pName); |
27 | CSqlExecData( |
28 | CDbConnectionPool::Mode m, |
29 | const char aFileName[64]); |
30 | CSqlExecData( |
31 | CDbConnectionPool::Mode m, |
32 | const CMysqlConfig *pMysqlConfig); |
33 | CSqlExecData(IConsole *pConsole, CDbConnectionPool::Mode m); |
34 | ~CSqlExecData() = default; |
35 | |
36 | enum |
37 | { |
38 | READ_ACCESS, |
39 | WRITE_ACCESS, |
40 | ADD_MYSQL, |
41 | ADD_SQLITE, |
42 | PRINT, |
43 | } m_Mode; |
44 | union |
45 | { |
46 | CDbConnectionPool::FRead m_pReadFunc; |
47 | CDbConnectionPool::FWrite m_pWriteFunc; |
48 | struct |
49 | { |
50 | CDbConnectionPool::Mode m_Mode; |
51 | CMysqlConfig m_Config; |
52 | } m_Mysql; |
53 | struct |
54 | { |
55 | CDbConnectionPool::Mode m_Mode; |
56 | char m_FileName[64]; |
57 | } m_Sqlite; |
58 | struct |
59 | { |
60 | IConsole *m_pConsole; |
61 | CDbConnectionPool::Mode m_Mode; |
62 | } m_Print; |
63 | } m_Ptr; |
64 | |
65 | std::unique_ptr<const ISqlData> m_pThreadData; |
66 | const char *m_pName; |
67 | }; |
68 | |
69 | CSqlExecData::CSqlExecData( |
70 | CDbConnectionPool::FRead pFunc, |
71 | std::unique_ptr<const ISqlData> pThreadData, |
72 | const char *pName) : |
73 | m_Mode(READ_ACCESS), |
74 | m_pThreadData(std::move(pThreadData)), |
75 | m_pName(pName) |
76 | { |
77 | m_Ptr.m_pReadFunc = pFunc; |
78 | } |
79 | |
80 | CSqlExecData::CSqlExecData( |
81 | CDbConnectionPool::FWrite pFunc, |
82 | std::unique_ptr<const ISqlData> pThreadData, |
83 | const char *pName) : |
84 | m_Mode(WRITE_ACCESS), |
85 | m_pThreadData(std::move(pThreadData)), |
86 | m_pName(pName) |
87 | { |
88 | m_Ptr.m_pWriteFunc = pFunc; |
89 | } |
90 | |
91 | CSqlExecData::CSqlExecData( |
92 | CDbConnectionPool::Mode m, |
93 | const char aFileName[64]) : |
94 | m_Mode(ADD_SQLITE), |
95 | m_pThreadData(nullptr), |
96 | m_pName("add sqlite server" ) |
97 | { |
98 | m_Ptr.m_Sqlite.m_Mode = m; |
99 | str_copy(dst&: m_Ptr.m_Sqlite.m_FileName, src: aFileName); |
100 | } |
101 | CSqlExecData::CSqlExecData(CDbConnectionPool::Mode m, |
102 | const CMysqlConfig *pMysqlConfig) : |
103 | m_Mode(ADD_MYSQL), |
104 | m_pThreadData(nullptr), |
105 | m_pName("add mysql server" ) |
106 | { |
107 | m_Ptr.m_Mysql.m_Mode = m; |
108 | mem_copy(dest: &m_Ptr.m_Mysql.m_Config, source: pMysqlConfig, size: sizeof(m_Ptr.m_Mysql.m_Config)); |
109 | } |
110 | |
111 | CSqlExecData::CSqlExecData(IConsole *pConsole, CDbConnectionPool::Mode m) : |
112 | m_Mode(PRINT), |
113 | m_pThreadData(nullptr), |
114 | m_pName("print database server" ) |
115 | { |
116 | m_Ptr.m_Print.m_pConsole = pConsole; |
117 | m_Ptr.m_Print.m_Mode = m; |
118 | } |
119 | |
120 | void CDbConnectionPool::Print(IConsole *pConsole, Mode DatabaseMode) |
121 | { |
122 | m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pConsole, args&: DatabaseMode); |
123 | m_InsertIdx %= std::size(m_pShared->m_aQueries); |
124 | m_pShared->m_NumBackup.Signal(); |
125 | } |
126 | |
127 | void CDbConnectionPool::RegisterSqliteDatabase(Mode DatabaseMode, const char aFileName[64]) |
128 | { |
129 | m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: DatabaseMode, args&: aFileName); |
130 | m_InsertIdx %= std::size(m_pShared->m_aQueries); |
131 | m_pShared->m_NumBackup.Signal(); |
132 | } |
133 | |
134 | void CDbConnectionPool::RegisterMysqlDatabase(Mode DatabaseMode, const CMysqlConfig *pMysqlConfig) |
135 | { |
136 | m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: DatabaseMode, args&: pMysqlConfig); |
137 | m_InsertIdx %= std::size(m_pShared->m_aQueries); |
138 | m_pShared->m_NumBackup.Signal(); |
139 | } |
140 | |
141 | void CDbConnectionPool::Execute( |
142 | FRead pFunc, |
143 | std::unique_ptr<const ISqlData> pSqlRequestData, |
144 | const char *pName) |
145 | { |
146 | m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pFunc, args: std::move(pSqlRequestData), args&: pName); |
147 | m_InsertIdx %= std::size(m_pShared->m_aQueries); |
148 | m_pShared->m_NumBackup.Signal(); |
149 | } |
150 | |
151 | void CDbConnectionPool::ExecuteWrite( |
152 | FWrite pFunc, |
153 | std::unique_ptr<const ISqlData> pSqlRequestData, |
154 | const char *pName) |
155 | { |
156 | m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(args&: pFunc, args: std::move(pSqlRequestData), args&: pName); |
157 | m_InsertIdx %= std::size(m_pShared->m_aQueries); |
158 | m_pShared->m_NumBackup.Signal(); |
159 | } |
160 | |
161 | void CDbConnectionPool::OnShutdown() |
162 | { |
163 | if(m_Shutdown) |
164 | return; |
165 | m_Shutdown = true; |
166 | m_pShared->m_Shutdown.store(i: true); |
167 | m_pShared->m_NumBackup.Signal(); |
168 | int i = 0; |
169 | while(m_pShared->m_Shutdown.load()) |
170 | { |
171 | // print a log about every two seconds |
172 | if(i % 20 == 0 && i > 0) |
173 | { |
174 | dbg_msg(sys: "sql" , fmt: "Waiting for score threads to complete (%ds)" , i / 10); |
175 | } |
176 | ++i; |
177 | std::this_thread::sleep_for(rtime: 100ms); |
178 | } |
179 | } |
180 | |
181 | // The backup worker thread looks at write queries and stores them |
182 | // in the sqlite database (WRITE_BACKUP). It skips over read queries. |
183 | // After processing the query, it gets passed on to the Worker thread. |
184 | // This is done to not loose ranks when the server shuts down before all |
185 | // queries are executed on the mysql server |
186 | class CBackup |
187 | { |
188 | public: |
189 | CBackup(std::shared_ptr<CDbConnectionPool::CSharedData> pShared) : |
190 | m_pShared(std::move(pShared)) {} |
191 | static void Start(void *pUser); |
192 | |
193 | private: |
194 | void ProcessQueries(); |
195 | |
196 | std::unique_ptr<IDbConnection> m_pWriteBackup; |
197 | |
198 | std::shared_ptr<CDbConnectionPool::CSharedData> m_pShared; |
199 | }; |
200 | |
201 | /* static */ |
202 | void CBackup::Start(void *pUser) |
203 | { |
204 | CBackup *pThis = (CBackup *)pUser; |
205 | pThis->ProcessQueries(); |
206 | delete pThis; |
207 | } |
208 | |
209 | void CBackup::ProcessQueries() |
210 | { |
211 | for(int JobNum = 0;; JobNum++) |
212 | { |
213 | m_pShared->m_NumBackup.Wait(); |
214 | CSqlExecData *pThreadData = m_pShared->m_aQueries[JobNum % std::size(m_pShared->m_aQueries)].get(); |
215 | |
216 | // work through all database jobs after OnShutdown is called before exiting the thread |
217 | if(pThreadData == nullptr) |
218 | { |
219 | m_pShared->m_NumWorker.Signal(); |
220 | return; |
221 | } |
222 | |
223 | if(pThreadData->m_Mode == CSqlExecData::ADD_SQLITE && |
224 | pThreadData->m_Ptr.m_Sqlite.m_Mode == CDbConnectionPool::Mode::WRITE_BACKUP) |
225 | { |
226 | m_pWriteBackup = CreateSqliteConnection(pFilename: pThreadData->m_Ptr.m_Sqlite.m_FileName, Setup: true); |
227 | } |
228 | else if(pThreadData->m_Mode == CSqlExecData::WRITE_ACCESS && m_pWriteBackup.get()) |
229 | { |
230 | bool Success = CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteBackup.get(), pData: pThreadData, w: Write::BACKUP_FIRST); |
231 | dbg_msg(sys: "sql" , fmt: "[%i] %s done on write backup database, Success=%i" , JobNum, pThreadData->m_pName, Success); |
232 | } |
233 | m_pShared->m_NumWorker.Signal(); |
234 | } |
235 | } |
236 | |
237 | // the worker threads executes queries on mysql or sqlite. If we write on |
238 | // a mysql server and have a backup server configured, we'll remove the |
239 | // entry from the backup server after completing it on the write server. |
240 | // static void Worker(void *pUser); |
241 | class CWorker |
242 | { |
243 | public: |
244 | CWorker(std::shared_ptr<CDbConnectionPool::CSharedData> pShared) : |
245 | m_pShared(std::move(pShared)) {} |
246 | static void Start(void *pUser); |
247 | void ProcessQueries(); |
248 | |
249 | private: |
250 | void Print(IConsole *pConsole, CDbConnectionPool::Mode DatabaseMode); |
251 | |
252 | // There are two possible configurations |
253 | // * sqlite mode: There exists exactly one READ and the same WRITE server |
254 | // with no WRITE_BACKUP server |
255 | // * mysql mode: there can exist multiple READ server, there must be at |
256 | // most one WRITE server. The WRITE server for all DDNet |
257 | // Servers must be the same (to counteract double loads). |
258 | // There may be one WRITE_BACKUP sqlite server. |
259 | // This variable should only change, before the worker threads |
260 | std::vector<std::unique_ptr<IDbConnection>> m_vpReadConnections; |
261 | std::unique_ptr<IDbConnection> m_pWriteConnection; |
262 | std::unique_ptr<IDbConnection> m_pWriteBackup; |
263 | |
264 | std::shared_ptr<CDbConnectionPool::CSharedData> m_pShared; |
265 | }; |
266 | |
267 | /* static */ |
268 | void CWorker::Start(void *pUser) |
269 | { |
270 | CWorker *pThis = (CWorker *)pUser; |
271 | pThis->ProcessQueries(); |
272 | delete pThis; |
273 | } |
274 | |
275 | void CWorker::ProcessQueries() |
276 | { |
277 | // remember last working server and try to connect to it first |
278 | int ReadServer = 0; |
279 | // enter fail mode when a sql request fails, skip read request during it and |
280 | // write to the backup database until all requests are handled |
281 | bool FailMode = false; |
282 | for(int JobNum = 0;; JobNum++) |
283 | { |
284 | if(FailMode && m_pShared->m_NumWorker.GetApproximateValue() == 0) |
285 | { |
286 | FailMode = false; |
287 | } |
288 | m_pShared->m_NumWorker.Wait(); |
289 | auto pThreadData = std::move(m_pShared->m_aQueries[JobNum % std::size(m_pShared->m_aQueries)]); |
290 | // work through all database jobs after OnShutdown is called before exiting the thread |
291 | if(pThreadData == nullptr) |
292 | { |
293 | m_pShared->m_Shutdown.store(i: false); |
294 | return; |
295 | } |
296 | bool Success = false; |
297 | switch(pThreadData->m_Mode) |
298 | { |
299 | case CSqlExecData::READ_ACCESS: |
300 | { |
301 | for(size_t i = 0; i < m_vpReadConnections.size(); i++) |
302 | { |
303 | if(m_pShared->m_Shutdown) |
304 | { |
305 | dbg_msg(sys: "sql" , fmt: "[%i] %s dismissed read request during shutdown" , JobNum, pThreadData->m_pName); |
306 | break; |
307 | } |
308 | if(FailMode) |
309 | { |
310 | dbg_msg(sys: "sql" , fmt: "[%i] %s dismissed read request during FailMode" , JobNum, pThreadData->m_pName); |
311 | break; |
312 | } |
313 | int CurServer = (ReadServer + i) % (int)m_vpReadConnections.size(); |
314 | if(CDbConnectionPool::ExecSqlFunc(pConnection: m_vpReadConnections[CurServer].get(), pData: pThreadData.get(), w: Write::NORMAL)) |
315 | { |
316 | ReadServer = CurServer; |
317 | dbg_msg(sys: "sql" , fmt: "[%i] %s done on read database %d" , JobNum, pThreadData->m_pName, CurServer); |
318 | Success = true; |
319 | break; |
320 | } |
321 | } |
322 | if(!Success) |
323 | { |
324 | FailMode = true; |
325 | } |
326 | } |
327 | break; |
328 | case CSqlExecData::WRITE_ACCESS: |
329 | { |
330 | if(m_pShared->m_Shutdown && m_pWriteBackup != nullptr) |
331 | { |
332 | dbg_msg(sys: "sql" , fmt: "[%i] %s skipped to backup database during shutdown" , JobNum, pThreadData->m_pName); |
333 | } |
334 | else if(FailMode && m_pWriteBackup != nullptr) |
335 | { |
336 | dbg_msg(sys: "sql" , fmt: "[%i] %s skipped to backup database during FailMode" , JobNum, pThreadData->m_pName); |
337 | } |
338 | else if(CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteConnection.get(), pData: pThreadData.get(), w: Write::NORMAL)) |
339 | { |
340 | dbg_msg(sys: "sql" , fmt: "[%i] %s done on write database" , JobNum, pThreadData->m_pName); |
341 | Success = true; |
342 | } |
343 | // enter fail mode if not successful |
344 | FailMode = FailMode || !Success; |
345 | const Write w = Success ? Write::NORMAL_SUCCEEDED : Write::NORMAL_FAILED; |
346 | if(m_pWriteBackup && CDbConnectionPool::ExecSqlFunc(pConnection: m_pWriteBackup.get(), pData: pThreadData.get(), w)) |
347 | { |
348 | dbg_msg(sys: "sql" , fmt: "[%i] %s done move write on backup database to non-backup table" , JobNum, pThreadData->m_pName); |
349 | Success = true; |
350 | } |
351 | } |
352 | break; |
353 | case CSqlExecData::ADD_MYSQL: |
354 | { |
355 | auto pMysql = CreateMysqlConnection(Config: pThreadData->m_Ptr.m_Mysql.m_Config); |
356 | switch(pThreadData->m_Ptr.m_Mysql.m_Mode) |
357 | { |
358 | case CDbConnectionPool::Mode::READ: |
359 | m_vpReadConnections.push_back(x: std::move(pMysql)); |
360 | break; |
361 | case CDbConnectionPool::Mode::WRITE: |
362 | m_pWriteConnection = std::move(pMysql); |
363 | break; |
364 | case CDbConnectionPool::Mode::WRITE_BACKUP: |
365 | m_pWriteBackup = std::move(pMysql); |
366 | break; |
367 | case CDbConnectionPool::Mode::NUM_MODES: |
368 | break; |
369 | } |
370 | Success = true; |
371 | break; |
372 | } |
373 | case CSqlExecData::ADD_SQLITE: |
374 | { |
375 | auto pSqlite = CreateSqliteConnection(pFilename: pThreadData->m_Ptr.m_Sqlite.m_FileName, Setup: true); |
376 | switch(pThreadData->m_Ptr.m_Sqlite.m_Mode) |
377 | { |
378 | case CDbConnectionPool::Mode::READ: |
379 | m_vpReadConnections.push_back(x: std::move(pSqlite)); |
380 | break; |
381 | case CDbConnectionPool::Mode::WRITE: |
382 | m_pWriteConnection = std::move(pSqlite); |
383 | break; |
384 | case CDbConnectionPool::Mode::WRITE_BACKUP: |
385 | m_pWriteBackup = std::move(pSqlite); |
386 | break; |
387 | case CDbConnectionPool::Mode::NUM_MODES: |
388 | break; |
389 | } |
390 | Success = true; |
391 | break; |
392 | } |
393 | case CSqlExecData::PRINT: |
394 | Print(pConsole: pThreadData->m_Ptr.m_Print.m_pConsole, DatabaseMode: pThreadData->m_Ptr.m_Print.m_Mode); |
395 | Success = true; |
396 | break; |
397 | } |
398 | if(!Success) |
399 | dbg_msg(sys: "sql" , fmt: "[%i] %s failed on all databases" , JobNum, pThreadData->m_pName); |
400 | if(pThreadData->m_pThreadData != nullptr && pThreadData->m_pThreadData->m_pResult != nullptr) |
401 | { |
402 | pThreadData->m_pThreadData->m_pResult->m_Success = Success; |
403 | pThreadData->m_pThreadData->m_pResult->m_Completed.store(i: true); |
404 | } |
405 | } |
406 | } |
407 | |
408 | void CWorker::Print(IConsole *pConsole, CDbConnectionPool::Mode DatabaseMode) |
409 | { |
410 | if(DatabaseMode == CDbConnectionPool::Mode::READ) |
411 | { |
412 | for(auto &pReadConnection : m_vpReadConnections) |
413 | pReadConnection->Print(pConsole, pMode: "Read" ); |
414 | if(m_vpReadConnections.empty()) |
415 | pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server" , pStr: "There are no read databases" ); |
416 | } |
417 | else if(DatabaseMode == CDbConnectionPool::Mode::WRITE) |
418 | { |
419 | if(m_pWriteConnection) |
420 | m_pWriteConnection->Print(pConsole, pMode: "Write" ); |
421 | else |
422 | pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server" , pStr: "There are no write databases" ); |
423 | } |
424 | else if(DatabaseMode == CDbConnectionPool::Mode::WRITE_BACKUP) |
425 | { |
426 | if(m_pWriteBackup) |
427 | m_pWriteBackup->Print(pConsole, pMode: "WriteBackup" ); |
428 | else |
429 | pConsole->Print(Level: IConsole::OUTPUT_LEVEL_STANDARD, pFrom: "server" , pStr: "There are no write backup databases" ); |
430 | } |
431 | } |
432 | |
433 | /* static */ |
434 | bool CDbConnectionPool::ExecSqlFunc(IDbConnection *pConnection, CSqlExecData *pData, Write w) |
435 | { |
436 | if(pConnection == nullptr) |
437 | { |
438 | dbg_msg(sys: "sql" , fmt: "No database given" ); |
439 | return false; |
440 | } |
441 | char aError[256] = "unknown error" ; |
442 | if(pConnection->Connect(pError: aError, ErrorSize: sizeof(aError))) |
443 | { |
444 | dbg_msg(sys: "sql" , fmt: "failed connecting to db: %s" , aError); |
445 | return false; |
446 | } |
447 | bool Success = false; |
448 | switch(pData->m_Mode) |
449 | { |
450 | case CSqlExecData::READ_ACCESS: |
451 | Success = !pData->m_Ptr.m_pReadFunc(pConnection, pData->m_pThreadData.get(), aError, sizeof(aError)); |
452 | break; |
453 | case CSqlExecData::WRITE_ACCESS: |
454 | Success = !pData->m_Ptr.m_pWriteFunc(pConnection, pData->m_pThreadData.get(), w, aError, sizeof(aError)); |
455 | break; |
456 | default: |
457 | dbg_assert(false, "unreachable" ); |
458 | } |
459 | pConnection->Disconnect(); |
460 | if(!Success) |
461 | { |
462 | dbg_msg(sys: "sql" , fmt: "%s failed: %s" , pData->m_pName, aError); |
463 | } |
464 | return Success; |
465 | } |
466 | |
467 | CDbConnectionPool::CDbConnectionPool() |
468 | { |
469 | m_pShared = std::make_shared<CSharedData>(); |
470 | m_pWorkerThread = thread_init(threadfunc: CWorker::Start, user: new CWorker(m_pShared), name: "database worker thread" ); |
471 | m_pBackupThread = thread_init(threadfunc: CBackup::Start, user: new CBackup(m_pShared), name: "database backup worker thread" ); |
472 | } |
473 | |
474 | CDbConnectionPool::~CDbConnectionPool() |
475 | { |
476 | OnShutdown(); |
477 | if(m_pWorkerThread) |
478 | thread_wait(thread: m_pWorkerThread); |
479 | if(m_pBackupThread) |
480 | thread_wait(thread: m_pBackupThread); |
481 | } |
482 | |