23#include <dtkcore_global.h>
28#include <QMutexLocker>
29#include <QCoreApplication>
36#define GUARDED_BY(...)
37#define D_THREAD_IN_MAIN() (qApp->instance() && qApp->instance()->thread() == QThread::currentThread())
40namespace DtkCorePrivate
46 inline void enqueue(
const T &t) {
47 QMutexLocker lkc(&m_mtx);
48 QQueue<T>::enqueue(t);
51 QMutexLocker lkc(&m_mtx);
52 return QQueue<T>::dequeue();
55 QMutexLocker lkc(&m_mtx);
56 return QQueue<T>::size();
59 QMutexLocker lkc(&m_mtx);
60 return QQueue<T>::head();
62 inline const T &head()
const {
63 QMutexLocker lkc(&m_mtx);
64 return QQueue<T>::head();
73 std::function<void(
void *)> m_handle;
74 std::function<void(
void *)> m_handleProxy;
76 std::function<void(
void)> m_handleV;
77 std::function<void(
void)> m_handleVProxy;
79 bool m_dasyncDestroyed =
false;
82 void setDAsyncDestroyed() {
83 m_dasyncDestroyed =
true;
85 bool dasyncDestroyed() {
86 return m_dasyncDestroyed;
93 Q_ASSERT(qApp->instance() && qApp->instance()->thread());
94 moveToThread(qApp->instance()->thread());
96 bool isStartInMain = D_THREAD_IN_MAIN();
98 QObject::connect(
this, &MainWorker::sigRunInMain,
99 this, &MainWorker::slotRunInMain,
100 isStartInMain ? Qt::AutoConnection : Qt::BlockingQueuedConnection);
102 QObject::connect(
this, &MainWorker::sigRunInMainVoid,
103 this, &MainWorker::slotRunInMainVoid,
104 isStartInMain ? Qt::AutoConnection : Qt::BlockingQueuedConnection);
108 template <
typename FUNC,
typename ArgType>
109 typename std::enable_if<!std::is_void<ArgType>::value>::type
110 setHandle(FUNC &&func) {
111 m_handle = [&] (
void *arg) {
113 while (q && q->size()) {
119 m_handleProxy = [
this] (
void *arg) {
127 template <
typename FUNC,
typename ArgType>
128 typename std::enable_if<std::is_void<ArgType>::value>::type
129 setHandle(FUNC &&func) {
130 m_handleV = [&] (void) {
135 m_handleVProxy = [
this] (void) {
142 void sigRunInMain(
void *arg);
143 void sigRunInMainVoid();
145 void slotRunInMain(
void *arg) {
146 Q_ASSERT(D_THREAD_IN_MAIN());
147 if (m_handleProxy && !m_dasyncDestroyed) {
151 void slotRunInMainVoid(
void) {
152 Q_ASSERT(D_THREAD_IN_MAIN());
153 if (m_handleVProxy && !m_dasyncDestroyed) {
163 explicit DAsyncState(QObject *parent =
nullptr) noexcept
167 enum AsyncTaskState {
171 Pending = Ready | Running,
177 Q_DECLARE_FLAGS(AsyncTaskStatus, AsyncTaskState)
181template <
typename DataTypeIn,
typename DataTypeOut>
187 std::condition_variable m_cvIn;
189 std::mutex m_mtxForWaitTask;
190 std::condition_variable m_cvForWaitTask;
196 bool m_dasDestructed =
false;
199 return m_dasDestructed;
201 void setDestructed() {
202 m_dasDestructed =
true;
205 explicit Guard(
DAsync *as) noexcept : m_as (as)
207 m_as->m_status.setFlag(DAsyncState::Ready);
208 m_as->m_status.setFlag(DAsyncState::Finished,
false);
214 m_as->m_threadGuard =
nullptr;
215 m_as->m_status.setFlag(DAsyncState::Finished);
216 m_as->m_status.setFlag(DAsyncState::Ready,
false);
217 if (m_as->m_status.testFlag(DAsyncState::WaitFinished)) {
218 m_as->m_cvForWaitTask.notify_one();
222 void setPending(
bool isPending) {
224 m_as->m_status.setFlag(DAsyncState::Pending, isPending);
228 Guard *m_threadGuard =
nullptr;
236 template<
typename T,
typename Enable =
void>
239 struct DataQueueType<T, typename std::enable_if<std::is_void<T>::value>::type> { };
240 using DataInQueue = DataQueueType<DataTypeIn>;
241 using DataOutQueue = DataQueueType<DataTypeOut>;
243 DataInQueue m_QueueIn;
244 DataOutQueue m_QueueOut;
247 template<
typename T1,
typename T2,
typename Enable1 =
void,
typename Enable2 =
void>
250 template<
typename T1,
typename T2>
251 struct FuncType<T1, T2,
252 typename std::enable_if<std::is_void<T1>::value>::type,
253 typename std::enable_if<std::is_void<T2>::value>::type> {
254 std::function <void(
void)> cbp;
256 template<
typename T1,
typename T2>
257 struct FuncType<T1, T2,
258 typename std::enable_if<!std::is_void<T1>::value>::type,
259 typename std::enable_if<!std::is_void<T2>::value>::type> {
260 std::function <T2(T1)> cbp;
262 template<
typename T1,
typename T2>
263 struct FuncType<T1, T2,
264 typename std::enable_if<std::is_void<T1>::value>::type,
265 typename std::enable_if<!std::is_void<T2>::value>::type> {
266 std::function <T2(
void)> cbp;
268 template<
typename T1,
typename T2>
269 struct FuncType<T1, T2,
270 typename std::enable_if<!std::is_void<T1>::value>::type,
271 typename std::enable_if<std::is_void<T2>::value>::type> {
272 std::function <void(T1)> cbp;
275 std::mutex m_mtxFunc;
276 FuncType<DataTypeIn, DataTypeOut> m_func GUARDED_BY(m_mtxFunc);
277 DAsyncState::AsyncTaskStatus m_status;
280 explicit DAsync(QObject *parent =
nullptr) noexcept
283 , m_status (DAsyncState::NotReady)
286 m_helper =
new Helper(
this,
this);
290 m_threadGuard->setDestructed();
292 m_status.setFlag(DAsyncState::Cancel);
293 if (m_status.testFlag(DAsyncState::Pending)) {
297 m_mainWorker->setDAsyncDestroyed();
298 m_mainWorker->deleteLater();
299 m_mainWorker =
nullptr;
305 template <
typename PostInType,
typename EmitInType>
306 typename std::enable_if<std::is_void<PostInType>::value && std::is_void<EmitInType>::value>::type
309 Q_EMIT m_mainWorker->sigRunInMainVoid();
312 template <
typename PostInType,
typename EmitInType>
313 typename std::enable_if<!std::is_void<PostInType>::value && !std::is_void<EmitInType>::value>::type
315 m_QueueOut.m_queue.enqueue(m_func.cbp(m_QueueIn.m_queue.dequeue()));
316 Q_EMIT m_mainWorker->sigRunInMain(
static_cast<void *
>(&(m_QueueOut.m_queue)));
319 template <
typename PostInType,
typename EmitInType>
320 typename std::enable_if<!std::is_void<PostInType>::value && std::is_void<EmitInType>::value>::type
322 m_func.cbp(m_QueueIn.m_queue.dequeue());
323 Q_EMIT m_mainWorker->sigRunInMainVoid();
326 template <
typename PostInType,
typename EmitInType>
327 typename std::enable_if<std::is_void<PostInType>::value && !std::is_void<EmitInType>::value>::type
329 m_QueueOut.m_queue.enqueue(m_func.cbp());
330 Q_EMIT m_mainWorker->sigRunInMain(
static_cast<void *
>(&(m_QueueOut.m_queue)));
335 if (m_status.testFlag(DAsyncState::Cancel)) {
341 m_status.setFlag(DAsyncState::Cancel);
342 if (m_status.testFlag(DAsyncState::Pending)) {
347 return m_status.testFlag(DAsyncState::Finished);
360 void waitForFinished(
bool cancelAllWorks =
true) {
361 Q_ASSERT(!D_THREAD_IN_MAIN());
362 if (cancelAllWorks) {
365 if (!m_status.testFlag(DAsyncState::Finished)) {
366 if (m_status.testFlag(DAsyncState::Pending)) {
369 m_status.setFlag(DAsyncState::WaitFinished);
370 std::unique_lock <std::mutex> lck(m_mtxForWaitTask);
371 m_cvForWaitTask.wait(lck);
375 template <
typename FUNC,
typename InputType = DataTypeIn>
376 typename std::enable_if<!std::is_void<InputType>::value, Helper *>::type
378 m_func.cbp = std::forward<FUNC>(func);
382 m_postProxy = [
this] () {
383 std::thread thread([
this] {
384 if (m_status.testFlag(DAsyncState::Cancel)) {
388 m_threadGuard = &guard;
390 std::unique_lock <std::mutex> lck(m_mtxIn);
392 while (!m_status.testFlag(DAsyncState::Ready) || !m_QueueIn.m_queue.size()) {
393 guard.setPending(
true);
395 m_cvIn.wait_for(lck, std::chrono::milliseconds(200));
396 if (guard.destructed() || m_status.testFlag(DAsyncState::Cancel)) {
400 guard.setPending(
false);
402 while (m_func.cbp && m_QueueIn.m_queue.size()) {
403 emitHelper<DataTypeIn, DataTypeOut>();
413 template <
typename FUNC,
typename InputType = DataTypeIn>
414 typename std::enable_if<std::is_void<InputType>::value, Helper *>::type
417 std::lock_guard<std::mutex> lckFunc(m_mtxFunc);
418 m_func.cbp = std::forward<FUNC>(func);
423 m_postProxy = [
this] () {
424 std::thread thread([
this] {
425 if (m_status.testFlag(DAsyncState::Cancel)) {
429 m_threadGuard = &guard;
431 std::unique_lock <std::mutex> lck(m_mtxIn);
433 if (!m_status.testFlag(DAsyncState::Ready)) {
434 guard.setPending(
true);
436 m_cvIn.wait_for(lck, std::chrono::milliseconds(200));
437 if (guard.destructed() || m_status.testFlag(DAsyncState::Cancel)){
441 guard.setPending(
false);
444 std::lock_guard<std::mutex> lckFunc(m_mtxFunc);
445 emitHelper<DataTypeIn, DataTypeOut>();
446 m_func.cbp =
nullptr;
457 template <
typename InputType = DataTypeIn>
458 typename std::enable_if<!std::is_void<InputType>::value>::type
459 postData(
const InputType &data) {
460 if (Q_UNLIKELY(!m_status.testFlag(DAsyncState::Cancel))) {
461 m_QueueIn.m_queue.enqueue(data);
462 if (m_status.testFlag(DAsyncState::Pending)) {
469 std::function<void()> m_postProxy;
470 class Helper :
public QObject {
473 explicit Helper(
DAsync *async, QObject *parent =
nullptr) noexcept
479 template <
typename FUNC>
480 Helper *then(FUNC &&func) {
481 m_async->m_mainWorker->template setHandle<FUNC, DataTypeOut>(std::forward<FUNC>(func));
485 void start(
bool immediately =
true) {
486 if (m_async->m_postProxy) {
487 m_async->m_postProxy();
490 m_async->m_status.setFlag(DAsyncState::Ready,
false);
492 m_async->m_status.setFlag(DAsyncState::Ready);
493 if (m_async->m_status.testFlag(DAsyncState::Pending)) {
494 m_async->m_cvIn.notify_one();
500 Helper *m_helper =
nullptr;