1/* (c) Magnus Auvinen. See licence.txt in the root of the distribution for more information. */
2/* If you are missing that file, acquire a complete release at teeworlds.com. */
3#include "jobs.h"
4
5#include <base/dbg.h>
6#include <base/str.h>
7#include <base/thread.h>
8
9#include <algorithm>
10
11IJob::IJob() :
12 m_pNext(nullptr),
13 m_State(STATE_QUEUED),
14 m_Abortable(false)
15{
16}
17
18IJob::~IJob() = default;
19
20IJob::EJobState IJob::State() const
21{
22 return m_State;
23}
24
25bool IJob::Done() const
26{
27 EJobState State = m_State;
28 return State != STATE_QUEUED && State != STATE_RUNNING;
29}
30
31bool IJob::Abort()
32{
33 if(!IsAbortable())
34 return false;
35
36 m_State = STATE_ABORTED;
37 return true;
38}
39
40void IJob::Abortable(bool Abortable)
41{
42 m_Abortable = Abortable;
43}
44
45bool IJob::IsAbortable() const
46{
47 return m_Abortable;
48}
49
50CJobPool::CJobPool()
51{
52 m_Shutdown = true;
53}
54
55CJobPool::~CJobPool()
56{
57 if(!m_Shutdown)
58 {
59 Shutdown();
60 }
61}
62
63void CJobPool::WorkerThread(void *pUser)
64{
65 static_cast<CJobPool *>(pUser)->RunLoop();
66}
67
68void CJobPool::RunLoop()
69{
70 while(true)
71 {
72 // wait for job to become available
73 sphore_wait(sem: &m_Semaphore);
74
75 // fetch job from queue
76 std::shared_ptr<IJob> pJob = nullptr;
77 {
78 const CLockScope LockScope(m_Lock);
79 if(m_pFirstJob)
80 {
81 pJob = m_pFirstJob;
82 m_pFirstJob = m_pFirstJob->m_pNext;
83 // allow remaining objects in list to destruct, even when current object stays alive
84 pJob->m_pNext = nullptr;
85 if(!m_pFirstJob)
86 m_pLastJob = nullptr;
87 }
88 }
89
90 if(pJob)
91 {
92 IJob::EJobState OldStateQueued = IJob::STATE_QUEUED;
93 if(!pJob->m_State.compare_exchange_strong(e&: OldStateQueued, i: IJob::STATE_RUNNING))
94 {
95 if(OldStateQueued == IJob::STATE_ABORTED)
96 {
97 // job was aborted before it was started
98 pJob->m_State = IJob::STATE_ABORTED;
99 continue;
100 }
101 dbg_assert_failed("Job state invalid. Job was reused or uninitialized.");
102 }
103
104 // remember running jobs so we can abort them
105 {
106 const CLockScope LockScope(m_LockRunning);
107 m_RunningJobs.push_back(x: pJob);
108 }
109 pJob->Run();
110 {
111 const CLockScope LockScope(m_LockRunning);
112 m_RunningJobs.erase(position: std::find(first: m_RunningJobs.begin(), last: m_RunningJobs.end(), val: pJob));
113 }
114
115 // do not change state to done if job was not completed successfully
116 IJob::EJobState OldStateRunning = IJob::STATE_RUNNING;
117 if(!pJob->m_State.compare_exchange_strong(e&: OldStateRunning, i: IJob::STATE_DONE))
118 {
119 if(OldStateRunning != IJob::STATE_ABORTED)
120 {
121 dbg_assert_failed("Job state invalid, must be either running or aborted");
122 }
123 }
124 }
125 else if(m_Shutdown)
126 {
127 // shut down worker thread when pool is shutting down and no more jobs are left
128 break;
129 }
130 }
131}
132
133void CJobPool::Init(int NumThreads)
134{
135 dbg_assert(m_Shutdown, "Job pool already running");
136 m_Shutdown = false;
137
138 const CLockScope LockScope(m_Lock);
139 sphore_init(sem: &m_Semaphore);
140 m_pFirstJob = nullptr;
141 m_pLastJob = nullptr;
142
143 // start worker threads
144 char aName[16]; // unix kernel length limit
145 m_vpThreads.reserve(n: NumThreads);
146 for(int i = 0; i < NumThreads; i++)
147 {
148 str_format(buffer: aName, buffer_size: sizeof(aName), format: "CJobPool W%d", i);
149 m_vpThreads.push_back(x: thread_init(threadfunc: WorkerThread, user: this, name: aName));
150 }
151}
152
153void CJobPool::Shutdown()
154{
155 dbg_assert(!m_Shutdown, "Job pool already shut down");
156 m_Shutdown = true;
157
158 // abort queued jobs
159 {
160 const CLockScope LockScope(m_Lock);
161 std::shared_ptr<IJob> pJob = m_pFirstJob;
162 std::shared_ptr<IJob> pPrev = nullptr;
163 while(pJob != nullptr)
164 {
165 std::shared_ptr<IJob> pNext = pJob->m_pNext;
166 if(pJob->Abort())
167 {
168 // only remove abortable jobs from queue
169 pJob->m_pNext = nullptr;
170 if(pPrev)
171 {
172 pPrev->m_pNext = pNext;
173 }
174 else
175 {
176 m_pFirstJob = pNext;
177 }
178 }
179 else
180 {
181 pPrev = pJob;
182 }
183 pJob = pNext;
184 }
185 m_pLastJob = pPrev;
186 }
187
188 // abort running jobs
189 {
190 const CLockScope LockScope(m_LockRunning);
191 for(const std::shared_ptr<IJob> &pJob : m_RunningJobs)
192 {
193 pJob->Abort();
194 }
195 }
196
197 // wake up all worker threads
198 for(size_t i = 0; i < m_vpThreads.size(); i++)
199 {
200 sphore_signal(sem: &m_Semaphore);
201 }
202
203 // wait for all worker threads to finish
204 for(void *pThread : m_vpThreads)
205 {
206 thread_wait(thread: pThread);
207 }
208
209 m_vpThreads.clear();
210 sphore_destroy(sem: &m_Semaphore);
211}
212
213void CJobPool::Add(std::shared_ptr<IJob> pJob)
214{
215 if(m_Shutdown)
216 {
217 // no jobs are accepted when the job pool is already shutting down
218 pJob->Abort();
219 return;
220 }
221
222 // add job to queue
223 {
224 const CLockScope LockScope(m_Lock);
225 if(m_pLastJob)
226 m_pLastJob->m_pNext = pJob;
227 m_pLastJob = std::move(pJob);
228 if(!m_pFirstJob)
229 m_pFirstJob = m_pLastJob;
230 }
231
232 // signal a worker thread that a job is available
233 sphore_signal(sem: &m_Semaphore);
234}
235