1/* (c) Magnus Auvinen. See licence.txt in the root of the distribution for more information. */
2/* If you are missing that file, acquire a complete release at teeworlds.com. */
3#include "jobs.h"
4
5#include <base/thread.h>
6
7#include <algorithm>
8
9IJob::IJob() :
10 m_pNext(nullptr),
11 m_State(STATE_QUEUED),
12 m_Abortable(false)
13{
14}
15
16IJob::~IJob() = default;
17
18IJob::EJobState IJob::State() const
19{
20 return m_State;
21}
22
23bool IJob::Done() const
24{
25 EJobState State = m_State;
26 return State != STATE_QUEUED && State != STATE_RUNNING;
27}
28
29bool IJob::Abort()
30{
31 if(!IsAbortable())
32 return false;
33
34 m_State = STATE_ABORTED;
35 return true;
36}
37
38void IJob::Abortable(bool Abortable)
39{
40 m_Abortable = Abortable;
41}
42
43bool IJob::IsAbortable() const
44{
45 return m_Abortable;
46}
47
48CJobPool::CJobPool()
49{
50 m_Shutdown = true;
51}
52
53CJobPool::~CJobPool()
54{
55 if(!m_Shutdown)
56 {
57 Shutdown();
58 }
59}
60
61void CJobPool::WorkerThread(void *pUser)
62{
63 static_cast<CJobPool *>(pUser)->RunLoop();
64}
65
66void CJobPool::RunLoop()
67{
68 while(true)
69 {
70 // wait for job to become available
71 sphore_wait(sem: &m_Semaphore);
72
73 // fetch job from queue
74 std::shared_ptr<IJob> pJob = nullptr;
75 {
76 const CLockScope LockScope(m_Lock);
77 if(m_pFirstJob)
78 {
79 pJob = m_pFirstJob;
80 m_pFirstJob = m_pFirstJob->m_pNext;
81 // allow remaining objects in list to destruct, even when current object stays alive
82 pJob->m_pNext = nullptr;
83 if(!m_pFirstJob)
84 m_pLastJob = nullptr;
85 }
86 }
87
88 if(pJob)
89 {
90 IJob::EJobState OldStateQueued = IJob::STATE_QUEUED;
91 if(!pJob->m_State.compare_exchange_strong(e&: OldStateQueued, i: IJob::STATE_RUNNING))
92 {
93 if(OldStateQueued == IJob::STATE_ABORTED)
94 {
95 // job was aborted before it was started
96 pJob->m_State = IJob::STATE_ABORTED;
97 continue;
98 }
99 dbg_assert_failed("Job state invalid. Job was reused or uninitialized.");
100 }
101
102 // remember running jobs so we can abort them
103 {
104 const CLockScope LockScope(m_LockRunning);
105 m_RunningJobs.push_back(x: pJob);
106 }
107 pJob->Run();
108 {
109 const CLockScope LockScope(m_LockRunning);
110 m_RunningJobs.erase(position: std::find(first: m_RunningJobs.begin(), last: m_RunningJobs.end(), val: pJob));
111 }
112
113 // do not change state to done if job was not completed successfully
114 IJob::EJobState OldStateRunning = IJob::STATE_RUNNING;
115 if(!pJob->m_State.compare_exchange_strong(e&: OldStateRunning, i: IJob::STATE_DONE))
116 {
117 if(OldStateRunning != IJob::STATE_ABORTED)
118 {
119 dbg_assert_failed("Job state invalid, must be either running or aborted");
120 }
121 }
122 }
123 else if(m_Shutdown)
124 {
125 // shut down worker thread when pool is shutting down and no more jobs are left
126 break;
127 }
128 }
129}
130
131void CJobPool::Init(int NumThreads)
132{
133 dbg_assert(m_Shutdown, "Job pool already running");
134 m_Shutdown = false;
135
136 const CLockScope LockScope(m_Lock);
137 sphore_init(sem: &m_Semaphore);
138 m_pFirstJob = nullptr;
139 m_pLastJob = nullptr;
140
141 // start worker threads
142 char aName[16]; // unix kernel length limit
143 m_vpThreads.reserve(n: NumThreads);
144 for(int i = 0; i < NumThreads; i++)
145 {
146 str_format(buffer: aName, buffer_size: sizeof(aName), format: "CJobPool W%d", i);
147 m_vpThreads.push_back(x: thread_init(threadfunc: WorkerThread, user: this, name: aName));
148 }
149}
150
151void CJobPool::Shutdown()
152{
153 dbg_assert(!m_Shutdown, "Job pool already shut down");
154 m_Shutdown = true;
155
156 // abort queued jobs
157 {
158 const CLockScope LockScope(m_Lock);
159 std::shared_ptr<IJob> pJob = m_pFirstJob;
160 std::shared_ptr<IJob> pPrev = nullptr;
161 while(pJob != nullptr)
162 {
163 std::shared_ptr<IJob> pNext = pJob->m_pNext;
164 if(pJob->Abort())
165 {
166 // only remove abortable jobs from queue
167 pJob->m_pNext = nullptr;
168 if(pPrev)
169 {
170 pPrev->m_pNext = pNext;
171 }
172 else
173 {
174 m_pFirstJob = pNext;
175 }
176 }
177 else
178 {
179 pPrev = pJob;
180 }
181 pJob = pNext;
182 }
183 m_pLastJob = pPrev;
184 }
185
186 // abort running jobs
187 {
188 const CLockScope LockScope(m_LockRunning);
189 for(const std::shared_ptr<IJob> &pJob : m_RunningJobs)
190 {
191 pJob->Abort();
192 }
193 }
194
195 // wake up all worker threads
196 for(size_t i = 0; i < m_vpThreads.size(); i++)
197 {
198 sphore_signal(sem: &m_Semaphore);
199 }
200
201 // wait for all worker threads to finish
202 for(void *pThread : m_vpThreads)
203 {
204 thread_wait(thread: pThread);
205 }
206
207 m_vpThreads.clear();
208 sphore_destroy(sem: &m_Semaphore);
209}
210
211void CJobPool::Add(std::shared_ptr<IJob> pJob)
212{
213 if(m_Shutdown)
214 {
215 // no jobs are accepted when the job pool is already shutting down
216 pJob->Abort();
217 return;
218 }
219
220 // add job to queue
221 {
222 const CLockScope LockScope(m_Lock);
223 if(m_pLastJob)
224 m_pLastJob->m_pNext = pJob;
225 m_pLastJob = std::move(pJob);
226 if(!m_pFirstJob)
227 m_pFirstJob = m_pLastJob;
228 }
229
230 // signal a worker thread that a job is available
231 sphore_signal(sem: &m_Semaphore);
232}
233