DtkCore
DTK Core module
载入中...
搜索中...
未找到
dasync.h
1// SPDX-FileCopyrightText: 2021 - 2022 UnionTech Software Technology Co., Ltd.
2//
3// SPDX-License-Identifier: LGPL-3.0-or-later
4
5#ifndef DASYNC_H
6#define DASYNC_H
7#include <dtkcore_global.h>
8
9#include <QQueue>
10#include <QMutex>
11#include <QThread>
12#include <QMutexLocker>
13#include <QCoreApplication>
14
15#include <functional>
16#include <type_traits>
17
18DCORE_BEGIN_NAMESPACE
19
20#define GUARDED_BY(...)
21#define D_THREAD_IN_MAIN() (qApp->instance() && qApp->instance()->thread() == QThread::currentThread())
22
23// TODO: 添加 DtkCorePrivate 到 dtkcore_global.h
24namespace DtkCorePrivate {
25// 本类是继承实现的,只有子类方法是安全的,暂不对外提供接口
26template <class T>
27class DSafeQueue : public QQueue<T>
28{
29public:
30 inline void enqueue(const T &t)
31 {
32 QMutexLocker lkc(&m_mtx);
33 QQueue<T>::enqueue(t);
34 }
35 inline T dequeue()
36 {
37 QMutexLocker lkc(&m_mtx);
38 return QQueue<T>::dequeue();
39 }
40 inline int size()
41 {
42 QMutexLocker lkc(&m_mtx);
43 return QQueue<T>::size();
44 }
45 inline T &head()
46 {
47 QMutexLocker lkc(&m_mtx);
48 return QQueue<T>::head();
49 }
50 inline const T &head() const
51 {
52 QMutexLocker lkc(&m_mtx);
53 return QQueue<T>::head();
54 }
55
56private:
57 mutable QMutex m_mtx;
58};
59
60// 内部使用,不对外提供接口
61class MainWorker : public QObject
62{
63 Q_OBJECT
64 std::function<void(void *)> m_handle;
65 std::function<void(void *)> m_handleProxy;
66
67 std::function<void(void)> m_handleV;
68 std::function<void(void)> m_handleVProxy;
69
70 bool m_dasyncDestroyed = false;
71 char __padding[7];
72
73public:
74 void setDAsyncDestroyed() { m_dasyncDestroyed = true; }
75 bool dasyncDestroyed() { return m_dasyncDestroyed; }
76
77public:
78 MainWorker(QObject *parent = nullptr)
79 : QObject(parent)
80 {
81 // Ensure that QApplication is initialized
82 Q_ASSERT(qApp->instance() && qApp->instance()->thread());
83 moveToThread(qApp->instance()->thread());
84
85 bool isStartInMain = D_THREAD_IN_MAIN();
86
87 QObject::connect(this,
88 &MainWorker::sigRunInMain,
89 this,
90 &MainWorker::slotRunInMain,
91 isStartInMain ? Qt::AutoConnection : Qt::BlockingQueuedConnection);
92
93 QObject::connect(this,
94 &MainWorker::sigRunInMainVoid,
95 this,
96 &MainWorker::slotRunInMainVoid,
97 isStartInMain ? Qt::AutoConnection : Qt::BlockingQueuedConnection);
98 }
99
100 // 1. handle arg is non void
101 template <typename FUNC, typename ArgType>
102 typename std::enable_if<!std::is_void<ArgType>::value>::type setHandle(FUNC &&func)
103 {
104 m_handle = [&](void *arg) {
105 DSafeQueue<ArgType> *q = static_cast<DSafeQueue<ArgType> *>(arg);
106 while (q && q->size()) {
107 // 这里是 then 回调真正执行到的地方
108 func(q->dequeue());
109 }
110 };
111
112 m_handleProxy = [this](void *arg) {
113 if (m_handle) {
114 m_handle(arg);
115 }
116 };
117 }
118
119 // 2. handle arg is void
120 template <typename FUNC, typename ArgType>
121 typename std::enable_if<std::is_void<ArgType>::value>::type setHandle(FUNC &&func)
122 {
123 m_handleV = [&](void) {
124 // 这里是 then 回调真正执行到的地方
125 func();
126 };
127
128 m_handleVProxy = [this](void) {
129 if (m_handleV) {
130 m_handleV();
131 }
132 };
133 }
134Q_SIGNALS:
135 void sigRunInMain(void *arg);
136 void sigRunInMainVoid();
137public Q_SLOTS:
138 void slotRunInMain(void *arg)
139 {
140 Q_ASSERT(D_THREAD_IN_MAIN());
141 if (m_handleProxy && !m_dasyncDestroyed) {
142 m_handleProxy(arg);
143 }
144 }
145 void slotRunInMainVoid(void)
146 {
147 Q_ASSERT(D_THREAD_IN_MAIN());
148 if (m_handleVProxy && !m_dasyncDestroyed) {
149 m_handleVProxy();
150 }
151 }
152};
153} // namespace DtkCorePrivate
154
155class DAsyncState : public QObject
156{
157 Q_OBJECT
158public:
159 explicit DAsyncState(QObject *parent = nullptr) noexcept
160 : QObject(parent)
161 {
162 }
163 enum AsyncTaskState {
164 NotReady = 0x00, // initial state
165 Ready = 0x02, // deffered = false
166 Running = 0x04, // thread started
167 Pending = Ready | Running, // condition wait
168 Cancel = 0x08, // set thread canceled
169 WaitFinished = 0x10, // wiaitForFinished
170 Finished = 0x20, // thread exit
171 Forever = 0x30, // TODO: DAsync<void, xxx>::post execute forever
172 };
173 Q_DECLARE_FLAGS(AsyncTaskStatus, AsyncTaskState)
174};
175
176// Template classes not supported by Q_OBJECT, so class MainWorker is independent
177template <typename DataTypeIn, typename DataTypeOut>
178class D_DECL_DEPRECATED DAsync : public QObject
179{
180 class Helper;
181
182 std::mutex m_mtxIn;
183 std::condition_variable m_cvIn;
184
185 std::mutex m_mtxForWaitTask;
186 std::condition_variable m_cvForWaitTask;
187
188 class Guard
189 {
190 DAsync *m_as;
191 // 如果 DAsync 已经析构了,工作线程还没结束
192 // DAsync 中的有些数据就不能在 guard 的析构里面访问了
193 bool m_dasDestructed = false;
194
195 public:
196 bool destructed() { return m_dasDestructed; }
197 void setDestructed() { m_dasDestructed = true; }
198
199 public:
200 explicit Guard(DAsync *as) noexcept
201 : m_as(as)
202 {
203 m_as->m_status.setFlag(DAsyncState::Ready);
204 m_as->m_status.setFlag(DAsyncState::Finished, false); // 防止重入
205 }
206 ~Guard()
207 {
208 if (destructed()) {
209 return;
210 }
211 m_as->m_threadGuard = nullptr;
212 m_as->m_status.setFlag(DAsyncState::Finished);
213 m_as->m_status.setFlag(DAsyncState::Ready, false); // 防止重入
214 if (m_as->m_status.testFlag(DAsyncState::WaitFinished)) {
215 m_as->m_cvForWaitTask.notify_one();
216 }
217 setPending(false);
218 }
219 void setPending(bool isPending)
220 {
221 if (!destructed()) {
222 m_as->m_status.setFlag(DAsyncState::Pending, isPending);
223 }
224 }
225 };
226 Guard *m_threadGuard = nullptr;
227
228 /*
229 * m_QueueIn 的作用是存储 PostData 传进来的数据
230 * m_QueueOut 的作用是将 post 处理完的结果暂存起来然后传入到 then 中
231 * 在 emitHelper 中调用 post 进来的任务,然后将结果传到主线程中处理
232 * 数据传递使用 void * 做转换,对于复合类型避免了使用 qRegisterMetaType
233 */
234 template <typename T, typename Enable = void>
235 struct DataQueueType
236 {
238 };
239 template <class T>
240 struct DataQueueType<T, typename std::enable_if<std::is_void<T>::value>::type>
241 {
242 };
243 using DataInQueue = DataQueueType<DataTypeIn>;
244 using DataOutQueue = DataQueueType<DataTypeOut>;
245 // Queue 中处理完的结果经由 m_QueueIn 变量暂存,然后经由 signal、slot 传给 then 中的回调函数做参数
246 DataInQueue m_QueueIn;
247 DataOutQueue m_QueueOut;
248
249 // 存储不同类型的输入函数
250 template <typename T1, typename T2, typename Enable1 = void, typename Enable2 = void>
251 struct FuncType
252 {
253 };
254 template <typename T1, typename T2>
255 struct FuncType<T1,
256 T2,
257 typename std::enable_if<std::is_void<T1>::value>::type,
258 typename std::enable_if<std::is_void<T2>::value>::type>
259 {
260 std::function<void(void)> cbp;
261 };
262 template <typename T1, typename T2>
263 struct FuncType<T1,
264 T2,
265 typename std::enable_if<!std::is_void<T1>::value>::type,
266 typename std::enable_if<!std::is_void<T2>::value>::type>
267 {
268 std::function<T2(T1)> cbp;
269 };
270 template <typename T1, typename T2>
271 struct FuncType<T1,
272 T2,
273 typename std::enable_if<std::is_void<T1>::value>::type,
274 typename std::enable_if<!std::is_void<T2>::value>::type>
275 {
276 std::function<T2(void)> cbp;
277 };
278 template <typename T1, typename T2>
279 struct FuncType<T1,
280 T2,
281 typename std::enable_if<!std::is_void<T1>::value>::type,
282 typename std::enable_if<std::is_void<T2>::value>::type>
283 {
284 std::function<void(T1)> cbp;
285 };
286
287 std::mutex m_mtxFunc;
288 FuncType<DataTypeIn, DataTypeOut> m_func GUARDED_BY(m_mtxFunc);
289 DAsyncState::AsyncTaskStatus m_status;
290
291public:
292 explicit DAsync(QObject *parent = nullptr) noexcept
293 : QObject(parent)
294 , m_func({nullptr})
295 , m_status(DAsyncState::NotReady)
296 {
297 m_mainWorker = new DtkCorePrivate::MainWorker();
298 m_helper = new Helper(this, this);
299 }
300 ~DAsync()
301 {
302 if (m_threadGuard) {
303 m_threadGuard->setDestructed();
304 }
305 m_status.setFlag(DAsyncState::Cancel);
306 if (m_status.testFlag(DAsyncState::Pending)) {
307 m_cvIn.notify_one();
308 }
309 if (m_mainWorker) {
310 m_mainWorker->setDAsyncDestroyed();
311 m_mainWorker->deleteLater();
312 m_mainWorker = nullptr;
313 }
314 }
315
316private:
317 // 1. input void & emit void
318 template <typename PostInType, typename EmitInType>
319 typename std::enable_if<std::is_void<PostInType>::value && std::is_void<EmitInType>::value>::type emitHelper()
320 {
321 m_func.cbp();
322 Q_EMIT m_mainWorker->sigRunInMainVoid();
323 }
324 // 2. input non void & emit non void
325 template <typename PostInType, typename EmitInType>
326 typename std::enable_if<!std::is_void<PostInType>::value && !std::is_void<EmitInType>::value>::type emitHelper()
327 {
328 m_QueueOut.m_queue.enqueue(m_func.cbp(m_QueueIn.m_queue.dequeue()));
329 Q_EMIT m_mainWorker->sigRunInMain(static_cast<void *>(&(m_QueueOut.m_queue)));
330 }
331 // 3. input non void & emit void
332 template <typename PostInType, typename EmitInType>
333 typename std::enable_if<!std::is_void<PostInType>::value && std::is_void<EmitInType>::value>::type emitHelper()
334 {
335 m_func.cbp(m_QueueIn.m_queue.dequeue());
336 Q_EMIT m_mainWorker->sigRunInMainVoid();
337 }
338 // 4. input void & emit non void
339 template <typename PostInType, typename EmitInType>
340 typename std::enable_if<std::is_void<PostInType>::value && !std::is_void<EmitInType>::value>::type emitHelper()
341 {
342 m_QueueOut.m_queue.enqueue(m_func.cbp());
343 Q_EMIT m_mainWorker->sigRunInMain(static_cast<void *>(&(m_QueueOut.m_queue)));
344 }
345
346public:
347 void startUp()
348 {
349 if (m_status.testFlag(DAsyncState::Cancel)) {
350 return;
351 }
352 m_helper->start();
353 }
354 void cancelAll()
355 {
356 m_status.setFlag(DAsyncState::Cancel);
357 if (m_status.testFlag(DAsyncState::Pending)) {
358 m_cvIn.notify_one();
359 }
360 }
361 bool isFinished() { return m_status.testFlag(DAsyncState::Finished); }
362 /*
363 * 不能在 QTimer 中使用 waitForFinished,防止阻塞主线程
364 * 也不能在主线程执行前使用 waitForFinished()
365 * 它的默认参数为 true,等同于 waitForFinished(false) +
366 * cancelAll, 如果调用了后者, 会一直阻塞等待任务,直到
367 * cancelAll 被调用之后 waitForFinished 才会在任务完成完
368 * 成后退出,此时就可以删除DAsync了。最好的管理方式还是采用
369 * QObject 的内存托管。主线程中使用,可以采用托管的方式,
370 * 任务结束只要调用 cancelAll + isFinished 轮询判断就行了,
371 * DAsync 的工作线程就会在完成后自动退出。
372 */
373 void waitForFinished(bool cancelAllWorks = true)
374 {
375 Q_ASSERT(!D_THREAD_IN_MAIN());
376 if (cancelAllWorks) {
377 cancelAll();
378 }
379 if (!m_status.testFlag(DAsyncState::Finished)) {
380 if (m_status.testFlag(DAsyncState::Pending)) {
381 m_cvIn.notify_one();
382 }
383 m_status.setFlag(DAsyncState::WaitFinished);
384 std::unique_lock<std::mutex> lck(m_mtxForWaitTask);
385 m_cvForWaitTask.wait(lck);
386 }
387 }
388 // 输入数据不是 void 类型则依赖于 m_QueueIn
389 template <typename FUNC, typename InputType = DataTypeIn>
390 typename std::enable_if<!std::is_void<InputType>::value, Helper *>::type post(FUNC &&func)
391 {
392 m_func.cbp = std::forward<FUNC>(func);
393 if (m_postProxy) {
394 return m_helper;
395 }
396 m_postProxy = [this]() {
397 std::thread thread([this] {
398 if (m_status.testFlag(DAsyncState::Cancel)) {
399 return;
400 }
401 Guard guard(this);
402 m_threadGuard = &guard;
403
404 std::unique_lock<std::mutex> lck(m_mtxIn);
405 while (true) {
406 while (!m_status.testFlag(DAsyncState::Ready) || !m_QueueIn.m_queue.size()) {
407 guard.setPending(true);
408 // 定时查询 flag,防止睡死的情况发生
409 m_cvIn.wait_for(lck, std::chrono::milliseconds(200));
410 if (guard.destructed() || m_status.testFlag(DAsyncState::Cancel)) {
411 return;
412 }
413 }
414 guard.setPending(false);
415
416 while (m_func.cbp && m_QueueIn.m_queue.size()) {
417 emitHelper<DataTypeIn, DataTypeOut>();
418 }
419 }
420 });
421 thread.detach();
422 };
423
424 return m_helper;
425 }
426
427 template <typename FUNC, typename InputType = DataTypeIn>
428 typename std::enable_if<std::is_void<InputType>::value, Helper *>::type post(FUNC &&func)
429 {
430 {
431 std::lock_guard<std::mutex> lckFunc(m_mtxFunc);
432 m_func.cbp = std::forward<FUNC>(func);
433 }
434 if (m_postProxy) {
435 return m_helper;
436 }
437 m_postProxy = [this]() {
438 std::thread thread([this] {
439 if (m_status.testFlag(DAsyncState::Cancel)) {
440 return;
441 }
442 Guard guard(this);
443 m_threadGuard = &guard;
444
445 std::unique_lock<std::mutex> lck(m_mtxIn);
446 while (true) {
447 if (!m_status.testFlag(DAsyncState::Ready)) {
448 guard.setPending(true);
449 // 定时查询 flag,防止睡死的情况发生
450 m_cvIn.wait_for(lck, std::chrono::milliseconds(200));
451 if (guard.destructed() || m_status.testFlag(DAsyncState::Cancel)) {
452 return;
453 }
454 }
455 guard.setPending(false);
456
457 if (m_func.cbp) {
458 std::lock_guard<std::mutex> lckFunc(m_mtxFunc);
459 emitHelper<DataTypeIn, DataTypeOut>();
460 m_func.cbp = nullptr; // reset
461 }
462 }
463 });
464 thread.detach();
465 };
466
467 return m_helper;
468 }
469
470 // only support DAsync<non void type, ...>
471 template <typename InputType = DataTypeIn>
472 typename std::enable_if<!std::is_void<InputType>::value>::type postData(const InputType &data)
473 {
474 if (Q_UNLIKELY(!m_status.testFlag(DAsyncState::Cancel))) {
475 m_QueueIn.m_queue.enqueue(data);
476 if (m_status.testFlag(DAsyncState::Pending)) {
477 m_cvIn.notify_one();
478 }
479 }
480 }
481
482private:
483 std::function<void()> m_postProxy;
484 class Helper : public QObject
485 {
486 DAsync *m_async;
487
488 public:
489 explicit Helper(DAsync *async, QObject *parent = nullptr) noexcept
490 : QObject(parent)
491 , m_async(async)
492 {
493 }
494
495 template <typename FUNC>
496 Helper *then(FUNC &&func)
497 {
498 m_async->m_mainWorker->template setHandle<FUNC, DataTypeOut>(std::forward<FUNC>(func));
499 return this;
500 }
501 // 仅启动,非阻塞
502 void start(bool immediately = true)
503 {
504 if (m_async->m_postProxy) {
505 m_async->m_postProxy();
506 }
507 if (!immediately) {
508 m_async->m_status.setFlag(DAsyncState::Ready, false);
509 } else {
510 m_async->m_status.setFlag(DAsyncState::Ready);
511 if (m_async->m_status.testFlag(DAsyncState::Pending)) {
512 m_async->m_cvIn.notify_one();
513 }
514 }
515 }
516 };
517
518 Helper *m_helper = nullptr;
519 DtkCorePrivate::MainWorker *m_mainWorker = nullptr;
520};
521
522DCORE_END_NAMESPACE
523#endif // DASYNC_H
Definition dasync.h:156
Definition dasync.h:179