1 | /* (c) Magnus Auvinen. See licence.txt in the root of the distribution for more information. */ |
2 | /* If you are missing that file, acquire a complete release at teeworlds.com. */ |
3 | #include "jobs.h" |
4 | #include <algorithm> |
5 | |
6 | IJob::IJob() : |
7 | m_pNext(nullptr), |
8 | m_State(STATE_QUEUED), |
9 | m_Abortable(false) |
10 | { |
11 | } |
12 | |
13 | IJob::~IJob() = default; |
14 | |
15 | IJob::EJobState IJob::State() const |
16 | { |
17 | return m_State; |
18 | } |
19 | |
20 | bool IJob::Done() const |
21 | { |
22 | EJobState State = m_State; |
23 | return State != STATE_QUEUED && State != STATE_RUNNING; |
24 | } |
25 | |
26 | bool IJob::Abort() |
27 | { |
28 | if(!IsAbortable()) |
29 | return false; |
30 | |
31 | m_State = STATE_ABORTED; |
32 | return true; |
33 | } |
34 | |
35 | void IJob::Abortable(bool Abortable) |
36 | { |
37 | m_Abortable = Abortable; |
38 | } |
39 | |
40 | bool IJob::IsAbortable() const |
41 | { |
42 | return m_Abortable; |
43 | } |
44 | |
45 | CJobPool::CJobPool() |
46 | { |
47 | m_Shutdown = true; |
48 | } |
49 | |
50 | CJobPool::~CJobPool() |
51 | { |
52 | if(!m_Shutdown) |
53 | { |
54 | Shutdown(); |
55 | } |
56 | } |
57 | |
58 | void CJobPool::WorkerThread(void *pUser) |
59 | { |
60 | static_cast<CJobPool *>(pUser)->RunLoop(); |
61 | } |
62 | |
63 | void CJobPool::RunLoop() |
64 | { |
65 | while(true) |
66 | { |
67 | // wait for job to become available |
68 | sphore_wait(sem: &m_Semaphore); |
69 | |
70 | // fetch job from queue |
71 | std::shared_ptr<IJob> pJob = nullptr; |
72 | { |
73 | const CLockScope LockScope(m_Lock); |
74 | if(m_pFirstJob) |
75 | { |
76 | pJob = m_pFirstJob; |
77 | m_pFirstJob = m_pFirstJob->m_pNext; |
78 | // allow remaining objects in list to destruct, even when current object stays alive |
79 | pJob->m_pNext = nullptr; |
80 | if(!m_pFirstJob) |
81 | m_pLastJob = nullptr; |
82 | } |
83 | } |
84 | |
85 | if(pJob) |
86 | { |
87 | IJob::EJobState OldStateQueued = IJob::STATE_QUEUED; |
88 | if(!pJob->m_State.compare_exchange_strong(e&: OldStateQueued, i: IJob::STATE_RUNNING)) |
89 | { |
90 | if(OldStateQueued == IJob::STATE_ABORTED) |
91 | { |
92 | // job was aborted before it was started |
93 | pJob->m_State = IJob::STATE_ABORTED; |
94 | continue; |
95 | } |
96 | dbg_assert(false, "Job state invalid. Job was reused or uninitialized." ); |
97 | dbg_break(); |
98 | } |
99 | |
100 | // remember running jobs so we can abort them |
101 | { |
102 | const CLockScope LockScope(m_LockRunning); |
103 | m_RunningJobs.push_back(x: pJob); |
104 | } |
105 | pJob->Run(); |
106 | { |
107 | const CLockScope LockScope(m_LockRunning); |
108 | m_RunningJobs.erase(position: std::find(first: m_RunningJobs.begin(), last: m_RunningJobs.end(), val: pJob)); |
109 | } |
110 | |
111 | // do not change state to done if job was not completed successfully |
112 | IJob::EJobState OldStateRunning = IJob::STATE_RUNNING; |
113 | if(!pJob->m_State.compare_exchange_strong(e&: OldStateRunning, i: IJob::STATE_DONE)) |
114 | { |
115 | if(OldStateRunning != IJob::STATE_ABORTED) |
116 | { |
117 | dbg_assert(false, "Job state invalid, must be either running or aborted" ); |
118 | } |
119 | } |
120 | } |
121 | else if(m_Shutdown) |
122 | { |
123 | // shut down worker thread when pool is shutting down and no more jobs are left |
124 | break; |
125 | } |
126 | } |
127 | } |
128 | |
129 | void CJobPool::Init(int NumThreads) |
130 | { |
131 | dbg_assert(m_Shutdown, "Job pool already running" ); |
132 | m_Shutdown = false; |
133 | |
134 | const CLockScope LockScope(m_Lock); |
135 | sphore_init(sem: &m_Semaphore); |
136 | m_pFirstJob = nullptr; |
137 | m_pLastJob = nullptr; |
138 | |
139 | // start worker threads |
140 | char aName[16]; // unix kernel length limit |
141 | m_vpThreads.reserve(n: NumThreads); |
142 | for(int i = 0; i < NumThreads; i++) |
143 | { |
144 | str_format(buffer: aName, buffer_size: sizeof(aName), format: "CJobPool W%d" , i); |
145 | m_vpThreads.push_back(x: thread_init(threadfunc: WorkerThread, user: this, name: aName)); |
146 | } |
147 | } |
148 | |
149 | void CJobPool::Shutdown() |
150 | { |
151 | dbg_assert(!m_Shutdown, "Job pool already shut down" ); |
152 | m_Shutdown = true; |
153 | |
154 | // abort queued jobs |
155 | { |
156 | const CLockScope LockScope(m_Lock); |
157 | std::shared_ptr<IJob> pJob = m_pFirstJob; |
158 | std::shared_ptr<IJob> pPrev = nullptr; |
159 | while(pJob != nullptr) |
160 | { |
161 | std::shared_ptr<IJob> pNext = pJob->m_pNext; |
162 | if(pJob->Abort()) |
163 | { |
164 | // only remove abortable jobs from queue |
165 | pJob->m_pNext = nullptr; |
166 | if(pPrev) |
167 | { |
168 | pPrev->m_pNext = pNext; |
169 | } |
170 | else |
171 | { |
172 | m_pFirstJob = pNext; |
173 | } |
174 | } |
175 | else |
176 | { |
177 | pPrev = pJob; |
178 | } |
179 | pJob = pNext; |
180 | } |
181 | m_pLastJob = pPrev; |
182 | } |
183 | |
184 | // abort running jobs |
185 | { |
186 | const CLockScope LockScope(m_LockRunning); |
187 | for(const std::shared_ptr<IJob> &pJob : m_RunningJobs) |
188 | { |
189 | pJob->Abort(); |
190 | } |
191 | } |
192 | |
193 | // wake up all worker threads |
194 | for(size_t i = 0; i < m_vpThreads.size(); i++) |
195 | { |
196 | sphore_signal(sem: &m_Semaphore); |
197 | } |
198 | |
199 | // wait for all worker threads to finish |
200 | for(void *pThread : m_vpThreads) |
201 | { |
202 | thread_wait(thread: pThread); |
203 | } |
204 | |
205 | m_vpThreads.clear(); |
206 | sphore_destroy(sem: &m_Semaphore); |
207 | } |
208 | |
209 | void CJobPool::Add(std::shared_ptr<IJob> pJob) |
210 | { |
211 | if(m_Shutdown) |
212 | { |
213 | // no jobs are accepted when the job pool is already shutting down |
214 | pJob->Abort(); |
215 | return; |
216 | } |
217 | |
218 | // add job to queue |
219 | { |
220 | const CLockScope LockScope(m_Lock); |
221 | if(m_pLastJob) |
222 | m_pLastJob->m_pNext = pJob; |
223 | m_pLastJob = std::move(pJob); |
224 | if(!m_pFirstJob) |
225 | m_pFirstJob = m_pLastJob; |
226 | } |
227 | |
228 | // signal a worker thread that a job is available |
229 | sphore_signal(sem: &m_Semaphore); |
230 | } |
231 | |