1#include "aio.h"
2
3#include "io.h"
4#include "lock.h"
5#include "mem.h"
6#include "sphore.h"
7#include "thread.h"
8
9#include <cstdlib>
10
11#define ASYNC_BUFSIZE (8 * 1024)
12#define ASYNC_LOCAL_BUFSIZE (64 * 1024)
13
14struct ASYNCIO
15{
16 CLock lock;
17 IOHANDLE io;
18 SEMAPHORE sphore;
19 void *thread;
20
21 unsigned char *buffer;
22 unsigned int buffer_size;
23 unsigned int read_pos;
24 unsigned int write_pos;
25
26 int error;
27 unsigned char finish;
28 unsigned char refcount;
29};
30
31enum
32{
33 ASYNCIO_RUNNING,
34 ASYNCIO_CLOSE,
35 ASYNCIO_EXIT,
36};
37
38struct BUFFERS
39{
40 unsigned char *buf1;
41 unsigned int len1;
42 unsigned char *buf2;
43 unsigned int len2;
44};
45
46static void buffer_ptrs(ASYNCIO *aio, struct BUFFERS *buffers)
47{
48 mem_zero(block: buffers, size: sizeof(*buffers));
49 if(aio->read_pos < aio->write_pos)
50 {
51 buffers->buf1 = aio->buffer + aio->read_pos;
52 buffers->len1 = aio->write_pos - aio->read_pos;
53 }
54 else if(aio->read_pos > aio->write_pos)
55 {
56 buffers->buf1 = aio->buffer + aio->read_pos;
57 buffers->len1 = aio->buffer_size - aio->read_pos;
58 buffers->buf2 = aio->buffer;
59 buffers->len2 = aio->write_pos;
60 }
61}
62
63static void aio_handle_free_and_unlock(ASYNCIO *aio) RELEASE(aio->lock)
64{
65 int do_free;
66 aio->refcount--;
67
68 do_free = aio->refcount == 0;
69 aio->lock.unlock();
70 if(do_free)
71 {
72 free(ptr: aio->buffer);
73 sphore_destroy(sem: &aio->sphore);
74 delete aio;
75 }
76}
77
78static void aio_thread(void *user)
79{
80 ASYNCIO *aio = (ASYNCIO *)user;
81
82 aio->lock.lock();
83 while(true)
84 {
85 struct BUFFERS buffers;
86 int result_io_error;
87 unsigned char local_buffer[ASYNC_LOCAL_BUFSIZE];
88 unsigned int local_buffer_len = 0;
89
90 if(aio->read_pos == aio->write_pos)
91 {
92 if(aio->finish != ASYNCIO_RUNNING)
93 {
94 if(aio->finish == ASYNCIO_CLOSE)
95 {
96 io_close(io: aio->io);
97 }
98 aio_handle_free_and_unlock(aio);
99 break;
100 }
101 aio->lock.unlock();
102 sphore_wait(sem: &aio->sphore);
103 aio->lock.lock();
104 continue;
105 }
106
107 buffer_ptrs(aio, buffers: &buffers);
108 if(buffers.buf1)
109 {
110 if(buffers.len1 > sizeof(local_buffer) - local_buffer_len)
111 {
112 buffers.len1 = sizeof(local_buffer) - local_buffer_len;
113 }
114 mem_copy(dest: local_buffer + local_buffer_len, source: buffers.buf1, size: buffers.len1);
115 local_buffer_len += buffers.len1;
116 if(buffers.buf2)
117 {
118 if(buffers.len2 > sizeof(local_buffer) - local_buffer_len)
119 {
120 buffers.len2 = sizeof(local_buffer) - local_buffer_len;
121 }
122 mem_copy(dest: local_buffer + local_buffer_len, source: buffers.buf2, size: buffers.len2);
123 local_buffer_len += buffers.len2;
124 }
125 }
126 aio->read_pos = (aio->read_pos + buffers.len1 + buffers.len2) % aio->buffer_size;
127 aio->lock.unlock();
128
129 io_write(io: aio->io, buffer: local_buffer, size: local_buffer_len);
130 io_flush(io: aio->io);
131 result_io_error = io_error(io: aio->io);
132
133 aio->lock.lock();
134 aio->error = result_io_error;
135 }
136}
137
138ASYNCIO *aio_new(IOHANDLE io)
139{
140 ASYNCIO *aio = new ASYNCIO;
141 if(!aio)
142 {
143 return nullptr;
144 }
145 aio->io = io;
146 sphore_init(sem: &aio->sphore);
147 aio->thread = nullptr;
148
149 aio->buffer = (unsigned char *)malloc(ASYNC_BUFSIZE);
150 if(!aio->buffer)
151 {
152 sphore_destroy(sem: &aio->sphore);
153 delete aio;
154 return nullptr;
155 }
156 aio->buffer_size = ASYNC_BUFSIZE;
157 aio->read_pos = 0;
158 aio->write_pos = 0;
159 aio->error = 0;
160 aio->finish = ASYNCIO_RUNNING;
161 aio->refcount = 2;
162
163 aio->thread = thread_init(threadfunc: aio_thread, user: aio, name: "aio");
164 if(!aio->thread)
165 {
166 free(ptr: aio->buffer);
167 sphore_destroy(sem: &aio->sphore);
168 delete aio;
169 return nullptr;
170 }
171 return aio;
172}
173
174static unsigned int buffer_len(ASYNCIO *aio)
175{
176 if(aio->write_pos >= aio->read_pos)
177 {
178 return aio->write_pos - aio->read_pos;
179 }
180 else
181 {
182 return aio->buffer_size + aio->write_pos - aio->read_pos;
183 }
184}
185
186static unsigned int next_buffer_size(unsigned int cur_size, unsigned int need_size)
187{
188 while(cur_size < need_size)
189 {
190 cur_size *= 2;
191 }
192 return cur_size;
193}
194
195void aio_lock(ASYNCIO *aio) ACQUIRE(aio->lock)
196{
197 aio->lock.lock();
198}
199
200void aio_unlock(ASYNCIO *aio) RELEASE(aio->lock)
201{
202 aio->lock.unlock();
203 sphore_signal(sem: &aio->sphore);
204}
205
206void aio_write_unlocked(ASYNCIO *aio, const void *buffer, unsigned size)
207{
208 unsigned int remaining;
209 remaining = aio->buffer_size - buffer_len(aio);
210
211 // Don't allow full queue to distinguish between empty and full queue.
212 if(size < remaining)
213 {
214 unsigned int remaining_contiguous = aio->buffer_size - aio->write_pos;
215 if(size > remaining_contiguous)
216 {
217 mem_copy(dest: aio->buffer + aio->write_pos, source: buffer, size: remaining_contiguous);
218 size -= remaining_contiguous;
219 buffer = ((unsigned char *)buffer) + remaining_contiguous;
220 aio->write_pos = 0;
221 }
222 mem_copy(dest: aio->buffer + aio->write_pos, source: buffer, size);
223 aio->write_pos = (aio->write_pos + size) % aio->buffer_size;
224 }
225 else
226 {
227 // Add 1 so the new buffer isn't completely filled.
228 unsigned int new_written = buffer_len(aio) + size + 1;
229 unsigned int next_size = next_buffer_size(cur_size: aio->buffer_size, need_size: new_written);
230 unsigned int next_len = 0;
231 unsigned char *next_buffer = (unsigned char *)malloc(size: next_size);
232
233 struct BUFFERS buffers;
234 buffer_ptrs(aio, buffers: &buffers);
235 if(buffers.buf1)
236 {
237 mem_copy(dest: next_buffer + next_len, source: buffers.buf1, size: buffers.len1);
238 next_len += buffers.len1;
239 if(buffers.buf2)
240 {
241 mem_copy(dest: next_buffer + next_len, source: buffers.buf2, size: buffers.len2);
242 next_len += buffers.len2;
243 }
244 }
245 mem_copy(dest: next_buffer + next_len, source: buffer, size);
246 next_len += size;
247
248 free(ptr: aio->buffer);
249 aio->buffer = next_buffer;
250 aio->buffer_size = next_size;
251 aio->read_pos = 0;
252 aio->write_pos = next_len;
253 }
254}
255
256void aio_write(ASYNCIO *aio, const void *buffer, unsigned size)
257{
258 aio_lock(aio);
259 aio_write_unlocked(aio, buffer, size);
260 aio_unlock(aio);
261}
262
263void aio_write_newline_unlocked(ASYNCIO *aio)
264{
265#if defined(CONF_FAMILY_WINDOWS)
266 aio_write_unlocked(aio, "\r\n", 2);
267#else
268 aio_write_unlocked(aio, buffer: "\n", size: 1);
269#endif
270}
271
272void aio_write_newline(ASYNCIO *aio)
273{
274 aio_lock(aio);
275 aio_write_newline_unlocked(aio);
276 aio_unlock(aio);
277}
278
279int aio_error(ASYNCIO *aio)
280{
281 CLockScope ls(aio->lock);
282 return aio->error;
283}
284
285void aio_close(ASYNCIO *aio)
286{
287 {
288 CLockScope ls(aio->lock);
289 aio->finish = ASYNCIO_CLOSE;
290 }
291 sphore_signal(sem: &aio->sphore);
292}
293
294void aio_wait(ASYNCIO *aio)
295{
296 void *thread;
297 {
298 CLockScope ls(aio->lock);
299 thread = aio->thread;
300 aio->thread = nullptr;
301 if(aio->finish == ASYNCIO_RUNNING)
302 {
303 aio->finish = ASYNCIO_EXIT;
304 }
305 }
306 sphore_signal(sem: &aio->sphore);
307 thread_wait(thread);
308}
309
310void aio_free(ASYNCIO *aio)
311{
312 aio->lock.lock();
313 if(aio->thread)
314 {
315 thread_detach(thread: aio->thread);
316 aio->thread = nullptr;
317 }
318 aio_handle_free_and_unlock(aio);
319}
320