• XSS.stack #1 – первый литературный журнал от юзеров форума

Статья Видеоредактор через CV, ASR, LLM.

raoulduke666

RAID-массив
Пользователь
Регистрация
08.10.2024
Сообщения
74
Реакции
102
Автор: raoulduke666
Написано специально для xss.pro (c)

Доброго времени суток.
В последнее время мне почему то было интерсно поработать с CV(Computer Vision) и ASR(Automated Speech Recognition). Соответственно в этом чтиве об этих двух инструментах на примере видеоредактора.

Раз уж речь про ИИ то хочется нажать кнопку и получить результат.

Видеомонтаж состоит из двух этапов:

  • Подготовка рабочего пространства.
Транскрибация аудиодорожки через Whisper(ASR)
Поиск интересующих моментов через промпт к языковой модели.

  • Видеомонтаж.
Нарезка по таймкодам, полученным от языковой модели
Работа с видеорядом через YOLO(CV) + ffmpeg.

Разделено на 2 этапа потому что 1 всегда работает одинаково, 2 меняется в зависимости от последнего модуля. Под каждую задачу свой модуль, например:

  • Нужен чтобы из горизонтального видео сделать вертикальное? так чтобы оригинальный видеоряд был расположен по центру + блюр сверху и снизу + субтитры.

  • Работает только со стримами. YOLO детектит область вэбки и выносит ее в верхнюю часть ролика, сама запись стрима остается снизу, но без вэбки чтобы не дублировать.

  • Накладывает другой видеоряд, его выбирает llm, но в статье будут только первые 2 примера.

Whisper.
Whisper (asr-система автоматического распознавания речи). Я не хочу писать какие то выдержки из документации, просто свои наблюдения.
Поддержка языков. У Whisper есть 2 модели которые поддерживают несколько языков, включая русский, Whisper-large-v3/Whisper-large-v3-turbo. Модели обучены на одинаковых датасетах, но обе имеют разное количество параметров и слоев.
  • Whisper-large-v3: 1.54 миллиарда параметров и 32 декодирующих слоя.
  • Whisper-large-v3-turbo: 809 миллиона параметров и 4 декодирующих слоя.
Так же обе модели поддерживают автоматический перевод на английский язык с любого другого языка. Условно аудио на немецком, значит whisper может переводить сразу на английский язык, в обратную сторону такой мув не работает.

Это говорит только о том что обе модели знают одни и те же слова и в "хороших" условиях должны дать одинаково хороший результат. По своему личному опыту заметил turbo нужен для транскрибации нормальной речи взрослого человека, большая модель нужна для транскрибации речи почти всех стримеров, детей и просто людей которые по каким то причинам не научились нормально разговаривать.

Модель умеет "додумывать", если не услышала слово до конца или речь слышно плохо, соответственно Whisper-large-v3 справляется с этой задачей лучше чем Whisper-large-v3-turbo.

Субтитры. Модель может транскрибировать с таймкодами на уровне предложений и слов. Это самый жирный и весомый плюс, решает сразу кучу проблем с субтитрами в дальнейшем. Но модель не может определять спикера, поэтому очень хорошо подходит только для одного спикера, если каким то другим образом получить таймкоды речи разных спикеров, то эту проблему можно решить, но только при условии если это какое то интервью или подкаст, гду у людей нормальная речь.

Whisper можно запустить на CPU. Работает очень медленно, поэтому такой вариант подходит только на случай чтобы транскрибировать короткое аудио с субтитрами на уровне слов, чтобы в дальнейшем во время рендера наложить субтитры.

HuggingFace.
Репозиторий модели:
Снимок экрана 2025-04-18 172721.png


Ниже в карточке модели можно ознакомиться с ее локальным запуском. Но там нам предлагают скачать еще какой то датасэт... Мы конечно этого делать не будем. Поэтому слегка отредактируем код который нам предлагают.

Снимок экрана 2025-04-18 174949.png


Но перед тем как скачать модель нужно установить Transformers - либа для работы с моделями на huggingface + установить PyTorch и CUDA.

Код:
pip install torch
Код:
pip install transformers

Отсюда скачиваем CUDA https://developer.nvidia.com/Cuda-downloads
CUDA дает возможность использовать GPU для вычислений.

Выбираем параметры под систему и скачиваем, это необходимо чтобы запустить на GPU https://pytorch.org/get-started/locally/
PyTorch позволяет работать с вычислениями на GPU за счет CUDA.

Теперь надо убедиться что CUDA работает и видит GPU.

Python:
import torch

if torch.cuda.is_available():
    print("CUDA доступен")
    print(f"Количество доступных GPU: {torch.cuda.device_count()}")
    for i in range(torch.cuda.device_count()):
        print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
else:
    print("CUDA не доступен.")

print(torch.__version__)  # версия pytorch

Должен быть такой результат:

CUDA доступен
Количество доступных GPU: 1
GPU 0: NVIDIA GeForce RTX 4060
2.5.1+cu124

Код для загрузки модели.
Python:
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline

device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

model_id = "openai/whisper-large-v3-turbo"

# загрузка модели
model = AutoModelForSpeechSeq2Seq.from_pretrained(
    model_id,
    torch_dtype=torch_dtype,
    low_cpu_mem_usage=True,
    use_safetensors=True
)
model.to(device)

processor = AutoProcessor.from_pretrained(model_id)

pipe = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    torch_dtype=torch_dtype,
    device=device,
)

print("Whisper загружен")

После загрузки модели код можно оставить таким, он будет загружать модель из кэша. Или можно напрямую указать путь к модели, .../.cache/huggingface/hub/models--openai--whisper-large-v3/snapshots/06f233fe06e710322aca913c1bc4249a0d71fce1

Параметры запуска Whisper.
Как упоминал ранее в проекте два модуля с asr, они выполняют разные задачи и соответственно настройка пайплайна для каждого отличается.

Первый пайплайн:

pipe = pipeline(
"automatic-speech-recognition",
model=model,
tokenizer=processor.tokenizer,
feature_extractor=processor.feature_extractor,
chunk_length_s=28,
batch_size=8,
stride_length_s=3,
torch_dtype=torch.float16,
device="cuda:0",
)

chunk_length_s: Длина чанка, на который Whisper разбивает аудио (максимум 30 секунд). Чем больше, тем лучше модель улавливает контекст, паузы и пунктуацию, это очень важно для llm. Но при chunk_length_s=30 на выходе иногда получаются очень длинные чанки. То есть вместо таймкодов предложений я получаю таймкоды целого абзаца, в котором может подняться куча разных тем, в таком случае llm не может понять что нарезать и весь абзац целиком выделит как интересный момент, а его таймкоды могут быть 0-100 секунд. Это может быть не критично если на выходе нужно получать длинное видео, например получасовой отрывок какого то интервью/подкаста/стрима. Но при значении chunk_length_s=28 данная проблема решается
  • chunk_length_s=30
"...Бедные люди всегда будут среди нас, потому что это начинается здесь. Это в их словах, и их слова становятся реальностью. Когда они говорят «Я не могу себе этого позволить», или «Я не могу этого сделать», в этот момент они сдаются...| (25.74, 99.22)"
  • chunk_length_s=28
"Бедные люди всегда будут среди нас, потому что это начинается здесь.| (48.0, 52.0)
Это в их словах, и их слова становятся реальностью.| (52.0, 56.0)
Когда они говорят «Я не могу себе этого позволить», или «Я не могу этого сделать»,| (56.0, 60.0)"

Так на много лучше :)

Кто то внимательный заметил не соответствие таймкодов. При chunk_length_s=30 еще возникают проблемы с полученными временными метками, возможно я чего то не знаю и это моя ошибка, но при 28 эта проблема исчезает.

stride_length_s: Перекрытие между чанками. Без перекрытия слова на стыках чанков могут обрезаться слова, теряется контекст, что ломает пунктуацию и смысл. Я ставлю 2-3 секунды — этого хватает, чтобы Whisper "услышал" начало следующего чанка и подстроился под темп речи. Большее значение может привести к дублированию слов на стыках чанков, особенно если спикер говорит быстро.

batch_size: Количество чанков, обрабатываемых за один проход через модель. В первом пайплайне указываю batch_size=8, что оптимально для 2-3 минутных файлов на моей видеокарте.

Параметры low_cpu_mem_usage=True и use_safetensors=True уменьшают потребление памяти.

low_cpu_mem_usage=True не относится к транскрибации на CPU или GPU напрямую. Этот параметр скорее про инициализацию модели. А так этот параметр просто изменяет процесс загрузки модели, что позволяет Whisper меньше срать в оперативную память на этапе загрузки.

use_safetensors=True указывает, что веса модели должны загружаться в формате safetensors, а не в стандартном формате pytorch(.bin). safetensors это оптимизированный формат, разработанный huggingface который быстрее, компактнее и безопаснее, чем условно файлы .bin. Он используется для хранения весов моделей. В целом этот параметр можно всегда прописывать всегда если модель его поддерживает.

Второй пайплайн:

pipe = pipeline(
"automatic-speech-recognition",
model=model,
tokenizer=processor.tokenizer,
feature_extractor=processor.feature_extractor,
chunk_length_s=20,
batch_size=2,
stride_length_s=2
torch_dtype=torch_dtype,
device=device,
)

Таймкоды на уровне слов требуют больше вычислений, так как Whisper анализирует аудио с большей гранулярностью. Соответственно 2 пайплайн имеет более скромные параметры.

YOLO.
YOLO11(cv - computer vision). В моем проекте он отвечает за работу с оригинальным видеорядом. Сейчас YOLO обучен детектить вэбку стримера. Данные о положении вэбки используются для редактирования видеоряда.

Сперва нужно настроить окружение под разметку. В моем случае используется CVAT(computer vision annotation tool)

На официальном сайте CVAT есть туториал по установке под разные системы. https://docs.cvat.ai/docs/administration/basics/installation/
Работает на Windows11*

Снимок экрана 2025-04-18 185623.png


Чтобы запустить CVAT нужен WSL2, doker desktop и git.

Если все вышеописанное имеется, то клонируем репозиторий

Код:
git clone https://github.com/cvat-ai/cvat
cd cvat

После перехода в репозиторий нужно запустить команду по сбору всех контейнеров.

Код:
docker compose up -d

После этого будет доступен localhost:8080, но CVAT будет требовать логин и пароль.

Поэтому перемещаемся в WSL и прописываем эту команду, перед этим указав свой логин и пароль. В итоге внутри cvat_server будет запущен питон файл с командой createsuperuser

Код:
sudo docker exec -it cvat_server bash -ic 'python3 ~/manage.py createsuperuser'

Пример
Снимок экрана 2025-04-18 192526.png


WSL можно больше не запускать. Чтобы перейти в CVAT нужно запустить doker и localhost:8080 станет доступен

CVAT(computer vision annotation tool).

CVAT используется для разметки и создания data.yaml файла, который необходим для обучения.
Все интуитивно понятно, сложного ничего нет.

Так выглядит раздел с проектами, у вас будет пусто. Нажимаете на плюсик и создаете новый.
Снимок экрана 2025-04-18 194520.png


Указываете названия проекта и класс который будем детектить.
Снимок экрана 2025-04-18 194548.png

Снимок экрана 2025-04-18 194605.png


Далее заходим в наш проект и создаем таску.
Снимок экрана 2025-04-18 194622.png


train таска. Указывается название таски, ее тип "train" и загружаем скриншоты стримов с вэбкой. Так как таска отвечает за тренировку, то тут должно быть больше всего изображений. 70% train и 30% validation.

Снимок экрана 2025-04-18 194659.png

Снимок экрана 2025-04-18 194732.png


Теперь в разделе task отображается задача под тренировку.
Снимок экрана 2025-04-18 194829.png


Работаем руками, разметка. Я загружал по несколько скриншотов из каждого стрима. Так как вэбка всегда на одном и том же месте можно немного упростить задачу с разметкой. Перед тем как разметить изображение я указываю тип "track", это позволит разметке остаться на том же месте на следующих изображениях.
Снимок экрана 2025-04-18 194837.png

Снимок экрана 2025-04-18 194859.png


Разметку я почему то решил сделать именно таким образом, без обводки самой рамки. Такая рамка есть не везде и модель может запутаться из за маленького датасета. Да и мне во время итогового монтажа эта рамка не нужна, и скорее только все будет портить
Снимок экрана 2025-04-18 194929.png


Допустим ваша пачка из 3 изображений одного стрима подошла к концу и далее начинается пачка из другого стрима. Но предыдущая разметка типа "track" будет на текущем скриншоте, чтобы ее убрать нажмите на эту кнопку, ненужная разетка исчезнет и делайте новую.
photo_2025-04-18_20-43-28.jpg


Когда разметку для train сделали, то сохраняйте, заканчивайте работу.
Снимок экрана 2025-04-18 195148.png


Нужно указать статус таски(выполнено).
Снимок экрана 2025-04-18 195220.png


Как с train закончили создавайте новую таску, но не train а validation и загружайте пачку изображений под validation (ИЗОБРАЖЕНИЯ ДОЛЖНЫ БЫТЬ РАЗНЫЕ).
Пропускаю момент с созданием таски под валидацию и разметку, тут реально ничего сложного + я забыл скрины сделать.
Снимок экрана 2025-04-18 195230.png


И вот когда вся разметка выполнена, таски готовы, можно экспортировать yaml файл.
Снимок экрана 2025-04-18 195328.png


Формат YOLOv8 Detection 1.0 и любое название.
Снимок экрана 2025-04-18 195340.png


Обучение.
Теперь когда yaml файл получен нужно создать следующую структуру:

onyxChlenix :smile10: /
├── data.yaml #yaml из cvat
├── yolo11n.pt #нейронка которую будем обучать
└── datasets/
├── train.txt # Список путей к train изображениям
├── val.txt # Список путей к val изображениям
├── images/
│ ├── train/ # Обучающие изображения
│ └── val/ # Валидационные изображения
└── labels/
├── train/ # Разметка для train
└── val/ # Разметка для val

Нужно убедиться, что в yaml файле указаны правильные пути к датасету.

Осталось скачать нейронку
Оф сайт ultralytics https://docs.ultralytics.com/ru/models/yolo11/#performance-metrics

Нужна YOLO11n, она самая легкая и лучше всех подходит под маленькие датасеты, Хорошо справляется на CPU

+

Теперь когда окружение готово можно приступить к обучению.


Python:
from ultralytics import YOLO
import multiprocessing

def main():
    # Загрузка модели
    model = YOLO("yolo11n.pt")  # или ваша модель "yolo11n.pt"

    # Обучение модели
    results = model.train(data="data.yaml", epochs=60, imgsz=640)

if __name__ == '__main__':
    multiprocessing.freeze_support()  # Для поддержки замороженных приложений (можно опустить если не создаете exe)
    main()

GPU будет выбран автоматически если доступен. И в самом начале обучения, перед его началом будет указан GPU или CPU. Должен быть GPU или придется учить на CPU (займет минут 15 -30, при условии 100 изображений в датасете). Так же замечал что если датасет слишком маленький, то можно в validation закидывать побольше изображений (60% train 40% validation)

После обучения будут доступны 2 нейронки, best.pt и last.pt. Для дальнейшей работы используем best.pt.
last.pt нужен для дообучения на сколько помню.

Структура проекта.

C:/├── work/ # Рабочее пространство
│ ├── [video_id_1]/ # Папка для каждого видео
│ │ ├── [video_id].mp4 # Оригинальное видео
│ │ ├── [video_id]_audio.mp3 # Оригинальная аудиодорожка
│ │ ├── 2mins/ # Нарезки аудио по 2 минуты
│ │ │ └── 2min_01_0.00_120.00.mp3...
│ │ ├── transcripts/ # Транскрбация Whisper
│ │ │ └── transcripts.txt
│ │ ├── transcriptsTimeSteps.txt # Результаты APIGPT
│ │ ├── shorts_audio/ # Короткие аудиофрагменты
│ │ │ ├── clip_1.mp3...
│ │ │ ├── durations.txt # Длительности фрагментов
│ │ │ └── transcripts_lvl_word/ # Транскрипция на уровне слов
│ │ │ └── clip_1_timeStepsLvlWord.txt...
│ │ └── (временные файлы DetectionGOVNISHE)
│ └── [video_id_2]/ # Аналогичная структура для других видео

├── shorts/ # Готовые ролики
│ ├── [video_id_1]/
│ │ └── final_with_subs_clip_1.mp4...
│ └── [video_id_2]/....

└── Users/
└── Administrator/
└── Desktop/
└── onyxChlenix1337/ # Исходные скрипты
├── Download.py
├── main_narezka2mins.py
├── mainWhisperLvlText.py
├── APIGPT.py
├── mainGenerateAudioShorts.py
├── audioToTextWORDS
├── DURATION.py
├── DetectionGOVNISHE.py
├── PizdecTitri3.py # Основной рендерер
├── best.pt
├── data.yaml
└── main3.py


main.py
Главный скрипт, вызывающий модули
Python:
import os
import subprocess
import sys
import time
import glob


def save_video_key3(video_id):
    filename = 'C:/task3.txt'
    with open(filename, 'w') as file:
        file.write(video_id)

def save_video_key3_1(video_id):
    filename = 'C:/task3_1.txt'
    with open(filename, 'w') as file:
        file.write(video_id)

def run_module(module_name, function_name, *args):
    """Запускает модуль как отдельный процесс с переданными аргументами."""
    command = [sys.executable, "-c", f"from {module_name} import {function_name}; {function_name}(*{args})"]
    subprocess.run(command, check=True)

def wait_for_any_file(file_paths, timeout=600):
    """Ожидает появления хотя бы одного файла в течение заданного времени."""
    start_time = time.time()
    print(f"Ожидание файлов: {file_paths}")
    while True:
        any_exist = any(os.path.exists(file_path) for file_path in file_paths)
        if any_exist:
            print("Хотя бы один файл появился.")
            return True
        if time.time() - start_time > timeout:
            print(f"Не удалось дождаться появления файлов в течение {timeout} секунд.")
            return False
        time.sleep(5)

def wait_for_transcript_files(audio_folder, timeout=600):
    """Ожидает создания папки и появления хотя бы одного файла транскрипции."""
    start_time = time.time()
    print(f"Ожидание создания папки {audio_folder} и файлов транскрипций...")
    while True:
        if os.path.exists(audio_folder):
            print(f"Папка {audio_folder} найдена, проверяем файлы...")
            transcript_files = glob.glob(os.path.join(audio_folder, "clip_*_timeStepsLvlWord.txt"))
            if transcript_files:
                print(f"Найдены файлы транскрипций: {transcript_files}")
                return transcript_files
            else:
                print(f"Папка {audio_folder} пуста, продолжаем ждать...")
        else:
            print(f"Папка {audio_folder} ещё не создана, ждём...")
        if time.time() - start_time > timeout:
            print(f"Не удалось дождаться создания папки или файлов в {audio_folder} за {timeout} секунд.")
            return None
        time.sleep(5)

def main():
    output_dir = "C:/work"
    youtube_url = "https://www.youtube.com/watch?v=huVDE_nABOQ"

    # 1. Загрузка видео и аудио
    run_module("Download", "download_youtube_video_and_audio", youtube_url, output_dir)
    output_folder = os.path.join(output_dir, youtube_url.split("v=")[1].split("&")[0])
    video_id = os.path.basename(output_folder)
    audio_file_path = os.path.join(output_folder, f"{video_id}_audio.mp3")

    # 2. Нарезка на 2-минутные фрагменты
    run_module("main_narezka2mins", "narezka2mins", audio_file_path)

    # 3. Транскрипция текста
    run_module("mainWhisperLvlText", "lvlText", video_id)

    # 4. Анализ текста через GPT
    run_module("APIGPT", "apgpt", video_id)

    # 5. Генерация коротких аудиофрагментов
    run_module("mainGenerateAudioShorts", "generateAudioShorts", video_id)

    # 6. Определение длительности фрагментов
    run_module("DURATION", "duration", video_id)

    # 7. Транскрипция на уровне слов
    run_module("audioToTextWORDS", "lvlWord", video_id)

    # 9. Анализ и коррекция аудио
    run_module("DetectionGOVNISHE", "DetectionGovnishe", video_id)

    # 10. Генерация видео
    run_module("VideoWordBlur", "process_video", video_id)

    print("Все этапы обработки завершены!")

if __name__ == "__main__":
    main()



Download.
Этот модуль получает ссылку на ютаб видео и скачивает его в разрешении до 1080p, плюс отдельно аудиодорожку в формате mp3. Для скачивания используется библиотека yt-dlp.
Модуль создает папку work/[video_id], где video_id — уникальный идентификатор видео из ссылки. В эту папку загружаются два файла: [video_id].mp4 (видео) и [video_id]_audio.mp3 (аудио). yt-dlp настроен так, чтобы видео скачивалось с лучшим качеством до 1080p (параметр -f bestvideo[height<=1080]+bestaudio), а аудио — в формате mp3 с максимальным битрейтом.

Python:
import os
import re
import subprocess
from yt_dlp import YoutubeDL

def download_youtube_video_and_audio(url, output_dir):
    """
    Скачивание ютаб аудио, создание папки с именем, равным ключу видео.
    """

    ydl_opts = {
        'quiet': True,
        'no_warnings': True,
        'extract_flat': True,
    }
    with YoutubeDL(ydl_opts) as ydl:
        info_dict = ydl.extract_info(url, download=False)
        video_id = info_dict.get('id')
        if not video_id:
            raise ValueError("Не удалось извлечь ID видео из ссылки")

    # создание папки с именем, равным ключу ютаб видео
    output_folder = os.path.join(output_dir, video_id)
    os.makedirs(output_folder, exist_ok=True)

    video_command = [
        'yt-dlp',
        '-f', 'bestvideo[height<=1080]+bestaudio/best[height<=1080]',  # Только до 1080p
        '--merge-output-format', 'mp4',
        '-o', os.path.join(output_folder, f'{video_id}.%(ext)s'),
        url
    ]

    audio_command = [
        'yt-dlp',
        '-f', 'bestaudio',
        '--extract-audio',
        '--audio-format', 'mp3',
        '-o', os.path.join(output_folder, f'{video_id}_audio.%(ext)s'),
        url
    ]

    #Запускаем команды
    print("Скачивание видео...")
    subprocess.run(video_command)

    print("Скачивание аудио...")
    subprocess.run(audio_command)

    return output_folder, video_id

mainNarezka2mins.
Модуль нарезает аудиодорожку [video_id]_audio.mp3 на 2-минутные фрагменты, чтобы видеокарте не было плохо во время транскрибации. Две-три минуты нормальная длительность для GPU с 8 ГБ видеопамяти, так как Whisper жрет ресурсы, особенно на больших чанках. Нарезка делается через ffmpeg с параметром -c copy, который копирует аудиопоток без перекодирования.

Фрагменты сохраняются в папку work/[video_id]/2mins с именами вида 2min_01_0.00_120.00.mp3, где:

2min — индикатор длительности.
01 — порядковый номер фрагмента.
0.00_120.00 — таймкоды относительно оригинального аудио.

Модуль сначала запрашивает длительность аудиофайла через ffprobe (команда -show_entries format=duration), затем итеративно режет аудио с шагом 120 секунд.

Python:
import os
import subprocess

def narezka2mins(audio_file_path):
 
    output_dir = os.path.join(os.path.dirname(audio_file_path), "2mins")
    os.makedirs(output_dir, exist_ok=True)

    command = [
        'ffprobe', '-i', audio_file_path, '-show_entries', 'format=duration', '-v', 'quiet', '-of', 'csv=%s' % ("p=0")
    ]
    duration = float(subprocess.check_output(command).decode('utf-8').strip())

    segment_duration = 120
    start_time = 0
    segment_number = 1

    while start_time < duration:
        end_time = min(start_time + segment_duration, duration)
        output_file = os.path.join(output_dir, f"2min_{segment_number:02d}_{start_time:.2f}_{end_time:.2f}.mp3")
 
        command = [
            'ffmpeg', '-i', audio_file_path, '-ss', str(start_time), '-to', str(end_time),
            '-c', 'copy', output_file
        ]
        subprocess.run(command, check=True)
 
        start_time += segment_duration
        segment_number += 1

    print(f"Аудиофайл успешно нарезан на фрагменты по 2 минуты в папке {output_dir}")

mainWhisperLvlText.
Прогоняет 2-минутные аудиофрагменты из work/[video_id]/2mins через Whisper для транскрибации с таймкодами на уровне предложений. Результат сохраняется в файл work/[video_id]/transcripts/transcripts.txt в формате:

Файл: 2min_01_0.00_120.00.mp3
Понятно? Жалость это хуже удара по яйцам.| (1.0, 3.84)
Когда на вас давят на жалость, поймите, вам бьют по яйцам, буквально.| (3.92, 7.48)
Понижают ваш тестостерон и повышают пролактин.| (7.6, 9.8)
Подходит тебе баба и говорит, меня ебало 40 человек, мне 20 лет.| (10.48, 15.22)
Ты такой, члюха.| (15.48, 16.52)
Подходит баба и говорит, я всю жизнь не могла найти любимого, я скиталась от одного к другому в поисках людей.| (17.26, 23.7)
Католог был твой путь!| (25.1, 27.0)
Сорок хуёв!| (27.38, 28.36)
Дай тебя обогрею, моя родная!| (29.78, 31.82)
Я сниму с тебя всю эту печаль!| (32.88, 34.86)
Ведь это чисто мужская функция!| (35.12, 36.9)
Терпеть!| (37.44, 38.08)
И во что бы это ни стало!| (38.48, 40.04)
Любить!| (40.78, 41.46)
Уметь прощать!| (41.78, 43.24)
Принимать такой, какой есть!| (43.38, 44.88)
Не требовать лучшего!| (45.16, 46.48)
Не брать лучше! Не рвать пасть за свою добычу а принимать ну и что с кем ты спала ты достойна| (46.58, 59.98)

Файл: 2min_02_120.00_240.00.mp3
Любви. Ведь каждый в этой вселенной, в этом мире достойный.| (0.0, 5.0)
Любви. Это правда.| (5.0, 10.0)
Какая разница, сколько у него было мужчин.| (10.0, 14.0)
Это не уменьшает того, что она хорошая женщина.| (14.0, 18.0)
Никак не уменьшает.| (18.0, 20.0)
Уменьшает! Уменьшает! Уменьшает! Это уменьшает уменьшает это уменьшает| (20.0, 27.0)
уменьшает| (27.0, 32.0)
понимаешь ты не папа римский ты не мама| (32.0, 34.0)
тарана бл#ть чтобы решать ее проблемы| (34.0, 37.0)
где она была когда ты хотел любви| (37.0, 39.0)
где она была когда ты хотел любви| (39.0, 42.0)
где она была когда тебе было тяжело| (42.0, 44.0)
она ебалась с гопником| (44.0, 46.0)
который работал на заводе бл#ть или с бандитом с уголовником ездила в тюрьму ему| (46.0, 53.84)
отдавать передачки пока ты как еблан пить у лимон и удалил| (53.84, 60.0)
...

Используется пайплайн Whisper с параметрами

chunk_length_s=28,
batch_size=8,
stride_length_s=3,

Оптимизированный для дальнейшей обработки LLM. Чанк в 28 секунд дает достаточно контекста для точной пунктуации и пауз, но не перегружает модель, как при 30 секундах, когда таймкоды могут растягиваться на 100+ секунд. Перекрытие чанков (stride_length_s=3) предотвращает обрезку слов на стыках.
Модуль работает на CPU, чтобы не нагружать GPU, которая нужна для других задач. Если транскрипция уже есть, модуль пропускает обработку, чтобы не тратить время. Whisper настроен на русский язык.

Python:
import os
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
import gc

def lvlText(video_key):
 
        base_dir = "C:/work"
 
        transcripts_path = os.path.join(base_dir, video_key, "transcripts", "transcripts.txt")
        if os.path.exists(transcripts_path):
            print("Транскрипция уже выполнена. GoHome.")
            return

        audio_folder = os.path.join(base_dir, video_key, "2mins")
        if not os.path.exists(audio_folder):
            raise FileNotFoundError(f"Папка с аудиофрагментами не найдена: {audio_folder}")
 
        audio_files = [os.path.join(audio_folder, f) for f in os.listdir(audio_folder) if f.endswith(".mp3")]
        audio_files.sort()

        # Инициализация модели с явным управлением памятью
        device = "cuda:0" if torch.cuda.is_available() else "cpu"
        torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

        model_id = "openai/whisper-large-v3-turbo"
 
        # Загрузка модели с оптимизацией памяти
        model = AutoModelForSpeechSeq2Seq.from_pretrained(
            model_id,
            torch_dtype=torch_dtype,
            low_cpu_mem_usage=True,
            use_safetensors=True
        ).to(device)

        processor = AutoProcessor.from_pretrained(model_id)

        # Конфигурация пайплайна
        pipe = pipeline(
            "automatic-speech-recognition",
            model=model,
            tokenizer=processor.tokenizer,
            feature_extractor=processor.feature_extractor,
            chunk_length_s=28,
            batch_size=8,
            stride_length_s=3,
            torch_dtype=torch_dtype,
            device=device,
        )

        # Создание директории для результатов
        output_dir = os.path.join(base_dir, video_key, "transcripts")
        os.makedirs(output_dir, exist_ok=True)

        # Обработка аудиофайлов
        with open(transcripts_path, "w", encoding="utf-8") as f:
            for audio_file in audio_files:
                result = pipe(audio_file, return_timestamps=True)
 
                f.write(f"Файл: {os.path.basename(audio_file)}\n")
                if "chunks" in result:
                    for chunk in result["chunks"]:
                        f.write(f"{chunk['text']}| {chunk['timestamp']}\n")
                else:
                    f.write(f"Текст: {result['text']}\n")
                f.write("\n")
 

        print(f"Транскрипция завершена. Результат сохранен в {transcripts_path}")

APIGPT.
Модуль для работы с llm через api. Да можно конечно локально запустить llm до кучи. но проще заплатить 5$
Ничего интересного кроме промпта тут нет. Слишком большой промпт лучше не писать, а то llm крышу сносит и начинает срать таймкодами длительность по 2-5 секунд.
Сохраняет таймкоды в один файл "transcriptsTimeSteps.txt" в следующем формате


"Файл: 2min_01_0.00_60.00.mp3 (1.0-9.8)
Файл: 2min_01_0.00_60.00.mp3 (10.48-16.52)
Файл: 2min_01_0.00_60.00.mp3 (17.26-27.0)
Файл: 2min_01_0.00_60.00.mp3 (29.78-36.9)
Файл: 2min_01_0.00_60.00.mp3 (37.44-46.48)
Файл: 2min_02_60.00_120.00.mp3 (20.0-46.0)
Файл: 2min_02_60.00_120.00.mp3 (37.0-60.0)
Файл: 2min_03_120.00_180.00.mp3 (0.0-7.22)
Файл: 2min_03_120.00_180.00.mp3 (7.58-19.0)
Файл: 2min_03_120.00_180.00.mp3 (23.82-27.4)
Файл: 2min_03_120.00_180.00.mp3 (27.66-31.56)
Файл: 2min_03_120.00_180.00.mp3 (33.16-38.68)..."

Модуль парсит транскрибацию по блокам (каждый блок — один аудиофрагмент), отправляет их в LLM и собирает ответы в единый файл. Если папка или файл не найдены, модуль кидает ошибку.

Python:
from openai import OpenAI
from pathlib import Path

client = OpenAI(
    api_key="sk-IXQTSZc61eGtp2FAr3FGhw",
    base_url="https://hubai.loe.gg/v1"
)

SYSTEM_PROMPT = """
Этот промт должен быть применен к каждому моему следующему сообщению.

Моя роль: я буду отправлять тебе текстовик в котором содержится текст и тайм коды.

Твоя роль: Быть гением SMM, создания коротких роликов(reels, shorts,tt...).
Ты акула. Тебе найти в этом тексте интересные моменты

Длина интересного момента которые ты ищешь: 20-60 секунд. Это ОЧЕНЬ важно!
Интересые моменты: моменты, которые можно использовать независимо
друг от друга. Это самодостаточные тексты, которые могут существовать
сами по себе. Имеют логическое начало и логическое завершение.
В начале интересного момента должна быть ключевая, цепляющая
фраза/слово или интригующая, которая быстро раскрывается к середине
интересного фрагмента.

Пример твоего ответа:
Файл: 2min_01_0.00_120.00.mp3 (19.86-48.3)
Файл: 2min_02_0.00_120.00.mp3 (89.0-112.82)
Файл: 2min_03_120.00_240.00.mp3 (23.92-48.9)
Файл: 2min_04_120.00_240.00.mp3 (93.74-113.36)
...

не забывай, длительность таймкодов которые ты ищешь 20-60 секунд! и ответ должен содержать только таймкоды и ничего лишнего.
"""

def parse_content(text: str) -> list:
    """Разбивает текст на блоки по файлам"""
    return [f"Файл: {block.strip()}" for block in text.split("Файл: ") if block.strip()]

def analyze_text(file_block: str) -> str:
    """Анализирует текст через API"""
    response = client.chat.completions.create(
        model="deepseek-chat",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": file_block}
        ]
    )
    return response.choices[0].message.content

def apgpt(key: str):
    """Основная функция для обработки транскрипции по ключу"""
    BASE_PATH = "C:/work"
    INPUT_FILE = f"{BASE_PATH}/{key}/transcripts.txt"
    OUTPUT_FILE = f"{BASE_PATH}/{key}/transcriptsTimeSteps.txt"

    # Проверяем существование папки с ключом
    if not Path(f"{BASE_PATH}/{key}").exists():
        print(f"Папка с ключом {key} не найдена")
        return False

    # Читаем файл
    if not Path(INPUT_FILE).exists():
        print(f"Файл {INPUT_FILE} не найден")
        return False

    with open(INPUT_FILE, 'r', encoding='utf-8') as f:
        content = f.read()

    # Обрабатываем каждый файл из текста и собираем результаты
    results = []
    for block in parse_content(content):
        result = analyze_text(block)
        print(result)
        results.append(result)
        print("-" * 50)

    # Сохраняем все результаты в файл
    with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
        f.write("\n".join(results))
 
    print(f"Результаты сохранены в {OUTPUT_FILE}")
    return True

mainGenerateAudioShorts.
Модуль нарезает оригинальную аудиодорожку [video_id]_audio.mp3 по таймкодам из transcriptsTimeSteps.txt. Функция pars_timesteps парсит файл через регулярные выражения, извлекая start_offset (начало 2-минутного фрагмента), start и end (таймкоды внутри фрагмента). Абсолютные таймкоды высчитываются:

absolute_start = start_offset + start
absolute_end = start_offset + end

Если формат файла не совпадает, парсинг ломается — пришлось добавить кучу проверок, чтобы отловить косяки.
Нарезка делается через ffmpeg с -c copy для скорости, фрагменты сохраняются в work/[video_id]/shorts_audio как clip_1.mp3, clip_2.mp3 и т.д. Перед нарезкой старые файлы и транскрипции удаляются, чтобы не было бардака. Если число созданных файлов не совпадает с числом таймкодов, модуль выдает предупреждение.

Python:
import os
import subprocess
import re

def pars_timesteps(file_path):
    timestamps = []
    with open(file_path, "r", encoding="utf-8") as f:
        print(f"Чтение файла: {file_path}")
        for line_number, line in enumerate(f, 1):
            line = line.strip()
            if not line:
                print(f"Пропущена пустая строка {line_number}")
                continue
            print(f"Обрабатываем строку {line_number}: '{line}'")
            match = re.match(r'Файл: 2min_\d+_(\d+\.\d+)_(\d+\.\d+)\.mp3 \((\d+\.\d+)-(\d+\.\d+)\).*', line)
            if match:
                start_offset = float(match.group(1))
                start = float(match.group(3))
                end = float(match.group(4))
                absolute_start = start_offset + start
                absolute_end = start_offset + end
                duration = absolute_end - absolute_start
                if absolute_end <= absolute_start:
                    print(f"Ошибка в строке {line_number}: Конец ({absolute_end}) <= Начало ({absolute_start}): {line}")
                    continue
                timestamps.append((absolute_start, absolute_end))
                print(f"Извлечено в строке {line_number}: start_offset={start_offset}, start={start}, end={end}, абсолютные: ({absolute_start}, {absolute_end}), длительность: {duration} сек")
            else:
                print(f"Строка {line_number} не соответствует формату: '{line}'")
    print(f"Всего извлечено таймкодов: {len(timestamps)}")
    return timestamps

def generateAudioShorts(video_key):
    base_dir = "C:/work"
    print(f"Используется base_dir: {base_dir}")
 
    original_audio_path = os.path.join(base_dir, video_key, f"{video_key}_audio.mp3")
    if not os.path.exists(original_audio_path):
        raise FileNotFoundError(f"Оригинальный аудиофайл не найден: {original_audio_path}")

    timestamps_file_path = os.path.join(base_dir, video_key, "transcriptsTimeSteps.txt")
    if not os.path.exists(timestamps_file_path):
        raise FileNotFoundError(f"Файл с таймкодами не найден: {timestamps_file_path}")

    timestamps = pars_timesteps(timestamps_file_path)
    print(f"Извлечённые таймкоды: {timestamps}")

    output_dir = os.path.join(base_dir, video_key, "shorts_audio")
    os.makedirs(output_dir, exist_ok=True)

    # Очистка старых файлов
    for old_file in os.listdir(output_dir):
        if old_file.startswith("clip_") and old_file.endswith(".mp3"):
            os.remove(os.path.join(output_dir, old_file))
            print(f"Удалён старый аудиофайл: {old_file}")

    # Очистка старых транскрипций
    transcript_dir = os.path.join(base_dir, video_key, "transcripts_lvl_word")
    if os.path.exists(transcript_dir):
        for old_file in os.listdir(transcript_dir):
            if old_file.startswith("clip_") and old_file.endswith("_timeStepsLvlWord.txt"):
                os.remove(os.path.join(transcript_dir, old_file))
                print(f"Удалён старый файл транскрипции: {old_file}")

    for i, (start_time, end_time) in enumerate(timestamps):
        output_file = os.path.join(output_dir, f"clip_{i+1}.mp3")
        duration = end_time - start_time
        print(f"Создание клипа {i+1}: {output_file}, таймкоды: ({start_time}, {end_time}), длительность: {duration} сек")
        command = [
            'ffmpeg', '-y', '-i', original_audio_path, '-ss', str(start_time), '-to', str(end_time),
            '-c', 'copy', output_file
        ]
        try:
            result = subprocess.run(command, check=True, capture_output=True, text=True)
            print(f"Фрагмент {i+1} сохранён в {output_file}")
            print(f"ffmpeg stdout: {result.stdout}")
            print(f"ffmpeg stderr: {result.stderr}")
        except subprocess.CalledProcessError as e:
            print(f"Ошибка при создании клипа {i+1}: {e}")
            print(f"ffmpeg stderr: {e.stderr}")
            continue

    created_files = [f for f in os.listdir(output_dir) if f.startswith("clip_") and f.endswith(".mp3")]
    print(f"Нарезка аудио завершена. Создано файлов: {len(created_files)} ({created_files})")
    if len(created_files) != len(timestamps):
        print(f"ВНИМАНИЕ: Ожидалось {len(timestamps)} файлов, создано {len(created_files)}")

audioToTextWORDS.
Второй модуль с Whisper, который транскрибирует аудиофрагменты из shorts_audio с таймкодами на уровне слов для субтитров. Использует пайплайн с chunk_length_s=20, batch_size=1, чтобы добиться высокой гранулярности. Работает на CPU, так как словесная транскрибация грузит память.
Результат сохраняется в work/[video_id]/shorts_audio/transcripts_lvl_word как clip_1_timeStepsLvlWord.txt и т.д., в формате:

"Выходя| (0.0, 1.04)
из| (1.04, 1.46)
автобуса,| (1.46, 2.2)
подавая| (2.2, 2.54)
руку| (2.54, 2.76)
женщине.| (2.76, 3.52)
Я| (3.52, 3.74)
до| (3.74, 3.92)
13| (3.92, 4.3)
лет| (4.3, 4.62)
подавал| (4.62, 5.16)
женщинам| (5.16, 5.68)
и| (5.68, 5.76)
знакомым| (5.76, 6.14)
руку,| (6.14, 6.42)
выходя| (6.42, 6.72)
из| (6.72, 6.82)
маршрутки.| (6.82, 7.36)
Знаешь| (7.36, 7.6)
до| (7.6, 7.72)
какого| (7.72, 8.02)
дня?| (8.02, 8.72)
Когда| (8.72, 8.98)
папа| (8.98, 9.32)
отбил| (9.32, 9.62)
мне| (9.62, 9.76)
руку| (9.76, 9.96)
и| (9.96, 10.06)
сказал,| (10.06, 10.34)
иди| (10.34, 10.5)
отсюда,| (10.5, 10.88)
чурка.| (10.88, 12.4)
И| (12.4, 13.1)
тогда| (13.1, 13.28)
я| (13.28, 13.48)
понял,| (13.48, 13.9)
что| (13.9, 14.04)
мама| (14.04, 14.32)
была| (14.32, 14.56)
не| (14.56, 14.74)
права.| (14.74, None)"

Часто последнее слово обрывается, и end получает значение None. Whisper пытается его додумать, что обычно работает, но звучит плохо из за резкого обрыва слова. Эту проблему решают следующие модули.

Python:
import os
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline

def lvlWord(video_id):
    base_dir = "C:/work"
    audio_folder = os.path.join(base_dir, video_id, "shorts_audio")
    output_folder = os.path.join(audio_folder, "transcripts_lvl_word")
    os.makedirs(output_folder, exist_ok=True)

    # Инициализация модели
    device = "cuda:0" if torch.cuda.is_available() else "cpu"
    torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

    model = AutoModelForSpeechSeq2Seq.from_pretrained(
        "openai/whisper-large-v3-turbo",
        torch_dtype=torch_dtype,
        low_cpu_mem_usage=True,
        use_safetensors=True
    ).to(device)

    processor = AutoProcessor.from_pretrained("openai/whisper-large-v3-turbo")

    # Конфигурация пайплайна (как вы указали)
    pipe = pipeline(
        "automatic-speech-recognition",
        model=model,
        tokenizer=processor.tokenizer,
        feature_extractor=processor.feature_extractor,
        chunk_length_s=28,
        batch_size=1,
        torch_dtype=torch_dtype,
        device=device,
    )

    for audio_file in os.listdir(audio_folder):
        if not audio_file.endswith(".mp3"):
            continue

        audio_file_path = os.path.join(audio_folder, audio_file)
        output_file_path = os.path.join(
            output_folder,
            f"{os.path.splitext(audio_file)[0]}_timeStepsLvlWord.txt"
        )

        # Обработка без torch.no_grad() как вы просили
        result = pipe(audio_file_path, return_timestamps="word")

        # Сохранение результатов
        with open(output_file_path, "w", encoding="utf-8") as f:
            if "chunks" in result:
                for chunk in result["chunks"]:
                    f.write(f"{chunk['text']}| {chunk['timestamp']}\n")

        print(f"Таймкоды для {audio_file} сохранены в {output_file_path}")

DURATION + DetectionGOVNISHE.
Эти модули работают в паре, чтобы исправить обрезанные слова в аудиофрагментах.
DURATION использует pydub для подсчета длительности каждого аудиофайла в shorts_audio. Результат сохраняется в durations.txt в формате:

clip_1.mp3: 2,28 секунд

Это нужно, чтобы знать точное время окончания фрагмента, особенно для последнего слова, где Whisper ставит None.
DetectionGOVNISHE анализирует последнее слово в каждом файле clip_X_timeStepsLvlWord.txt. Он считает длительность произношения, исходя из количества букв:
  • Гласные: 0.1 секунды.
  • Согласные: 0.08 секунды.
Если длительность последнего слова меньше ожидаемой, фрагмент удаляется, и на его место нарезается новый с увеличенным end через ffmpeg. Метод примитивный, так как темп речи у всех разный. Лучше бы анализировать среднюю длительность слов спикера по другим таймкодам, но я не стал заморачиваться.

Python:
from pydub import AudioSegment
import os

def duration(video_key):

    base_dir="C:/work"
 
    audio_folder = os.path.join(base_dir, video_key, "shorts_audio")  # Замените на ваш путь

    output_file_path = os.path.join(audio_folder, "durations.txt")

    with open(output_file_path, "w", encoding="utf-8") as f:
        for audio_file in os.listdir(audio_folder):
            if audio_file.endswith(".mp3"):
                audio_file_path = os.path.join(audio_folder, audio_file)

                audio = AudioSegment.from_file(audio_file_path)

                duration_ms = len(audio)

                duration_seconds = round(duration_ms / 1000, 2)

                f.write(f"{audio_file}: {duration_seconds} секунд\n")

    print(f"Длительности аудиофрагментов сохранены в {output_file_path}")

Python:
import os
import subprocess
import re
import gc
from concurrent.futures import ThreadPoolExecutor, as_completed

# Константы для длительности звуков
VOWEL_DURATION = 0.10  # гласные
CONSONANT_DURATION = 0.08  # согласные
MAX_WORKERS = 4  # Количество потоков для параллельной обработки

def read_durations(durations_file):
    """Чтение файла с длительностями с обработкой ошибок"""
    durations = {}
    try:
        with open(durations_file, "r", encoding="utf-8") as f:
            for line in f:
                if ":" in line:
                    parts = line.strip().split(":")
                    if len(parts) == 2:
                        file_name, duration = parts
                        try:
                            durations[file_name.strip()] = float(duration.strip().replace("секунд", ""))
                        except ValueError:
                            continue
    except Exception as e:
        print(f"Ошибка чтения файла длительностей: {e}")
    return durations

def read_transcript(transcript_file):
    """Безопасное чтение файла транскрипции"""
    transcript = []
    try:
        with open(transcript_file, "r", encoding="utf-8") as f:
            for line in f:
                if "|" in line:
                    parts = line.strip().split("|")
                    if len(parts) == 2:
                        word, timestamps = parts
                        try:
                            start, end = eval(timestamps.strip())
                            transcript.append((word.strip(), (start, end)))
                        except:
                            continue
    except Exception as e:
        print(f"Ошибка чтения транскрипции {transcript_file}: {e}")
    return transcript

def count_vowels_consonants(word):
    """Подсчет гласных и согласных с поддержкой Unicode"""
    vowels = "аеёиоуыэюяaeiou"
    consonants = "бвгджзйклмнпрстфхцчшщbcdfghjklmnpqrstvwxyz"
    vowel_count = sum(1 for char in word.lower() if char in vowels)
    consonant_count = sum(1 for char in word.lower() if char in consonants)
    return vowel_count, consonant_count

def get_expected_duration(word):
    """Вычисление ожидаемой длительности слова"""
    vowel_count, consonant_count = count_vowels_consonants(word)
    return (vowel_count * VOWEL_DURATION) + (consonant_count * CONSONANT_DURATION)

def read_timecodes(file_path):
    """Чтение таймкодов с обработкой ошибок"""
    timecodes = []
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            for line in file:
                match = re.match(r'Файл: 2min_\d+_(\d+\.\d+)_(\d+\.\d+)\.mp3 \((\d+\.\d+)-(\d+\.\d+)\) .*', line)
                if match:
                    try:
                        start_offset = float(match.group(1))
                        start = float(match.group(3))
                        end = float(match.group(4))
                        absolute_start = start_offset + start
                        absolute_end = start_offset + end
                        timecodes.append((absolute_start, absolute_end))
                    except ValueError:
                        continue
    except Exception as e:
        print(f"Ошибка чтения таймкодов: {e}")
    return timecodes

def cut_audio_with_extra_time(audio_path, start, end, output_file, extra_time):
    """Нарезка аудио с дополнительным временем с контролем ресурсов"""
    new_end = min(end + extra_time, start + 60)  # Ограничение максимальной длительности
    try:
        subprocess.run([
            'ffmpeg', '-y', '-i', audio_path, '-ss', str(start), '-to', str(new_end),
            '-c:a', 'libmp3lame', '-b:a', '192k', output_file
        ], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        return True
    except subprocess.CalledProcessError as e:
        print(f"Ошибка нарезки аудио: {e.stderr.decode('utf-8')[:200]}")
        return False
    except Exception as e:
        print(f"Неожиданная ошибка при нарезке аудио: {e}")
        return False

def process_audio_file(args):
    """Обработка одного аудиофайла (для многопоточной обработки)"""
    audio_file, duration, transcript, original_audio_path, start, end, output_folder = args
 
    if not transcript:
        return audio_file, False, "Отсутствует транскрипция"

    last_word = transcript[-1]
    last_word_text = last_word[0]
    last_word_start = last_word[1][0]
    last_word_end = last_word[1][1] if last_word[1][1] is not None else duration

    last_word_duration = last_word_end - last_word_start
    expected_duration = get_expected_duration(last_word_text)

    if last_word_duration < expected_duration:
        extra_time = expected_duration - last_word_duration
        output_file = os.path.join(output_folder, audio_file)
 
        if cut_audio_with_extra_time(original_audio_path, start, end, output_file, extra_time):
            return audio_file, True, f"Скорректировано (+{extra_time:.2f} сек)"
        else:
            return audio_file, False, "Ошибка коррекции"
 
    # Если аудио не требует коррекции, просто сохраняем его
    output_file = os.path.join(output_folder, audio_file)
    if cut_audio_with_extra_time(original_audio_path, start, end, output_file, 0):
        return audio_file, True, "OK"
    else:
        return audio_file, False, "Ошибка сохранения"

def DetectionGovnishe(video_key):
    """Основная функция для анализа и коррекции аудиофрагментов"""
    base_dir = "C:\work"
 
    try:
        # Инициализация путей
        audio_folder = os.path.join(base_dir, video_key, "shorts_audio")
        transcripts_folder = os.path.join(audio_folder, "transcripts_lvl_word")
        durations_file = os.path.join(audio_folder, "durations.txt")

        # Проверка исходного аудио
        original_audio_path = os.path.join(base_dir, video_key, f"{video_key}_audio.mp3")
        if not os.path.exists(original_audio_path):
            raise FileNotFoundError(f"Оригинальный аудиофайл не найден: {original_audio_path}")

        # Чтение данных
        durations = read_durations(durations_file)
        timecodes = read_timecodes(os.path.join(base_dir, video_key, "transcriptsTimeSteps.txt"))

        # Подготовка задач для многопоточной обработки
        tasks = []
        for i, (start, end) in enumerate(timecodes):
            audio_file = f"clip_{i + 1}.mp3"
            duration = durations.get(audio_file, 0)
            transcript_file = os.path.join(transcripts_folder, audio_file.replace(".mp3", "_timeStepsLvlWord.txt"))
 
            if os.path.exists(transcript_file):
                transcript = read_transcript(transcript_file)
                tasks.append((audio_file, duration, transcript, original_audio_path, start, end, audio_folder))  # Сохраняем сразу в shorts_audio

        # Многопоточная обработка
        results = []
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            futures = [executor.submit(process_audio_file, task) for task in tasks]
            for future in as_completed(futures):
                results.append(future.result())
                gc.collect()  # Очистка памяти после каждой задачи

        # Анализ результатов
        good_audio = [r[0] for r in results if r[1]]
        bad_audio = [(r[0], r[2]) for r in results if not r[1]]

        # Вывод отчета
        print("\n=== Итоговый отчет ===")
        print(f"Успешно обработано: {len(good_audio)} файлов")
        print(f"Проблемные файлы ({len(bad_audio)}):")
        for file, reason in bad_audio:
            print(f"  {file}: {reason}")

        return {
            'good': good_audio,
            'bad': bad_audio,
            'total': len(results)
        }

    except Exception as e:
        print(f"Критическая ошибка в DetectionGovnishe: {e}")
        return None
    finally:
        gc.collect()

VideoWordBlur.

Основной рендерер для вертикальных видео. Реализован как класс VideoProcessor, который делает ролики 1080x1920 (9:16) с субтитрами и блюром сверху/снизу.
Ключевые шаги:

Обрезка черных полос: Метод detect_black_bars анализирует первый кадр через OpenCV. Кадр конвертируется в градации серого (cv2.cvtColor), затем применяется бинарный порог (cv2.threshold, интенсивность <= 20 — черный, >20 — белый). Суммирование пикселей по строкам и столбцам (np.sum(thresh, axis=1)/255) определяет границы полос. Если полос нет, видео не обрезается.

Создание вертикального видео: Видео масштабируется до ширины 1080 пикселей, центрируется, сверху иснизу добавляется размытый фон через ffmpeg (boxblur=10).

Субтитры: Таймкоды из transcripts_lvl_word используются для наложения текста через moviepy (TextClip). moviepy нужен только для субтитров, всё остальное делает ffmpeg (нарезка, масштабирование).

Результат — файлы final_with_subs_clip_X.mp4 в shorts/[video_id]. Обработка идет параллельно через ProcessPoolExecutor для ускорения.

Python:
import os
import re
import subprocess
import concurrent.futures
from moviepy import TextClip, VideoFileClip, CompositeVideoClip
import cv2
import numpy as np

class VideoProcessor:
    # ======================== КОНСТАНТЫ ========================
    BASE_WORK_DIR = "C:/work"
    BASE_OUTPUT_DIR = "C:/shorts"
 
    # Настройки текста
    TEXT_SETTINGS = {
        'font_size': 65,
        'color': 'white',
        'font': "C:/Windows/Fonts/sylfaen.ttf",
        'position': ('center', 1400)  # Сдвигаем текст ниже для вертикального формата
    }

    # Настройки вертикального видео
    OUTPUT_RESOLUTION = (1080, 1920)  # 9:16

    def __init__(self, video_key):
        self.video_key = video_key
        self.original_video = os.path.join(self.BASE_WORK_DIR, video_key, f"{video_key}.mp4")
        self.timecodes_file = os.path.join(self.BASE_WORK_DIR, video_key, "transcriptsTimeSteps.txt")
        self.transcripts_dir = os.path.join(self.BASE_WORK_DIR, video_key, "transcripts_lvl_word")
        self.output_dir = os.path.join(self.BASE_OUTPUT_DIR, video_key)
 
        os.makedirs(self.output_dir, exist_ok=True)
 
        # Определяем координаты для обрезки черных полос
        self.crop_params = self.detect_black_bars()

    def detect_black_bars(self):
        """Определяет координаты для обрезки черных полос"""
        cap = cv2.VideoCapture(self.original_video)
        ret, frame = cap.read()
        cap.release()
        if not ret:
            print("Ошибка чтения видео")
            return None
 
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        _, thresh = cv2.threshold(gray, 20, 255, cv2.THRESH_BINARY)
        row_sums = np.sum(thresh, axis=1) / 255
        col_sums = np.sum(thresh, axis=0) / 255
 
        frame_h, frame_w = frame.shape[:2]
        threshold_pixels = 10
        top_bar = 0
        for i, row_sum in enumerate(row_sums):
            if row_sum > threshold_pixels:
                top_bar = i
                break
        bottom_bar = frame_h
        for i, row_sum in enumerate(row_sums[::-1]):
            if row_sum > threshold_pixels:
                bottom_bar = frame_h - i
                break
        left_bar = 0
        for i, col_sum in enumerate(col_sums):
            if col_sum > threshold_pixels:
                left_bar = i
                break
        right_bar = frame_w
        for i, col_sum in enumerate(col_sums[::-1]):
            if col_sum > threshold_pixels:
                right_bar = frame_w - i
                break
 
        if top_bar == 0 and bottom_bar == frame_h and left_bar == 0 and right_bar == frame_w:
            print("Чёрные полосы не найдены")
            return None
 
        crop_params = {
            'x_min': left_bar,
            'y_min': top_bar,
            'width': right_bar - left_bar,
            'height': bottom_bar - top_bar
        }
        print(f"Обрезка: x={left_bar}, y={top_bar}, w={right_bar-left_bar}, h={bottom_bar-top_bar}")
        return crop_params

    def parse_timecodes(self):
        """Парсит временные отрезки из файла с таймкодами"""
        time_segments = []
        with open(self.timecodes_file, 'r', encoding='utf-8') as f:
            for line in f:
                if match := re.match(r'Файл: 2min_\d+_(\d+\.\d+)_(\d+\.\d+)\.mp3 \((\d+\.\d+)-(\d+\.\d+)\)', line.strip()):
                    segment_start = float(match.group(1))
                    time_segments.append((
                        segment_start + float(match.group(3)),
                        segment_start + float(match.group(4))
                    ))
        return time_segments

    def read_timestamps(self, clip_num):
        """Читает временные метки для конкретного клипа"""
        timestamp_file = os.path.join(self.transcripts_dir, f"clip_{clip_num}_timeStepsLvlWord.txt")
        if not os.path.exists(timestamp_file):
            return []
 
        timestamps = []
        with open(timestamp_file, 'r', encoding='utf-8') as file:
            for line in file:
                line = line.strip()
                if not line:
                    continue
                parts = line.split("|")
                if len(parts) != 2:
                    continue
                word, times = parts
                start_end = times.strip().strip("()").split(", ")
                if len(start_end) != 2:
                    continue
                start, end = start_end
                timestamps.append((word, float(start), float(end) if end != "None" else None))
        return timestamps

    def create_text_clips(self, timestamps, video_duration, video_width):
        """Создает текстовые клипы для наложения"""
        clips = []
        for word, start, end in timestamps:
            end_time = end if end is not None else video_duration
            txt_clip = TextClip(
                text=word,
                font_size=self.TEXT_SETTINGS['font_size'],
                color=self.TEXT_SETTINGS['color'],
                font=self.TEXT_SETTINGS['font'],
                size=(video_width, self.TEXT_SETTINGS['font_size'])
            ).with_position(self.TEXT_SETTINGS['position']).with_start(start).with_end(end_time)
            clips.append(txt_clip)
        return clips

    def process_single_clip(self, clip_info):
        """Обрабатывает один клип - центрирование, добавление заблюренного фона и текста"""
        clip_num, start_time, end_time = clip_info
 
        # Временные файлы
        temp_path = os.path.join(self.output_dir, f"temp_clip_{clip_num}.mp4")
        final_path = os.path.join(self.output_dir, f"final_with_subs_clip_{clip_num}.mp4")
 
        try:
            # 1. Обрезка видео по времени с учётом черных полос
            crop_filter = ""
            if self.crop_params:
                crop_filter = f"crop={self.crop_params['width']}:{self.crop_params['height']}:{self.crop_params['x_min']}:{self.crop_params['y_min']},"
 
            subprocess.run([
                'ffmpeg',
                '-y',
                '-ss', str(start_time),
                '-to', str(end_time),
                '-i', self.original_video,
                '-vf', f'{crop_filter}scale=iw:ih',  # Обрезаем черные полосы, если есть
                '-c:v', 'libx264',
                '-preset', 'ultrafast',
                '-c:a', 'copy',
                temp_path
            ], check=True)

            # 2. Создание вертикального видео с заблюренным фоном
            blurred_path = os.path.join(self.output_dir, f"temp_blurred_{clip_num}.mp4")
            subprocess.run([
                'ffmpeg',
                '-y',
                '-i', temp_path,
                '-filter_complex',
                f'[0:v]scale={self.OUTPUT_RESOLUTION[0]}:-1[main];'  # Основное видео: ширина 1080, высота пропорционально
                f'[0:v]scale={self.OUTPUT_RESOLUTION[0]}:{self.OUTPUT_RESOLUTION[1]},boxblur=10[blurred];'  # Фон: полный размер 1080x1920 с размытием
                f'[blurred][main]overlay=0:(main_h-overlay_h)/2:shortest=1',  # Накладываем основное видео по центру
                '-c:v', 'libx264',
                '-preset', 'fast',
                '-c:a', 'copy',
                '-s', f'{self.OUTPUT_RESOLUTION[0]}x{self.OUTPUT_RESOLUTION[1]}',  # Явно задаём разрешение
                blurred_path
            ], check=True)

            # 3. Добавление текста с помощью moviepy
            video = VideoFileClip(blurred_path)
            timestamps = self.read_timestamps(clip_num)
            if timestamps:
                text_clips = self.create_text_clips(timestamps, video.duration, video.size[0])
                final_clip = CompositeVideoClip([video] + text_clips)
            else:
                final_clip = video
 
            final_clip.write_videofile(
                final_path,
                codec="libx264",
                audio_codec="aac",
                fps=24,
                threads=4,
                preset='ultrafast'
            )
 
            video.close()
            final_clip.close()
            os.remove(temp_path)
            os.remove(blurred_path)
            return True
 
        except Exception as e:
            print(f"Ошибка в клипе {clip_num}: {e}")
            if os.path.exists(temp_path):
                os.remove(temp_path)
            if os.path.exists(blurred_path):
                os.remove(blurred_path)
            return False

    def process_all_clips(self):
        """Обрабатывает все клипы параллельно"""
        time_segments = self.parse_timecodes()
        if not time_segments:
            print("Не найдены временные отрезки для обработки")
            return
 
        tasks = [(i+1, start, end) for i, (start, end) in enumerate(time_segments)]
 
        with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
            results = list(executor.map(self.process_single_clip, tasks))
 
        print(f"Обработка завершена. Успешно: {sum(results)} из {len(results)}")

def process_video(video_key):
    """Функция для вызова из основного скрипта"""
    processor = VideoProcessor(video_key)
    processor.process_all_clips()

Оригинал
IMG_8145.jpeg


Результат. 56 роликов из 16 минутного оригинала за 43 минуты. Мой любимый «16», про историю с рукой и маршруткой.
Можно заметить жирные полосы со всех сторон на оригинале, все обрезалось успешно.
IMG_8146.png

IMG_8147.jpeg


VideoStreamDetection.
Модуль для стримов с одной вебкой, работает отдельно от общей структуры проекта, путь к видео задается вручную. Создает вертикальное видео 1080x1920: вебка — 30% высоты (576 пикселей), стрим — 70% (1344 пикселя).
Код нацелен только на стримы где у стримера обязательно есть вэбка.
YOLO11n детектит вебку, собирая 10 детектов с интервалом 3 секунды. Коорды сглаживаются через np.median, чтобы убрать разброс (±2 пикселя из-за движений стримера).
Далее создается вертикальный ролик, где верхнюю часть занимает вэбка стримера и нижнюю запись стрима.
Вэбка и запись стрима рендерятся отдельно в разных потоках, я это делал для отладки чтобы решить проблемы с масштабом записи стрима. Далее полученные результаты соединяются через ffmpeg в одно видео.

Python:
import cv2
import numpy as np
from ultralytics import YOLO
import ffmpeg
import sys

# Загрузка модели YOLO
model = YOLO('best.pt')

# Путь к видео
video_path = 'C:/work/-t_nEsRWEiU/jopa1.mp4'
cap = cv2.VideoCapture(video_path)

# Получение FPS
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps * 3)

# Список для хранения координат вебки
webcam_bboxes = []
frame_count = 0
max_detections = 10

print("Собираем детекты вебки...")

# Первый проход: собираем координаты вебки
while cap.isOpened() and len(webcam_bboxes) < max_detections:
    ret, frame = cap.read()
    if not ret:
        break

    if frame_count % frame_interval == 0:
        results = model(frame)
        for box in results[0].boxes:
            if box.cls == 0:  # Класс вебки (chel)
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                webcam_bboxes.append((x1, y1, x2, y2))
                print(f"Детект {len(webcam_bboxes)}: ({x1}, {y1}, {x2}, {y2})")
                break

    frame_count += 1

cap.release()

if not webcam_bboxes:
    print("Вебка не найдена в видео!")
    sys.exit(1)

# Берем медиану координат
x1_vals, y1_vals, x2_vals, y2_vals = zip(*webcam_bboxes)
x1, y1, x2, y2 = (
    int(np.median(x1_vals)),
    int(np.median(y1_vals)),
    int(np.median(x2_vals)),
    int(np.median(y2_vals))
)
print(f"Финальные координаты вебки: ({x1}, {y1}, {x2}, {y2})")

# Определяем размеры для вертикального видео
final_width = 1080
final_height = 1920
webcam_height = int(final_height * 0.3)  # 30% высоты для вебки
screen_height = final_height - webcam_height

# Получаем исходное разрешение видео
try:
    probe = ffmpeg.probe(video_path)
    video_stream = next(s for s in probe['streams'] if s['codec_type'] == 'video')
    input_width = int(video_stream['width'])
    input_height = int(video_stream['height'])
    print(f"Исходное разрешение видео: {input_width}x{input_height}")
except ffmpeg.Error as e:
    print(f"Ошибка при получении разрешения видео: {e.stderr.decode()}")
    sys.exit(1)

# Параметры для ffmpeg
webcam_w = x2 - x1
webcam_h = y2 - y1

# Проверяем валидность координат
if webcam_w <= 0 or webcam_h <= 0 or x1 < 0 or y1 < 0 or x2 > input_width or y2 > input_height:
    print(f"Невалидные координаты вебки: ({x1}, {y1}, {x2}, {y2})")
    sys.exit(1)

# Вычисляем размеры для масштабирования экрана
aspect_ratio = input_width / input_height
scaled_width = int(screen_height * aspect_ratio)
scaled_height = screen_height

# Если масштабированная ширина меньше final_width, масштабируем по ширине
if scaled_width < final_width:
    scaled_width = final_width
    scaled_height = int(final_width / aspect_ratio)

# Проверяем, что размеры валидны
if scaled_width <= 0 or scaled_height <= 0:
    print(f"Невалидные размеры масштабирования: {scaled_width}x{scaled_height}")
    sys.exit(1)

print(f"Масштабированные размеры экрана: {scaled_width}x{scaled_height}")

# Вычисляем смещение для центрирования кропа экрана
crop_x = (scaled_width - final_width) // 2 if scaled_width > final_width else 0
crop_y = (scaled_height - screen_height) // 2 if scaled_height > screen_height else 0

# Сохраняем промежуточные видео для отладки
try:
    # Сохраняем только область веб-камеры
    video_input = ffmpeg.input(video_path)
    output = ffmpeg.output(
        video_input,
        'debug_webcam.mp4',
        **{
            'vf': f'crop={webcam_w}:{webcam_h}:{x1}:{y1},scale={final_width}:{webcam_height},setsar=1',
            'c:v': 'libx264',
            'c:a': 'aac',
            'preset': 'ultrafast'
        }
    )
    output.run(overwrite_output=True)
    print("Сохранено debug_webcam.mp4")

    # Сохраняем только экран
    output = ffmpeg.output(
        video_input,
        'debug_screen.mp4',
        **{
            'vf': f'scale={scaled_width}:{scaled_height},crop={final_width}:{screen_height}:{crop_x}:{crop_y},setsar=1',
            'c:v': 'libx264',
            'c:a': 'aac',
            'preset': 'ultrafast'
        }
    )
    output.run(overwrite_output=True)
    print("Сохранено debug_screen.mp4")
except ffmpeg.Error as e:
    print(f"Ошибка при сохранении отладочных видео: {e.stderr.decode()}")
    sys.exit(1)

# Основная обработка
try:
    video_input = ffmpeg.input(video_path)

    # Создаем потоки для веб-камеры и экрана
    webcam = (
        video_input['v']
        .filter('crop', webcam_w, webcam_h, x1, y1)
        .filter('scale', final_width, webcam_height)
        .filter('setsar', 1)
    )
    screen = (
        video_input['v']
        .filter('scale', scaled_width, scaled_height)
        .filter('crop', final_width, screen_height, crop_x, crop_y)
        .filter('setsar', 1)
    )

    # Объединяем потоки вертикально
    output_video = ffmpeg.filter([webcam, screen], 'vstack', inputs=2)

    # Объединяем с аудиопотоком
    output = ffmpeg.output(
        output_video,
        video_input['a'],
        'output_video.mp4',
        **{
            'c:v': 'libx264',
            'c:a': 'aac',
            'preset': 'ultrafast',
            'threads': 2,
            'pix_fmt': 'yuv420p'
        }
    )

    print("Команда ffmpeg:", ffmpeg.compile(output))
    output.run(overwrite_output=True)
except ffmpeg.Error as e:
    err = e.stderr.decode() if e.stderr else 'Нет подробностей ошибки'
    print('Ошибка при обработке видео:', err)
    sys.exit(1)

Оригинал
IMG_8148.jpeg


Результат
IMG_8149.jpeg

Вэбку лучше выставлять побольше по высоте, чтобы занимала 40%, так запись стрима можно будет не так сильно приближать и обзор увеличится.

Финалочка.
Все работает как заявлено и полностью соответствует описанию из начала статьи, работает через нажатие одной кнопки и составления промпта.

В сутки можно спокойно создавать от 1к до 3к коротких роликов. Видеокарта сильно не нагревается, на Whisper turbo выше 70 не поднималась, держалась в районе 63-68. На большой модели работа становится явно горячее, 76 градусов могло быть. У меня 4060 8гб.

На самом деле может быть хорошим инструментом при условии ручного монтажа. Нейронка может просто нарезать и дать субтитры, а все остальное самому клепать. 3к конечно не сделать, но работу облегчает точно. Было бы не плохо что то похожее сделать в виде плагина для какого нибудь adobe premiere.

Пока что можно использовать просто для нарезки стримов, с такой задачей справится не плохо если докрутить какие то визуальные моменты и переписать все полностью. Вертикальные ролики достойные, но скучно смотрится, от оригинала сильно зависит. Хотя бы к субтитрам анимацию как то сделать и этого уже будет достаточно чтобы заливать. Может CV еще дообучать чтобы трекать лица и следить за ними в кадре, но об этом может быть в следующий раз.
 
Последнее редактирование:
Пожалуйста, обратите внимание, что пользователь заблокирован
Забыл статью указать, как теперь префикс поставить ?
Просто напиши любому модератору. Например мне =) Поставил
 
Статья вроде полностью соответсвует тематике раздела.

«Искусственный интеллект (AI), машинное обучение (ML).
Утилиты и инструменты для управления данными в cybercrime и других отраслях...»

Обучение видеомонтажу, а какое отношение это имеет к хакингу и тематике форума? Вам на вк или ютуб с таким надо
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Вам на вк или ютуб с таким надо
А вам, с такими ответами, пожалуй нужно в раздел "ПРАВИЛА", после это на главную форума, для изучения раздело. Желаю удачи в это не лёгком деле
 


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх