• XSS.stack #1 – первый литературный журнал от юзеров форума

Статья Merge File Sort или быстрая сортировка строк и удаление дублей интерпретатором в файле от 400 GB за счет ПЗУ

rand

CooL-Lamer
Эксперт
Регистрация
24.05.2023
Сообщения
581
Реакции
1 152
Депозит
0.07 Ł и др.
Написал: rand
Эксклюзивно для: xss.pro
Отдельная благодарность в помощи и тестировании: antikrya, celty, Bertor, Eject, Zeta

Всем привет, не так давно у меня появилась идея (после того как я прочитал этот тред https://xss.pro/threads/122300/) реализовать на питоне скрипт который делает быструю сортировку и очистку дублей строк в огромном текстовом файле без колоссального потребления ресурсов ОЗУ в операционной системе.
Минимальные системные требования: Python 3.12, 4 ядра CPU, 4 гигабайта ОЗУ, 200% свободного места на накопителе от размера импортируемого файла+10%. (При работе этого алгоритма приходится расплачиваться свободной памятью и ресурсом накопителя).

Скрины отработки 100-гигового ULP:
1726873664010.jpeg


1726873710348.jpeg


1726873749900.jpeg


1726873802860.jpeg


1726873834304.jpeg


1726873872951.jpeg


Скрипт работает на встроенных библиотеках питона, кроме одной библиотеки используемой для разметки выдачи цвета лога (в общем все подробно объяснил в комментариях к коду, может быть потом исправлю тут на полноценную статью, если настроение будет):
Bash:
pip install colorlog==6.8.2

main.py
Python:
import os
import tempfile
import heapq
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from colorlog import ColoredFormatter  # Для цветного логирования

# Настраиваем цветное логирование
formatter = ColoredFormatter(
    "%(log_color)s%(asctime)s - %(levelname)s - %(message)s",  # Формат вывода логов
    datefmt=None,  # Формат даты, по умолчанию
    reset=True,  # Сброс цветов после каждой строки
    log_colors={  # Задаем цвета для различных уровней логов
        'DEBUG': 'cyan',
        'INFO': 'green',
        'WARNING': 'yellow',
        'ERROR': 'red',
        'CRITICAL': 'bold_red',
    }
)

# Настройка обработчика логов с UTF-8
handler = logging.StreamHandler()  # Логи выводятся в консоль
handler.setFormatter(formatter)  # Применяем цветной формат
logger = logging.getLogger()  # Получаем объект логгера
logger.addHandler(handler)  # Добавляем к нему наш обработчик
logger.setLevel(logging.INFO)  # Устанавливаем уровень логирования INFO (можно изменять на DEBUG для более детальных логов)

# Создаем путь к папке TEMP для временных файлов
temp_dir = os.path.join(os.getcwd(), 'TEMP')  # Директория для хранения временных файлов
if not os.path.exists(temp_dir):  # Если папки нет, создаем её в корне скрипта
    os.makedirs(temp_dir)

# Функция для обработки чанков: удаление дублей и сортировка
def process_chunk(chunk, temp_dir):
    try:
        logger.info(f"Начало обработки чанка размером {len(chunk)} строк")  # Логируем количество строк в чанке
 
        unique_items = set(chunk)  # Превращаем список в множество для удаления дублей
        logger.info(f"Удалено {len(chunk) - len(unique_items)} дублей в чанке")  # Логируем, сколько дублей удалено
 
        sorted_chunk = sorted(unique_items)  # Сортируем уникальные строки

        # Создаем временный файл, который не будет удален автоматически (delete=False)
        with tempfile.NamedTemporaryFile(delete=False, mode='w', encoding='utf-8', errors='ignore', dir=temp_dir) as temp_file:
            temp_file.write('\n'.join(sorted_chunk) + '\n')  # Записываем отсортированный чанк во временный файл
            logger.info(f"Чанк записан во временный файл {temp_file.name}")  # Логируем путь к временному файлу
            return temp_file.name  # Возвращаем имя временного файла

    except Exception as e:
        logger.error(f"Ошибка при обработке чанка: {e}")  # Логируем ошибку, если что-то пошло не так
        return None  # Возвращаем None, если произошла ошибка

# Функция для пакетного слияния временных файлов в один общий файл
def merge_files(temp_files, output_file):
    try:
        logger.info(f"Окончательное слияние {len(temp_files)} временных файлов")  # Логируем количество файлов для слияния

        unique_count = 0  # Счетчик уникальных строк
        duplicate_count = 0  # Счетчик дублей

        # Открываем выходной файл для записи
        with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
            # Открываем все временные файлы и создаем итераторы
            file_iters = [open(f, 'r', encoding='utf-8', errors='replace') for f in temp_files]
            merged_iter = heapq.merge(*file_iters)  # Сливаем отсортированные временные файлы в один поток
            prev_line = None  # Переменная для отслеживания предыдущей строки

            # Проходим по слитым строкам
            for line in merged_iter:
                if line != prev_line:  # Если строка не совпадает с предыдущей (уникальная)
                    outfile.write(line)  # Записываем строку в выходной файл
                    prev_line = line  # Обновляем предыдущую строку
                    unique_count += 1  # Увеличиваем счетчик уникальных строк
                else:
                    duplicate_count += 1  # Если строка дублируется, увеличиваем счетчик дублей

            # Закрываем временные файлы
            for f in file_iters:
                f.close()

        logger.info(f"Слияние завершено. Уникальных строк: {unique_count}, дублей удалено: {duplicate_count}")  # Логируем результаты
        return unique_count, duplicate_count  # Возвращаем количество уникальных строк и дублей

    except Exception as e:
        logger.error(f"Ошибка при слиянии файлов: {e}")  # Логируем ошибку, если слияние не удалось
        return 0, 0  # Возвращаем нули в случае ошибки

# Функция для пакетного слияния временных файлов
def batch_merge(temp_files, batch_size, temp_dir):
    try:
        logger.info(f"Начало пакетного слияния с размером пакета {batch_size}")  # Логируем начало пакетного слияния
        merged_files = []  # Список для хранения результатов пакетного слияния
        total_unique_count = 0  # Общий счетчик уникальных строк
        total_duplicate_count = 0  # Общий счетчик дублей

        # Обрабатываем файлы по частям (батчами)
        for i in range(0, len(temp_files), batch_size):
            batch = temp_files[i:i + batch_size]  # Берем очередной пакет файлов
            logger.info(f"Слияние пакета с файлов {i+1} по {min(i + batch_size, len(temp_files))}")  # Логируем диапазон файлов

            # Создаем временный файл для результата слияния пакета
            with tempfile.NamedTemporaryFile(delete=False, mode='w', encoding='utf-8', dir=temp_dir) as temp_merged_file:
                unique_count, duplicate_count = merge_files(batch, temp_merged_file.name)  # Сливаем пакет файлов
                merged_files.append(temp_merged_file.name)  # Добавляем результат слияния в список

                total_unique_count += unique_count  # Обновляем общий счетчик уникальных строк
                total_duplicate_count += duplicate_count  # Обновляем общий счетчик дублей

            # Удаляем временные файлы после слияния
            logger.info(f"Удаление временных файлов пакета с {i+1} по {min(i + batch_size, len(temp_files))}")
            for temp_file in batch:
                if os.path.exists(temp_file):
                    os.remove(temp_file)  # Удаляем временный файл

        return merged_files, total_unique_count, total_duplicate_count  # Возвращаем список объединенных файлов и итоговые счетчики

    except Exception as e:
        logger.error(f"Ошибка при пакетном слиянии: {e}")  # Логируем ошибку при пакетном слиянии
        return temp_files, 0, 0  # Возвращаем исходные файлы и нули в случае ошибки

# Основная функция сортировки и удаления дубликатов с параллельной обработкой чанков
def sort_and_uniq_streaming(input_file, output_file, chunk_size=2000000, batch_size=10):
    temp_files = []  # Список для хранения временных файлов
    chunk = []  # Текущий чанк строк
    original_count = 0  # Счетчик всех строк в исходном файле

    logger.info(f"Чтение файла {input_file}...")  # Логируем начало чтения файла
    try:
        with open(input_file, 'r', encoding='utf-8', errors='ignore') as infile:  # Открываем файл для чтения
            with ThreadPoolExecutor() as executor:  # Создаем пул потоков для параллельной обработки
                futures = []  # Список задач для параллельной обработки

                for line in infile:
                    chunk.append(line.strip())  # Добавляем строку в чанк
                    original_count += 1  # Увеличиваем счетчик строк
                    if len(chunk) >= chunk_size:  # Если размер чанка достиг предела
                        futures.append(executor.submit(process_chunk, chunk, temp_dir))  # Запускаем обработку чанка в отдельном потоке
                        chunk = []  # Очищаем чанк для следующего набора строк

                if chunk:  # Если остались необработанные строки после завершения чтения файла
                    futures.append(executor.submit(process_chunk, chunk, temp_dir))  # Обрабатываем последний чанк

                for future in as_completed(futures):  # Ждем завершения всех задач
                    temp_file = future.result()  # Получаем результат обработки (имя временного файла)
                    if temp_file:
                        temp_files.append(temp_file)  # Добавляем временный файл в список

        logger.info(f"Все чанки обработаны. Начинается пакетное слияние временных файлов...")  # Логируем завершение обработки всех чанков

        total_unique_count = 0  # Общий счетчик уникальных строк
        total_duplicate_count = 0  # Общий счетчик дублей

        # Пока временных файлов больше, чем размер батча, продолжаем пакетное слияние
        while len(temp_files) > batch_size:
            temp_files, unique_count, duplicate_count = batch_merge(temp_files, batch_size, temp_dir)  # Выполняем пакетное слияние
            total_unique_count += unique_count  # Увеличиваем общий счетчик уникальных строк
            total_duplicate_count += duplicate_count  # Увеличиваем общий счетчик дублей

        # Если остался больше одного временного файла, выполняем финальное слияние
        if len(temp_files) > 1:
            logger.info("Завершающий этап слияния крупных оставшихся файлов")  # Логируем финальный этап
            unique_count, duplicate_count = merge_files(temp_files, output_file)  # Финальное слияние временных файлов
            total_unique_count += unique_count  # Добавляем количество уникальных строк
            total_duplicate_count += duplicate_count  # Добавляем количество дублей
        else:
            # Если остался только один временный файл, переименовываем его в выходной файл
            os.rename(temp_files[0], output_file)  # Переименование файла

        # Удаляем все оставшиеся временные файлы
        for temp_file in temp_files:
            if os.path.exists(temp_file):  # Проверяем существование файла перед удалением
                os.remove(temp_file)  # Удаляем временный файл

        logger.info("Все временные файлы удалены")  # Логируем успешное удаление всех временных файлов
 
        unique_count = original_count - total_duplicate_count  # Рассчитываем количество уникальных строк
        logger.info(f"Итоговые результаты: Уникальных строк: {unique_count}, Дублей удалено: {total_duplicate_count}")  # Выводим результаты

        return original_count  # Возвращаем общее количество строк в исходном файле

    except Exception as e:
        logger.error(f"Ошибка при обработке файла: {e}")  # Логируем ошибку при обработке файла
        return 0  # Возвращаем 0 в случае ошибки

# Точка входа
if __name__ == '__main__':
    tic = time.perf_counter()  # Замеряем время начала выполнения программы

    input_file = "large_random_emails.txt"  # Исходный файл с данными
    output_file = "output-sorted-unique.txt"  # Выходной файл для сохранения результатов
    original_count = sort_and_uniq_streaming(input_file, output_file)  # Запускаем процесс сортировки и удаления дублей

    tac = time.perf_counter()  # Замеряем время завершения выполнения программы

    logging.info(f"Всего обработано строк: {original_count}")  # Логируем общее количество обработанных строк
    logging.info(f"Удаление дублей и сортировка заняли {tac - tic:0.4f} секунд")  # Логируем время выполнения программы

Мультипроцессорно-многопоточная версия, работает быстрее, но и к ресурсам требовательна по процу и памяти раз в 5-10 (пример отработки ".txt файл 100 миллионов строк, размером в 2GB (Ryzen 5600 4700mhz, 32GB DDR4 3600MHZ):
1727184165936.png


Скриншот отработки 300 гигового сгенерированного лога без дублей на 4.5 миллиарда строк, чем больше уникальных строк, тем ниже скорость обработки (AMD Ryzen 7 3700x, 64GB ОЗУ, 1TB NVME в Raid 1 из двух накопителей по 1TB, Linux Ubuntu 22.04):
1727442444363.png

main-multi.py:
Python:
# Импорт необходимых библиотек
import os  # Для работы с операционной системой и файловой системой
import heapq  # Для эффективного слияния отсортированных последовательностей
import time  # Для измерения времени выполнения скрипта
import logging  # Для ведения логов
import shutil  # Для удаления файлов и директории TEMP
import uuid  # Для генерации уникальных идентификаторов
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed  # Для параллельного выполнения задач
from colorlog import ColoredFormatter  # Для создания цветных логов

# Настройка цветного логирования
formatter = ColoredFormatter(
    "%(log_color)s%(asctime)s - %(levelname)s - %(message)s",
    datefmt=None,
    reset=True,
    log_colors={
        'DEBUG': 'cyan',
        'INFO': 'green',
        'WARNING': 'yellow',
        'ERROR': 'red',
        'CRITICAL': 'bold_red',
    }
)

# Создание и настройка обработчика логов
handler = logging.StreamHandler()  # Создаем обработчик для вывода логов в консоль
handler.setFormatter(formatter)  # Устанавливаем форматтер для обработчика
logger = logging.getLogger()  # Получаем объект логгера
logger.addHandler(handler)  # Добавляем обработчик к логгеру
logger.setLevel(logging.INFO)  # Устанавливаем уровень логирования

# Создание временной директории
temp_dir = os.path.join(os.getcwd(), 'TEMP')  # Путь к временной директории в текущей рабочей директории
if not os.path.exists(temp_dir):  # Если директория не существует
    os.makedirs(temp_dir)  # Создаем её

def create_temp_merged_file(temp_dir):
    """
    Создает временный файл для слияния данных.
 
    :param temp_dir: Путь к временной директории
    """
    unique_filename = os.path.join(temp_dir, f"tempfile_{uuid.uuid4().hex}.tmp")  # Генерируем уникальное имя файла
    temp_merged_file = open(unique_filename, 'w', encoding='utf-8')  # Открываем файл для записи
    return temp_merged_file, unique_filename  # Возвращаем объект файла и его имя

def heavy_computation(n):
    """
    Выполняет тяжелое вычисление (для симуляции нагрузки, процентов на 5 по моим замерам увеличивает скорость обработки лога).
 
    :param n: Число для вычислений
    :return: Результат вычисления
    """
    logger.debug(f"Запуск тяжелого вычисления с параметром: {n}")
    result = 0
    for i in range(n):
        result += i * i  # Выполняем сложение квадратов чисел
    logger.debug(f"Результат тяжелого вычисления: {result}")
    return result

def process_chunk_and_write_multiprocess(chunk, temp_dir):
    """
    Обрабатывает чанк данных и записывает результат во временный файл.
 
    :param chunk: Список строк для обработки
    :param temp_dir: Путь к временной директории
    :return: Имя созданного временного файла или None в случае ошибки
    """
    try:
        logger.info(f"Начало обработки чанка размером {len(chunk)} строк (процесс)")
        heavy_computation(10000000)  # Симуляция тяжелых вычислений
        unique_items = set(chunk)  # Убираем дубликаты на уровне чанка
        sorted_chunk = sorted(unique_items)  # Сортируем уникальные элементы
        unique_filename = os.path.join(temp_dir, f"tempfile_{uuid.uuid4()}.tmp")  # Генерируем уникальное имя файла
        with open(unique_filename, 'w', encoding='utf-8') as temp_file:
            temp_file.write("\n".join(sorted_chunk) + "\n")  # Сохраняем только уникальные строки
        return unique_filename
    except Exception as e:
        logger.error(f"Ошибка при обработке чанка: {e}")
        return None

def merge_files_parallel(temp_files, output_file, num_threads=24):
    """
    Выполняет параллельное слияние временных файлов.
 
    :param temp_files: Список временных файлов для слияния
    :param output_file: Имя выходного файла
    :param num_threads: Количество потоков для использования
    :return: (количество уникальных строк, количество дубликатов, имя выходного файла)
    """
    try:
        logger.info(f"Параллельное слияние {len(temp_files)} временных файлов с использованием {num_threads} потоков")
        unique_count = 0
        duplicate_count = 0

        file_iters = []
        try:
            for temp_file in temp_files:
                file_iters.append(open(temp_file, 'r', encoding='utf-8', errors='replace'))  # Открываем все временные файлы

            merged_iter = heapq.merge(*[iter(f) for f in file_iters])  # Создаем итератор для слияния

            with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
                prev_line = None
                for line in merged_iter:
                    line = line.strip()
                    if line != prev_line:  # Если текущая строка отличается от предыдущей
                        outfile.write(line + '\n')  # Записываем её в выходной файл
                        prev_line = line
                        unique_count += 1
                    else:
                        duplicate_count += 1  # Увеличиваем счетчик дубликатов

        except Exception as e:
            logger.error(f"Ошибка при слиянии файлов: {e}")
            return 0, 0

        finally:
            for f in file_iters:
                f.close()  # Закрываем все открытые файлы

        logger.info(f"Слияние завершено. Уникальных строк: {unique_count}, дублей удалено: {duplicate_count}")
        return unique_count, duplicate_count, output_file

    except Exception as e:
        logger.error(f"Ошибка при параллельном слиянии файлов: {e}")
        return 0, 0

def batch_merge(temp_files, batch_size, temp_dir, num_merge_processes=24):
    """
    Выполняет пакетное слияние временных файлов.
 
    :param temp_files: Список временных файлов
    :param batch_size: Размер пакета для слияния
    :param temp_dir: Путь к временной директории
    :param num_merge_processes: Количество процессов для слияния
    :return: (список объединенных файлов, общее количество уникальных строк, общее количество дубликатов)
    """
    try:
        logger.info(f"Начало пакетного слияния с размером пакета {batch_size}")
        merged_files = []
        total_unique_count = 0
        total_duplicate_count = 0

        with ProcessPoolExecutor(max_workers=num_merge_processes) as merge_executor:
            futures = []

            for i in range(0, len(temp_files), batch_size):
                batch = temp_files[i:i + batch_size]  # Формируем пакет файлов
                logger.info(f"Слияние пакета с файлов {i + 1} по {min(i + batch_size, len(temp_files))}")

                temp_merged_file, unique_filename = create_temp_merged_file(temp_dir)
                temp_merged_file.close()
                futures.append(merge_executor.submit(merge_files_parallel, batch, unique_filename))

            for future in as_completed(futures):
                unique_count, duplicate_count, temp_file = future.result()  # Получаем результат слияния
                if unique_count or duplicate_count:
                    merged_files.append(temp_file)  # Добавляем временный файл в список
                total_unique_count += unique_count
                total_duplicate_count += duplicate_count

            for temp_file in temp_files:
                if os.path.exists(temp_file):
                    logger.info(f"Удаление временного файла: {temp_file}")
                    os.remove(temp_file)  # Удаляем обработанные временные файлы
                else:
                    logger.warning(f"Файл не найден для удаления: {temp_file}")

        return merged_files, total_unique_count, total_duplicate_count

    except Exception as e:
        logger.error(f"Ошибка при пакетном слиянии: {e}")
        return temp_files, 0, 0

def final_merge(temp_dir, output_file):
    """
    Выполняет финальное слияние всех оставшихся временных файлов.
 
    :param temp_dir: Путь к временной директории
    :param output_file: Имя выходного файла
    :return: (количество уникальных строк, количество удаленных дубликатов)
    """
    logger.info(f"Финальная стадия слияния временных файлов из папки {temp_dir}.")
    temp_files = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if os.path.isfile(os.path.join(temp_dir, f))]  # Список временных файлов для финального слияния

    if len(temp_files) > 1:  # Если файлов больше одного, запускаем слияние
        unique_count, duplicate_count, output_file = merge_files_parallel(temp_files, output_file)  # Параллельно сливаем файлы
    elif len(temp_files) == 1:  # Если остался только один файл
        logger.info(f"Остался один файл. Переименование {temp_files[0]} в {output_file}")
        os.rename(temp_files[0], output_file)  # Переименовываем файл в выходной
        unique_count, duplicate_count = 0, 0  # Устанавливаем нулевые значения для счетчиков
    else:
        logger.error("Не осталось временных файлов для слияния!")
        return 0, 0  # Возвращаем нули в случае ошибки

    logger.info(f"Финальное слияние завершено. Уникальных строк: {unique_count}, дублей удалено: {duplicate_count}")

    try:
        shutil.rmtree(temp_dir)  # Удаляем временную директорию
        logger.info(f"Временная папка {temp_dir} успешно удалена.")
    except Exception as e:
        logger.error(f"Ошибка при удалении временной папки {temp_dir}: {e}")

    return unique_count, duplicate_count  # Возвращаем количество уникальных строк и дубликатов

def read_and_process_chunks_multiprocess(input_file, chunk_size=2000000, num_processes=24):
    """
    Читает входной файл по чанкам и обрабатывает их в многопроцессорном режиме.
 
    :param input_file: Имя входного файла
    :param chunk_size: Размер чанка (количество строк)
    :param num_processes: Количество процессов для обработки
    :return: (список временных файлов, общее количество прочитанных строк)
    """
    temp_files = []  # Список для временных файлов
    chunk = []  # Буфер для хранения чанка строк
    original_count = 0  # Счетчик общего числа строк

    logger.info(f"Чтение и обработка файла {input_file} в {num_processes} процессах...")

    try:
        with open(input_file, 'r', encoding='utf-8', errors='ignore') as infile:  # Открываем входной файл для чтения
            with ProcessPoolExecutor(max_workers=num_processes) as executor:  # Создаем процессный пул для обработки чанков
                futures = []  # Список задач для выполнения

                for line in infile:  # Читаем файл построчно
                    chunk.append(line.strip())  # Добавляем строку в чанк
                    original_count += 1  # Увеличиваем счетчик строк
                    if len(chunk) >= chunk_size:  # Если чанк достиг нужного размера
                        futures.append(executor.submit(process_chunk_and_write_multiprocess, chunk, temp_dir))  # Отправляем чанк на обработку в пул процессов
                        chunk = []  # Очищаем чанк

                if chunk:  # Если остались строки после завершения цикла
                    futures.append(executor.submit(process_chunk_and_write_multiprocess, chunk, temp_dir))  # Обрабатываем оставшийся чанк

                for future in as_completed(futures):  # Ожидаем завершения всех задач
                    temp_file = future.result()  # Получаем результат задачи
                    if temp_file:
                        temp_files.append(temp_file)  # Добавляем временный файл в список

        logger.info(f"Чтение и обработка завершены (процессами). Всего строк: {original_count}")  # Логируем завершение обработки файла

    except Exception as e:
        logger.error(f"Ошибка при чтении файла (процессы): {e}")  # Логируем ошибку в случае сбоя

    return temp_files, original_count  # Возвращаем список временных файлов и общее количество строк

def sort_and_uniq_streaming_multiprocess(input_file, output_file, chunk_size=2000000, batch_size=10, num_processes=24, num_merge_processes=24):
    """
    Основная функция для сортировки и удаления дубликатов из большого файла с использованием многопроцессорной обработки.
 
    :param input_file: Имя входного файла
    :param output_file: Имя выходного файла
    :param chunk_size: Размер чанка для обработки
    :param batch_size: Размер пакета для слияния
    :param num_processes: Количество процессов для обработки чанков
    :param num_merge_processes: Количество процессов для слияния
    :return: (общее количество строк, количество уникальных строк)
    """
    logger.info(f"Старт обработки файла {input_file}...")  # Логируем начало процесса

    temp_files, original_count = read_and_process_chunks_multiprocess(input_file, chunk_size, num_processes)  # Читаем и обрабатываем файл по чанкам
    logger.info(f"Начинается пакетное слияние временных файлов...")  # Логируем начало пакетного слияния

    total_unique_count = 0  # Инициализируем счетчик всех уникальных строк
    total_duplicate_count = 0  # Инициализируем счетчик всех дубликатов

    while len(temp_files) > batch_size:  # Пока временных файлов больше, чем размер пакета
        temp_files, unique_count, duplicate_count = batch_merge(temp_files, batch_size, temp_dir, num_merge_processes)  # Выполняем пакетное слияние
        total_unique_count += unique_count  # Обновляем общий счетчик уникальных строк
        total_duplicate_count += duplicate_count  # Обновляем общий счетчик дубликатов

    unique_count, duplicate_count = final_merge(temp_dir, output_file)  # Выполняем финальное слияние
    total_unique_count += unique_count  # Обновляем счетчик уникальных строк
    total_duplicate_count += duplicate_count  # Обновляем счетчик дубликатов

    return original_count, unique_count  # Возвращаем общее количество строк и количество уникальных строк

# Точка входа
if __name__ == '__main__':  # Если этот файл запускается как основная программа
    tic = time.perf_counter()  # Начало отсчета времени

    input_file = "input.txt"  # Указываем импортируемый файл
    output_file = "output.txt"  # Указываем экспортируемый файл
    original_count, unique_count = sort_and_uniq_streaming_multiprocess(input_file, output_file, num_processes=24, num_merge_processes=24)  # Запускаем основную функцию обработки

    tac = time.perf_counter()  # Конец отсчета времени
    logging.info(f"Все временные файлы удалены. Уникальных строк: {unique_count}, Дублей удалено (на всех этапах процессов слияния): {original_count - unique_count}")  # Логируем результаты
    logging.info(f"Всего обработано строк: {original_count}")  # Логируем количество обработанных строк
    logging.info(f"Удаление дублей и сортировка заняли {tac - tic:0.2f} секунд")  # Логируем время выполнения
 
Последнее редактирование:
Долгие дни и недели я гуглил как же мне удалить дубликаты с тхтшников размером более 100гб, никакие софты не справлялись ибо они загружали весь текстовик в оперативку, и со временем вылетали. Я даже сервак со 128гб оперативки взял под это дело.
И тут rand вызвался написать свой скрипт на питоне, за что ему огромное спасибо!
Несколько дней и ночей мы его тестили, и в итоге у rand получилось сделать то, о чем я мечтал последние несколько недель)
Работает шустро, ресурсы не жрет от слова совсем.
 
Долгие дни и недели я гуглил как же мне удалить дубликаты с тхтшников размером более 100гб, никакие софты не справлялись ибо они загружали весь текстовик в оперативку, и со временем вылетали. Я даже сервак со 128гб оперативки взял под это дело.
И тут rand вызвался написать свой скрипт на питоне, за что ему огромное спасибо!
Несколько дней и ночей мы его тестили, и в итоге у rand получилось сделать то, о чем я мечтал последние несколько недель)
Работает шустро, ресурсы не жрет от слова совсем.
Отдельный респект тебе за помощь в тестировании, багов была куча. Но ПЗУ память твоего сервака пережила все этапы тестирования. :D На своем M2 накопителе я бы побоялся такие объемы прогонять. И не столкнулся бы с массой багов при обработке таких больших ULP =)
 
Последнее редактирование:
Идея с разбивкой файла на чанки и их сортировка очень гениальна))
Для средних ПК самое то! Баланс очень хорошо соблюден!
Респект!
 
Идея с разбивкой файла на чанки и их сортировка очень гениальна))
Для средних ПК самое то! Баланс очень хорошо соблюден!
Респект!
Тут вся фишка в нескольких этапах слияния файлов из сформированных мелких чанков по 2 миллиона строк, на каждом этапе происходит сортировка и удаление дублей, и чтение файлов распараллеливанием пула потоков. Если этого не сделать, скорость обработки будет низкая, дубли останутся или операционная система не сможет открыть большое количество дескрипторов файлов, что приведет к исключению в интерпретаторе. Хотя погрешность всё равно может быть, и не все дубли могут быть почищены с первого раза. Если хотите 100% очистку дублей без погрешности алгоритма на этапах слияния (хотя она должна быть очень низкая, меньше 0.001% при размере ULP в 100 гигов предположительно), тогда придется делать повторный проход уже по экспортируемому файлу.

При создании алгоритма, очень помогло это:

P.S. Когда тестировал, у меня был 2-х гиговый файл со 100 миллионами строк с небольшим количеством дублей как маячки по позициям начала, середины и конца в текстовом файле+образцовый очищенный лог от дублей этого файла, по нему как раз я и сверял выходящий файл после обработки скрипта интерпретатором. На таком размере, погрешность 0%. А вот на размерах побольше при тестировании с antikrya, проверить погрешность возможности не было.
 
Последнее редактирование:
Я тут еще хорошенько пересмотрел отладчиком работу этого скрипта, и понял что жестко ошибся в расчетах системных требований для размера свободного места на накопителе и не учел один критичный момент. В процессе окончательного слияния последних крупных временных файлов в output.txt до окончательного слияния в файл пишется весь объем временных файлов, что увеличивает размер занимаемого места в 2 раза до удаления временных файлов. Получается на обработку 100 гигового текстовика, нужно в районе 210-220 гигов свободного места на накопителе. Вот такая расплата за низкое потребление ОЗУ....
P.S. Идеи есть как сэкономить пространство на накопителе при выполнении скрипта, но пока только в теории, потом попробую, если получится, залью обнову.
 
Последнее редактирование:
Работает шустро, файл в 6.75GB прогнал за 444с / 7мин~
Единственное, под формат ULP не советую, так как работает не корректно, но если login:pass или mail:pass , то всё чётко будет! Неплохо реализовал систему с чанками и дальнейшей сортировкой ;)
1. как ты увидел хайд?
2. Почему под ULP не советуешь?
 
Ну вот
Работает шустро, файл в 6.75GB прогнал за 444с / 7мин~
Единственное, под формат ULP не советую, так как работает не корректно, но если login:pass или mail:pass , то всё чётко будет! Неплохо реализовал систему с чанками и дальнейшей сортировкой ;)
По найденному багу, это что-то из той-же оперы: "Помогите найти решение?", решение им найдено, персонаж пишет "Решение найдено можете закрывать тему". Строки они и в африке строки, есть будут совпадения, алгоритм должен пройтись по ним. Какой формат строк не принимает?
 
Последнее редактирование:
Бро, пишу коммент не ради критики, а для конструктива и всеобщего развития. У тебя классный алгоритм и идея с чанками, но для такой задачи есть еще одна крутая альтернатива - фильтр Блума. Это вероятностная структура данных, которая позволяет проверить принадлежность элемента множеству. Будет небольшой шанс на ложноположительное срабатывание, но даст сильный прирост по скорости и использованию места на диске.

Если предположить, что
  • Файл весит 100гб
  • При этом в нем 1млдр строк (примерно 100 байт на строку)
  • Уникальных строк - 50% (500 млн)
  • Целевая вероятность ложноположительных результатов - 0.1%
то получаем следующие результаты:

Текущий алгоритм

1) Память
  • Для обработки чанков: предположим, размер чанка 2 миллиона строк, 2 млн * 100 байт = 200 МБ на чанк
  • Для сортировки и удаления дубликатов может потребоваться до 400 МБ (удвоенный размер чанка)
  • Дополнительная память для буферов чтения/записи и служебных структур данных
Итого: около 500-600 МБ оперативной памяти

2) Дисковое пространство
  • Исходный файл: 100 ГБ
  • Временные файлы: до 100 ГБ (в худшем случае)
  • Выходной файл: около 50 ГБ (предполагая 50% уникальных строк)
Итого: до 250 ГБ дискового пространства

3) Время выполнения
  • Зависит от производительности диска и CPU, но может занять несколько часов из-за необходимости многократного чтения и записи данных.

Фильтр Блума

1) Память
  • Для расчета размера фильтра Блума используем формулу: m = -n * ln(p) / (ln(2))^2, где n - количество элементов, p - вероятность ложноположительных результатов: m = -500,000,000 * ln(0.001) / (ln(2))^2 ≈ 7,190,574,313 бит ≈ 899 МБ.
Округлим до 1 ГБ для учета дополнительных структур данных.

2) Дисковое пространство
  • Исходный файл: 100 ГБ
  • Выходной файл: около 50 ГБ (предполагая 50% уникальных строк)
Итого: около 150 ГБ дискового пространства

3) Время выполнения
  • Значительно меньше, чем у текущего алгоритма. Вероятно, несколько часов, но без необходимости многократного чтения и записи данных.

Итого получаем, что прирост производительности будет варьироваться в зависимости от характеристик входных данных, но в среднем можно ожидать:
  • Уменьшение времени выполнения на 40-60% для очень больших файлов
  • Сокращение использования места на диске на 80-90%
  • Значительное уменьшение количества операций ввода-вывода, так как не требуется создание и слияние временных файлов.
Надеюсь инфа будет полезной и успехов в кодинге ;)
 
Бро, пишу коммент не ради критики, а для конструктива и всеобщего развития. У тебя классный алгоритм и идея с чанками, но для такой задачи есть еще одна крутая альтернатива - фильтр Блума. Это вероятностная структура данных, которая позволяет проверить принадлежность элемента множеству. Будет небольшой шанс на ложноположительное срабатывание, но даст сильный прирост по скорости и использованию места на диске.

Если предположить, что
  • Файл весит 100гб
  • При этом в нем 1млдр строк (примерно 100 байт на строку)
  • Уникальных строк - 50% (500 млн)
  • Целевая вероятность ложноположительных результатов - 0.1%
то получаем следующие результаты:

Текущий алгоритм

1) Память
  • Для обработки чанков: предположим, размер чанка 2 миллиона строк, 2 млн * 100 байт = 200 МБ на чанк
  • Для сортировки и удаления дубликатов может потребоваться до 400 МБ (удвоенный размер чанка)
  • Дополнительная память для буферов чтения/записи и служебных структур данных
Итого: около 500-600 МБ оперативной памяти

2) Дисковое пространство
  • Исходный файл: 100 ГБ
  • Временные файлы: до 100 ГБ (в худшем случае)
  • Выходной файл: около 50 ГБ (предполагая 50% уникальных строк)
Итого: до 250 ГБ дискового пространства

3) Время выполнения
  • Зависит от производительности диска и CPU, но может занять несколько часов из-за необходимости многократного чтения и записи данных.

Фильтр Блума

1) Память
  • Для расчета размера фильтра Блума используем формулу: m = -n * ln(p) / (ln(2))^2, где n - количество элементов, p - вероятность ложноположительных результатов: m = -500,000,000 * ln(0.001) / (ln(2))^2 ≈ 7,190,574,313 бит ≈ 899 МБ.
Округлим до 1 ГБ для учета дополнительных структур данных.

2) Дисковое пространство
  • Исходный файл: 100 ГБ
  • Выходной файл: около 50 ГБ (предполагая 50% уникальных строк)
Итого: около 150 ГБ дискового пространства

3) Время выполнения
  • Значительно меньше, чем у текущего алгоритма. Вероятно, несколько часов, но без необходимости многократного чтения и записи данных.

Итого получаем, что прирост производительности будет варьироваться в зависимости от характеристик входных данных, но в среднем можно ожидать:
  • Уменьшение времени выполнения на 40-60% для очень больших файлов
  • Сокращение использования места на диске на 80-90%
  • Значительное уменьшение количества операций ввода-вывода, так как не требуется создание и слияние временных файлов.
Надеюсь инфа будет полезной и успехов в кодинге ;)
Спасибо, я где-то читал про это когда писал код, но мне не совсем это показалось понятным, я попробую учесть предложенный Вами способ. Но я пошел другим путем, и выжал из процессора все разделив выполнение скрипта питоном на многопроцессорность, разделив нагрузку на ядра по полной! Скорость на слабой виртуалке (4 ядра, 4 гига RAM) при обработке 2 гигового файла выросла в 2-2.5 раза в пределах погрешности, даже больше чем у меня получилось сишарпом, теперь скорость обработки зависит исключительно от мощности и количества ядер процессора, сегодня вечером попробую затестить на мощном камне:
1727078186447.png


Результат отработки 2 гигового файла, до всех изменений в коде выполнялось за 360 секунд, сейчас за 157 секунд. Но пришлось чутка поменять логику.
 
В общем тестирование 100 миллионов строк на одном файле размером 2GB (Ryzen 5600 4700mhz, 32GB DDR4 3600MHZ)

Версия которая сейчас под хайдом:
1727088321972.png


Многопроцессорная версия:
1727176610649.png



P.S. Добавил мультипроцессорную версию.
 
Последнее редактирование:
Попробовал мультипроцессорную версию, хороший результат, файл 8 gb

INFO - Все временные файлы удалены. Уникальных строк: 229390870, Дублей удалено: 206
INFO - Всего обработано строк: 114695683
INFO - Удаление дублей и сортировка заняли 187.4826 секунд

Вопрос, скорость работы диска как то регулируется в софте?
на ссд при обращении к файлам более 250 мб/сек не видел
 
Попробовал мультипроцессорную версию, хороший результат, файл 8 gb

INFO - Все временные файлы удалены. Уникальных строк: 229390870, Дублей удалено: 206
INFO - Всего обработано строк: 114695683
INFO - Удаление дублей и сортировка заняли 187.4826 секунд

Вопрос, скорость работы диска как то регулируется в софте?
на ссд при обращении к файлам более 250 мб/сек не видел
Количество строк уникальных неправильно рассчитано.=)
Можно попробовать размер чанков увеличить или уменьшить. И подобрать нужную скорость. Но все может быть ограничено I/O операционной системы или питона.
Также там есть функции с параметрами установки количества потоков и процессов.
Ещё можно попробовать в 2 отдельных процесса одновременно запустить на разных папках и файлах.
 
Последнее редактирование:
Пожалуйста, обратите внимание, что пользователь заблокирован
Решение найдено можете закрывать тему
это я только так писал xD
 
Можно добавить функционал проверки длины строки?Строки меньше 6 символов и больше 32 в выходном файле ненужны.Готов все словари с weakpass.com прогнать через ваш софт и выложить в этом топике
 
Можно добавить функционал проверки длины строки?Строки меньше 6 символов и больше 32 в выходном файле ненужны.Готов все словари с weakpass.com прогнать через ваш софт и выложить в этом топике
Потом. Я пока на перерыве от IDE. =)
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Можно добавить функционал проверки длины строки?Строки меньше 6 символов и больше 32 в выходном файле ненужны.Готов все словари с weakpass.com прогнать через ваш софт и выложить в этом топике
Легче добавить просто валидатор по regexp где юзер сам будет писать регулярку как ему проверять
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Легче добавить просто валидатор по regexp где юзер сам будет писать регулярку как ему проверять
Попробуйте это
Мне очень понравилось. Пользуюсь уже давно. Причём работает без сортировки.
И параметры самому можно выставить.
 


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх