• XSS.stack #1 – первый литературный журнал от юзеров форума

Генератор Мемов (Python, Gemini Flash 2.5, Telegram).

rand

CooL-Lamer
Эксперт
Регистрация
24.05.2023
Сообщения
581
Реакции
1 152
Депозит
0.07 Ł и др.
Написал: rand
Специально для: xss.pro

Всем привет, написал крутого бота генератора мемов и стендапов через обработку нейросети Gemini Flash (Api ключи регайте у Gemini фришные, реализована ротация ключей, 1 ключ 60 запросов (чем больше ключей, тем больше мемов за сутки и можете сгенерить).

Для запуска требуется поставить следующие либы:
Bash:
pip install google-generativeai
pip install aiogram
pip install pillow


Код:
Python:
"""
Телеграм-бот для генерации мемов и стендапов на основе загруженных изображений.
Использует модель Gemini 2.5 Flash для анализа изображений и создания текста.
"""
import time
import os
import logging
from pathlib import Path
import shutil
import google.generativeai as genai
import gc
import itertools
from collections import deque

# Библиотеки для работы с Telegram
from aiogram import Bot, Dispatcher, F
from aiogram.types import Message, FSInputFile, ReplyKeyboardMarkup, KeyboardButton, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.utils.markdown import hbold, hcode
from aiogram.enums import ChatType

# Библиотеки для асинхронной работы
import asyncio

# Библиотеки для обработки изображений
from PIL import Image, ImageDraw, ImageFont

# API ключи Gemini
GEMINI_API_KEYS = [
    "AIzaSyA2FItlFG0FDlSOpz8VMv6Agg-555555",
    "AIzaSyBzJ8Ix-hn0IurdK3ov3fmnbKsb8555555",
]

# Глобальные переменные для модели Gemini
gemini_model = None
current_key_index = 0
api_key_iterator = None
failed_keys = set()

# Глобальные переменные для очереди
processing_queue = deque()
is_processing = False
queue_lock = asyncio.Lock()
queue_task = None
bot_user_id = None

# Список путей к возможным шрифтам
possible_fonts = [
    Path("fonts/ImpactRegular.ttf"),
    Path("C:/Windows/Fonts/Arial.ttf"),
    Path("/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf"),
    Path("/usr/share/fonts/truetype/msttcorefonts/Arial.ttf"),
    Path("/Library/Fonts/Arial.ttf"),
]

# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Конфигурация путей
BASE_DIR = Path(__file__).parent
TEMP_DIR = BASE_DIR / "tmp"
OUTPUT_DIR = BASE_DIR / "output"
FONTS_DIR = BASE_DIR / "fonts"

# Создаем директории
TEMP_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
FONTS_DIR.mkdir(exist_ok=True)

# Константы
BOT_TOKEN = "7569029224:AAFiqMbSQxBtf9HdDjNuR1o0W4555555555"
MIN_IMAGE_SIZE = 512

# Эмодзи для интерфейса
EMOJI = {
    "start": "🚀",
    "meme": "😂",
    "standup": "🎭",
    "photo": "📷",
    "processing": "⚙️",
    "success": "✅",
    "error": "❌",
    "warning": "⚠️",
    "create": "🎨",
}

# Определение состояний FSM
class MemeStates(StatesGroup):
    awaiting_choice = State()

# Инициализация бота и диспетчера
bot = Bot(token=BOT_TOKEN)
dp = Dispatcher()

# Словарь для хранения данных пользователей
user_data = {}

# Класс для обновления прогресса
class ProgressMessageUpdater:
    def __init__(self, message: Message, initial_text: str, emoji: str):
        self.message = message
        self.initial_text = initial_text
        self.emoji = emoji
        self.running = False
        self.task = None
        self.seconds = 0
        self.sent_message = None

    async def start(self):
        self.running = True
        self.seconds = 0
        self.sent_message = await self.message.answer(f"{self.emoji} {self.initial_text} (0с)")
        self.task = asyncio.create_task(self._update_progress())
        return self.sent_message

    async def stop(self):
        if self.running:
            self.running = False
            if self.task:
                self.task.cancel()
            return self.seconds
        return 0

    async def _update_progress(self):
        try:
            while self.running:
                self.seconds += 1
                dots = "." * (1 + (self.seconds % 3))
                time_str = f"{self.seconds}с"

                encouragement = ""
                if self.seconds == 15:
                    encouragement = f"\n⏳ Пожалуйста, подождите..."
                elif self.seconds == 30:
                    encouragement = f"\n⏳ Почти готово..."
                elif self.seconds >= 45 and self.seconds % 15 == 0:
                    encouragement = f"\n⏳ Процесс занимает больше времени, чем обычно, но мы работаем!"

                new_text = f"{self.emoji} {self.initial_text}{dots} ({time_str}){encouragement}"

                try:
                    await self.sent_message.edit_text(new_text)
                except Exception:
                    pass

                await asyncio.sleep(1)
        except asyncio.CancelledError:
            pass
        except Exception as e:
            logger.error(f"Ошибка при обновлении прогресса: {e}")

def is_group_chat(message: Message) -> bool:
    return message.chat.type in [ChatType.GROUP, ChatType.SUPERGROUP]

def get_user_key(message: Message) -> str:
    if is_group_chat(message):
        return f"group_{message.chat.id}_user_{message.from_user.id}"
    return str(message.from_user.id)

async def initialize_gemini_model():
    global gemini_model, api_key_iterator, current_key_index
 
    if not GEMINI_API_KEYS:
        raise Exception("Не указаны API ключи Gemini!")
 
    api_key_iterator = itertools.cycle(GEMINI_API_KEYS)
 
    for attempt in range(len(GEMINI_API_KEYS)):
        current_key = next(api_key_iterator)
        current_key_index = GEMINI_API_KEYS.index(current_key)
 
        if current_key in failed_keys:
            logger.warning(f"Пропускаем заблокированный ключ #{current_key_index + 1}")
            continue
     
        try:
            logger.info(f"Инициализация Gemini 2.5 Flash с ключом #{current_key_index + 1}")
     
            genai.configure(api_key=current_key)
            gemini_model = genai.GenerativeModel('gemini-2.5-flash-preview-05-20')
     
            test_response = await asyncio.get_event_loop().run_in_executor(
                None,
                lambda: gemini_model.generate_content("Test")
            )
     
            if test_response:
                logger.info(f"Gemini 2.5 Flash успешно инициализирован с ключом #{current_key_index + 1}")
                return
         
        except Exception as e:
            logger.error(f"Ошибка при инициализации с ключом #{current_key_index + 1}: {e}")
            if "API_KEY_INVALID" in str(e) or "CONSUMER_SUSPENDED" in str(e):
                failed_keys.add(current_key)
                logger.warning(f"Ключ #{current_key_index + 1} помечен как недоступный")
            continue
 
    raise Exception("Не удалось инициализировать Gemini ни с одним из ключей!")

async def rotate_api_key():
    global gemini_model, current_key_index
 
    if not api_key_iterator:
        logger.error("Итератор ключей не инициализирован!")
        return False
 
    next_key = next(api_key_iterator)
    new_index = GEMINI_API_KEYS.index(next_key)
 
    attempts = 0
    while next_key in failed_keys and attempts < len(GEMINI_API_KEYS):
        logger.warning(f"Пропускаем заблокированный ключ #{new_index + 1}")
        next_key = next(api_key_iterator)
        new_index = GEMINI_API_KEYS.index(next_key)
        attempts += 1
 
    if attempts >= len(GEMINI_API_KEYS):
        logger.error("Все ключи заблокированы!")
        return False
 
    try:
        logger.info(f"Переключение с ключа #{current_key_index + 1} на ключ #{new_index + 1}")
 
        genai.configure(api_key=next_key)
        gemini_model = genai.GenerativeModel('gemini-2.5-flash-preview-05-20')
 
        current_key_index = new_index
        logger.info(f"Успешно переключились на ключ #{current_key_index + 1}")
        return True
 
    except Exception as e:
        logger.error(f"Ошибка при переключении на ключ #{new_index + 1}: {e}")
        failed_keys.add(next_key)
        return False

async def analyze_and_generate_with_gemini(image_path, content_type):
    temp_path = image_path.parent / f"temp_{image_path.name}"
    max_retries = len(GEMINI_API_KEYS) - len(failed_keys)
 
    for retry in range(max_retries):
        try:
            shutil.copy2(image_path, temp_path)
            logger.info(f"Создана копия изображения для анализа (попытка {retry + 1})")
     
            if gemini_model is None:
                raise Exception("Модель не инициализирована")
     
            with Image.open(temp_path) as image:
                logger.info(f"Изображение загружено: {image.size}")
         
                if image.mode != 'RGB':
                    image = image.convert('RGB')
         
                try:
                    logger.info(f"Отправляю запрос к Gemini API для {content_type} (ключ #{current_key_index + 1})...")
             
                    if content_type == "Мем":
                        prompt = """Проанализируй это изображение и создай для него смешную подпись в стиле мема.

Требования для мема:
- Максимум 15-20 слов
- Используй классический мем-формат ("Когда...", "Мое лицо когда...", "Я пытаюсь... Но...")
- Текст должен быть ироничным, саркастичным или релейтабельным
- Обязательно на русском языке
- Только текст мема, никаких пояснений

Примеры хороших мемов:
- "Когда сказал что пойдешь спать в 10 вечера"
- "Я: буду продуктивным сегодня. Также я:"
- "Мое лицо когда будильник звонит в понедельник"

Создай подобный мем на основе изображения:"""
             
                    else:  # Стендап
                        prompt = """Ты стендап-комик на сцене. Проанализируй это изображение и создай смешную шутку в стиле стендап-комедии.

Требования для стендапа:
- Максимум 3000 символов
- Используй наблюдательный юмор, как настоящие комики
- Можешь добавить эмодзи для выразительности (но умеренно)
- Делай неожиданные сравнения и находи смешные детали
- Можешь использовать формат: "А вы замечали что...", "Смотрю на это и думаю...", "Это же когда..."
- Обязательно на русском языке
- Только текст шутки, без лишних пояснений

Создай смешную стендап-шутку на основе изображения:"""
             
                    response = await asyncio.get_event_loop().run_in_executor(
                        None,
                        lambda: gemini_model.generate_content([prompt, image])
                    )
             
                    if response and response.text:
                        generated_text = response.text.strip()
                        generated_text = generated_text.strip('"\'')
                 
                        if content_type == "Мем" and len(generated_text) > 100:
                            generated_text = generated_text[:97] + "..."
                        elif content_type == "Стендап" and len(generated_text) > 4000:
                            generated_text = generated_text[:3997] + "..."
                 
                        logger.info(f"Получен {content_type} от Gemini (ключ #{current_key_index + 1}): {generated_text[:100]}...")
                 
                        await rotate_api_key()
                        return generated_text
                    else:
                        logger.error("Gemini вернул пустой ответ")
                        raise Exception("Сервис временно недоступен")
                 
                except Exception as api_error:
                    logger.error(f"Ошибка при вызове Gemini API (ключ #{current_key_index + 1}): {api_error}")
             
                    error_str = str(api_error)
             
                    if any(err in error_str for err in ["API_KEY_INVALID", "CONSUMER_SUSPENDED", "PERMISSION_DENIED"]):
                        logger.critical(f"Ключ #{current_key_index + 1} заблокирован или недействителен!")
                        failed_keys.add(GEMINI_API_KEYS[current_key_index])
                 
                        if await rotate_api_key():
                            continue
                        else:
                            raise Exception("Все API ключи недоступны. Обратитесь к администратору.")
             
                    elif "RATE_LIMIT_EXCEEDED" in error_str or "quota" in error_str.lower():
                        logger.warning(f"Превышен лимит для ключа #{current_key_index + 1}")
                        if await rotate_api_key():
                            continue
                        else:
                            raise Exception("Превышен лимит запросов на всех ключах.")
             
                    elif "RESOURCE_EXHAUSTED" in error_str:
                        if await rotate_api_key():
                            continue
                        else:
                            raise Exception("Сервис перегружен. Попробуйте позже.")
             
                    elif "timeout" in error_str.lower():
                        raise Exception("Время ожидания истекло. Попробуйте еще раз.")
             
                    else:
                        raise Exception("Не удалось сгенерировать контент. Попробуйте позже.")
             
        except Exception as e:
            if "Сервис" in str(e) or "Превышен" in str(e) or "Время" in str(e) or "Все API" in str(e):
                raise e
            else:
                logger.error(f"Неожиданная ошибка при генерации {content_type}: {e}", exc_info=True)
         
                if retry < max_retries - 1 and await rotate_api_key():
                    continue
             
                raise Exception(f"Произошла ошибка при создании {content_type}")
        finally:
            if temp_path.exists():
                try:
                    os.remove(temp_path)
                    logger.info("Временная копия для Gemini удалена")
                except:
                    pass
 
    logger.warning(f"Не удалось сгенерировать {content_type} после всех попыток, используем fallback")
    if content_type == "Мем":
        fallbacks = [
            "Когда понимаешь что завтра понедельник",
            "Я: буду продуктивным сегодня\nТакже я:",
            "Мое лицо когда вижу это"
        ]
    else:
        fallbacks = [
            "Смотрю на эту картинку и понимаю - это же я пытаюсь быть взрослым! 😅\n\nСерьезно, кто-то когда-то сказал, что взрослая жизнь - это просто детство, но с налогами. И вот я смотрю на это изображение и думаю: 'Да, это точно про меня!' 🤔",
            "Эта картинка - это буквально моё лицо, когда я вижу счета за коммунальные услуги! 💸\n\nВсегда удивляюсь, как электричество может стоить дороже чем мой обед. Что там происходит? Мой холодильник тайно майнит биткоины? 🤖⚡",
            "Понедельник? Не, не слышал такого слова! 🙈\n\nЭто как когда тебе говорят 'давай встретимся завтра в 8 утра', а ты такой: 'В каком часовом поясе? На какой планете?' 🌍⏰"
        ]
 
    import random
    return random.choice(fallbacks)

@dp.message(Command("api_status"))
async def cmd_api_status(message: Message):
    ADMIN_IDS = [5555805555]
 
    if message.from_user.id not in ADMIN_IDS:
        await message.answer("У вас нет доступа к этой команде.")
        return
 
    total_keys = len(GEMINI_API_KEYS)
    active_keys = total_keys - len(failed_keys)
 
    status_text = (
        f"📊 **Статус API ключей Gemini:**\n\n"
        f"Всего ключей: {total_keys}\n"
        f"Активных: {active_keys}\n"
        f"Заблокированных: {len(failed_keys)}\n"
        f"Текущий ключ: #{current_key_index + 1}\n\n"
    )
 
    for i, key in enumerate(GEMINI_API_KEYS):
        if key in failed_keys:
            status_text += f"Ключ #{i + 1}: ❌ Заблокирован\n"
        elif i == current_key_index:
            status_text += f"Ключ #{i + 1}: ✅ Активен (текущий)\n"
        else:
            status_text += f"Ключ #{i + 1}: ✅ Активен\n"
 
    await message.answer(status_text, parse_mode="Markdown")

@dp.message(Command("reset_keys"))
async def cmd_reset_keys(message: Message):
    ADMIN_IDS = [6955805799]
 
    if message.from_user.id not in ADMIN_IDS:
        await message.answer("У вас нет доступа к этой команде.")
        return
 
    global failed_keys
    failed_keys.clear()
 
    await message.answer(
        f"✅ Список заблокированных ключей очищен.\n"
        f"Все {len(GEMINI_API_KEYS)} ключей снова доступны для использования."
    )

@dp.message(Command("start"))
async def cmd_start(message: Message, state: FSMContext):
    if is_group_chat(message):
        keyboard = InlineKeyboardMarkup(
            inline_keyboard=[
                [InlineKeyboardButton(text=f"{EMOJI['meme']} Мем", callback_data="choose_meme")],
                [InlineKeyboardButton(text=f"{EMOJI['standup']} Стендап", callback_data="choose_standup")]
            ]
        )

        welcome_text = (
            f"{EMOJI['start']} {hbold('Привет! Я генератор мемов и стендапов!')} \n\n"
            f"Работаю в групповых чатах! Нажмите на кнопку ниже, чтобы выбрать тип контента, "
            f"а затем отправьте изображение в ответ на это сообщение.\n\n"
            f"{EMOJI['meme']} {hbold('Мем')} - классический формат с текстом внизу картинки\n"
            f"{EMOJI['standup']} {hbold('Стендап')} - забавный комментарий к вашему изображению\n\n"
            f"Что хотите создать, {message.from_user.first_name}?"
        )
    else:
        keyboard = ReplyKeyboardMarkup(
            keyboard=[
                [KeyboardButton(text=f"{EMOJI['meme']} Мем")],
                [KeyboardButton(text=f"{EMOJI['standup']} Стендап")]
            ],
            resize_keyboard=True
        )

        welcome_text = (
            f"{EMOJI['start']} {hbold('Добро пожаловать в генератор мемов и стендапов!')} \n\n"
            f"Я использую нейросети для создания забавного контента на основе ваших изображений.\n\n"
            f"{EMOJI['meme']} {hbold('Мем')} - классический формат с текстом внизу картинки\n"
            f"{EMOJI['standup']} {hbold('Стендап')} - забавный комментарий к вашему изображению\n\n"
            f"Что вы хотите создать?"
        )

    await message.answer(
        welcome_text,
        reply_markup=keyboard,
        parse_mode="HTML"
    )

    await state.set_state(MemeStates.awaiting_choice)

@dp.callback_query(F.data.in_(["choose_meme", "choose_standup"]))
async def process_inline_choice(callback_query, state: FSMContext):
    content_type = "Мем" if callback_query.data == "choose_meme" else "Стендап"
    emoji_type = EMOJI['meme'] if content_type == "Мем" else EMOJI['standup']

    if is_group_chat(callback_query.message):
        user_key = f"group_{callback_query.message.chat.id}_user_{callback_query.from_user.id}"
    else:
        user_key = str(callback_query.from_user.id)

    user_data[user_key] = {
        "content_type": content_type,
        "message_id": callback_query.message.message_id,
        "chat_id": callback_query.message.chat.id,
        "timestamp": time.time(),
        "bot_id": bot_user_id
    }

    await callback_query.answer(f"Выбран тип: {content_type}")

    await callback_query.message.edit_text(
        f"{emoji_type} {hbold(f'{callback_query.from_user.first_name} выбрал: {content_type}')}!\n\n"
        f"{EMOJI['photo']} Теперь отправьте изображение в ответ на это сообщение "
        f"(размером от {hcode(f'{MIN_IMAGE_SIZE}x{MIN_IMAGE_SIZE}')} пикселей).",
        parse_mode="HTML"
    )

    await state.clear()

@dp.message(StateFilter(MemeStates.awaiting_choice))
async def process_content_choice(message: Message, state: FSMContext):
    if is_group_chat(message):
        return

    choice = message.text
    if f"{EMOJI['meme']} Мем" in choice:
        content_type = "Мем"
        emoji_type = EMOJI['meme']
    elif f"{EMOJI['standup']} Стендап" in choice:
        content_type = "Стендап"
        emoji_type = EMOJI['standup']
    else:
        await message.answer(f"{EMOJI['error']} Пожалуйста, выберите один из предложенных вариантов.")
        return

    user_key = get_user_key(message)
    user_data[user_key] = {"content_type": content_type}

    await message.answer(
        f"{emoji_type} {hbold(f'Вы выбрали: {content_type}')}!\n\n"
        f"{EMOJI['photo']} Теперь загрузите изображение размером от "
        f"{hcode(f'{MIN_IMAGE_SIZE}x{MIN_IMAGE_SIZE}')} пикселей.",
        parse_mode="HTML"
    )

    await state.clear()

@dp.message(F.photo)
async def process_photo(message: Message):
    try:
        user_id = message.from_user.id
        user_key = get_user_key(message)
 
        if is_group_chat(message):
            if user_key not in user_data:
                logger.debug(f"Игнорируем фото от {user_id} в группе - нет активной сессии")
                return
     
            if not message.reply_to_message:
                logger.debug(f"Игнорируем фото от {user_id} в группе - не является ответом")
                return
     
            if message.reply_to_message.from_user.id != bot.id:
                logger.debug(f"Игнорируем фото от {user_id} в группе - ответ не на сообщение бота")
                return
     
            expected_message_id = user_data[user_key].get("message_id")
            expected_chat_id = user_data[user_key].get("chat_id")
     
            if not expected_message_id or not expected_chat_id:
                logger.warning(f"Нет expected_message_id или expected_chat_id для {user_key}")
                return
     
            if (message.reply_to_message.message_id != expected_message_id or
                    message.chat.id != expected_chat_id):
                await message.answer(
                    f"{EMOJI['warning']} {message.from_user.first_name}, пожалуйста, отправьте изображение "
                    f"в ответ на ВАШЕ сообщение с выбором типа контента!",
                    parse_mode="HTML"
                )
                return
 
        else:
            if user_key not in user_data:
                error_text = f"{EMOJI['error']} Сначала выберите тип контента, используя команду /start"
                await message.answer(error_text, parse_mode="HTML")
                return
 
        logger.info(f"Обрабатываем фото от пользователя {user_id}")
 
        position, already_in_queue = await add_to_queue(user_id, message)
 
        if already_in_queue:
            await message.answer(
                f"⏳ Вы уже в очереди на позиции {position}. Пожалуйста, подождите.",
                reply_to_message_id=message.message_id
            )
            logger.info(f"Пользователь {user_id} уже в очереди на позиции {position}")
        else:
            if position == 1:
                queue_msg = await message.answer(
                    f"🚀 Вы первый в очереди! Начинаю обработку...",
                    reply_to_message_id=message.message_id
                )
            else:
                queue_msg = await message.answer(
                    f"⏳ Вы добавлены в очередь. Ваша позиция: {position}. Ожидайте...",
                    reply_to_message_id=message.message_id
                )
     
            logger.info(f"Пользователь {user_id} добавлен в очередь на позицию {position}")
     
            await asyncio.sleep(5)
            try:
                await queue_msg.delete()
            except:
                pass
 
    except Exception as e:
        logger.error(f"Ошибка при добавлении в очередь: {e}", exc_info=True)
        await message.answer(
            f"{EMOJI['error']} Произошла ошибка. Попробуйте позже.",
            parse_mode="HTML"
        )

@dp.message(Command("queue"))
async def cmd_queue(message: Message):
    async with queue_lock:
        if not processing_queue:
            await message.answer("Очередь пуста.")
            return
 
        queue_text = "📋 Текущая очередь:\n\n"
        for i, item in enumerate(processing_queue, 1):
            user = item['message'].from_user
            queue_text += f"{i}. {user.first_name} (ID: {user.id})\n"
 
        queue_text += f"\n🔄 Обработка: {'Да' if is_processing else 'Нет'}"
        await message.answer(queue_text)

async def process_photo_internal(message: Message):
    photo_path = None
    output_path = None
    messages_to_delete = []
 
    try:
        logger.info(f"Начало внутренней обработки фото от пользователя {message.from_user.id}")
 
        user_key = get_user_key(message)
 
        if user_key not in user_data:
            logger.error(f"Данные пользователя {user_key} не найдены!")
            await message.answer(
                f"{EMOJI['error']} Данные сессии устарели. Пожалуйста, начните заново с /start",
                reply_to_message_id=message.message_id
            )
            return
 
        content_type = user_data[user_key]["content_type"]
        emoji_type = EMOJI['meme'] if content_type == "Мем" else EMOJI['standup']
 
        logger.info(f"Тип контента: {content_type}")
 
        if is_group_chat(message):
            if not message.reply_to_message:
                logger.warning("Нет reply_to_message в групповом чате")
                await message.answer(
                    f"{EMOJI['warning']} {message.from_user.first_name}, пожалуйста, отправьте изображение "
                    f"в ответ на сообщение с выбором типа контента!",
                    parse_mode="HTML"
                )
                return

            if message.reply_to_message.from_user.id != bot_user_id:
                logger.debug(f"Игнорируем фото от {message.from_user.id} в группе - ответ не на сообщение бота")
                return

            expected_message_id = user_data[user_key].get("message_id")
            expected_chat_id = user_data[user_key].get("chat_id")

            if (message.reply_to_message.message_id != expected_message_id or
                    message.chat.id != expected_chat_id):
                await message.answer(
                    f"{EMOJI['warning']} {message.from_user.first_name}, пожалуйста, отправьте изображение "
                    f"в ответ на ВАШЕ сообщение с выбором типа контента!",
                    parse_mode="HTML"
                )
                return

        username = message.from_user.first_name if is_group_chat(message) else "ваше"
        initial_text = f"Обрабатываю изображение {username} для создания {content_type.lower()}а"

        initial_progress = ProgressMessageUpdater(
            message,
            initial_text,
            EMOJI['processing']
        )
        await initial_progress.start()
        messages_to_delete.append(initial_progress.sent_message)

        try:
            photo_id = message.photo[-1].file_id
            photo_path = TEMP_DIR / f"{photo_id}.jpg"
            logger.info(f"Загружаю изображение в {photo_path}")
     
            await bot.download(message.photo[-1], destination=photo_path)
            logger.info(f"Изображение успешно загружено")

            processing_time = await initial_progress.stop()
            await initial_progress.sent_message.edit_text(
                f"{EMOJI['success']} Изображение успешно загружено за {processing_time}с!"
            )

            with Image.open(photo_path) as check_image:
                width, height = check_image.size
                logger.info(f"Размер изображения: {width}x{height}")
         
                if width < MIN_IMAGE_SIZE or height < MIN_IMAGE_SIZE:
                    logger.warning(f"Изображение слишком маленькое: {width}x{height}")
                    await message.answer(
                        f"{EMOJI['warning']} {hbold('Ошибка')}: изображение должно быть не менее "
                        f"{hcode(f'{MIN_IMAGE_SIZE}x{MIN_IMAGE_SIZE}')} пикселей.",
                        parse_mode="HTML"
                    )
                    for msg in messages_to_delete:
                        try:
                            await msg.delete()
                        except:
                            pass
                    return

            gemini_msg = await message.answer(
                f"🤖 Анализирую изображение и создаю {content_type.lower()} с помощью Gemini 2.5 Flash..."
            )
            messages_to_delete.append(gemini_msg)

            try:
                logger.info(f"Начинаю анализ и генерацию {content_type} через Gemini")
                generated_text = await analyze_and_generate_with_gemini(photo_path, content_type)
                logger.info(f"Генерация завершена успешно. Текст: {generated_text[:100]}...")
         
                await gemini_msg.edit_text(
                    f"{EMOJI['success']} {content_type} успешно создан!"
                )
            except Exception as e:
                logger.error(f"Ошибка при генерации через Gemini: {e}", exc_info=True)
                safe_error_message = str(e)
                await gemini_msg.edit_text(f"{EMOJI['error']} {safe_error_message}")
         
                await asyncio.sleep(2)
                for msg in messages_to_delete:
                    try:
                        await msg.delete()
                    except:
                        pass
                return

            if content_type == "Мем":
                create_msg = await message.answer(
                    f"{EMOJI['create']} Создаю финальное изображение..."
                )
                messages_to_delete.append(create_msg)
         
                try:
                    logger.info("Создаю финальное изображение")
                    output_path = await create_final_image(photo_path, generated_text, content_type, photo_id)
                    logger.info(f"Изображение создано: {output_path}")
             
                    await create_msg.edit_text(
                        f"{EMOJI['success']} Изображение готово!"
                    )
                except Exception as e:
                    logger.error(f"Ошибка при создании изображения: {e}", exc_info=True)
                    await create_msg.edit_text(
                        f"{EMOJI['error']} Не удалось создать изображение."
                    )
             
                    await asyncio.sleep(2)
                    for msg in messages_to_delete:
                        try:
                            await msg.delete()
                        except:
                            pass
                    return
            else:
                output_path = photo_path

            logger.info("Отправляю результат пользователю")
            await send_result(message, output_path, generated_text, emoji_type)

            await asyncio.sleep(1)
            for msg in messages_to_delete:
                try:
                    await msg.delete()
                    logger.debug(f"Удалено промежуточное сообщение")
                except Exception as e:
                    logger.warning(f"Не удалось удалить сообщение: {e}")

            if user_key in user_data:
                del user_data[user_key]
                logger.info("Данные пользователя очищены")

            clean_msg = f"{EMOJI['success']} Готово!"
            if not is_group_chat(message):
                clean_msg += " Для создания нового контента используйте /start"
     
            final_msg = await message.answer(clean_msg)
     
            await asyncio.sleep(5)
            try:
                await final_msg.delete()
                logger.debug("Финальное сообщение удалено")
            except:
                pass

            logger.info(f"Обработка завершена успешно для пользователя {message.from_user.id}")

        except Exception as e:
            logger.error(f"Ошибка в процессе обработки: {e}", exc_info=True)
     
            for msg in messages_to_delete:
                try:
                    await msg.delete()
                except:
                    pass
     
            error_msg = await message.answer(
                f"{EMOJI['error']} Произошла ошибка при обработке изображения.",
                parse_mode="HTML"
            )
     
            await asyncio.sleep(5)
            try:
                await error_msg.delete()
            except:
                pass

    except Exception as critical_error:
        logger.error(f"Критическая ошибка при обработке фото: {critical_error}", exc_info=True)
        try:
            error_msg = await message.answer(
                f"{EMOJI['error']} Произошла критическая ошибка. Попробуйте позже."
            )
            await asyncio.sleep(5)
            try:
                await error_msg.delete()
            except:
                pass
        except:
            pass
 
    finally:
        logger.info("Начинаю очистку временных файлов")
        gc.collect()
 
        if photo_path:
            deleted = await safe_delete_file(photo_path)
            if deleted:
                logger.info(f"Временный файл {photo_path} удален")
            else:
                logger.warning(f"Не удалось удалить {photo_path}")
 
        if output_path and output_path != photo_path:
            deleted = await safe_delete_file(output_path)
            if deleted:
                logger.info(f"Выходной файл {output_path} удален")
            else:
                logger.warning(f"Не удалось удалить {output_path}")
 
        logger.info(f"Завершена обработка для пользователя {message.from_user.id}")

async def create_final_image(image_path, text, content_type, photo_id):
    output_path = OUTPUT_DIR / f"{photo_id}_output.jpg"
 
    try:
        with Image.open(image_path) as original_image:
            working_image = original_image.copy()
 
        if content_type == "Мем":
            words = text.split()
            half_point = len(words) // 2
            if len(words) > 1:
                first_line = " ".join(words[:half_point])
                second_line = " ".join(words[half_point:])
                formatted_text = f"{first_line}\n{second_line}" if second_line else first_line
            else:
                formatted_text = text

            draw = ImageDraw.Draw(working_image)

            initial_font_size = int(min(working_image.width, working_image.height) * 0.08)
            max_width = working_image.width * 0.9
            max_height = working_image.height * 0.2

            font_size = initial_font_size
            font = None
            font_path = None
     
            for potential_font_path in possible_fonts:
                try:
                    temp_font = ImageFont.truetype(str(potential_font_path), size=font_size)
                    font = temp_font
                    font_path = str(potential_font_path)
                    logger.info(f"Шрифт '{potential_font_path}' успешно загружен.")
                    break
                except IOError:
                    logger.warning(f"Не удалось загрузить шрифт '{potential_font_path}'.")
                    continue
                except Exception as e:
                    logger.error(f"Ошибка при загрузке шрифта '{potential_font_path}': {e}")
                    continue

            if font_path is None:
                logger.warning("Не удалось загрузить масштабируемый шрифт, используем дефолтный.")
                font = ImageFont.load_default()
            else:
                while font_size > 10:
                    try:
                        current_font = ImageFont.truetype(font_path, size=font_size)
                        bbox = draw.textbbox((0, 0), formatted_text, font=current_font)
                        text_width = bbox[2] - bbox[0]
                        text_height = bbox[3] - bbox[1]
                        num_lines = formatted_text.count('\n') + 1

                        if text_width <= max_width and text_height <= max_height and num_lines <= 3:
                            font = current_font
                            break
                        else:
                            font_size -= 2
                    except Exception as e:
                        logger.warning(f"Ошибка при подборе размера шрифта {font_size}: {e}")
                        font_size -= 2
                        font = ImageFont.load_default()
                        break

            if hasattr(font, 'getbbox'):
                bbox = draw.textbbox((0, 0), formatted_text, font=font)
                text_width = bbox[2] - bbox[0]
                text_height = bbox[3] - bbox[1]
            else:
                text_width = len(max(formatted_text.split('\n'), key=len)) * font_size * 0.6
                text_height = len(formatted_text.split('\n')) * font_size * 1.2

            x = (working_image.width - text_width) // 2
            y = working_image.height - text_height - int(working_image.height * 0.05)
            min_y = working_image.height - max_height
            y = max(y, min_y)

            for adj_x in [-2, -1, 0, 1, 2]:
                for adj_y in [-2, -1, 0, 1, 2]:
                    draw.text(
                        (x + adj_x, y + adj_y),
                        formatted_text,
                        font=font,
                        fill='black',
                        align="center"
                    )

            draw.text(
                (x, y),
                formatted_text,
                font=font,
                fill='white',
                align="center"
            )

        working_image.save(output_path, quality=95)
 
        working_image.close()
        del working_image
 
        return output_path
 
    except Exception as e:
        logger.error(f"Ошибка при создании изображения: {e}")
        if output_path.exists():
            try:
                os.remove(output_path)
            except:
                pass
        raise Exception("Не удалось создать изображение")

async def safe_delete_file(file_path, max_attempts=5):
    if not file_path or not file_path.exists():
        return True
 
    for attempt in range(max_attempts):
        try:
            gc.collect()
            await asyncio.sleep(0.1 * (attempt + 1))
     
            os.remove(file_path)
            logger.info(f"Файл {file_path} успешно удален с попытки {attempt + 1}")
            return True
        except Exception as e:
            if attempt < max_attempts - 1:
                logger.warning(f"Попытка {attempt + 1} удалить {file_path} не удалась: {e}")
            else:
                logger.error(f"Не удалось удалить {file_path} после {max_attempts} попыток: {e}")
                return False
 
    return False

async def send_result(message: Message, image_path: str, text: str, emoji_type: str):
    try:
        image = FSInputFile(image_path)

        if is_group_chat(message):
            if emoji_type == EMOJI['meme']:
                caption = f"{emoji_type} Мем создан для {message.from_user.first_name}!"
                await message.answer_photo(image, caption=caption)
            else:
                caption = f"{emoji_type} Стендап для {message.from_user.first_name}:"
                await message.answer_photo(image, caption=caption)
         
                standup_message = f"🎭 {text}"
                await message.answer(standup_message, parse_mode="HTML")
     
            final_message = f"✅ Готово, {message.from_user.first_name}! Чтобы создать еще один мем или стендап, используйте /start"
        else:
            if emoji_type == EMOJI['meme']:
                caption = f"{emoji_type} Ваш мем готов!"
                await message.answer_photo(image, caption=caption)
            else:
                caption = f"{emoji_type} Ваш стендап готов:"
                await message.answer_photo(image, caption=caption)
         
                standup_message = f"🎭 {text}"
                await message.answer(standup_message, parse_mode="HTML")
     
            final_message = "✅ Готово! Если хотите создать еще один мем или стендап, используйте команду /start"

        await message.answer(final_message)

        try:
            os.remove(image_path)
        except:
            pass

    except Exception as e:
        logger.error(f"Ошибка при отправке результата: {e}")
        await message.answer("❌ Произошла ошибка при отправке результата")

async def add_to_queue(user_id: int, message: Message):
    async with queue_lock:
        for item in processing_queue:
            if item['user_id'] == user_id:
                position = list(processing_queue).index(item) + 1
                return position, True
 
        processing_queue.append({
            'user_id': user_id,
            'message': message,
            'timestamp': time.time()
        })
 
        position = len(processing_queue)
        return position, False

async def remove_from_queue(user_id: int):
    async with queue_lock:
        for item in list(processing_queue):
            if item['user_id'] == user_id:
                processing_queue.remove(item)
                break

async def get_queue_position(user_id: int):
    async with queue_lock:
        for i, item in enumerate(processing_queue):
            if item['user_id'] == user_id:
                return i + 1
        return 0

async def process_queue():
    global is_processing
 
    logger.info("Запущен обработчик очереди")
 
    while True:
        try:
            async with queue_lock:
                if not processing_queue:
                    is_processing = False
                    await asyncio.sleep(0.5)
                    continue
         
                current_item = processing_queue[0]
                is_processing = True
                logger.info(f"Обработка пользователя {current_item['user_id']} из очереди")
     
            try:
                async with queue_lock:
                    for i, item in enumerate(list(processing_queue)[1:], 1):
                        try:
                            await bot.send_message(
                                item['message'].chat.id,
                                f"⏳ Вы в очереди: {i} из {len(processing_queue)-1}. Ожидайте...",
                                reply_to_message_id=item['message'].message_id
                            )
                        except Exception as e:
                            logger.warning(f"Не удалось отправить уведомление о позиции: {e}")
         
                await process_photo_internal(current_item['message'])
         
            except Exception as e:
                logger.error(f"Ошибка при обработке из очереди: {e}", exc_info=True)
                try:
                    await current_item['message'].answer(
                        f"{EMOJI['error']} Произошла ошибка при обработке. Попробуйте снова.",
                        reply_to_message_id=current_item['message'].message_id
                    )
                except:
                    pass
            finally:
                async with queue_lock:
                    if processing_queue and processing_queue[0]['user_id'] == current_item['user_id']:
                        processing_queue.popleft()
                        logger.info(f"Пользователь {current_item['user_id']} удален из очереди")
         
                await asyncio.sleep(0.5)
         
        except Exception as e:
            logger.error(f"Критическая ошибка в обработчике очереди: {e}", exc_info=True)
            await asyncio.sleep(1)

async def cleanup_old_user_data():
    while True:
        try:
            current_time = time.time()
            to_delete = []
     
            for key, data in user_data.items():
                if "timestamp" in data and current_time - data["timestamp"] > 3600:
                    to_delete.append(key)
     
            for key in to_delete:
                del user_data[key]
     
            if to_delete:
                logger.info(f"Очищено {len(to_delete)} устаревших записей пользователей")
     
            await asyncio.sleep(300)
     
        except Exception as e:
            logger.error(f"Ошибка при очистке данных: {e}")
            await asyncio.sleep(60)

async def main():
    global queue_task, bot, bot_user_id
 
    queue_task = None
    cleanup_task = None
 
    try:
        logger.info("Запуск бота...")
 
        bot_info = await bot.get_me()
        bot_user_id = bot_info.id
        logger.info(f"Бот запущен: @{bot_info.username} (ID: {bot_user_id})")
 
        try:
            await initialize_gemini_model()
        except Exception as e:
            logger.error(f"Ошибка инициализации Gemini: {e}")
            logger.warning("Бот будет работать без анализа изображений")
 
        queue_task = asyncio.create_task(process_queue())
        cleanup_task = asyncio.create_task(cleanup_old_user_data())
 
        logger.info("Фоновые задачи запущены")
 
        await dp.start_polling(bot, drop_pending_updates=True)
 
    except Exception as e:
        logger.error(f"Ошибка при запуске бота: {e}", exc_info=True)
        raise
    finally:
        if queue_task and not queue_task.done():
            queue_task.cancel()
            try:
                await queue_task
            except asyncio.CancelledError:
                pass
         
        if cleanup_task and not cleanup_task.done():
            cleanup_task.cancel()
            try:
                await cleanup_task
            except asyncio.CancelledError:
                pass
 
        await bot.session.close()

if __name__ == "__main__":
    try:
        print("Бот запущен.")
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Бот остановлен.")
    except Exception as e:
        print(f"Произошла ошибка при запуске бота: {e}")

Команды бота:
/start@vash_bot
/api_status@vash_bot - для работы этой команды не забудьте вписать свой админский ID в переменную списка ADMIN_IDS = [6955805799]

Внимание! Бот работает как в групповых чатах, так и в ЛС.

Пример работы:
1749888678059.png

1749888714670.png

Бота потестить можете тут (мне не жалко):

P.S. Полный исходник со структурой проекта прикрепил в zip.
P.S.S. Рекомендую использовать OVPN для подключения бота (Не Российский) - Все известные маски айпишников VDS/VPS забанены гуглом.
 

Вложения

  • membot.zip
    412.3 КБ · Просмотры: 10
Последнее редактирование:
Пожалуйста, обратите внимание, что пользователь заблокирован
Кстати если не знаешь, чем заняться можешь сделать фейковый генератор персональных данных. Иногда требуется где то зарегистрироваться и там требуют такие данные как: First name, Last name, Address, City, State, Zip, Birthday, Phone, Email итд Я обычно гуглю какие то данные но не плохо иметь подобный инструмент под рукой.
 
Кстати если не знаешь, чем заняться можешь сделать фейковый генератор персональных данных. Иногда требуется где то зарегистрироваться и там требуют такие данные как: First name, Last name, Address, City, State, Zip, Birthday, Phone, Email итд Я обычно гуглю какие то данные но не плохо иметь подобный инструмент под рукой.
Так делал же. На форуме лежит.

/threads/124971/
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Так делал же. На форуме лежит.

/threads/124971/
Имел введу с применением ИИ тоже через API . Там реализовать какие-то промпты чтобы генерировало правдоподобные данные. Поинтереснее бы выглядело чем мемы =) Но мемы тоже хорошо...
 
Имел введу с применением ИИ тоже через API . Там реализовать какие-то промпты чтобы генерировало правдоподобные данные. Поинтереснее бы выглядело чем мемы =) Но мемы тоже хорошо...
Ок, сделаем=) Ну попозже.
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Имел введу с применением ИИ тоже через API . Там реализовать какие-то промпты чтобы генерировало правдоподобные данные. Поинтереснее бы выглядело чем мемы =) Но мемы тоже хорошо...
Бот ИИ отрисовщик
 
Предлагаю потестить бота кто может и если мемы прикольные, повыкладывать тут: /threads/40972/

P.S. Была и первая версия мемгенератора от меня если кто не помнит, но она слишком медленная и не точная в работе: /threads/117407/
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Предлагаю потестить бота кто может и если мемы прикольные, повыкладывать тут: /threads/40972/

P.S. Была и первая версия мемгенератора от меня если кто не помнит, но она слишком медленная и не точная в работе: /threads/117407/
тележки нету
 


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх