1/* (c) Magnus Auvinen. See licence.txt in the root of the distribution for more information. */
2/* If you are missing that file, acquire a complete release at teeworlds.com. */
3#include "jobs.h"
4#include <algorithm>
5
6IJob::IJob() :
7 m_pNext(nullptr),
8 m_State(STATE_QUEUED),
9 m_Abortable(false)
10{
11}
12
13IJob::~IJob() = default;
14
15IJob::EJobState IJob::State() const
16{
17 return m_State;
18}
19
20bool IJob::Done() const
21{
22 EJobState State = m_State;
23 return State != STATE_QUEUED && State != STATE_RUNNING;
24}
25
26bool IJob::Abort()
27{
28 if(!IsAbortable())
29 return false;
30
31 m_State = STATE_ABORTED;
32 return true;
33}
34
35void IJob::Abortable(bool Abortable)
36{
37 m_Abortable = Abortable;
38}
39
40bool IJob::IsAbortable() const
41{
42 return m_Abortable;
43}
44
45CJobPool::CJobPool()
46{
47 m_Shutdown = true;
48}
49
50CJobPool::~CJobPool()
51{
52 if(!m_Shutdown)
53 {
54 Shutdown();
55 }
56}
57
58void CJobPool::WorkerThread(void *pUser)
59{
60 static_cast<CJobPool *>(pUser)->RunLoop();
61}
62
63void CJobPool::RunLoop()
64{
65 while(true)
66 {
67 // wait for job to become available
68 sphore_wait(sem: &m_Semaphore);
69
70 // fetch job from queue
71 std::shared_ptr<IJob> pJob = nullptr;
72 {
73 const CLockScope LockScope(m_Lock);
74 if(m_pFirstJob)
75 {
76 pJob = m_pFirstJob;
77 m_pFirstJob = m_pFirstJob->m_pNext;
78 // allow remaining objects in list to destruct, even when current object stays alive
79 pJob->m_pNext = nullptr;
80 if(!m_pFirstJob)
81 m_pLastJob = nullptr;
82 }
83 }
84
85 if(pJob)
86 {
87 IJob::EJobState OldStateQueued = IJob::STATE_QUEUED;
88 if(!pJob->m_State.compare_exchange_strong(e&: OldStateQueued, i: IJob::STATE_RUNNING))
89 {
90 if(OldStateQueued == IJob::STATE_ABORTED)
91 {
92 // job was aborted before it was started
93 pJob->m_State = IJob::STATE_ABORTED;
94 continue;
95 }
96 dbg_assert(false, "Job state invalid. Job was reused or uninitialized.");
97 dbg_break();
98 }
99
100 // remember running jobs so we can abort them
101 {
102 const CLockScope LockScope(m_LockRunning);
103 m_RunningJobs.push_back(x: pJob);
104 }
105 pJob->Run();
106 {
107 const CLockScope LockScope(m_LockRunning);
108 m_RunningJobs.erase(position: std::find(first: m_RunningJobs.begin(), last: m_RunningJobs.end(), val: pJob));
109 }
110
111 // do not change state to done if job was not completed successfully
112 IJob::EJobState OldStateRunning = IJob::STATE_RUNNING;
113 if(!pJob->m_State.compare_exchange_strong(e&: OldStateRunning, i: IJob::STATE_DONE))
114 {
115 if(OldStateRunning != IJob::STATE_ABORTED)
116 {
117 dbg_assert(false, "Job state invalid, must be either running or aborted");
118 }
119 }
120 }
121 else if(m_Shutdown)
122 {
123 // shut down worker thread when pool is shutting down and no more jobs are left
124 break;
125 }
126 }
127}
128
129void CJobPool::Init(int NumThreads)
130{
131 dbg_assert(m_Shutdown, "Job pool already running");
132 m_Shutdown = false;
133
134 const CLockScope LockScope(m_Lock);
135 sphore_init(sem: &m_Semaphore);
136 m_pFirstJob = nullptr;
137 m_pLastJob = nullptr;
138
139 // start worker threads
140 char aName[16]; // unix kernel length limit
141 m_vpThreads.reserve(n: NumThreads);
142 for(int i = 0; i < NumThreads; i++)
143 {
144 str_format(buffer: aName, buffer_size: sizeof(aName), format: "CJobPool W%d", i);
145 m_vpThreads.push_back(x: thread_init(threadfunc: WorkerThread, user: this, name: aName));
146 }
147}
148
149void CJobPool::Shutdown()
150{
151 dbg_assert(!m_Shutdown, "Job pool already shut down");
152 m_Shutdown = true;
153
154 // abort queued jobs
155 {
156 const CLockScope LockScope(m_Lock);
157 std::shared_ptr<IJob> pJob = m_pFirstJob;
158 std::shared_ptr<IJob> pPrev = nullptr;
159 while(pJob != nullptr)
160 {
161 std::shared_ptr<IJob> pNext = pJob->m_pNext;
162 if(pJob->Abort())
163 {
164 // only remove abortable jobs from queue
165 pJob->m_pNext = nullptr;
166 if(pPrev)
167 {
168 pPrev->m_pNext = pNext;
169 }
170 else
171 {
172 m_pFirstJob = pNext;
173 }
174 }
175 else
176 {
177 pPrev = pJob;
178 }
179 pJob = pNext;
180 }
181 m_pLastJob = pPrev;
182 }
183
184 // abort running jobs
185 {
186 const CLockScope LockScope(m_LockRunning);
187 for(const std::shared_ptr<IJob> &pJob : m_RunningJobs)
188 {
189 pJob->Abort();
190 }
191 }
192
193 // wake up all worker threads
194 for(size_t i = 0; i < m_vpThreads.size(); i++)
195 {
196 sphore_signal(sem: &m_Semaphore);
197 }
198
199 // wait for all worker threads to finish
200 for(void *pThread : m_vpThreads)
201 {
202 thread_wait(thread: pThread);
203 }
204
205 m_vpThreads.clear();
206 sphore_destroy(sem: &m_Semaphore);
207}
208
209void CJobPool::Add(std::shared_ptr<IJob> pJob)
210{
211 if(m_Shutdown)
212 {
213 // no jobs are accepted when the job pool is already shutting down
214 pJob->Abort();
215 return;
216 }
217
218 // add job to queue
219 {
220 const CLockScope LockScope(m_Lock);
221 if(m_pLastJob)
222 m_pLastJob->m_pNext = pJob;
223 m_pLastJob = std::move(pJob);
224 if(!m_pFirstJob)
225 m_pFirstJob = m_pLastJob;
226 }
227
228 // signal a worker thread that a job is available
229 sphore_signal(sem: &m_Semaphore);
230}
231