1/* (c) Magnus Auvinen. See licence.txt in the root of the distribution for more information. */
2/* If you are missing that file, acquire a complete release at teeworlds.com. */
3#ifndef ENGINE_SHARED_JOBS_H
4#define ENGINE_SHARED_JOBS_H
5
6#include <base/lock.h>
7#include <base/system.h>
8
9#include <atomic>
10#include <deque>
11#include <memory>
12#include <vector>
13
14/**
15 * A job which runs in a worker thread of a job pool.
16 *
17 * @see CJobPool
18 */
19class IJob
20{
21 friend class CJobPool;
22
23public:
24 /**
25 * The state of a job in the job pool.
26 */
27 enum EJobState
28 {
29 /**
30 * Job has been created/queued but not started on a worker thread yet.
31 */
32 STATE_QUEUED = 0,
33
34 /**
35 * Job is currently running on a worker thread.
36 */
37 STATE_RUNNING,
38
39 /**
40 * Job was completed successfully.
41 */
42 STATE_DONE,
43
44 /**
45 * Job was aborted. Note the job may or may not still be running while
46 * in this state.
47 *
48 * @see IsAbortable
49 */
50 STATE_ABORTED,
51 };
52
53private:
54 std::shared_ptr<IJob> m_pNext;
55 std::atomic<EJobState> m_State;
56 std::atomic<bool> m_Abortable;
57
58protected:
59 /**
60 * Performs tasks in a worker thread.
61 */
62 virtual void Run() = 0;
63
64 /**
65 * Sets whether this job can be aborted.
66 *
67 * @remark Has no effect if the job has already been aborted.
68 *
69 * @see IsAbortable
70 */
71 void Abortable(bool Abortable);
72
73public:
74 IJob();
75 virtual ~IJob();
76
77 IJob(const IJob &Other) = delete;
78 IJob &operator=(const IJob &Other) = delete;
79
80 /**
81 * Returns the state of the job.
82 *
83 * @remark Accessing jobs in any other way that with the base functions of `IJob`
84 * is generally not thread-safe unless the job is in @link STATE_DONE @endlink
85 * or has not been enqueued yet.
86 *
87 * @return State of the job.
88 */
89 EJobState State() const;
90
91 /**
92 * Returns whether the job was completed, i.e. whether it's not still queued
93 * or running.
94 *
95 * @return `true` if the job is done, `false` otherwise.
96 */
97 bool Done() const;
98
99 /**
100 * Aborts the job, if it can be aborted.
101 *
102 * @return `true` if abort was accepted, `false` otherwise.
103 *
104 * @remark May be overridden to delegate abort to other jobs. Note that this
105 * function may be called from any thread and should be thread-safe.
106 */
107 virtual bool Abort();
108
109 /**
110 * Returns whether the job can be aborted. Jobs that are abortable may have
111 * their state set to `STATE_ABORTED` at any point if the job was aborted.
112 * The job state should be checked periodically in the `Run` function and the
113 * job should terminate at the earliest, safe opportunity when aborted.
114 * Scheduled jobs which are not abortable are guaranteed to fully complete
115 * before the job pool is shut down.
116 *
117 * @return `true` if the job can be aborted, `false` otherwise.
118 */
119 bool IsAbortable() const;
120};
121
122/**
123 * A job pool which runs jobs in one or more worker threads.
124 *
125 * @see IJob
126 */
127class CJobPool
128{
129 std::vector<void *> m_vpThreads;
130 std::atomic<bool> m_Shutdown;
131
132 CLock m_Lock;
133 SEMAPHORE m_Semaphore;
134 std::shared_ptr<IJob> m_pFirstJob GUARDED_BY(m_Lock);
135 std::shared_ptr<IJob> m_pLastJob GUARDED_BY(m_Lock);
136
137 CLock m_LockRunning;
138 std::deque<std::shared_ptr<IJob>> m_RunningJobs GUARDED_BY(m_LockRunning);
139
140 static void WorkerThread(void *pUser) NO_THREAD_SAFETY_ANALYSIS;
141 void RunLoop() NO_THREAD_SAFETY_ANALYSIS;
142
143public:
144 CJobPool();
145 ~CJobPool();
146
147 /**
148 * Initializes the job pool with the given number of worker threads.
149 *
150 * @param NumTheads The number of worker threads.
151 *
152 * @remark Must be called on the main thread.
153 */
154 void Init(int NumThreads) REQUIRES(!m_Lock);
155
156 /**
157 * Shuts down the job pool. Aborts all abortable jobs. Then waits for all
158 * worker threads to complete all remaining queued jobs and terminate.
159 *
160 * @remark Must be called on the main thread.
161 */
162 void Shutdown() REQUIRES(!m_Lock) REQUIRES(!m_LockRunning);
163
164 /**
165 * Adds a job to the queue of the job pool.
166 *
167 * @param pJob The job to enqueue.
168 *
169 * @remark If the job pool is already shutting down, no additional jobs
170 * will be enqueue anymore. Abortable jobs will immediately be aborted.
171 */
172 void Add(std::shared_ptr<IJob> pJob) REQUIRES(!m_Lock);
173};
174#endif
175