• XSS.stack #1 – первый литературный журнал от юзеров форума

Алгоритмы сортировки и удаления дублей строк в больших TXT файлах объемом от 1ГБ

вообще попробуй переписать чтоб ты сверял реально по базе, самая оптимальное что наверное могу посоветовать и без установки доп бд и всяких пакетов это sqlite и лимиты вроде позволяют https://www.sqlite.org/limits.html
Посмотреть вложение 95179
Попробовал такой подход реализовать на питоне, да всё работает, но увы очень медленно пишет в БД строки....такой подход мне не понравился, может я что-то не так делал. Но код даже показывать не охото, ибо стыдно =)
Я ищу вариант где можно и рыбку съесть (быстрая работа), и косточкой не подавится (экономия ресурсов ОЗУ).
 
Последнее редактирование:
Пожалуйста, обратите внимание, что пользователь заблокирован
попробуй не построчно писать в бд а огромными sql запросами
Попробовал такой подход реализовать на питоне, да всё работает, но увы очень медленно пишет в БД строки....такой подход мне не понравился, может я что-то не так делал. Но код даже показывать не охото, ибо стыдно =)
Я ищу вариант где можно и рыбку съесть (быстрая работа), и косточкой не подавится (экономия ресурсов ОЗУ).
 
ссылки по теме:
Вот это классный проект, чуть доработать и будет бомба:
Основной смысл в том чтобы сортировать не строки а их индексы.
 
Последнее редактирование:
ссылки по теме:
Вот это классный проект, чуть доработать и будет бомба:
Основной смысл в том чтобы сортировать не строки а их индексы.
Это не питон.
 
Тебе шашечки или ехать?
Интересуют опыты только с питоном. Что-то подобное я и реализовал в последнем решении, пока тестирую. В любом случае спасибо за наводки.
 
В общем сгенерил 100 миллионов строк вида xxx@xxx.xxx, немножко разбавил файл дублями. На выходе для тестов получился файл размером в 2 гигабайта. Написал скрипт который делит 1 большой файл на несколько маленьких частей в папке TEMP (корень скрипта), выполняет в них сортировку и удаление дублей и объединяет в 1 файл, ОЗУ по диспетчеру задач кушает в районе 20-30 мегабайт при выполнении скрипта, процессор грузит хорошо.
Результат:
1726747663167.png

Python:
import os  # Импортируем модуль для работы с файловой системой
import tempfile  # Импортируем модуль для создания временных файлов и каталогов
import time  # Импортируем модуль для работы с временем (измерение времени выполнения)
from multiprocessing import Pool  # Импортируем Pool для параллельной обработки данных
import heapq  # Импортируем модуль для работы с очередями слияния (сортировка)

# Создаем путь к папке TEMP в текущей директории для хранения временных файлов
temp_dir = os.path.join(os.getcwd(), 'TEMP')
if not os.path.exists(temp_dir):  # Если папка TEMP не существует
    os.makedirs(temp_dir)  # Создаем папку TEMP

# Функция для обработки чанка (части данных)
def process_chunk(chunk, temp_dir):
    chunk = sorted(set(chunk))  # Удаляем дубликаты в чанке и сортируем данные
    temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w', dir=temp_dir)  # Создаем временный файл
    with open(temp_file.name, 'w') as f:  # Открываем временный файл для записи
        for item in chunk:  # Проходим по каждому элементу чанка
            clean_item = ' '.join(item.split())  # Удаляем лишние пробелы из строки
            f.write(clean_item + '\n')  # Записываем строку в файл
    return temp_file.name, len(chunk)  # Возвращаем имя временного файла и количество уникальных строк

# Функция для слияния данных из временных файлов
def merge_files(temp_files, output_file):
    with open(output_file, 'w') as outfile:  # Открываем выходной файл для записи
        file_iters = [open(f, 'r') for f in temp_files]  # Открываем все временные файлы для чтения
        merged_iter = heapq.merge(*file_iters)  # Используем heapq.merge для слияния отсортированных файлов
        prev_line = ""  # Объявляем переменную для хранения предыдущей строки
        unique_count = 0  # Объявляем счётчик уникальных строк
        for line in merged_iter:  # Проходим по каждой строке в объединённом потоке данных
            clean_line = ' '.join(line.split())  # Очищаем строку от лишних пробелов
            if clean_line != prev_line:  # Если строка не является дубликатом предыдущей
                outfile.write(clean_line + '\n')  # Записываем строку в выходной файл
                prev_line = clean_line  # Обновляем переменную предыдущей строки
                unique_count += 1  # Увеличиваем счётчик уникальных строк
        for f in file_iters:  # Закрываем все временные файлы
            f.close()
    return unique_count  # Возвращаем количество уникальных строк

# Функция для сортировки и удаления дубликатов из входного файла
def sort_and_uniq_streaming(input_file, output_file):
    temp_files = []  # Список для временных файлов
    chunk = []  # Текущий чанк данных
    original_count = 0  # Счётчик исходных строк

    # Чтение входного файла по частям (чанками)
    with open(input_file, 'r') as infile:  # Открываем входной файл для чтения
        for line in infile:  # Проходим по каждой строке файла
            chunk.append(line.strip())  # Добавляем строку в чанк, удаляя лишние пробелы
            original_count += 1  # Увеличиваем счётчик исходных строк
            if len(chunk) >= 100000:  # Если чанк достиг 100000 строк
                with Pool() as pool:  # Создаём пул процессов для параллельной обработки
                    temp_data = pool.starmap(process_chunk, [(chunk, temp_dir)])  # Обрабатываем чанк в параллельных процессах
                    temp_files += [t[0] for t in temp_data]  # Добавляем имена временных файлов в список
                chunk = []  # Очищаем чанк

        # Обработка оставшегося чанка (если есть)
        if chunk:  # Если чанк не пустой
            with Pool() as pool:  # Создаём пул процессов
                temp_data = pool.starmap(process_chunk, [(chunk, temp_dir)])  # Обрабатываем оставшийся чанк
                temp_files += [t[0] for t in temp_data]  # Добавляем имена временных файлов

    # Слияние временных файлов и удаление дубликатов
    unique_count = merge_files(temp_files, output_file)  # Сливаем временные файлы в выходной файл

    # Удаление временных файлов
    for temp_file in temp_files:  # Проходим по каждому временному файлу
        os.remove(temp_file)  # Удаляем временный файл

    return original_count, unique_count  # Возвращаем исходное и уникальное количество строк

# Точка входа
if __name__ == '__main__':
    tic = time.perf_counter()  # Засекаем начальное время выполнения скрипта

    # Вызов функции сортировки и удаления дубликатов
    original_count, unique_count = sort_and_uniq_streaming('large_random_emails.txt', 'output.txt')

    tac = time.perf_counter()  # Засекаем конечное время выполнения скрипта

    duplicates_removed = original_count - unique_count  # Вычисляем количество удалённых дублей
    print(f"Удаление дублей заняло {tac - tic:0.4f} секунд")  # Выводим время выполнения
    print(f"Всего строк: {original_count}")  # Выводим общее количество строк в исходном файле
    print(f"Уникальных строк: {unique_count}")  # Выводим количество уникальных строк
    print(f"Удалено дублей: {duplicates_removed}")  # Выводим количество удалённых дублей
 
В общем сгенерил 100 миллионов строк вида xxx@xxx.xxx, немножко разбавил файл дублями. На выходе для тестов получился файл размером в 2 гигабайта. Написал скрипт который делит 1 большой файл на несколько маленьких частей в папке TEMP (корень скрипта), выполняет в них сортировку и удаление дублей и объединяет в 1 файл, ОЗУ по диспетчеру задач кушает в районе 20-30 мегабайт при выполнении скрипта, процессор грузит хорошо.
Результат:
Посмотреть вложение 95279
Python:
import os  # Импортируем модуль для работы с файловой системой
import tempfile  # Импортируем модуль для создания временных файлов и каталогов
import time  # Импортируем модуль для работы с временем (измерение времени выполнения)
from multiprocessing import Pool  # Импортируем Pool для параллельной обработки данных
import heapq  # Импортируем модуль для работы с очередями слияния (сортировка)

# Создаем путь к папке TEMP в текущей директории
temp_dir = os.path.join(os.getcwd(), 'TEMP')
if not os.path.exists(temp_dir):  # Если папка TEMP не существует
    os.makedirs(temp_dir)  # Создаем папку TEMP

# Функция для обработки чанка (части данных)
def process_chunk(chunk, temp_dir):
    chunk = sorted(set(chunk))  # Удаляем дубликаты в чанке и сортируем данные
    temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w', dir=temp_dir)  # Создаем временный файл
    with open(temp_file.name, 'w') as f:  # Открываем временный файл для записи
        for item in chunk:  # Проходим по каждому элементу чанка
            clean_item = ' '.join(item.split())  # Удаляем лишние пробелы из строки
            f.write(clean_item + '\n')  # Записываем строку в файл
    return temp_file.name, len(chunk)  # Возвращаем имя временного файла и количество уникальных строк

# Функция для слияния данных из временных файлов
def merge_files(temp_files, output_file):
    with open(output_file, 'w') as outfile:  # Открываем выходной файл для записи
        file_iters = [open(f, 'r') for f in temp_files]  # Открываем все временные файлы для чтения
        merged_iter = heapq.merge(*file_iters)  # Используем heapq.merge для слияния отсортированных файлов
        prev_line = ""  # Объявляем переменную для хранения предыдущей строки
        unique_count = 0  # Объявляем счётчик уникальных строк
        for line in merged_iter:  # Проходим по каждой строке в объединённом потоке данных
            clean_line = ' '.join(line.split())  # Очищаем строку от лишних пробелов
            if clean_line != prev_line:  # Если строка не является дубликатом предыдущей
                outfile.write(clean_line + '\n')  # Записываем строку в выходной файл
                prev_line = clean_line  # Обновляем переменную предыдущей строки
                unique_count += 1  # Увеличиваем счётчик уникальных строк
        for f in file_iters:  # Закрываем все временные файлы
            f.close()
    return unique_count  # Возвращаем количество уникальных строк

# Функция для сортировки и удаления дубликатов из входного файла
def sort_and_uniq_streaming(input_file, output_file):
    temp_files = []  # Список для временных файлов
    chunk = []  # Текущий чанк данных
    original_count = 0  # Счётчик исходных строк

    # Чтение входного файла по частям (чанками)
    with open(input_file, 'r') as infile:  # Открываем входной файл для чтения
        for line in infile:  # Проходим по каждой строке файла
            chunk.append(line.strip())  # Добавляем строку в чанк, удаляя лишние пробелы
            original_count += 1  # Увеличиваем счётчик исходных строк
            if len(chunk) >= 100000:  # Если чанк достиг 100000 строк
                with Pool() as pool:  # Создаём пул процессов для параллельной обработки
                    temp_data = pool.starmap(process_chunk, [(chunk, temp_dir)])  # Обрабатываем чанк в параллельных процессах
                    temp_files += [t[0] for t in temp_data]  # Добавляем имена временных файлов в список
                chunk = []  # Очищаем чанк

        # Обработка оставшегося чанка (если есть)
        if chunk:  # Если чанк не пустой
            with Pool() as pool:  # Создаём пул процессов
                temp_data = pool.starmap(process_chunk, [(chunk, temp_dir)])  # Обрабатываем оставшийся чанк
                temp_files += [t[0] for t in temp_data]  # Добавляем имена временных файлов

    # Слияние временных файлов и удаление дубликатов
    unique_count = merge_files(temp_files, output_file)  # Сливаем временные файлы в выходной файл

    # Удаление временных файлов
    for temp_file in temp_files:  # Проходим по каждому временному файлу
        os.remove(temp_file)  # Удаляем временный файл

    return original_count, unique_count  # Возвращаем исходное и уникальное количество строк

# Точка входа
if __name__ == '__main__':
    tic = time.perf_counter()  # Засекаем начальное время выполнения скрипта

    # Вызов функции сортировки и удаления дубликатов
    original_count, unique_count = sort_and_uniq_streaming('large_random_emails.txt', 'output.txt')

    tac = time.perf_counter()  # Засекаем конечное время выполнения скрипта

    duplicates_removed = original_count - unique_count  # Вычисляем количество удалённых дублей
    print(f"Удаление дублей заняло {tac - tic:0.4f} секунд")  # Выводим время выполнения
    print(f"Всего строк: {original_count}")  # Выводим общее количество строк в исходном файле
    print(f"Уникальных строк: {unique_count}")  # Выводим количество уникальных строк
    print(f"Удалено дублей: {duplicates_removed}")  # Выводим количество удалённых дублей
Ну да это External Merge Sort, главное за HDD следи чтобы он не отъехал.
Я бы определял колличество доступной ОЗУ, читал файл не построчно, а побайтово, по размеру ОЗУ корректируя чтобы чанк кончался концом строки.
Когда то делал красивый итератор для чтения по блокам.
 
Последнее редактирование:
Ну да это External Merge Sort, главное за HDD следи чтобы он не отъехал.
Я бы определял колличество доступной ОЗУ, читал файл не построчно, а побайтово, по размеру ОЗУ корректируя чтобы чанк кончался концом строки.
Когда то делал красивый итератор для чтения по блокам.
Интересная идея, попробую попозже. Но пока устал. В принципе данный алгоритм удовлетворил мою любопытность реализации такого на петухоне. :D
Правда да, недостаток с нагрузкой на ПЗУ очень смущает. Ваш вариант показался более перспективным.
 
Bash:
vim filename.txt
:sort u
2 гига съел без проблем.
думал пару минут, не засекал время.
 
Чуть-чуть ускорил работу предыдущего алгоритма, процентов на 10-15, тесты производил на виртуалке в 4 ядра, 4 гига ОЗУ.
До изменений:
Код:
Удаление дублей заняло 385.0045 секунд
Всего строк: 86008184
Уникальных строк: 85570684
Удалено дублей: 437500
После изменений в функции process_chunk:
Код:
Удаление дублей заняло 349.8931 секунд
Всего строк: 86008184
Уникальных строк: 85570684
Удалено дублей: 437500
Python:
import os
import tempfile
import time
from multiprocessing import Pool
import heapq

# Создаем путь к папке TEMP в текущей директории для хранения временных файлов
temp_dir = os.path.join(os.getcwd(), 'TEMP')
if not os.path.exists(temp_dir):  # Если папка TEMP не существует
    os.makedirs(temp_dir)  # Создаем папку TEMP

# Функция для обработки чанка (части данных)
def process_chunk(chunk, temp_dir):
    print("Начата обработка чанка...")

    # Используем временное множество для удаления дубликатов
    unique_items = set(chunk)

    # Используем heapq для одновременного удаления дубликатов и сортировки
    sorted_chunk = list(heapq.merge(sorted(unique_items)))

    # Используем пакетную запись
    temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w', dir=temp_dir)
    with open(temp_file.name, 'w', errors='ignore') as f:
        f.write('\n'.join(sorted_chunk) + '\n')  # Записываем данные в файл

    print(f"Чанк обработан и сохранён во временный файл: {temp_file.name}")
    return temp_file.name, len(sorted_chunk)

# Функция для слияния данных из временных файлов
def merge_files(temp_files, output_file):
    print(f"Начинается слияние файлов. Всего временных файлов: {len(temp_files)}")
    with open(output_file, 'w', errors='ignore') as outfile:
        file_iters = [open(f, 'r', errors='ignore') for f in temp_files]  # Открываем все временные файлы для чтения
        merged_iter = heapq.merge(*file_iters)  # Используем heapq.merge для слияния отсортированных файлов
        prev_line = ""
        unique_count = 0
        for line in merged_iter:
            clean_line = ' '.join(line.split())  # Очищаем строку от лишних пробелов
            if clean_line != prev_line:  # Если строка не является дубликатом предыдущей
                outfile.write(clean_line + '\n')  # Записываем строку в выходной файл
                prev_line = clean_line
                unique_count += 1  # Увеличиваем счётчик уникальных строк
        for f in file_iters:
            f.close()
    print(f"Слияние завершено. Уникальных строк: {unique_count}")
    return unique_count  # Возвращаем количество уникальных строк

# Функция для сортировки и удаления дубликатов из входного файла
def sort_and_uniq_streaming(input_file, output_file):
    print(f"Начинается обработка файла: {input_file}")
    temp_files = []  # Список для временных файлов
    chunk = []  # Текущий чанк данных
    original_count = 0  # Счётчик исходных строк

    with open(input_file, 'r', errors='ignore') as infile:  # Открываем входной файл для чтения
        for line in infile:
            chunk.append(line.strip())  # Добавляем строку в чанк, удаляя лишние пробелы
            original_count += 1
            if len(chunk) >= 200000:  # Если чанк достиг 200000 строк, для ускорения можно поиграться с этим условием, например если в моем случае добавить чанк на 500000 строк, будет работать чуть побыстрее.
                print(f"Обработка чанка из {len(chunk)} строк")
                with Pool() as pool:
                    temp_data = pool.starmap(process_chunk, [(chunk, temp_dir)])  # Обрабатываем чанк в параллельных процессах
                    temp_files += [t[0] for t in temp_data]
                chunk = []  # Очищаем чанк

        if chunk:  # Если чанк не пустой
            print(f"Обработка последнего чанка из {len(chunk)} строк")
            with Pool() as pool:
                temp_data = pool.starmap(process_chunk, [(chunk, temp_dir)])
                temp_files += [t[0] for t in temp_data]

    print("Все чанки обработаны. Начинается слияние временных файлов...")
    unique_count = merge_files(temp_files, output_file)  # Сливаем временные файлы в выходной файл

    for temp_file in temp_files:
        os.remove(temp_file)  # Удаляем временные файлы
    print("Все временные файлы удалены")

    return original_count, unique_count  # Возвращаем исходное и уникальное количество строк

# Точка входа
if __name__ == '__main__':
    tic = time.perf_counter()  # Засекаем начальное время выполнения скрипта

    original_count, unique_count = sort_and_uniq_streaming('large_random_emails.txt', 'output.txt')

    tac = time.perf_counter()  # Засекаем конечное время выполнения скрипта

    duplicates_removed = original_count - unique_count
    print(f"Удаление дублей заняло {tac - tic:0.4f} секунд")
    print(f"Всего строк: {original_count}")
    print(f"Уникальных строк: {unique_count}")
    print(f"Удалено дублей: {duplicates_removed}")


Bash:
vim filename.txt
:sort u
2 гига съел без проблем.
думал пару минут, не засекал время.
Понятно что это можно сделать как встроенными утилитами Linux, так и VIM. Конкретно в этом случае я написал в первом посте интересует только Python. Но это не подойдет для файлов, если их размер к примеру будет 100 гигов, при условии недостаточного количества ОЗУ в системе. Ведь вим грузит твой файл сначала в буфер перед тем как делать сортировку и удаление дублей.

P.S. При тестировании алгоритма на размере файла в 70 гигов обнаружен баг, проблема понятна. Я уже внёс исправления, и алгоритм тестируется. Пока что самый большой файл которы отработали в ветке был в районе 10 гигабайт.
 
Последнее редактирование:
Понятно что это можно сделать как встроенными утилитами Linux, так и VIM. Конкретно в этом случае я написал в первом посте интересует только Python. Но это не подойдет для файлов, если их размер к примеру будет 100 гигов, при условии недостаточного количества ОЗУ в системе. Ведь вим грузит твой файл сначала в буфер перед тем как делать сортировку и удаление дублей.
А Python возьмёт 100 гигов файл?
 
А Python возьмёт 100 гигов файл?
На данной реализации алгоритма можно попробовать, только учтите что для его отработки нужно будет еще столько же свободного места
 
Последнее редактирование:
10 гб файл

Удаление дублей заняло 484.0911 секунд
Всего строк: 125156483
Уникальных строк: 68249415
Удалено дублей: 56907068
 
10 гб файл

Удаление дублей заняло 484.0911 секунд
Всего строк: 125156483
Уникальных строк: 68249415
Удалено дублей: 56907068
Спасибо за тест, нормальный результат. Я еще попозже попробую оптимизировать код, добавив динамическое вычисление чанка в зависимости от объема ОЗУ, должно быть быстрее.
P.S. В алгоритме есть логическая ошибка. Сейчас тружусь над её исправлением, дубли чистятся не все.
 
Последнее редактирование:
Чуть-чуть ускорил работу предыдущего алгоритма, процентов на 10-15, тесты производил на виртуалке в 4 ядра, 4 гига ОЗУ.
До изменений:
Код:
Удаление дублей заняло 385.0045 секунд
Всего строк: 86008184
Уникальных строк: 85570684
Удалено дублей: 437500
После изменений в функции process_chunk:
Код:
Удаление дублей заняло 349.8931 секунд
Всего строк: 86008184
Уникальных строк: 85570684
Удалено дублей: 437500
Python:
import os
import tempfile
import time
from multiprocessing import Pool
import heapq

# Создаем путь к папке TEMP в текущей директории для хранения временных файлов
temp_dir = os.path.join(os.getcwd(), 'TEMP')
if not os.path.exists(temp_dir):  # Если папка TEMP не существует
    os.makedirs(temp_dir)  # Создаем папку TEMP

# Функция для обработки чанка (части данных)
def process_chunk(chunk, temp_dir):
    print("Начата обработка чанка...")

    # Используем временное множество для удаления дубликатов
    unique_items = set(chunk)

    # Используем heapq для одновременного удаления дубликатов и сортировки
    sorted_chunk = list(heapq.merge(sorted(unique_items)))

    # Используем пакетную запись
    temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w', dir=temp_dir)
    with open(temp_file.name, 'w', errors='ignore') as f:
        f.write('\n'.join(sorted_chunk) + '\n')  # Записываем данные в файл

    print(f"Чанк обработан и сохранён во временный файл: {temp_file.name}")
    return temp_file.name, len(sorted_chunk)

# Функция для слияния данных из временных файлов
def merge_files(temp_files, output_file):
    print(f"Начинается слияние файлов. Всего временных файлов: {len(temp_files)}")
    with open(output_file, 'w', errors='ignore') as outfile:
        file_iters = [open(f, 'r', errors='ignore') for f in temp_files]  # Открываем все временные файлы для чтения
        merged_iter = heapq.merge(*file_iters)  # Используем heapq.merge для слияния отсортированных файлов
        prev_line = ""
        unique_count = 0
        for line in merged_iter:
            clean_line = ' '.join(line.split())  # Очищаем строку от лишних пробелов
            if clean_line != prev_line:  # Если строка не является дубликатом предыдущей
                outfile.write(clean_line + '\n')  # Записываем строку в выходной файл
                prev_line = clean_line
                unique_count += 1  # Увеличиваем счётчик уникальных строк
        for f in file_iters:
            f.close()
    print(f"Слияние завершено. Уникальных строк: {unique_count}")
    return unique_count  # Возвращаем количество уникальных строк

# Функция для сортировки и удаления дубликатов из входного файла
def sort_and_uniq_streaming(input_file, output_file):
    print(f"Начинается обработка файла: {input_file}")
    temp_files = []  # Список для временных файлов
    chunk = []  # Текущий чанк данных
    original_count = 0  # Счётчик исходных строк

    with open(input_file, 'r', errors='ignore') as infile:  # Открываем входной файл для чтения
        for line in infile:
            chunk.append(line.strip())  # Добавляем строку в чанк, удаляя лишние пробелы
            original_count += 1
            if len(chunk) >= 200000:  # Если чанк достиг 200000 строк, для ускорения можно поиграться с этим условием, например если в моем случае добавить чанк на 500000 строк, будет работать чуть побыстрее.
                print(f"Обработка чанка из {len(chunk)} строк")
                with Pool() as pool:
                    temp_data = pool.starmap(process_chunk, [(chunk, temp_dir)])  # Обрабатываем чанк в параллельных процессах
                    temp_files += [t[0] for t in temp_data]
                chunk = []  # Очищаем чанк

        if chunk:  # Если чанк не пустой
            print(f"Обработка последнего чанка из {len(chunk)} строк")
            with Pool() as pool:
                temp_data = pool.starmap(process_chunk, [(chunk, temp_dir)])
                temp_files += [t[0] for t in temp_data]

    print("Все чанки обработаны. Начинается слияние временных файлов...")
    unique_count = merge_files(temp_files, output_file)  # Сливаем временные файлы в выходной файл

    for temp_file in temp_files:
        os.remove(temp_file)  # Удаляем временные файлы
    print("Все временные файлы удалены")

    return original_count, unique_count  # Возвращаем исходное и уникальное количество строк

# Точка входа
if __name__ == '__main__':
    tic = time.perf_counter()  # Засекаем начальное время выполнения скрипта

    original_count, unique_count = sort_and_uniq_streaming('large_random_emails.txt', 'output.txt')

    tac = time.perf_counter()  # Засекаем конечное время выполнения скрипта

    duplicates_removed = original_count - unique_count
    print(f"Удаление дублей заняло {tac - tic:0.4f} секунд")
    print(f"Всего строк: {original_count}")
    print(f"Уникальных строк: {unique_count}")
    print(f"Удалено дублей: {duplicates_removed}")



Понятно что это можно сделать как встроенными утилитами Linux, так и VIM. Конкретно в этом случае я написал в первом посте интересует только Python. Но это не подойдет для файлов, если их размер к примеру будет 100 гигов, при условии недостаточного количества ОЗУ в системе. Ведь вим грузит твой файл сначала в буфер перед тем как делать сортировку и удаление дублей.

P.S. При тестировании алгоритма на размере файла в 70 гигов обнаружен баг, проблема понятна. Я уже внёс исправления, и алгоритм тестируется. Пока что самый большой файл которы отработали в ветке был в районе 10 гигабайт.
Эта версия "умерла" на файле в 77 гигов с такой ошибкой:
Traceback (most recent call last):
File "c:\test\sort.py", line 86, in <module>
original_count, unique_count = sort_and_uniq_streaming('lines.txt', 'output.txt')
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\test\sort.py", line 74, in sort_and_uniq_streaming
unique_count = merge_files(temp_files, output_file) # Сливаем временные файлы в выходной файл
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\test\sort.py", line 34, in merge_files
file_iters = [open(f, 'r', errors='ignore') for f in temp_files] # Открываем все временные файлы для чтения
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [Errno 24] Too many open files: 'c:\\test\\TEMP\\tmpdaz2dlgy'
 
Эта версия "умерла" на файле в 77 гигов с такой ошибкой:

https://xss.pro/threads/123192/ , там обновленный скрипт, файл в 400 гб съел без проблем
 
Эта версия "умерла" на файле в 77 гигов с такой ошибкой:
Это я не учел когда писал скрипт, что есть определенное ограничение у операционных систем на открытое количество дескрипторов файлов (его костылем в линуксе и винде можно исправлять или даже обходить питоном используя один метод), поэтому я эту проблему поборол пакетным слиянием батчей))))
 
Последнее редактирование:
https://xss.pro/threads/123192/ , там обновленный скрипт, файл в 400 гб съел без проблем
Этот же файл мультипроцессорный обновленный скрипт не съел.
2024-09-24 13:26:03,465 - INFO - Параллельное слияние 10300 временных файлов с использованием 24 потоков
2024-09-24 13:26:03,714 - ERROR - Ошибка при слиянии файлов: [Errno 24] Too many open files: 'c:\\test\\TEMP\\tmptb4mp7st'
2024-09-24 13:26:03,766 - INFO - Финальное слияние завершено. Уникальных строк: 0, дублей удалено: 0
2024-09-24 13:26:07,570 - INFO - Временная папка c:\test\TEMP успешно удалена.
2024-09-24 13:26:07,571 - INFO - Все временные файлы удалены. Уникальных строк: 0, Дублей удалено: 1024293093
2024-09-24 13:26:07,571 - INFO - Всего обработано строк: 1024293093
2024-09-24 13:26:07,572 - INFO - Удаление дублей и сортировка заняли 1958.85
 


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх