1/* (c) Magnus Auvinen. See licence.txt in the root of the distribution for more information. */
2/* If you are missing that file, acquire a complete release at teeworlds.com. */
3#include "jobs.h"
4
5#include <algorithm>
6
7IJob::IJob() :
8 m_pNext(nullptr),
9 m_State(STATE_QUEUED),
10 m_Abortable(false)
11{
12}
13
14IJob::~IJob() = default;
15
16IJob::EJobState IJob::State() const
17{
18 return m_State;
19}
20
21bool IJob::Done() const
22{
23 EJobState State = m_State;
24 return State != STATE_QUEUED && State != STATE_RUNNING;
25}
26
27bool IJob::Abort()
28{
29 if(!IsAbortable())
30 return false;
31
32 m_State = STATE_ABORTED;
33 return true;
34}
35
36void IJob::Abortable(bool Abortable)
37{
38 m_Abortable = Abortable;
39}
40
41bool IJob::IsAbortable() const
42{
43 return m_Abortable;
44}
45
46CJobPool::CJobPool()
47{
48 m_Shutdown = true;
49}
50
51CJobPool::~CJobPool()
52{
53 if(!m_Shutdown)
54 {
55 Shutdown();
56 }
57}
58
59void CJobPool::WorkerThread(void *pUser)
60{
61 static_cast<CJobPool *>(pUser)->RunLoop();
62}
63
64void CJobPool::RunLoop()
65{
66 while(true)
67 {
68 // wait for job to become available
69 sphore_wait(sem: &m_Semaphore);
70
71 // fetch job from queue
72 std::shared_ptr<IJob> pJob = nullptr;
73 {
74 const CLockScope LockScope(m_Lock);
75 if(m_pFirstJob)
76 {
77 pJob = m_pFirstJob;
78 m_pFirstJob = m_pFirstJob->m_pNext;
79 // allow remaining objects in list to destruct, even when current object stays alive
80 pJob->m_pNext = nullptr;
81 if(!m_pFirstJob)
82 m_pLastJob = nullptr;
83 }
84 }
85
86 if(pJob)
87 {
88 IJob::EJobState OldStateQueued = IJob::STATE_QUEUED;
89 if(!pJob->m_State.compare_exchange_strong(e&: OldStateQueued, i: IJob::STATE_RUNNING))
90 {
91 if(OldStateQueued == IJob::STATE_ABORTED)
92 {
93 // job was aborted before it was started
94 pJob->m_State = IJob::STATE_ABORTED;
95 continue;
96 }
97 dbg_assert_failed("Job state invalid. Job was reused or uninitialized.");
98 }
99
100 // remember running jobs so we can abort them
101 {
102 const CLockScope LockScope(m_LockRunning);
103 m_RunningJobs.push_back(x: pJob);
104 }
105 pJob->Run();
106 {
107 const CLockScope LockScope(m_LockRunning);
108 m_RunningJobs.erase(position: std::find(first: m_RunningJobs.begin(), last: m_RunningJobs.end(), val: pJob));
109 }
110
111 // do not change state to done if job was not completed successfully
112 IJob::EJobState OldStateRunning = IJob::STATE_RUNNING;
113 if(!pJob->m_State.compare_exchange_strong(e&: OldStateRunning, i: IJob::STATE_DONE))
114 {
115 if(OldStateRunning != IJob::STATE_ABORTED)
116 {
117 dbg_assert_failed("Job state invalid, must be either running or aborted");
118 }
119 }
120 }
121 else if(m_Shutdown)
122 {
123 // shut down worker thread when pool is shutting down and no more jobs are left
124 break;
125 }
126 }
127}
128
129void CJobPool::Init(int NumThreads)
130{
131 dbg_assert(m_Shutdown, "Job pool already running");
132 m_Shutdown = false;
133
134 const CLockScope LockScope(m_Lock);
135 sphore_init(sem: &m_Semaphore);
136 m_pFirstJob = nullptr;
137 m_pLastJob = nullptr;
138
139 // start worker threads
140 char aName[16]; // unix kernel length limit
141 m_vpThreads.reserve(n: NumThreads);
142 for(int i = 0; i < NumThreads; i++)
143 {
144 str_format(buffer: aName, buffer_size: sizeof(aName), format: "CJobPool W%d", i);
145 m_vpThreads.push_back(x: thread_init(threadfunc: WorkerThread, user: this, name: aName));
146 }
147}
148
149void CJobPool::Shutdown()
150{
151 dbg_assert(!m_Shutdown, "Job pool already shut down");
152 m_Shutdown = true;
153
154 // abort queued jobs
155 {
156 const CLockScope LockScope(m_Lock);
157 std::shared_ptr<IJob> pJob = m_pFirstJob;
158 std::shared_ptr<IJob> pPrev = nullptr;
159 while(pJob != nullptr)
160 {
161 std::shared_ptr<IJob> pNext = pJob->m_pNext;
162 if(pJob->Abort())
163 {
164 // only remove abortable jobs from queue
165 pJob->m_pNext = nullptr;
166 if(pPrev)
167 {
168 pPrev->m_pNext = pNext;
169 }
170 else
171 {
172 m_pFirstJob = pNext;
173 }
174 }
175 else
176 {
177 pPrev = pJob;
178 }
179 pJob = pNext;
180 }
181 m_pLastJob = pPrev;
182 }
183
184 // abort running jobs
185 {
186 const CLockScope LockScope(m_LockRunning);
187 for(const std::shared_ptr<IJob> &pJob : m_RunningJobs)
188 {
189 pJob->Abort();
190 }
191 }
192
193 // wake up all worker threads
194 for(size_t i = 0; i < m_vpThreads.size(); i++)
195 {
196 sphore_signal(sem: &m_Semaphore);
197 }
198
199 // wait for all worker threads to finish
200 for(void *pThread : m_vpThreads)
201 {
202 thread_wait(thread: pThread);
203 }
204
205 m_vpThreads.clear();
206 sphore_destroy(sem: &m_Semaphore);
207}
208
209void CJobPool::Add(std::shared_ptr<IJob> pJob)
210{
211 if(m_Shutdown)
212 {
213 // no jobs are accepted when the job pool is already shutting down
214 pJob->Abort();
215 return;
216 }
217
218 // add job to queue
219 {
220 const CLockScope LockScope(m_Lock);
221 if(m_pLastJob)
222 m_pLastJob->m_pNext = pJob;
223 m_pLastJob = std::move(pJob);
224 if(!m_pFirstJob)
225 m_pFirstJob = m_pLastJob;
226 }
227
228 // signal a worker thread that a job is available
229 sphore_signal(sem: &m_Semaphore);
230}
231