• XSS.stack #1 – первый литературный журнал от юзеров форума

Как реализовать?

SlowMan

(L3) cache
Забанен
Регистрация
22.07.2023
Сообщения
167
Реакции
6
Пожалуйста, обратите внимание, что пользователь заблокирован
Привет.

Нужно реализовать следующую задачу:
Есть список IP.txt содержащий строки ip:port
Код должен отправлять пустой пакет(с 1 байтом 0x00) на каждый ip:port и проверять ответ. Если первый байт ответа == 0x55 то записывать в файл Good иначе в файл Bads.txt или если ошибка то в файл Error.txt
Нужна большая скорость работы.(Списки минимум от миллиарда строк)
C++/C#
 
А зачем тебе писать на плюсах ради скорости, если эта скорость не критична? Какая разница просканит чекер за сутки, или за двое? Ты во всяком случае скорее упрешься в ограничения сети, а не нагрузки на цп. Также тебе нужны socks5 либо антиабуз хост, иначе в бан дедик улетит через час за скан. Вот тебе аутпут ллм на питоне

Python:
import asyncio
import socket

async def check_ip(ip, port):
    try:
        reader, writer = await asyncio.wait_for(asyncio.open_connection(ip, port), timeout=1)
        writer.write(b'\x00')  # Отправка пустого пакета (1 байт 0x00)
        await writer.drain()

        response = await asyncio.wait_for(reader.read(1), timeout=1)
        writer.close()
        await writer.wait_closed()

        if response == b'\x55':
            return "Good", f"{ip}:{port}\n"
        else:
            return "Bads", f"{ip}:{port}\n"
    except Exception as e:
        return "Error", f"{ip}:{port} - {e}\n"

async def process_file():
    with open("IP.txt", "r") as ip_file:
        lines = ip_file.readlines()

    # Открытие файлов заранее для записи
    good_file = open("Good.txt", "a")
    bad_file = open("Bads.txt", "a")
    error_file = open("Error.txt", "a")

    tasks = []
    for line in lines:
        ip, port = line.strip().split(':')
        tasks.append(check_ip(ip, int(port)))

        if len(tasks) >= 10000:  # Ограничение количества одновременных задач
            results = await asyncio.gather(*tasks)
            for result, data in results:
                if result == "Good":
                    good_file.write(data)
                elif result == "Bads":
                    bad_file.write(data)
                elif result == "Error":
                    error_file.write(data)
            tasks = []

    # Обработка оставшихся задач
    if tasks:
        results = await asyncio.gather(*tasks)
        for result, data in results:
            if result == "Good":
                good_file.write(data)
            elif result == "Bads":
                bad_file.write(data)
            elif result == "Error":
                error_file.write(data)

    # Закрытие файлов
    good_file.close()
    bad_file.close()
    error_file.close()

if __name__ == "__main__":
    asyncio.run(process_file())

Описание кода:​

  1. Асинхронные функции:
    • check_ip — отправляет пустой пакет на указанный IP и порт, затем проверяет первый байт ответа. Результат записывается в один из трёх файлов.
    • process_file — читает файл IP.txt, обрабатывает строки и вызывает функцию check_ip для каждой пары IP

      .
  2. Асинхронное выполнение:
    • Используется asyncio для параллельного выполнения запросов к большим спискам IP-адресов.
  3. Ограничение числа задач:
    • Для эффективного использования ресурсов и предотвращения переполнения памяти, добавлено ограничение на количество одновременно выполняемых задач (например, 10,000).
  4. Обработка ошибок:
    • Все ошибки при подключении или получении данных записываются в файл Error.txt.
Этот скрипт будет обрабатывать список IP-адресов параллельно, что значительно увеличит скорость работы при обработке больших списков.
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Мне кажется под эту задачу лучше подойдет Rust, чем C++.
Или я ошибаюсь? Хмм. Эксперты, подскажите.
waahoo DildoFagins Whisper Quake3
 
А зачем тебе писать на плюсах ради скорости, если эта скорость не критична? Какая разница просканит чекер за сутки, или за двое? Ты во всяком случае скорее упрешься в ограничения сети, а не нагрузки на цп. Также тебе нужны socks5 либо антиабуз хост, иначе в бан дедик улетит через час за скан. Вот тебе аутпут ллм на питоне

Python:
import asyncio
import socket

async def check_ip(ip, port):
    try:
        reader, writer = await asyncio.wait_for(asyncio.open_connection(ip, port), timeout=1)
        writer.write(b'\x00')  # Отправка пустого пакета (1 байт 0x00)
        await writer.drain()

        response = await asyncio.wait_for(reader.read(1), timeout=1)
        writer.close()
        await writer.wait_closed()

        if response == b'\x55':
            return "Good", f"{ip}:{port}\n"
        else:
            return "Bads", f"{ip}:{port}\n"
    except Exception as e:
        return "Error", f"{ip}:{port} - {e}\n"

async def process_file():
    with open("IP.txt", "r") as ip_file:
        lines = ip_file.readlines()

    # Открытие файлов заранее для записи
    good_file = open("Good.txt", "a")
    bad_file = open("Bads.txt", "a")
    error_file = open("Error.txt", "a")

    tasks = []
    for line in lines:
        ip, port = line.strip().split(':')
        tasks.append(check_ip(ip, int(port)))

        if len(tasks) >= 10000:  # Ограничение количества одновременных задач
            results = await asyncio.gather(*tasks)
            for result, data in results:
                if result == "Good":
                    good_file.write(data)
                elif result == "Bads":
                    bad_file.write(data)
                elif result == "Error":
                    error_file.write(data)
            tasks = []

    # Обработка оставшихся задач
    if tasks:
        results = await asyncio.gather(*tasks)
        for result, data in results:
            if result == "Good":
                good_file.write(data)
            elif result == "Bads":
                bad_file.write(data)
            elif result == "Error":
                error_file.write(data)

    # Закрытие файлов
    good_file.close()
    bad_file.close()
    error_file.close()

if __name__ == "__main__":
    asyncio.run(process_file())

Описание кода:​

  1. Асинхронные функции:
    • check_ip — отправляет пустой пакет на указанный IP и порт, затем проверяет первый байт ответа. Результат записывается в один из трёх файлов.
    • process_file — читает файл IP.txt, обрабатывает строки и вызывает функцию check_ip для каждой пары IP

      .
  2. Асинхронное выполнение:
    • Используется asyncio для параллельного выполнения запросов к большим спискам IP-адресов.
  3. Ограничение числа задач:
    • Для эффективного использования ресурсов и предотвращения переполнения памяти, добавлено ограничение на количество одновременно выполняемых задач (например, 10,000).
  4. Обработка ошибок:
    • Все ошибки при подключении или получении данных записываются в файл Error.txt.
Этот скрипт будет обрабатывать список IP-адресов параллельно, что значительно увеличит скорость работы при обработке больших списков.
lines = ip_file.readlines() - если много строк будет, то скрипт может до 10 минут это обрабатывать, лучше сразу по ip_file форироваться
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Нужен код на С++/C# потому что эти ЯП мне привычнее. Я могу реализовать эту задачу, но проблема именно в скорости. У меня много гигабайтные списки. Я не понимаю как сделать так что бы можно было быстрее обрабатывать. У меня нету столько опыта.
 
Нужен код на С++/C# потому что эти ЯП мне привычнее. Я могу реализовать эту задачу, но проблема именно в скорости. У меня много гигабайтные списки. Я не понимаю как сделать так что бы можно было быстрее обрабатывать. У меня нету столько опыта.
Что у тебя за "Списки минимум от миллиарда строк", если всего адресов 4 миллиарда? Если тебе надо просканить весь интернет, просто укажи в скрипте маску 0.0.0.0/0 и нужные порты
 
Нужен код на С++/C# потому что эти ЯП мне привычнее.

Самый быстрый паблик чекер - masscan. Смотри исходники на гитхабе, меняй в коде payload на 0x00, собирай под себя
Но лучше подумать и уменьшать списки
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Что у тебя за "Списки минимум от миллиарда строк", если всего адресов 4 миллиарда? Если тебе надо просканить весь интернет, просто укажи в скрипте маску 0.0.0.0/0 и нужные порты
Например порт может быть не один

Самый быстрый паблик чекер - masscan. Смотри исходники на гитхабе, меняй в коде payload на 0x00, собирай под себя
Но лучше подумать и уменьшать списки
Masscan к сожалению очень плохо работает с большими списками. Пробовал грузануть в него миллион. Он очень очень долго грузился. Если миллиард загрузить то он неделю будет запускатся
 
Согласен с dunkel, тебе для чекера не нужна сверх высокая скорость, потому что упрёшься в ширину пропускного канала. Поэтому возможностей питона хватило бы с головой. Но раз уж тебе нужен скрипт именно на плюсах, то вот, набросал за несколько минут. Там могут быть баги, так что, имей ввиду, он достаточно костыльный.

C++:
#include <winsock2.h>
#include <windows.h>
#include <cstdio>
#include <iostream>
#include <fstream>
#include <string>

#pragma comment(lib, "ws2_32.lib")

#define SEND_BYTE    0x00 // байт для отправки
#define CHECK_BYTE    0x55 // ожидаемый байт
#define MAX_THREADS 100 // макс. кол-во потоков в один момент

LONG prevSeamphore = 0;

HANDLE hProcHeap = NULL;
HANDLE hSemaphore = NULL;
CRITICAL_SECTION cs;

std::ofstream goods;

bool WSAInit()
{
    WSADATA wsa;
    if (WSAStartup(MAKEWORD(2, 2), &wsa) == 0)
        return true;

    return false;
}

bool WSAClean()
{
    if (WSACleanup() == 0)
        return true;

    return false;
}

bool sockInit(SOCKET* sock, SOCKADDR_IN* si, const char* ip, int port)
{
    *sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (!(*sock))
        return false;

    ZeroMemory(si, sizeof(SOCKADDR_IN));
    si->sin_family = AF_INET;
    si->sin_port = htons(port);
    si->sin_addr.S_un.S_addr = inet_addr(ip);

    return true;
}

int readSock(SOCKET sock, char* buffer, int cbBuffSize)
{
    if (!sock || !buffer || !cbBuffSize)
        return -1;

    return (recv(sock, (char*)buffer, cbBuffSize, 0));

}

int writeSock(SOCKET sock, const char* buffer, int cbBuffSize)
{
    if (!sock || !buffer || !cbBuffSize)
        return 1;

    return (send(sock, (const char*)buffer, cbBuffSize, 0) == cbBuffSize) ? 0 : WSAGetLastError();
}

typedef struct _entry
{
    char ip[16];
    int port;
}ENTRY;

// чекер
DWORD WINAPI checkHost(LPVOID _listEntry)
{
    ENTRY* listEntry = (ENTRY*)_listEntry;

    SOCKET sock = INVALID_SOCKET;

    SOCKADDR_IN si;
    if (sockInit(&sock, &si, listEntry->ip, listEntry->port)){
        int res;
        res = connect(sock, (sockaddr*)&si, sizeof(si));
        if (res != SOCKET_ERROR){
            char buff = SEND_BYTE;
            res = writeSock(sock, &buff, 1);
            if (res != SOCKET_ERROR){
                res = readSock(sock, &buff, 1);
                if ((buff & 0xff) == CHECK_BYTE){
                    EnterCriticalSection(&cs);
                    goods << listEntry->ip << ':' << listEntry->port <<"\r\n";
                    goods.flush();
                    LeaveCriticalSection(&cs);
                }
            }
        }
        closesocket(sock);
    }

    HeapFree(hProcHeap, 0, _listEntry);
    ReleaseSemaphore(hSemaphore, 1, &prevSeamphore);

    return GetCurrentThreadId();
}

// создаём поток
void createTask(const char* ip, int ipLen, int port)
{
    WaitForSingleObject(hSemaphore, INFINITE); // semaphore gate
    ENTRY* entry = (ENTRY*)HeapAlloc(hProcHeap, 0, sizeof(ENTRY));
    memcpy(entry->ip, ip, ipLen);
    entry->port = port;
    HANDLE hThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)checkHost, (LPVOID)entry, 0, NULL);
    CloseHandle(hThread);
}

int main(int argc, char** argv)
{
    if (argc != 3){
        printf("\nUsing:\t%s ips.txt good.txt\n\n", *argv); // пример использования
        return 0;
    }

    hProcHeap = GetProcessHeap();

    std::ifstream ips(argv[1]);
    if (!ips.is_open()) {
        printf("Open file %s error!\n", argv[1]);
        return 0;
    }

    goods.open(argv[2]);
    if (!goods.is_open()) {
        printf("Open file %s error!\n", argv[2]);
        return 0;
    }

    if (!WSAInit()){
        printf("WSAStartup error!\n");
        return 0;
    }

    hSemaphore = CreateSemaphoreA(NULL, MAX_THREADS, MAX_THREADS, NULL);
    if (!hSemaphore){
        printf("Create semaphore error!\n");
        return 0;
    }

    InitializeCriticalSection(&cs);

    std::string line;
    while (std::getline(ips, line)){

        // ищем разделитель':'
        size_t pos = line.find(':');
        if (pos == std::string::npos) {
            continue;
        }

        // извлекаем подстроки
        std::string strIP = line.substr(0, pos);
        std::string strPort = line.substr(pos + 1);

        createTask(strIP.c_str(), strIP.length(), atoi(strPort.c_str()));
    }

    do{ // костыль =P
        Sleep(100);
    } while (prevSeamphore != (MAX_THREADS - 1));

    ips.close();
    goods.close();

    DeleteCriticalSection(&cs);
    CloseHandle(hSemaphore);

    WSAClean();
}

Ты не уточнял, под какую ось. Этот скрипт под винду. Надеюсь, поможет.
 
Последнее редактирование:
Например порт может быть не один
Ну так укажи эти несколько портов, или диапазон. В чем смысл генерировать по известному алгоритму текстовик со строками и обрабатывать эти терабайты строк, если можно вставить этот алгоритм в сам скрипт?
 
boost asio тебе в помощь
опишу алгоритм, реализацию - как будет время сделаю.
1) не надо читать файл целиком. Выбери количество строк, которое ты хочешь обработать за раз.
2) возьми нормальный пул потоков, лучше уже готовый, я здесь выкладывал где-то. Количество потоков бери равное удвоенному количеству железных потоков.
3) реализуй асинхронную обработку - у тебя должны одновременно работать, условно говоря, 3 модуля - первый читает поблочно из файла строки, валидирует их, второй пингует нужные тебе ипы, третий - обрабатывает результаты. std::async тебе в помощь.
 
Там могут быть баги, так что, имей ввиду, он достаточно костыльный.
ну он впринципе не рабочий)
пересоздавать потоки - ошибка
создавать 100 потоков - ошибка
не дожидаться потоков - фатальная ошибка, поэтому и не работает.
Ты не можешь вот так взять и закрыть поток через CloseHandle. который еще работает. Ты должен дождать его через WaitForSingleObject.
Поэтому для таких задач нужно делать тред пул и в конце ждать потоки.
 
пересоздавать потоки - ошибка
Не ошибка, но не оптимально.

создавать 100 потоков - ошибка
Почему?

Ты не можешь вот так взять и закрыть поток через CloseHandle. который еще работает. Ты должен дождать его через WaitForSingleObject.
Могу и он продолжит работу.

не дожидаться потоков - фатальная ошибка, поэтому и не работает.
Каждый поток после отработки освобождает семафор. После цикла есть "костыль", который проверяет освобождён ли весь семафор.

Поэтому для таких задач нужно делать тред пул и в конце ждать потоки.
Тут согласен.

Код далеко не оптимальный, я согласен. Но рабочий))
 
пулл потоков
C++:
#ifndef THREAD_POOL_HPP
#define THREAD_POOL_HPP

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <exception>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>
namespace tools::multithreading
{
    using ConcurrencyT = std::invoke_result_t<decltype(std::thread::hardware_concurrency)>;


    template<typename T>
    class [[nodiscard]] MultiFuture
    {
    private:
        std::vector<std::future<T>> m_futures;
    public:
        explicit MultiFuture(const size_t numFutures = 0) : m_futures(numFutures)
        {}

        [[nodiscard]] std::conditional_t<std::is_void_v<T>, void, std::vector<T>> get()
        {
            if constexpr (std::is_void_v<T>)
            {
                for (size_t i = 0; i < m_futures.size(); ++i)
                    m_futures[i].get();
                return;
            }
            else
            {
                std::vector<T> results(m_futures.size());
                for (size_t i = 0; i < m_futures.size(); ++i)
                    results[i] = m_futures[i].get();
                return results;
            }
        }

        [[nodiscard]] std::future<T> &operator[](const size_t i)
        {
            return m_futures[i];
        }

        void pushBack(std::future<T> future)
        {
            m_futures.push_back(std::move(future));
        }

        [[nodiscard]] size_t size() const
        {
            return m_futures.size();
        }

        void wait() const
        {
            for (size_t i = 0; i < m_futures.size(); ++i)
                m_futures[i].wait();
        }

    };

    template<typename T1, typename T2, typename T = std::common_type_t<T1, T2>>
    class [[nodiscard]] Blocks
    {
    private:
        size_t m_blockSize = 0;
        T m_firstIndex = 0;
        T m_indexAfterLast = 0;
        size_t m_numBlocks = 0;
        size_t m_totalSize = 0;

    public:

        Blocks(const T1 firstIndex, const T2 indexAfterLast, const size_t numBlocks) : m_firstIndex(static_cast<T>(firstIndex)), m_indexAfterLast(
                static_cast<T>(indexAfterLast)), m_numBlocks(numBlocks)
        {
            if (m_indexAfterLast < m_firstIndex)
                std::swap(m_indexAfterLast, m_firstIndex);
            m_totalSize = static_cast<size_t>(m_indexAfterLast - m_firstIndex);
            m_blockSize = static_cast<size_t>(m_totalSize / m_numBlocks);
            if (m_blockSize == 0)
            {
                m_blockSize = 1;
                m_numBlocks = (m_totalSize > 1) ? m_totalSize : 1;
            }
        }

        [[nodiscard]] T start(const size_t i) const
        {
            return static_cast<T>(i * m_blockSize) + m_firstIndex;
        }

        [[nodiscard]] T end(const size_t i) const
        {
            return (i == m_numBlocks - 1) ? m_indexAfterLast : (static_cast<T>((i + 1) * m_blockSize) + m_firstIndex);
        }

        [[nodiscard]] size_t getNumBlocks() const
        {
            return m_numBlocks;
        }

        [[nodiscard]] size_t getTotalSize() const
        {
            return m_totalSize;
        }

    };

    class [[nodiscard]] ThreadPool
    {
        std::atomic<bool> m_isPaused = false;
        std::atomic<bool> m_running = false;
        std::condition_variable m_taskAvailableCv = {};
        std::condition_variable m_taskDoneCv = {};
        std::queue<std::function<void()>> m_tasks = {};
        std::atomic<size_t> m_tasksTotal = 0;
        mutable std::mutex m_tasksMutex = {};
        ConcurrencyT m_threadCount = 0;
        std::unique_ptr<std::thread[]> m_threads = nullptr;
        std::atomic<bool> m_waiting = false;

    public:

        explicit ThreadPool(const ConcurrencyT threadCount = 0) : m_threadCount(determineThreadCount(threadCount)), m_threads(
                std::make_unique<std::thread[]>(determineThreadCount(threadCount)))
        {
            createThreads();
        }

        ~ThreadPool()
        {
            waitForTasks();
            destroyThreads();
        }


        [[nodiscard]] size_t getTasksQueued() const
        {
            const std::scoped_lock tasksLock(m_tasksMutex);
            return m_tasks.size();
        }


        [[nodiscard]] size_t getTasksRunning() const
        {
            const std::scoped_lock tasks_lock(m_tasksMutex);
            return m_tasksTotal - m_tasks.size();
        }


        [[nodiscard]] size_t getTasksTotal() const
        {
            return m_tasksTotal;
        }


        [[nodiscard]] ConcurrencyT getThreadCount() const
        {
            return m_threadCount;
        }


        [[nodiscard]] bool isPaused() const
        {
            return m_isPaused;
        }


        template<typename F, typename T1, typename T2, typename T = std::common_type_t<T1, T2>, typename R = std::invoke_result_t<std::decay_t<F>, T, T>>
        [[nodiscard]] MultiFuture<R> parallelizeLoop(const T1 firstIndex, const T2 indexAfterLast, F &&loop, const size_t numBlocks = 0)
        {
            Blocks blocks(firstIndex, indexAfterLast, numBlocks ? numBlocks : m_threadCount);
            if (blocks.getTotalSize() > 0)
            {
                MultiFuture<R> mf(blocks.getNumBlocks());
                for (size_t i = 0; i < blocks.getNumBlocks(); ++i)
                {
                    mf[i] = submit(std::forward<F>(loop), blocks.start(i), blocks.end(i));
                }
                return mf;
            } else
            {
                return MultiFuture<R>();
            }
        }


        template<typename F, typename T, typename R = std::invoke_result_t<std::decay_t<F>, T, T>>
        [[nodiscard]] MultiFuture<R> parallelizeLoop(const T indexAfterLast, F &&loop, const size_t numBlocks = 0)
        {
            return parallelizeLoop(0, indexAfterLast, std::forward<F>(loop), numBlocks);
        }


        void pause()
        {
            m_isPaused = true;
        }


        template<typename F, typename T1, typename T2, typename T = std::common_type_t<T1, T2>>
        void pushLoop(const T1 firstIndex, const T2 indexAfterLast, F &&loop, const size_t numBlocks = 0)
        {
            Blocks blocks(firstIndex, indexAfterLast, numBlocks ? numBlocks : m_threadCount);
            if (blocks.getTotalSize() > 0)
            {
                for (size_t i = 0; i < blocks.getNumBlocks(); ++i)
                    pushTask(std::forward<F>(loop), blocks.start(i), blocks.end(i));
            }
        }


        template<typename F, typename T>
        void pushLoop(const T indexAfterLast, F &&loop, const size_t numBlocks = 0)
        {
            pushLoop(0, indexAfterLast, std::forward<F>(loop), numBlocks);
        }


        template<typename F, typename... A>
        void pushTask(F &&task, A &&... args)
        {
            std::function<void()> task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...);
            {
                const std::scoped_lock tasks_lock(m_tasksMutex);
                m_tasks.push(task_function);
            }
            ++m_tasksTotal;
            m_taskAvailableCv.notify_one();
        }


        void reset(const ConcurrencyT threadCount = 0)
        {
            const bool wasPaused = m_isPaused;
            m_isPaused = true;
            waitForTasks();
            destroyThreads();
            m_threadCount = determineThreadCount(threadCount);
            m_threads = std::make_unique<std::thread[]>(m_threadCount);
            m_isPaused = wasPaused;
            createThreads();
        }


        template<typename F, typename... A, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>
        [[nodiscard]] std::future<R> submit(F &&task, A &&... args)
        {
            std::function<R()> taskFunction = std::bind(std::forward<F>(task), std::forward<A>(args)...);
            std::shared_ptr<std::promise<R>> taskPromise = std::make_shared<std::promise<R>>();
            pushTask(
                    [taskFunction, taskPromise] {
                        try
                        {
                            if constexpr (std::is_void_v<R>)
                            {
                                std::invoke(taskFunction);
                                taskPromise->set_value();
                            } else
                            {
                                taskPromise->set_value(std::invoke(taskFunction));
                            }
                        }
                        catch (...)
                        {
                            try
                            {
                                taskPromise->set_exception(std::current_exception());
                            }
                            catch (...)
                            {
                            }
                        }
                    });
            return taskPromise->get_future();
        }


        void unpause()
        {
            m_isPaused = false;
        }


        void waitForTasks()
        {
            m_waiting = true;
            std::unique_lock tasks_lock(m_tasksMutex);
            m_taskDoneCv.wait(tasks_lock, [this] { return (m_tasksTotal == (m_isPaused ? m_tasks.size() : 0)); });
            m_waiting = false;
        }

    private:
        void createThreads()
        {
            m_running = true;
            for (ConcurrencyT i = 0; i < m_threadCount; ++i)
            {
                m_threads[i] = std::thread(&ThreadPool::worker, this);
            }
        }


        void destroyThreads()
        {
            m_running = false;
            m_taskAvailableCv.notify_all();
            for (ConcurrencyT i = 0; i < m_threadCount; ++i)
            {
                m_threads[i].join();
            }
        }


        [[nodiscard]] static ConcurrencyT determineThreadCount(const ConcurrencyT threadCount)
        {
            if (threadCount > 0)
                return threadCount;
            else
                return std::thread::hardware_concurrency() > 0 ? std::thread::hardware_concurrency() : 1;
        }


        void worker()
        {
            while (m_running)
            {
                std::function<void()> task;
                std::unique_lock<std::mutex> tasks_lock(m_tasksMutex);
                m_taskAvailableCv.wait(tasks_lock, [this] { return !m_tasks.empty() || !m_running; });
                if (m_running && !m_isPaused)
                {
                    task = std::move(m_tasks.front());
                    m_tasks.pop();
                    tasks_lock.unlock();
                    task();
                    tasks_lock.lock();
                    --m_tasksTotal;
                    if (m_waiting)
                        m_taskDoneCv.notify_one();
                }
            }
        }

    };


    class [[nodiscard]] Timer
    {
    public:
        void start()
        {
            m_startTime = std::chrono::steady_clock::now();
        }

        void stop()
        {
            m_elapsedTime = std::chrono::steady_clock::now() - m_startTime;
        }

        [[nodiscard]] std::chrono::milliseconds::rep ms() const
        {
            return (std::chrono::duration_cast<std::chrono::milliseconds>(m_elapsedTime)).count();
        }
        [[nodiscard]] std::chrono::milliseconds::rep seconds() const
        {
            return (std::chrono::duration_cast<std::chrono::seconds>(m_elapsedTime)).count();
        }

        [[nodiscard]] std::string getTime() const
        {
            auto time = seconds();
            if(time < 10)
            {
                time = ms();
                const auto seconds = time / 1000;
                const auto ms = time % 1000;
                return std::to_string(seconds) + "s : " + std::to_string(ms) + "ms";
            }
            if(time < 60) return std::to_string(time ) + "s";
            if(time < 3600)
            {
                const auto minutes = time / 60;
                const auto seconds = time % 60;
                return std::to_string(minutes) + "m :" + std::to_string(seconds) + "s";
            }
            const auto hours = time / 3600;
            const auto minutes = (time % 3600) / 60;
            const auto seconds = minutes / 60;
            return std::to_string(hours) + "h : " + std::to_string(minutes) + "m" + std::to_string(seconds) + "s";
        }
    private:
        std::chrono::time_point<std::chrono::steady_clock> m_startTime = std::chrono::steady_clock::now();

        std::chrono::duration<double> m_elapsedTime = std::chrono::duration<double>::zero();
    };
}


#endif //THREAD_POOL_HPP
 
Но рабочий))
просто проведи тест.
На вход ips.txt с содержанием 242000 строк.
первые 241999 строк 127.0.0.1:1337
последняя 127.0.0.1:1338
Запусти wireshark, запусти свою программу, подожди конца, останови захват трафика.
Примени фильтр tcp.port == 1338. А ты увидишь, что нет пакетов, которые пытались отослаться на 1338.
 
Мне кажется под эту задачу лучше подойдет Rust, чем C++.
Или я ошибаюсь? Хмм. Эксперты, подскажите.
waahoo DildoFagins Whisper Quake3
Я бы эту задачу под винду решал на голанге, асинхронно из коробки. Если хочется Си, то в этом плане очень хорошо работает epool (Linux).
Миллиарды строк этож ведь совсем не значит миллиарды запросов одновременных, типа сначала читаем весь файло, потом запрашиваем...
Потихоньку, помаленьку считывать с файла 50-100 строк на тредпул из 50-100 "потоков" (горутин) и дочитываем новые строки в процессе отвала "потоков" попутно наполняя три файла, good, bad, error.

 
Мне кажется под эту задачу лучше подойдет Rust, чем C++.
Или я ошибаюсь? Хмм. Эксперты, подскажите.
waahoo DildoFagins Whisper Quake3
Согласно теории ограничений Голдрата(https://ru.wikipedia.org/wiki/Теория_ограничений) надо узнать где узкое место. Потоки тут точно не нужны. Разберись на чем тебе проще сделать асинки(логичным выглядит собрать на го для разных систем и попробовать на разных что бы сделать вывод). Вероятно для самого лушего результата нужно писать код для ядра и делать скан в ядре это ускорит дело. Ну и кеширование результатов делай что не писать в файл каждый раз когда будешь получать результат а делать это блоками.
Лучше всего сделать распределенную систему и сканить с нескольких дедов и без реальной надобности не возиться с кодом ядра.
 
Последнее редактирование:


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх