WaitForMultipleObjects 是我以 pthread 移植 MFC 架構 至 Android 的一環; 因為覺得還蠻實用的, 也因為蠻簡單的, 所以放到部落格上供讀者參考. 有錯誤之處, 請讀者不吝指出. 但切記, 在應用時一定要先測試其效能. Android NDK 裡面的 pthread 並不支援 pthread_testcancel(),pthread_cancel().. , 因此下文內的 THREAD_CANCEL 並沒有用上, 只是我縱情想像力寫的.
Global 函數
所需要的靜態函數有 2 . 我解釋一下 TimespecAdd() ; 它需傳入兩個 timespec 結構, 然後將此 2 個 timespec 結構相加計算, 然後傳出此相加之後的 timespec 結構. timespec 結構是用來表示高精確度時間的 C 結構; 至於真正的精確度為何? 則與 kernel, platform 有關.
void CleanupLock(void *param) { pthread_mutex_unlock((pthread_mutex_t *)param); } void TimespecAdd(const struct timespec* a, const struct timespec* b, struct timespec* out) { time_t sec = a->tv_sec + b->tv_sec; long nsec = a->tv_nsec + b->tv_nsec; sec += nsec / 1000000000L; nsec = nsec % 1000000000L; out->tv_sec = sec; out->tv_nsec = nsec; }
API 是這樣的:
HANDLE CreateEvent(void *pAttr, int bManualReset, int bInitialState, const char *szName);
int CloseEvent(HANDLE hObject);
int SetEvent(HANDLE hEvent);
int ResetEvent(HANDLE hEvent);
int PulseEvent(HANDLE hEvent);
unsigned int WaitForSingleObject(HANDLE hEvent, unsigned int dwMilliseconds);
unsigned int WaitForMultipleObjects(unsigned int nCount, const HANDLE *lpHandles, int bWaitAll, unsigned int dwMilliseconds);
#if !defined(AFX_EVENT_H__285CFF59_8261_4DE7_84F3_43E209FEF60C__INCLUDED_) #define AFX_EVENT_H__285CFF59_8261_4DE7_84F3_43E209FEF60C__INCLUDED_ #if _MSC_VER > 1000 #pragma once #endif // _MSC_VER > 1000 #ifndef INFINITE #define INFINITE 0xFFFFFFFF #endif #ifdef ETIMEDOUT #define WAIT_TIMEOUT ETIMEDOUT #else #define WAIT_TIMEOUT 0x00000102L #endif #define WAIT_FAILED 0xFFFFFFFFL #define WAIT_OBJECT_0 0x00000000L #define WAIT_ABANDONED 0x00000080L #define WAIT_FOR_ANY_ONE_EVENTS 0 #define WAIT_FOR_ALL_EVENTS 1 #define MANUAL_RESET 1 #define AUTO_RESET 0 #define SET 1 #define RESET 0 ////////////////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////////////////// struct event_t { int id; int type; char name[256]; int manual_reset; pthread_cond_t event; }; ////////////////////////////////////////////////////////////////////// // extern ////////////////////////////////////////////////////////////////////// #if defined(__cplusplus) || defined(__CPLUSPLUS__) extern "C" { #endif HANDLE CreateEvent(void *pAttr, int bManualReset, int bInitialState, const char *szName); int CloseEvent(HANDLE hObject); int SetEvent(HANDLE hEvent); int ResetEvent(HANDLE hEvent); int PulseEvent(HANDLE hEvent); unsigned int WaitForSingleObject(HANDLE hEvent, unsigned int dwMilliseconds); unsigned int WaitForMultipleObjects(unsigned int nCount, const HANDLE *lpHandles, int bWaitAll, unsigned int dwMilliseconds); #if defined(__cplusplus) || defined(__CPLUSPLUS__) } #endif #endif // !defined(AFX_EVENT_H__285CFF59_8261_4DE7_84F3_43E209FEF60C__INCLUDED_)
// StdAfx.h : include file for standard system include files, // or project specific include files that are used frequently, but // are changed infrequently // #if !defined(AFX_STDAFX_H__3F41C39C_B21A_4F9F_BD57_0061B53E9B86__INCLUDED_) #define AFX_STDAFX_H__3F41C39C_B21A_4F9F_BD57_0061B53E9B86__INCLUDED_ #if _MSC_VER > 1000 #pragma once #endif // _MSC_VER > 1000 #ifdef WIN32 #define VC_EXTRALEAN // Exclude rarely-used stuff from Windows headers #include <afxwin.h> // MFC core and standard components #include <afxext.h> // MFC extensions #include <afxdtctl.h> // MFC support for Internet Explorer 4 Common Controls #ifndef _AFX_NO_AFXCMN_SUPPORT #include <afxcmn.h> // MFC support for Windows Common Controls #endif // _AFX_NO_AFXCMN_SUPPORT #endif // WIN32 //{{AFX_INSERT_LOCATION}} #ifndef NULL #define NULL 0 #endif #ifndef BOOL #define BOOL int #endif #ifndef INT #define INT int #endif #ifndef UINT #define UINT unsigned int #endif #ifndef DWORD #define DWORD unsigned long #endif #ifndef LONG #define LONG long #endif #ifndef WORD #define WORD unsigned short #endif #ifndef BYTE #define BYTE unsigned char #endif #ifndef TRUE #define TRUE 1 #define FALSE 0 #endif #ifndef WPARAM #define WPARAM unsigned int #endif #ifndef LPARAM #define LPARAM unsigned int #endif #ifndef LRESULT #define LRESULT unsigned int #endif #ifndef INFINITE #define INFINITE 0XFFFFFFFF // Infinite timeout #endif #ifndef SIGNED #define SIGNED int #endif #ifndef UCHAR #define UCHAR unsigned char #endif #ifndef LONG #define LONG int #endif #ifndef INT64 #define INT64 signed long long #endif #ifndef UINT64 #define UINT64 unsigned long long #endif #ifndef FLOAT #define FLOAT float #endif typedef void *HANDLE; typedef unsigned char U8_T; typedef unsigned char U8_B_T; typedef unsigned char U8_S_T; typedef unsigned short U16_T; typedef unsigned long U32_T; typedef unsigned long U32_IP_T; typedef char I8_T; typedef short I16_T; typedef long I32_T; #define ULONGLONGINT unsigned long long int #define BIT0 (1<<0) #define BIT1 (1<<1) #define BIT2 (1<<2) #define BIT3 (1<<3) #define BIT4 (1<<4) #define BIT5 (1<<5) #define BIT6 (1<<6) #define BIT7 (1<<7) #endif // !defined(AFX_STDAFX_H__3F41C39C_B21A_4F9F_BD57_0061B53E9B86__INCLUDED_)
Global 變數
struct event_t { int id; int type; char name[256]; int manual_reset; pthread_cond_t event; };
static HANDLE pso[] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 }; static unsigned long long int cs = 0; static pthread_mutex_t cm = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t si = PTHREAD_MUTEX_INITIALIZER;
cm 是一個 mutex . pso 是 Events 陣列 , 陣列中每一個元素可能指向一個 event_t 結構, 而這結構則包含了一個 pthresd 條件變數 pthread_cond_t event . cs 後面會提到. event_t 結構裡有一成員 manual_reset, 1 的時候是手動 reset, 0 的時候是自動 reset .
CreateEvent / CloseEvent
CreateEvents() 用來產生一個 Event 物件; 而 CloseEvent() 則用來刪除這個物件. si 這個 mutex 則做為執行緒間 cteate/close 之間的互斥機制. CreateEvents() 有幾個重要的參數:
1. bManualReset: 設為1 是手動 reset ; 設為 0 是自動 reset .
2. bInitialState: bInitialState 是指這個要產生的 Event 物件它的初始狀態是 set 或是 reset . 這狀態如果是 reset, 則執行緒呼叫 WaitFor.... 等函數會被 block; 如果是 set, 則會 unblock . Event 物件的狀態是儲存在全域變數 cs 的其中 1個 bit.
HANDLE CreateEvent(void *pAttr, int bManualReset, int bInitialState, const char *szName) { int i; struct event_t *p = 0; unsigned long long int mask = 0; #ifdef THREAD_CANCEL int last_type; pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &last_type); #endif // THREAD_CANCEL pthread_mutex_lock(&si); pthread_cleanup_push(CleanupLock, (void *)&si); for (i = 0; i < sizeof(unsigned long long int); i++) { if (pso[i] == 0) { pso[i] = malloc(sizeof(struct event_t)); p = (struct event_t *)pso[i], assert(p); memset(p, 0, sizeof(struct event_t)); p->id = i; mask = 1 << i; break; } } pthread_cleanup_pop(1); #ifdef THREAD_CANCEL pthread_testcancel(); pthread_setcanceltype(last_type, NULL); #endif // THREAD_CANCEL if(p == 0) return 0; strcpy(p->name, szName); p->type = 1; p->manual_reset = bManualReset; pthread_cond_init(&p->event, NULL); pthread_mutex_lock(&cm); pthread_cleanup_push(CleanupLock, (void *)&cm); if (bInitialState) cs |= mask; else cs &= ~mask; pthread_cleanup_pop(1); return (void *)p; }
int CloseEvent(HANDLE hObject) { struct event_t *p; unsigned long long int mask; int i; #ifdef THREAD_CANCEL int last_type; #endif if(hObject == 0) return 0; p = (struct event_t *)hObject; i = p->id; mask = 1 << i; #ifdef THREAD_CANCEL pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &last_type); #endif pthread_mutex_lock(&cm); pthread_cleanup_push(CleanupLock, (void *)&cm); cs &= ~mask; pthread_cleanup_pop(1); #ifdef THREAD_CANCEL pthread_testcancel(); pthread_setcanceltype(last_type, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &last_type); #endif pthread_mutex_lock(&si); pthread_cleanup_push(CleanupLock, (void *)&si); pso[i] = 0; pthread_cleanup_pop(1); #ifdef THREAD_CANCEL pthread_testcancel(); pthread_setcanceltype(last_type, NULL); #endif free(hObject); return 0; }
SetEvent / ResetEvent / PulseEvent
SetEvent() / ResetEvent() 是用來 set 或是 reset 產生的 Event 物件它內部的狀態. 這 Event 物件的狀態是儲存在全域變數 cs 的其中 1個 bit. 我們用 cm 這個 mutex 則做為執行緒間所有 Events set/reset 之間的互斥機制. 這邊要提到 SetEvent() 與 PulseEvent() 有什麼不同呢? 假設執行緒 T1, T2 阻斷 (block, 是動詞) 在 E 這個 Event 物件, 如果 E 是 自動 reset 的話, SetEvent() 只能 unblock T1 或是 T2, 而 PulseEvent() 則能同時 unblock T1 以及 T2.
int SetEvent(HANDLE hEvent) { struct event_t *p; unsigned long long int mask; int i; int res; #ifdef THREAD_CANCEL int last_type; #endif if(hEvent == 0) return 0; p = (struct event_t *)hEvent; i = p->id; mask = 1 << i; #ifdef THREAD_CANCEL pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &last_type); #endif pthread_mutex_lock(&cm); pthread_cleanup_push(CleanupLock, (void *)&cm); cs |= mask; res = pthread_cond_signal(&p->event); pthread_cleanup_pop(1); #ifdef THREAD_CANCEL pthread_testcancel(); pthread_setcanceltype(last_type, NULL); #endif return (res == 0) ? 1 : 0; }
int ResetEvent(HANDLE hEvent) { struct event_t *p; unsigned long long int mask; int i; #ifdef THREAD_CANCEL int last_type; #endif if(hEvent == 0) return 0; p = (struct event_t *)hEvent; i = p->id; mask = 1 << i; #ifdef THREAD_CANCEL pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &last_type); #endif pthread_mutex_lock(&cm); pthread_cleanup_push(CleanupLock, (void *)&cm); cs &= ~mask; pthread_cleanup_pop(1); #ifdef THREAD_CANCEL pthread_testcancel(); pthread_setcanceltype(last_type, NULL); #endif return 1; }
int PulseEvent(HANDLE hEvent) { struct event_t *p; unsigned long long int mask; int i; int res; #ifdef THREAD_CANCEL int last_type; #endif if(hEvent == 0) return 0; p = (struct event_t *)hEvent; i = p->id; mask = 1 << i; #ifdef THREAD_CANCEL pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &last_type); #endif pthread_mutex_lock(&cm); pthread_cleanup_push(CleanupLock, (void *)&cm); cs |= mask; res = pthread_cond_broadcast(&p->event); pthread_cleanup_pop(1); #ifdef THREAD_CANCEL pthread_testcancel(); pthread_setcanceltype(last_type, NULL); #endif return (res == 0) ? 1 : 0; }
直接來看重點吧 (WaitForMultipleObjects) . 關於 Events 我使用一個 64位元的變數 cs 來代表它們的 set / reset, set為1 reset為0 . 也就是說 WaitForMultiples 最多只能 wait 64 個 Events 物件. 一般來說, 這是夠用了. WaitForMultipleObjects() 參數1 傳入 Events 數目; 參數2 傳入我們關注的 Events 陣列; 參數3 傳入是否為 WaitAll; 參數4 則傳入 TimeOut 的 mini-seconds . 這與 MSDN 上的 prototpye 一致, https://msdn.microsoft.com/zh-tw/library/windows/desktop/ms687025(v=vs.85).aspx .
WaitForMultipleObjects 作用是 Wait for the thread to signal one of the event objects . 返回值則是 WAIT_TIMEOUT, WAIT_FAIL, 以及 WAIT_OBJECT_0, +1 +2 ....
unsigned int WaitForMultipleObjects(unsigned int nCount, const HANDLE *lpHandles, int bWaitAll, unsigned int dwMilliseconds) { unsigned int r; int i; int res = EINVAL; HANDLE handle = 0; struct event_t *p; unsigned long long int mask = 0; struct timespec ts; struct timespec dt = { 0 }, at = { 0 }; unsigned long long int id = 0; #ifdef THREAD_CANCEL int last_type; #endif assert(nCount > 0), assert(nCount <= sizeof(unsigned long long int)), assert(lpHandles != 0); clock_gettime(CLOCK_REALTIME, &ts); if(dwMilliseconds != INFINITE) { memset(&dt, 0, sizeof(dt)); dt.tv_sec = dwMilliseconds / 1000; dt.tv_nsec = (dwMilliseconds % 1000) * 1000000; TimespecAdd(&ts, &dt, &at); } #ifdef THREAD_CANCEL pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &last_type); #endif pthread_mutex_lock(&cm); pthread_cleanup_push(CleanupLock, (void *)&cm); mask = 0; for (i = 0; i < (int)nCount; i++) { handle = lpHandles[i]; p = (struct event_t *)handle; mask |= (1 << p->id); } /////////////////////////////////////////////////////////// // wait for any one of the events if (!bWaitAll) { #if 0 while (!(mask & cs)) { #else if (!(mask & cs)) { #endif if (dwMilliseconds == INFINITE) res = pthread_cond_wait(&p->event, &cm); else res = pthread_cond_timedwait(&p->event, &cm, &at); if(res == 0) break; if(res == ETIMEDOUT) { r = WAIT_TIMEOUT; goto EXIT; } if(res == EINVAL) { r = WAIT_FAILED; goto EXIT; } } for (i = 0; i < (int)nCount; i++) { handle = lpHandles[i]; p = (struct event_t *)handle; id = p->id; if (cs & (1 << id)) { if(!p->manual_reset) cs &= ~(1 << id); break; } } r = WAIT_OBJECT_0 + (unsigned int)i; goto EXIT; } /////////////////////////////////////////////////////////// // wait for all of the events #if 0 while ((mask & cs) != cs) { #else if ((mask & cs) != cs) { #endif if (dwMilliseconds == INFINITE) res = pthread_cond_wait(&p->event, &cm); else res = pthread_cond_timedwait(&p->event, &cm, &at); if(res == 0) break; if(res == ETIMEDOUT) { r = WAIT_TIMEOUT; goto EXIT; } if(res == EINVAL) { r = WAIT_FAILED; goto EXIT; } } for (i = 0; i < (int)nCount; i++) { handle = lpHandles[i]; p = (struct event_t *)handle; id = p->id; if (cs & (1 << id)) { if(!p->manual_reset) cs &= ~(1 << id); } } r = WAIT_OBJECT_0; goto EXIT; /////////////////////////////////////////////////////////// // returns EXIT: pthread_cleanup_pop(1); #ifdef THREAD_CANCEL pthread_testcancel(); pthread_setcanceltype(last_type, NULL); #endif return r; }
文件中的 mutural exclution 機制, 我使用的 clean-up stack. pthread_cleanup_push() / pthread_cleanup_pop() 互為 counter-part 函式. 由於 pthread_cleanup_push() / pthread_cleanup_pop() 實做上是 macro 因此文件上我用了 goto .
先來看 wait for any one of the events ; 通常我們會用 wait for any one of the events. 觀察的重點是 cs 這個 64位元的全域變數 (靜態變數, 因只有 Event.cpp 這個模組使用) . 通常執行緒 T 會 block 在 pthread_cond_wait() 或 pthread_cond_timedwait, 如果 WaitForMultipleObjects() 最後一個參數是 INFINITE, 則執行緒會 block 在 pthread_cond_wait(), 否則執行緒會 block 在 pthread_cond_timedwait() . 這最後一個參數的單位是 mini-seconds. 當有其他執行緒呼叫 SetEvent() 後, SetEvent() 會準確地 set cs 這個 64位元的全域變數造成執行緒 T 自 pthread_cond_wait() 返回. 之後我們計算 unblock 自那一個 Event 物件, 告知執行緒 T. WAIT_OBJECT_0 就是我們關注的 Events 中的第1個, (WAIT_OBJECT_0+1) 就是我們關注的 Events 中的第2個, ...
wait for all of the events 就請讀者自行參閱. 要注意的一點是: 如果要 wait for all of the events , 這些關注的 events 都要 manual reset. 原因是如果不全是 manual reset 的話, 執行緒 T 會 block forever.
unsigned int WaitForSingleObject(HANDLE hEvent, unsigned int dwMilliseconds) { struct event_t *p; unsigned long long int mask; int i; int res = EINVAL; struct timespec ts; struct timespec dt = { 0 }, at = { 0 }; #ifdef THREAD_CANCEL int last_type; #endif // THREAD_CANCEL if(hEvent == 0) return WAIT_FAILED; p = (struct event_t *)hEvent; i = p->id; mask = (1 << i); if (dwMilliseconds != INFINITE) { clock_gettime(CLOCK_REALTIME, &ts); memset(&dt, 0, sizeof(dt)); dt.tv_sec += (dwMilliseconds / 1000); dt.tv_nsec += ((dwMilliseconds % 1000) * 1000000); TimespecAdd(&ts, &dt, &at); } #ifdef THREAD_CANCEL pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &last_type); #endif // THREAD_CANCEL pthread_mutex_lock(&cm); pthread_cleanup_push(CleanupLock, (void *)&cm); #if 0 while((cs & mask) != mask) { #else if((cs & mask) != mask) { #endif if (dwMilliseconds == INFINITE) res = pthread_cond_wait(&p->event, &cm); else res = pthread_cond_timedwait(&p->event, &cm, &at); if (res != 0) break; } // auto reset if(!p->manual_reset) cs &= ~mask; pthread_cleanup_pop(1); #ifdef THREAD_CANCEL pthread_testcancel(); pthread_setcanceltype(last_type, NULL); #endif // THREAD_CANCEL if (res == 0) return WAIT_OBJECT_0; else if (res == ETIMEDOUT) return WAIT_TIMEOUT; else return WAIT_FAILED; }
還有一個函數 WaitForSingleObject(), 我把它列出來. 細節我就不多做說明了.
Example 我先使用 MSDN 裡面的範例,
#include <windows.h> #include <stdio.h> HANDLE ghEvents[2]; DWORD WINAPI ThreadProc( LPVOID ); int main( void ) { HANDLE hThread; DWORD i, dwEvent, dwThreadID; // Create two event objects for (i = 0; i < 2; i++) { ghEvents[i] = CreateEvent( NULL, // default security attributes FALSE, // auto-reset event object FALSE, // initial state is nonsignaled NULL); // unnamed object if (ghEvents[i] == NULL) { printf("CreateEvent error: %d\n", GetLastError() ); ExitProcess(0); } } // Create a thread hThread = CreateThread( NULL, // default security attributes 0, // default stack size (LPTHREAD_START_ROUTINE) ThreadProc, NULL, // no thread function arguments 0, // default creation flags &dwThreadID); // receive thread identifier if( hThread == NULL ) { printf("CreateThread error: %d\n", GetLastError()); return 1; } // Wait for the thread to signal one of the event objects dwEvent = WaitForMultipleObjects( 2, // number of objects in array ghEvents, // array of objects FALSE, // wait for any object 5000); // five-second wait // The return value indicates which event is signaled switch (dwEvent) { // ghEvents[0] was signaled case WAIT_OBJECT_0 + 0: // TODO: Perform tasks required by this event printf("First event was signaled.\n"); break; // ghEvents[1] was signaled case WAIT_OBJECT_0 + 1: // TODO: Perform tasks required by this event printf("Second event was signaled.\n"); break; case WAIT_TIMEOUT: printf("Wait timed out.\n"); break; // Return value is invalid. default: printf("Wait error: %d\n", GetLastError()); ExitProcess(0); } // Close event handles for (i = 0; i < 2; i++) CloseHandle(ghEvents[i]); return 0; } DWORD WINAPI ThreadProc( LPVOID lpParam ) { // lpParam not used in this example UNREFERENCED_PARAMETER( lpParam); // Set one event to the signaled state if ( !SetEvent(ghEvents[0]) ) { printf("SetEvent failed (%d)\n", GetLastError()); return 1; } return 0; }
