DtkCore
DTK Core module
dasync.h
1/*
2 * Copyright (C) 2021 ~ 2021 UnionTech Technology Co., Ltd.
3 *
4 * Author: Wang Peng <[email protected]>
5 *
6 * Maintainer: Wang Peng <[email protected]>
7 *
8 * This program is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU Lesser General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 */
21#ifndef DASYNC_H
22#define DASYNC_H
23#include <dtkcore_global.h>
24
25#include <QQueue>
26#include <QMutex>
27#include <QThread>
28#include <QMutexLocker>
29#include <QCoreApplication>
30
31#include <functional>
32#include <type_traits>
33
34DCORE_BEGIN_NAMESPACE
35
36#define GUARDED_BY(...)
37#define D_THREAD_IN_MAIN() (qApp->instance() && qApp->instance()->thread() == QThread::currentThread())
38
39// TODO: 添加 DtkCorePrivate 到 dtkcore_global.h
40namespace DtkCorePrivate
41{
42 // 本类是继承实现的,只有子类方法是安全的,暂不对外提供接口
43 template<class T>
44 class DSafeQueue : public QQueue<T> {
45 public:
46 inline void enqueue(const T &t) {
47 QMutexLocker lkc(&m_mtx);
48 QQueue<T>::enqueue(t);
49 }
50 inline T dequeue() {
51 QMutexLocker lkc(&m_mtx);
52 return QQueue<T>::dequeue();
53 }
54 inline int size() {
55 QMutexLocker lkc(&m_mtx);
56 return QQueue<T>::size();
57 }
58 inline T &head() {
59 QMutexLocker lkc(&m_mtx);
60 return QQueue<T>::head();
61 }
62 inline const T &head() const {
63 QMutexLocker lkc(&m_mtx);
64 return QQueue<T>::head();
65 }
66 private:
67 QMutex m_mtx;
68 };
69
70 // 内部使用,不对外提供接口
71 class MainWorker : public QObject {
72 Q_OBJECT
73 std::function<void(void *)> m_handle;
74 std::function<void(void *)> m_handleProxy;
75
76 std::function<void(void)> m_handleV;
77 std::function<void(void)> m_handleVProxy;
78
79 bool m_dasyncDestroyed = false;
80 char __padding[7];
81 public:
82 void setDAsyncDestroyed() {
83 m_dasyncDestroyed = true;
84 }
85 bool dasyncDestroyed() {
86 return m_dasyncDestroyed;
87 }
88 public:
89 MainWorker(QObject *parent = nullptr)
90 : QObject (parent)
91 {
92 // Ensure that QApplication is initialized
93 Q_ASSERT(qApp->instance() && qApp->instance()->thread());
94 moveToThread(qApp->instance()->thread());
95
96 bool isStartInMain = D_THREAD_IN_MAIN();
97
98 QObject::connect(this, &MainWorker::sigRunInMain,
99 this, &MainWorker::slotRunInMain,
100 isStartInMain ? Qt::AutoConnection : Qt::BlockingQueuedConnection);
101
102 QObject::connect(this, &MainWorker::sigRunInMainVoid,
103 this, &MainWorker::slotRunInMainVoid,
104 isStartInMain ? Qt::AutoConnection : Qt::BlockingQueuedConnection);
105 }
106
107 // 1. handle arg is non void
108 template <typename FUNC, typename ArgType>
109 typename std::enable_if<!std::is_void<ArgType>::value>::type
110 setHandle(FUNC &&func) {
111 m_handle = [&] (void *arg) {
112 DSafeQueue<ArgType> *q = static_cast<DSafeQueue<ArgType>*>(arg);
113 while (q && q->size()) {
114 // 这里是 then 回调真正执行到的地方
115 func(q->dequeue());
116 }
117 };
118
119 m_handleProxy = [this] (void *arg) {
120 if (m_handle) {
121 m_handle(arg);
122 }
123 };
124 }
125
126 // 2. handle arg is void
127 template <typename FUNC, typename ArgType>
128 typename std::enable_if<std::is_void<ArgType>::value>::type
129 setHandle(FUNC &&func) {
130 m_handleV = [&] (void) {
131 // 这里是 then 回调真正执行到的地方
132 func();
133 };
134
135 m_handleVProxy = [this] (void) {
136 if (m_handleV) {
137 m_handleV();
138 }
139 };
140 }
141 Q_SIGNALS:
142 void sigRunInMain(void *arg);
143 void sigRunInMainVoid();
144 public Q_SLOTS:
145 void slotRunInMain(void *arg) {
146 Q_ASSERT(D_THREAD_IN_MAIN());
147 if (m_handleProxy && !m_dasyncDestroyed) {
148 m_handleProxy(arg);
149 }
150 }
151 void slotRunInMainVoid(void) {
152 Q_ASSERT(D_THREAD_IN_MAIN());
153 if (m_handleVProxy && !m_dasyncDestroyed) {
154 m_handleVProxy();
155 }
156 }
157 };
158}
159
160class DAsyncState : public QObject {
161 Q_OBJECT
162public:
163 explicit DAsyncState(QObject *parent = nullptr) noexcept
164 : QObject (parent)
165 {
166 }
167 enum AsyncTaskState {
168 NotReady = 0x00, // initial state
169 Ready = 0x02, // deffered = false
170 Running = 0x04, // thread started
171 Pending = Ready | Running, // condition wait
172 Cancel = 0x08, // set thread canceled
173 WaitFinished = 0x10, // wiaitForFinished
174 Finished = 0x20, // thread exit
175 Forever = 0x30, // TODO: DAsync<void, xxx>::post execute forever
176 };
177 Q_DECLARE_FLAGS(AsyncTaskStatus, AsyncTaskState)
178};
179
180// Template classes not supported by Q_OBJECT, so class MainWorker is independent
181template <typename DataTypeIn, typename DataTypeOut>
182class DAsync : public QObject {
183
184 class Helper;
185
186 std::mutex m_mtxIn;
187 std::condition_variable m_cvIn;
188
189 std::mutex m_mtxForWaitTask;
190 std::condition_variable m_cvForWaitTask;
191
192 class Guard {
193 DAsync *m_as;
194 // 如果 DAsync 已经析构了,工作线程还没结束
195 // DAsync 中的有些数据就不能在 guard 的析构里面访问了
196 bool m_dasDestructed = false;
197 public:
198 bool destructed() {
199 return m_dasDestructed;
200 }
201 void setDestructed() {
202 m_dasDestructed = true;
203 }
204 public:
205 explicit Guard(DAsync *as) noexcept : m_as (as)
206 {
207 m_as->m_status.setFlag(DAsyncState::Ready);
208 m_as->m_status.setFlag(DAsyncState::Finished, false); // 防止重入
209 }
210 ~Guard() {
211 if (destructed()) {
212 return;
213 }
214 m_as->m_threadGuard = nullptr;
215 m_as->m_status.setFlag(DAsyncState::Finished);
216 m_as->m_status.setFlag(DAsyncState::Ready, false); // 防止重入
217 if (m_as->m_status.testFlag(DAsyncState::WaitFinished)) {
218 m_as->m_cvForWaitTask.notify_one();
219 }
220 setPending(false);
221 }
222 void setPending(bool isPending) {
223 if (!destructed()) {
224 m_as->m_status.setFlag(DAsyncState::Pending, isPending);
225 }
226 }
227 };
228 Guard *m_threadGuard = nullptr;
229
230 /*
231 * m_QueueIn 的作用是存储 PostData 传进来的数据
232 * m_QueueOut 的作用是将 post 处理完的结果暂存起来然后传入到 then 中
233 * 在 emitHelper 中调用 post 进来的任务,然后将结果传到主线程中处理
234 * 数据传递使用 void * 做转换,对于复合类型避免了使用 qRegisterMetaType
235 */
236 template<typename T, typename Enable = void>
237 struct DataQueueType { DtkCorePrivate::DSafeQueue<T> m_queue; };
238 template<class T>
239 struct DataQueueType<T, typename std::enable_if<std::is_void<T>::value>::type> { };
240 using DataInQueue = DataQueueType<DataTypeIn>;
241 using DataOutQueue = DataQueueType<DataTypeOut>;
242 // Queue 中处理完的结果经由 m_QueueIn 变量暂存,然后经由 signal、slot 传给 then 中的回调函数做参数
243 DataInQueue m_QueueIn;
244 DataOutQueue m_QueueOut;
245
246 // 存储不同类型的输入函数
247 template<typename T1, typename T2, typename Enable1 = void, typename Enable2 = void>
248 struct FuncType {
249 };
250 template<typename T1, typename T2>
251 struct FuncType<T1, T2,
252 typename std::enable_if<std::is_void<T1>::value>::type,
253 typename std::enable_if<std::is_void<T2>::value>::type> {
254 std::function <void(void)> cbp;
255 };
256 template<typename T1, typename T2>
257 struct FuncType<T1, T2,
258 typename std::enable_if<!std::is_void<T1>::value>::type,
259 typename std::enable_if<!std::is_void<T2>::value>::type> {
260 std::function <T2(T1)> cbp;
261 };
262 template<typename T1, typename T2>
263 struct FuncType<T1, T2,
264 typename std::enable_if<std::is_void<T1>::value>::type,
265 typename std::enable_if<!std::is_void<T2>::value>::type> {
266 std::function <T2(void)> cbp;
267 };
268 template<typename T1, typename T2>
269 struct FuncType<T1, T2,
270 typename std::enable_if<!std::is_void<T1>::value>::type,
271 typename std::enable_if<std::is_void<T2>::value>::type> {
272 std::function <void(T1)> cbp;
273 };
274
275 std::mutex m_mtxFunc;
276 FuncType<DataTypeIn, DataTypeOut> m_func GUARDED_BY(m_mtxFunc);
277 DAsyncState::AsyncTaskStatus m_status;
278
279public:
280 explicit DAsync(QObject *parent = nullptr) noexcept
281 : QObject (parent)
282 , m_func ({nullptr})
283 , m_status (DAsyncState::NotReady)
284 {
285 m_mainWorker = new DtkCorePrivate::MainWorker();
286 m_helper = new Helper(this, this);
287 }
288 ~DAsync() {
289 if (m_threadGuard) {
290 m_threadGuard->setDestructed();
291 }
292 m_status.setFlag(DAsyncState::Cancel);
293 if (m_status.testFlag(DAsyncState::Pending)) {
294 m_cvIn.notify_one();
295 }
296 if (m_mainWorker) {
297 m_mainWorker->setDAsyncDestroyed();
298 m_mainWorker->deleteLater();
299 m_mainWorker = nullptr;
300 }
301 }
302
303private:
304 // 1. input void & emit void
305 template <typename PostInType, typename EmitInType>
306 typename std::enable_if<std::is_void<PostInType>::value && std::is_void<EmitInType>::value>::type
307 emitHelper() {
308 m_func.cbp();
309 Q_EMIT m_mainWorker->sigRunInMainVoid();
310 }
311 // 2. input non void & emit non void
312 template <typename PostInType, typename EmitInType>
313 typename std::enable_if<!std::is_void<PostInType>::value && !std::is_void<EmitInType>::value>::type
314 emitHelper() {
315 m_QueueOut.m_queue.enqueue(m_func.cbp(m_QueueIn.m_queue.dequeue()));
316 Q_EMIT m_mainWorker->sigRunInMain(static_cast<void *>(&(m_QueueOut.m_queue)));
317 }
318 // 3. input non void & emit void
319 template <typename PostInType, typename EmitInType>
320 typename std::enable_if<!std::is_void<PostInType>::value && std::is_void<EmitInType>::value>::type
321 emitHelper() {
322 m_func.cbp(m_QueueIn.m_queue.dequeue());
323 Q_EMIT m_mainWorker->sigRunInMainVoid();
324 }
325 // 4. input void & emit non void
326 template <typename PostInType, typename EmitInType>
327 typename std::enable_if<std::is_void<PostInType>::value && !std::is_void<EmitInType>::value>::type
328 emitHelper() {
329 m_QueueOut.m_queue.enqueue(m_func.cbp());
330 Q_EMIT m_mainWorker->sigRunInMain(static_cast<void *>(&(m_QueueOut.m_queue)));
331 }
332
333public:
334 void startUp() {
335 if (m_status.testFlag(DAsyncState::Cancel)) {
336 return;
337 }
338 m_helper->start();
339 }
340 void cancelAll() {
341 m_status.setFlag(DAsyncState::Cancel);
342 if (m_status.testFlag(DAsyncState::Pending)) {
343 m_cvIn.notify_one();
344 }
345 }
346 bool isFinished() {
347 return m_status.testFlag(DAsyncState::Finished);
348 }
349 /*
350 * 不能在 QTimer 中使用 waitForFinished,防止阻塞主线程
351 * 也不能在主线程执行前使用 waitForFinished()
352 * 它的默认参数为 true,等同于 waitForFinished(false) +
353 * cancelAll, 如果调用了后者, 会一直阻塞等待任务,直到
354 * cancelAll 被调用之后 waitForFinished 才会在任务完成完
355 * 成后退出,此时就可以删除DAsync了。最好的管理方式还是采用
356 * QObject 的内存托管。主线程中使用,可以采用托管的方式,
357 * 任务结束只要调用 cancelAll + isFinished 轮询判断就行了,
358 * DAsync 的工作线程就会在完成后自动退出。
359 */
360 void waitForFinished(bool cancelAllWorks = true) {
361 Q_ASSERT(!D_THREAD_IN_MAIN());
362 if (cancelAllWorks) {
363 cancelAll();
364 }
365 if (!m_status.testFlag(DAsyncState::Finished)) {
366 if (m_status.testFlag(DAsyncState::Pending)) {
367 m_cvIn.notify_one();
368 }
369 m_status.setFlag(DAsyncState::WaitFinished);
370 std::unique_lock <std::mutex> lck(m_mtxForWaitTask);
371 m_cvForWaitTask.wait(lck);
372 }
373 }
374 // 输入数据不是 void 类型则依赖于 m_QueueIn
375 template <typename FUNC, typename InputType = DataTypeIn>
376 typename std::enable_if<!std::is_void<InputType>::value, Helper *>::type
377 post(FUNC &&func) {
378 m_func.cbp = std::forward<FUNC>(func);
379 if (m_postProxy) {
380 return m_helper;
381 }
382 m_postProxy = [this] () {
383 std::thread thread([this] {
384 if (m_status.testFlag(DAsyncState::Cancel)) {
385 return;
386 }
387 Guard guard(this);
388 m_threadGuard = &guard;
389
390 std::unique_lock <std::mutex> lck(m_mtxIn);
391 while (true) {
392 while (!m_status.testFlag(DAsyncState::Ready) || !m_QueueIn.m_queue.size()) {
393 guard.setPending(true);
394 // 定时查询 flag,防止睡死的情况发生
395 m_cvIn.wait_for(lck, std::chrono::milliseconds(200));
396 if (guard.destructed() || m_status.testFlag(DAsyncState::Cancel)) {
397 return;
398 }
399 }
400 guard.setPending(false);
401
402 while (m_func.cbp && m_QueueIn.m_queue.size()) {
403 emitHelper<DataTypeIn, DataTypeOut>();
404 }
405 }
406 });
407 thread.detach();
408 };
409
410 return m_helper;
411 }
412
413 template <typename FUNC, typename InputType = DataTypeIn>
414 typename std::enable_if<std::is_void<InputType>::value, Helper *>::type
415 post(FUNC &&func) {
416 {
417 std::lock_guard<std::mutex> lckFunc(m_mtxFunc);
418 m_func.cbp = std::forward<FUNC>(func);
419 }
420 if (m_postProxy) {
421 return m_helper;
422 }
423 m_postProxy = [this] () {
424 std::thread thread([this] {
425 if (m_status.testFlag(DAsyncState::Cancel)) {
426 return;
427 }
428 Guard guard(this);
429 m_threadGuard = &guard;
430
431 std::unique_lock <std::mutex> lck(m_mtxIn);
432 while (true) {
433 if (!m_status.testFlag(DAsyncState::Ready)) {
434 guard.setPending(true);
435 // 定时查询 flag,防止睡死的情况发生
436 m_cvIn.wait_for(lck, std::chrono::milliseconds(200));
437 if (guard.destructed() || m_status.testFlag(DAsyncState::Cancel)){
438 return;
439 }
440 }
441 guard.setPending(false);
442
443 if (m_func.cbp) {
444 std::lock_guard<std::mutex> lckFunc(m_mtxFunc);
445 emitHelper<DataTypeIn, DataTypeOut>();
446 m_func.cbp = nullptr; // reset
447 }
448 }
449 });
450 thread.detach();
451 };
452
453 return m_helper;
454 }
455
456 // only support DAsync<non void type, ...>
457 template <typename InputType = DataTypeIn>
458 typename std::enable_if<!std::is_void<InputType>::value>::type
459 postData(const InputType &data) {
460 if (Q_UNLIKELY(!m_status.testFlag(DAsyncState::Cancel))) {
461 m_QueueIn.m_queue.enqueue(data);
462 if (m_status.testFlag(DAsyncState::Pending)) {
463 m_cvIn.notify_one();
464 }
465 }
466 }
467
468private:
469 std::function<void()> m_postProxy;
470 class Helper : public QObject {
471 DAsync *m_async;
472 public:
473 explicit Helper(DAsync *async, QObject *parent = nullptr) noexcept
474 : QObject (parent)
475 , m_async (async)
476 {
477 }
478
479 template <typename FUNC>
480 Helper *then(FUNC &&func) {
481 m_async->m_mainWorker->template setHandle<FUNC, DataTypeOut>(std::forward<FUNC>(func));
482 return this;
483 }
484 // 仅启动,非阻塞
485 void start(bool immediately = true) {
486 if (m_async->m_postProxy) {
487 m_async->m_postProxy();
488 }
489 if (!immediately) {
490 m_async->m_status.setFlag(DAsyncState::Ready, false);
491 } else {
492 m_async->m_status.setFlag(DAsyncState::Ready);
493 if (m_async->m_status.testFlag(DAsyncState::Pending)) {
494 m_async->m_cvIn.notify_one();
495 }
496 }
497 }
498 };
499
500 Helper *m_helper = nullptr;
501 DtkCorePrivate::MainWorker *m_mainWorker = nullptr;
502};
503
504DCORE_END_NAMESPACE
505#endif //DASYNC_H
Definition: dasync.h:160
Definition: dasync.h:182