Написал: rand
Специально для: xss.pro
Всем привет, написал крутого бота генератора мемов и стендапов через обработку нейросети Gemini Flash (Api ключи регайте у Gemini фришные, реализована ротация ключей, 1 ключ 60 запросов (чем больше ключей, тем больше мемов за сутки и можете сгенерить).
Для запуска требуется поставить следующие либы:
Код:
Команды бота:
Внимание! Бот работает как в групповых чатах, так и в ЛС.
Пример работы:
Бота потестить можете тут (мне не жалко):
P.S. Полный исходник со структурой проекта прикрепил в zip.
P.S.S. Рекомендую использовать OVPN для подключения бота (Не Российский) - Все известные маски айпишников VDS/VPS забанены гуглом.
Специально для: xss.pro
Всем привет, написал крутого бота генератора мемов и стендапов через обработку нейросети Gemini Flash (Api ключи регайте у Gemini фришные, реализована ротация ключей, 1 ключ 60 запросов (чем больше ключей, тем больше мемов за сутки и можете сгенерить).
Для запуска требуется поставить следующие либы:
Bash:
pip install google-generativeai
pip install aiogram
pip install pillow
Код:
Python:
"""
Телеграм-бот для генерации мемов и стендапов на основе загруженных изображений.
Использует модель Gemini 2.5 Flash для анализа изображений и создания текста.
"""
import time
import os
import logging
from pathlib import Path
import shutil
import google.generativeai as genai
import gc
import itertools
from collections import deque
# Библиотеки для работы с Telegram
from aiogram import Bot, Dispatcher, F
from aiogram.types import Message, FSInputFile, ReplyKeyboardMarkup, KeyboardButton, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.utils.markdown import hbold, hcode
from aiogram.enums import ChatType
# Библиотеки для асинхронной работы
import asyncio
# Библиотеки для обработки изображений
from PIL import Image, ImageDraw, ImageFont
# API ключи Gemini
GEMINI_API_KEYS = [
"AIzaSyA2FItlFG0FDlSOpz8VMv6Agg-555555",
"AIzaSyBzJ8Ix-hn0IurdK3ov3fmnbKsb8555555",
]
# Глобальные переменные для модели Gemini
gemini_model = None
current_key_index = 0
api_key_iterator = None
failed_keys = set()
# Глобальные переменные для очереди
processing_queue = deque()
is_processing = False
queue_lock = asyncio.Lock()
queue_task = None
bot_user_id = None
# Список путей к возможным шрифтам
possible_fonts = [
Path("fonts/ImpactRegular.ttf"),
Path("C:/Windows/Fonts/Arial.ttf"),
Path("/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf"),
Path("/usr/share/fonts/truetype/msttcorefonts/Arial.ttf"),
Path("/Library/Fonts/Arial.ttf"),
]
# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Конфигурация путей
BASE_DIR = Path(__file__).parent
TEMP_DIR = BASE_DIR / "tmp"
OUTPUT_DIR = BASE_DIR / "output"
FONTS_DIR = BASE_DIR / "fonts"
# Создаем директории
TEMP_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
FONTS_DIR.mkdir(exist_ok=True)
# Константы
BOT_TOKEN = "7569029224:AAFiqMbSQxBtf9HdDjNuR1o0W4555555555"
MIN_IMAGE_SIZE = 512
# Эмодзи для интерфейса
EMOJI = {
"start": "🚀",
"meme": "😂",
"standup": "🎭",
"photo": "📷",
"processing": "⚙️",
"success": "✅",
"error": "❌",
"warning": "⚠️",
"create": "🎨",
}
# Определение состояний FSM
class MemeStates(StatesGroup):
awaiting_choice = State()
# Инициализация бота и диспетчера
bot = Bot(token=BOT_TOKEN)
dp = Dispatcher()
# Словарь для хранения данных пользователей
user_data = {}
# Класс для обновления прогресса
class ProgressMessageUpdater:
def __init__(self, message: Message, initial_text: str, emoji: str):
self.message = message
self.initial_text = initial_text
self.emoji = emoji
self.running = False
self.task = None
self.seconds = 0
self.sent_message = None
async def start(self):
self.running = True
self.seconds = 0
self.sent_message = await self.message.answer(f"{self.emoji} {self.initial_text} (0с)")
self.task = asyncio.create_task(self._update_progress())
return self.sent_message
async def stop(self):
if self.running:
self.running = False
if self.task:
self.task.cancel()
return self.seconds
return 0
async def _update_progress(self):
try:
while self.running:
self.seconds += 1
dots = "." * (1 + (self.seconds % 3))
time_str = f"{self.seconds}с"
encouragement = ""
if self.seconds == 15:
encouragement = f"\n⏳ Пожалуйста, подождите..."
elif self.seconds == 30:
encouragement = f"\n⏳ Почти готово..."
elif self.seconds >= 45 and self.seconds % 15 == 0:
encouragement = f"\n⏳ Процесс занимает больше времени, чем обычно, но мы работаем!"
new_text = f"{self.emoji} {self.initial_text}{dots} ({time_str}){encouragement}"
try:
await self.sent_message.edit_text(new_text)
except Exception:
pass
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Ошибка при обновлении прогресса: {e}")
def is_group_chat(message: Message) -> bool:
return message.chat.type in [ChatType.GROUP, ChatType.SUPERGROUP]
def get_user_key(message: Message) -> str:
if is_group_chat(message):
return f"group_{message.chat.id}_user_{message.from_user.id}"
return str(message.from_user.id)
async def initialize_gemini_model():
global gemini_model, api_key_iterator, current_key_index
if not GEMINI_API_KEYS:
raise Exception("Не указаны API ключи Gemini!")
api_key_iterator = itertools.cycle(GEMINI_API_KEYS)
for attempt in range(len(GEMINI_API_KEYS)):
current_key = next(api_key_iterator)
current_key_index = GEMINI_API_KEYS.index(current_key)
if current_key in failed_keys:
logger.warning(f"Пропускаем заблокированный ключ #{current_key_index + 1}")
continue
try:
logger.info(f"Инициализация Gemini 2.5 Flash с ключом #{current_key_index + 1}")
genai.configure(api_key=current_key)
gemini_model = genai.GenerativeModel('gemini-2.5-flash-preview-05-20')
test_response = await asyncio.get_event_loop().run_in_executor(
None,
lambda: gemini_model.generate_content("Test")
)
if test_response:
logger.info(f"Gemini 2.5 Flash успешно инициализирован с ключом #{current_key_index + 1}")
return
except Exception as e:
logger.error(f"Ошибка при инициализации с ключом #{current_key_index + 1}: {e}")
if "API_KEY_INVALID" in str(e) or "CONSUMER_SUSPENDED" in str(e):
failed_keys.add(current_key)
logger.warning(f"Ключ #{current_key_index + 1} помечен как недоступный")
continue
raise Exception("Не удалось инициализировать Gemini ни с одним из ключей!")
async def rotate_api_key():
global gemini_model, current_key_index
if not api_key_iterator:
logger.error("Итератор ключей не инициализирован!")
return False
next_key = next(api_key_iterator)
new_index = GEMINI_API_KEYS.index(next_key)
attempts = 0
while next_key in failed_keys and attempts < len(GEMINI_API_KEYS):
logger.warning(f"Пропускаем заблокированный ключ #{new_index + 1}")
next_key = next(api_key_iterator)
new_index = GEMINI_API_KEYS.index(next_key)
attempts += 1
if attempts >= len(GEMINI_API_KEYS):
logger.error("Все ключи заблокированы!")
return False
try:
logger.info(f"Переключение с ключа #{current_key_index + 1} на ключ #{new_index + 1}")
genai.configure(api_key=next_key)
gemini_model = genai.GenerativeModel('gemini-2.5-flash-preview-05-20')
current_key_index = new_index
logger.info(f"Успешно переключились на ключ #{current_key_index + 1}")
return True
except Exception as e:
logger.error(f"Ошибка при переключении на ключ #{new_index + 1}: {e}")
failed_keys.add(next_key)
return False
async def analyze_and_generate_with_gemini(image_path, content_type):
temp_path = image_path.parent / f"temp_{image_path.name}"
max_retries = len(GEMINI_API_KEYS) - len(failed_keys)
for retry in range(max_retries):
try:
shutil.copy2(image_path, temp_path)
logger.info(f"Создана копия изображения для анализа (попытка {retry + 1})")
if gemini_model is None:
raise Exception("Модель не инициализирована")
with Image.open(temp_path) as image:
logger.info(f"Изображение загружено: {image.size}")
if image.mode != 'RGB':
image = image.convert('RGB')
try:
logger.info(f"Отправляю запрос к Gemini API для {content_type} (ключ #{current_key_index + 1})...")
if content_type == "Мем":
prompt = """Проанализируй это изображение и создай для него смешную подпись в стиле мема.
Требования для мема:
- Максимум 15-20 слов
- Используй классический мем-формат ("Когда...", "Мое лицо когда...", "Я пытаюсь... Но...")
- Текст должен быть ироничным, саркастичным или релейтабельным
- Обязательно на русском языке
- Только текст мема, никаких пояснений
Примеры хороших мемов:
- "Когда сказал что пойдешь спать в 10 вечера"
- "Я: буду продуктивным сегодня. Также я:"
- "Мое лицо когда будильник звонит в понедельник"
Создай подобный мем на основе изображения:"""
else: # Стендап
prompt = """Ты стендап-комик на сцене. Проанализируй это изображение и создай смешную шутку в стиле стендап-комедии.
Требования для стендапа:
- Максимум 3000 символов
- Используй наблюдательный юмор, как настоящие комики
- Можешь добавить эмодзи для выразительности (но умеренно)
- Делай неожиданные сравнения и находи смешные детали
- Можешь использовать формат: "А вы замечали что...", "Смотрю на это и думаю...", "Это же когда..."
- Обязательно на русском языке
- Только текст шутки, без лишних пояснений
Создай смешную стендап-шутку на основе изображения:"""
response = await asyncio.get_event_loop().run_in_executor(
None,
lambda: gemini_model.generate_content([prompt, image])
)
if response and response.text:
generated_text = response.text.strip()
generated_text = generated_text.strip('"\'')
if content_type == "Мем" and len(generated_text) > 100:
generated_text = generated_text[:97] + "..."
elif content_type == "Стендап" and len(generated_text) > 4000:
generated_text = generated_text[:3997] + "..."
logger.info(f"Получен {content_type} от Gemini (ключ #{current_key_index + 1}): {generated_text[:100]}...")
await rotate_api_key()
return generated_text
else:
logger.error("Gemini вернул пустой ответ")
raise Exception("Сервис временно недоступен")
except Exception as api_error:
logger.error(f"Ошибка при вызове Gemini API (ключ #{current_key_index + 1}): {api_error}")
error_str = str(api_error)
if any(err in error_str for err in ["API_KEY_INVALID", "CONSUMER_SUSPENDED", "PERMISSION_DENIED"]):
logger.critical(f"Ключ #{current_key_index + 1} заблокирован или недействителен!")
failed_keys.add(GEMINI_API_KEYS[current_key_index])
if await rotate_api_key():
continue
else:
raise Exception("Все API ключи недоступны. Обратитесь к администратору.")
elif "RATE_LIMIT_EXCEEDED" in error_str or "quota" in error_str.lower():
logger.warning(f"Превышен лимит для ключа #{current_key_index + 1}")
if await rotate_api_key():
continue
else:
raise Exception("Превышен лимит запросов на всех ключах.")
elif "RESOURCE_EXHAUSTED" in error_str:
if await rotate_api_key():
continue
else:
raise Exception("Сервис перегружен. Попробуйте позже.")
elif "timeout" in error_str.lower():
raise Exception("Время ожидания истекло. Попробуйте еще раз.")
else:
raise Exception("Не удалось сгенерировать контент. Попробуйте позже.")
except Exception as e:
if "Сервис" in str(e) or "Превышен" in str(e) or "Время" in str(e) or "Все API" in str(e):
raise e
else:
logger.error(f"Неожиданная ошибка при генерации {content_type}: {e}", exc_info=True)
if retry < max_retries - 1 and await rotate_api_key():
continue
raise Exception(f"Произошла ошибка при создании {content_type}")
finally:
if temp_path.exists():
try:
os.remove(temp_path)
logger.info("Временная копия для Gemini удалена")
except:
pass
logger.warning(f"Не удалось сгенерировать {content_type} после всех попыток, используем fallback")
if content_type == "Мем":
fallbacks = [
"Когда понимаешь что завтра понедельник",
"Я: буду продуктивным сегодня\nТакже я:",
"Мое лицо когда вижу это"
]
else:
fallbacks = [
"Смотрю на эту картинку и понимаю - это же я пытаюсь быть взрослым! 😅\n\nСерьезно, кто-то когда-то сказал, что взрослая жизнь - это просто детство, но с налогами. И вот я смотрю на это изображение и думаю: 'Да, это точно про меня!' 🤔",
"Эта картинка - это буквально моё лицо, когда я вижу счета за коммунальные услуги! 💸\n\nВсегда удивляюсь, как электричество может стоить дороже чем мой обед. Что там происходит? Мой холодильник тайно майнит биткоины? 🤖⚡",
"Понедельник? Не, не слышал такого слова! 🙈\n\nЭто как когда тебе говорят 'давай встретимся завтра в 8 утра', а ты такой: 'В каком часовом поясе? На какой планете?' 🌍⏰"
]
import random
return random.choice(fallbacks)
@dp.message(Command("api_status"))
async def cmd_api_status(message: Message):
ADMIN_IDS = [5555805555]
if message.from_user.id not in ADMIN_IDS:
await message.answer("У вас нет доступа к этой команде.")
return
total_keys = len(GEMINI_API_KEYS)
active_keys = total_keys - len(failed_keys)
status_text = (
f"📊 **Статус API ключей Gemini:**\n\n"
f"Всего ключей: {total_keys}\n"
f"Активных: {active_keys}\n"
f"Заблокированных: {len(failed_keys)}\n"
f"Текущий ключ: #{current_key_index + 1}\n\n"
)
for i, key in enumerate(GEMINI_API_KEYS):
if key in failed_keys:
status_text += f"Ключ #{i + 1}: ❌ Заблокирован\n"
elif i == current_key_index:
status_text += f"Ключ #{i + 1}: ✅ Активен (текущий)\n"
else:
status_text += f"Ключ #{i + 1}: ✅ Активен\n"
await message.answer(status_text, parse_mode="Markdown")
@dp.message(Command("reset_keys"))
async def cmd_reset_keys(message: Message):
ADMIN_IDS = [6955805799]
if message.from_user.id not in ADMIN_IDS:
await message.answer("У вас нет доступа к этой команде.")
return
global failed_keys
failed_keys.clear()
await message.answer(
f"✅ Список заблокированных ключей очищен.\n"
f"Все {len(GEMINI_API_KEYS)} ключей снова доступны для использования."
)
@dp.message(Command("start"))
async def cmd_start(message: Message, state: FSMContext):
if is_group_chat(message):
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(text=f"{EMOJI['meme']} Мем", callback_data="choose_meme")],
[InlineKeyboardButton(text=f"{EMOJI['standup']} Стендап", callback_data="choose_standup")]
]
)
welcome_text = (
f"{EMOJI['start']} {hbold('Привет! Я генератор мемов и стендапов!')} \n\n"
f"Работаю в групповых чатах! Нажмите на кнопку ниже, чтобы выбрать тип контента, "
f"а затем отправьте изображение в ответ на это сообщение.\n\n"
f"{EMOJI['meme']} {hbold('Мем')} - классический формат с текстом внизу картинки\n"
f"{EMOJI['standup']} {hbold('Стендап')} - забавный комментарий к вашему изображению\n\n"
f"Что хотите создать, {message.from_user.first_name}?"
)
else:
keyboard = ReplyKeyboardMarkup(
keyboard=[
[KeyboardButton(text=f"{EMOJI['meme']} Мем")],
[KeyboardButton(text=f"{EMOJI['standup']} Стендап")]
],
resize_keyboard=True
)
welcome_text = (
f"{EMOJI['start']} {hbold('Добро пожаловать в генератор мемов и стендапов!')} \n\n"
f"Я использую нейросети для создания забавного контента на основе ваших изображений.\n\n"
f"{EMOJI['meme']} {hbold('Мем')} - классический формат с текстом внизу картинки\n"
f"{EMOJI['standup']} {hbold('Стендап')} - забавный комментарий к вашему изображению\n\n"
f"Что вы хотите создать?"
)
await message.answer(
welcome_text,
reply_markup=keyboard,
parse_mode="HTML"
)
await state.set_state(MemeStates.awaiting_choice)
@dp.callback_query(F.data.in_(["choose_meme", "choose_standup"]))
async def process_inline_choice(callback_query, state: FSMContext):
content_type = "Мем" if callback_query.data == "choose_meme" else "Стендап"
emoji_type = EMOJI['meme'] if content_type == "Мем" else EMOJI['standup']
if is_group_chat(callback_query.message):
user_key = f"group_{callback_query.message.chat.id}_user_{callback_query.from_user.id}"
else:
user_key = str(callback_query.from_user.id)
user_data[user_key] = {
"content_type": content_type,
"message_id": callback_query.message.message_id,
"chat_id": callback_query.message.chat.id,
"timestamp": time.time(),
"bot_id": bot_user_id
}
await callback_query.answer(f"Выбран тип: {content_type}")
await callback_query.message.edit_text(
f"{emoji_type} {hbold(f'{callback_query.from_user.first_name} выбрал: {content_type}')}!\n\n"
f"{EMOJI['photo']} Теперь отправьте изображение в ответ на это сообщение "
f"(размером от {hcode(f'{MIN_IMAGE_SIZE}x{MIN_IMAGE_SIZE}')} пикселей).",
parse_mode="HTML"
)
await state.clear()
@dp.message(StateFilter(MemeStates.awaiting_choice))
async def process_content_choice(message: Message, state: FSMContext):
if is_group_chat(message):
return
choice = message.text
if f"{EMOJI['meme']} Мем" in choice:
content_type = "Мем"
emoji_type = EMOJI['meme']
elif f"{EMOJI['standup']} Стендап" in choice:
content_type = "Стендап"
emoji_type = EMOJI['standup']
else:
await message.answer(f"{EMOJI['error']} Пожалуйста, выберите один из предложенных вариантов.")
return
user_key = get_user_key(message)
user_data[user_key] = {"content_type": content_type}
await message.answer(
f"{emoji_type} {hbold(f'Вы выбрали: {content_type}')}!\n\n"
f"{EMOJI['photo']} Теперь загрузите изображение размером от "
f"{hcode(f'{MIN_IMAGE_SIZE}x{MIN_IMAGE_SIZE}')} пикселей.",
parse_mode="HTML"
)
await state.clear()
@dp.message(F.photo)
async def process_photo(message: Message):
try:
user_id = message.from_user.id
user_key = get_user_key(message)
if is_group_chat(message):
if user_key not in user_data:
logger.debug(f"Игнорируем фото от {user_id} в группе - нет активной сессии")
return
if not message.reply_to_message:
logger.debug(f"Игнорируем фото от {user_id} в группе - не является ответом")
return
if message.reply_to_message.from_user.id != bot.id:
logger.debug(f"Игнорируем фото от {user_id} в группе - ответ не на сообщение бота")
return
expected_message_id = user_data[user_key].get("message_id")
expected_chat_id = user_data[user_key].get("chat_id")
if not expected_message_id or not expected_chat_id:
logger.warning(f"Нет expected_message_id или expected_chat_id для {user_key}")
return
if (message.reply_to_message.message_id != expected_message_id or
message.chat.id != expected_chat_id):
await message.answer(
f"{EMOJI['warning']} {message.from_user.first_name}, пожалуйста, отправьте изображение "
f"в ответ на ВАШЕ сообщение с выбором типа контента!",
parse_mode="HTML"
)
return
else:
if user_key not in user_data:
error_text = f"{EMOJI['error']} Сначала выберите тип контента, используя команду /start"
await message.answer(error_text, parse_mode="HTML")
return
logger.info(f"Обрабатываем фото от пользователя {user_id}")
position, already_in_queue = await add_to_queue(user_id, message)
if already_in_queue:
await message.answer(
f"⏳ Вы уже в очереди на позиции {position}. Пожалуйста, подождите.",
reply_to_message_id=message.message_id
)
logger.info(f"Пользователь {user_id} уже в очереди на позиции {position}")
else:
if position == 1:
queue_msg = await message.answer(
f"🚀 Вы первый в очереди! Начинаю обработку...",
reply_to_message_id=message.message_id
)
else:
queue_msg = await message.answer(
f"⏳ Вы добавлены в очередь. Ваша позиция: {position}. Ожидайте...",
reply_to_message_id=message.message_id
)
logger.info(f"Пользователь {user_id} добавлен в очередь на позицию {position}")
await asyncio.sleep(5)
try:
await queue_msg.delete()
except:
pass
except Exception as e:
logger.error(f"Ошибка при добавлении в очередь: {e}", exc_info=True)
await message.answer(
f"{EMOJI['error']} Произошла ошибка. Попробуйте позже.",
parse_mode="HTML"
)
@dp.message(Command("queue"))
async def cmd_queue(message: Message):
async with queue_lock:
if not processing_queue:
await message.answer("Очередь пуста.")
return
queue_text = "📋 Текущая очередь:\n\n"
for i, item in enumerate(processing_queue, 1):
user = item['message'].from_user
queue_text += f"{i}. {user.first_name} (ID: {user.id})\n"
queue_text += f"\n🔄 Обработка: {'Да' if is_processing else 'Нет'}"
await message.answer(queue_text)
async def process_photo_internal(message: Message):
photo_path = None
output_path = None
messages_to_delete = []
try:
logger.info(f"Начало внутренней обработки фото от пользователя {message.from_user.id}")
user_key = get_user_key(message)
if user_key not in user_data:
logger.error(f"Данные пользователя {user_key} не найдены!")
await message.answer(
f"{EMOJI['error']} Данные сессии устарели. Пожалуйста, начните заново с /start",
reply_to_message_id=message.message_id
)
return
content_type = user_data[user_key]["content_type"]
emoji_type = EMOJI['meme'] if content_type == "Мем" else EMOJI['standup']
logger.info(f"Тип контента: {content_type}")
if is_group_chat(message):
if not message.reply_to_message:
logger.warning("Нет reply_to_message в групповом чате")
await message.answer(
f"{EMOJI['warning']} {message.from_user.first_name}, пожалуйста, отправьте изображение "
f"в ответ на сообщение с выбором типа контента!",
parse_mode="HTML"
)
return
if message.reply_to_message.from_user.id != bot_user_id:
logger.debug(f"Игнорируем фото от {message.from_user.id} в группе - ответ не на сообщение бота")
return
expected_message_id = user_data[user_key].get("message_id")
expected_chat_id = user_data[user_key].get("chat_id")
if (message.reply_to_message.message_id != expected_message_id or
message.chat.id != expected_chat_id):
await message.answer(
f"{EMOJI['warning']} {message.from_user.first_name}, пожалуйста, отправьте изображение "
f"в ответ на ВАШЕ сообщение с выбором типа контента!",
parse_mode="HTML"
)
return
username = message.from_user.first_name if is_group_chat(message) else "ваше"
initial_text = f"Обрабатываю изображение {username} для создания {content_type.lower()}а"
initial_progress = ProgressMessageUpdater(
message,
initial_text,
EMOJI['processing']
)
await initial_progress.start()
messages_to_delete.append(initial_progress.sent_message)
try:
photo_id = message.photo[-1].file_id
photo_path = TEMP_DIR / f"{photo_id}.jpg"
logger.info(f"Загружаю изображение в {photo_path}")
await bot.download(message.photo[-1], destination=photo_path)
logger.info(f"Изображение успешно загружено")
processing_time = await initial_progress.stop()
await initial_progress.sent_message.edit_text(
f"{EMOJI['success']} Изображение успешно загружено за {processing_time}с!"
)
with Image.open(photo_path) as check_image:
width, height = check_image.size
logger.info(f"Размер изображения: {width}x{height}")
if width < MIN_IMAGE_SIZE or height < MIN_IMAGE_SIZE:
logger.warning(f"Изображение слишком маленькое: {width}x{height}")
await message.answer(
f"{EMOJI['warning']} {hbold('Ошибка')}: изображение должно быть не менее "
f"{hcode(f'{MIN_IMAGE_SIZE}x{MIN_IMAGE_SIZE}')} пикселей.",
parse_mode="HTML"
)
for msg in messages_to_delete:
try:
await msg.delete()
except:
pass
return
gemini_msg = await message.answer(
f"🤖 Анализирую изображение и создаю {content_type.lower()} с помощью Gemini 2.5 Flash..."
)
messages_to_delete.append(gemini_msg)
try:
logger.info(f"Начинаю анализ и генерацию {content_type} через Gemini")
generated_text = await analyze_and_generate_with_gemini(photo_path, content_type)
logger.info(f"Генерация завершена успешно. Текст: {generated_text[:100]}...")
await gemini_msg.edit_text(
f"{EMOJI['success']} {content_type} успешно создан!"
)
except Exception as e:
logger.error(f"Ошибка при генерации через Gemini: {e}", exc_info=True)
safe_error_message = str(e)
await gemini_msg.edit_text(f"{EMOJI['error']} {safe_error_message}")
await asyncio.sleep(2)
for msg in messages_to_delete:
try:
await msg.delete()
except:
pass
return
if content_type == "Мем":
create_msg = await message.answer(
f"{EMOJI['create']} Создаю финальное изображение..."
)
messages_to_delete.append(create_msg)
try:
logger.info("Создаю финальное изображение")
output_path = await create_final_image(photo_path, generated_text, content_type, photo_id)
logger.info(f"Изображение создано: {output_path}")
await create_msg.edit_text(
f"{EMOJI['success']} Изображение готово!"
)
except Exception as e:
logger.error(f"Ошибка при создании изображения: {e}", exc_info=True)
await create_msg.edit_text(
f"{EMOJI['error']} Не удалось создать изображение."
)
await asyncio.sleep(2)
for msg in messages_to_delete:
try:
await msg.delete()
except:
pass
return
else:
output_path = photo_path
logger.info("Отправляю результат пользователю")
await send_result(message, output_path, generated_text, emoji_type)
await asyncio.sleep(1)
for msg in messages_to_delete:
try:
await msg.delete()
logger.debug(f"Удалено промежуточное сообщение")
except Exception as e:
logger.warning(f"Не удалось удалить сообщение: {e}")
if user_key in user_data:
del user_data[user_key]
logger.info("Данные пользователя очищены")
clean_msg = f"{EMOJI['success']} Готово!"
if not is_group_chat(message):
clean_msg += " Для создания нового контента используйте /start"
final_msg = await message.answer(clean_msg)
await asyncio.sleep(5)
try:
await final_msg.delete()
logger.debug("Финальное сообщение удалено")
except:
pass
logger.info(f"Обработка завершена успешно для пользователя {message.from_user.id}")
except Exception as e:
logger.error(f"Ошибка в процессе обработки: {e}", exc_info=True)
for msg in messages_to_delete:
try:
await msg.delete()
except:
pass
error_msg = await message.answer(
f"{EMOJI['error']} Произошла ошибка при обработке изображения.",
parse_mode="HTML"
)
await asyncio.sleep(5)
try:
await error_msg.delete()
except:
pass
except Exception as critical_error:
logger.error(f"Критическая ошибка при обработке фото: {critical_error}", exc_info=True)
try:
error_msg = await message.answer(
f"{EMOJI['error']} Произошла критическая ошибка. Попробуйте позже."
)
await asyncio.sleep(5)
try:
await error_msg.delete()
except:
pass
except:
pass
finally:
logger.info("Начинаю очистку временных файлов")
gc.collect()
if photo_path:
deleted = await safe_delete_file(photo_path)
if deleted:
logger.info(f"Временный файл {photo_path} удален")
else:
logger.warning(f"Не удалось удалить {photo_path}")
if output_path and output_path != photo_path:
deleted = await safe_delete_file(output_path)
if deleted:
logger.info(f"Выходной файл {output_path} удален")
else:
logger.warning(f"Не удалось удалить {output_path}")
logger.info(f"Завершена обработка для пользователя {message.from_user.id}")
async def create_final_image(image_path, text, content_type, photo_id):
output_path = OUTPUT_DIR / f"{photo_id}_output.jpg"
try:
with Image.open(image_path) as original_image:
working_image = original_image.copy()
if content_type == "Мем":
words = text.split()
half_point = len(words) // 2
if len(words) > 1:
first_line = " ".join(words[:half_point])
second_line = " ".join(words[half_point:])
formatted_text = f"{first_line}\n{second_line}" if second_line else first_line
else:
formatted_text = text
draw = ImageDraw.Draw(working_image)
initial_font_size = int(min(working_image.width, working_image.height) * 0.08)
max_width = working_image.width * 0.9
max_height = working_image.height * 0.2
font_size = initial_font_size
font = None
font_path = None
for potential_font_path in possible_fonts:
try:
temp_font = ImageFont.truetype(str(potential_font_path), size=font_size)
font = temp_font
font_path = str(potential_font_path)
logger.info(f"Шрифт '{potential_font_path}' успешно загружен.")
break
except IOError:
logger.warning(f"Не удалось загрузить шрифт '{potential_font_path}'.")
continue
except Exception as e:
logger.error(f"Ошибка при загрузке шрифта '{potential_font_path}': {e}")
continue
if font_path is None:
logger.warning("Не удалось загрузить масштабируемый шрифт, используем дефолтный.")
font = ImageFont.load_default()
else:
while font_size > 10:
try:
current_font = ImageFont.truetype(font_path, size=font_size)
bbox = draw.textbbox((0, 0), formatted_text, font=current_font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
num_lines = formatted_text.count('\n') + 1
if text_width <= max_width and text_height <= max_height and num_lines <= 3:
font = current_font
break
else:
font_size -= 2
except Exception as e:
logger.warning(f"Ошибка при подборе размера шрифта {font_size}: {e}")
font_size -= 2
font = ImageFont.load_default()
break
if hasattr(font, 'getbbox'):
bbox = draw.textbbox((0, 0), formatted_text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
else:
text_width = len(max(formatted_text.split('\n'), key=len)) * font_size * 0.6
text_height = len(formatted_text.split('\n')) * font_size * 1.2
x = (working_image.width - text_width) // 2
y = working_image.height - text_height - int(working_image.height * 0.05)
min_y = working_image.height - max_height
y = max(y, min_y)
for adj_x in [-2, -1, 0, 1, 2]:
for adj_y in [-2, -1, 0, 1, 2]:
draw.text(
(x + adj_x, y + adj_y),
formatted_text,
font=font,
fill='black',
align="center"
)
draw.text(
(x, y),
formatted_text,
font=font,
fill='white',
align="center"
)
working_image.save(output_path, quality=95)
working_image.close()
del working_image
return output_path
except Exception as e:
logger.error(f"Ошибка при создании изображения: {e}")
if output_path.exists():
try:
os.remove(output_path)
except:
pass
raise Exception("Не удалось создать изображение")
async def safe_delete_file(file_path, max_attempts=5):
if not file_path or not file_path.exists():
return True
for attempt in range(max_attempts):
try:
gc.collect()
await asyncio.sleep(0.1 * (attempt + 1))
os.remove(file_path)
logger.info(f"Файл {file_path} успешно удален с попытки {attempt + 1}")
return True
except Exception as e:
if attempt < max_attempts - 1:
logger.warning(f"Попытка {attempt + 1} удалить {file_path} не удалась: {e}")
else:
logger.error(f"Не удалось удалить {file_path} после {max_attempts} попыток: {e}")
return False
return False
async def send_result(message: Message, image_path: str, text: str, emoji_type: str):
try:
image = FSInputFile(image_path)
if is_group_chat(message):
if emoji_type == EMOJI['meme']:
caption = f"{emoji_type} Мем создан для {message.from_user.first_name}!"
await message.answer_photo(image, caption=caption)
else:
caption = f"{emoji_type} Стендап для {message.from_user.first_name}:"
await message.answer_photo(image, caption=caption)
standup_message = f"🎭 {text}"
await message.answer(standup_message, parse_mode="HTML")
final_message = f"✅ Готово, {message.from_user.first_name}! Чтобы создать еще один мем или стендап, используйте /start"
else:
if emoji_type == EMOJI['meme']:
caption = f"{emoji_type} Ваш мем готов!"
await message.answer_photo(image, caption=caption)
else:
caption = f"{emoji_type} Ваш стендап готов:"
await message.answer_photo(image, caption=caption)
standup_message = f"🎭 {text}"
await message.answer(standup_message, parse_mode="HTML")
final_message = "✅ Готово! Если хотите создать еще один мем или стендап, используйте команду /start"
await message.answer(final_message)
try:
os.remove(image_path)
except:
pass
except Exception as e:
logger.error(f"Ошибка при отправке результата: {e}")
await message.answer("❌ Произошла ошибка при отправке результата")
async def add_to_queue(user_id: int, message: Message):
async with queue_lock:
for item in processing_queue:
if item['user_id'] == user_id:
position = list(processing_queue).index(item) + 1
return position, True
processing_queue.append({
'user_id': user_id,
'message': message,
'timestamp': time.time()
})
position = len(processing_queue)
return position, False
async def remove_from_queue(user_id: int):
async with queue_lock:
for item in list(processing_queue):
if item['user_id'] == user_id:
processing_queue.remove(item)
break
async def get_queue_position(user_id: int):
async with queue_lock:
for i, item in enumerate(processing_queue):
if item['user_id'] == user_id:
return i + 1
return 0
async def process_queue():
global is_processing
logger.info("Запущен обработчик очереди")
while True:
try:
async with queue_lock:
if not processing_queue:
is_processing = False
await asyncio.sleep(0.5)
continue
current_item = processing_queue[0]
is_processing = True
logger.info(f"Обработка пользователя {current_item['user_id']} из очереди")
try:
async with queue_lock:
for i, item in enumerate(list(processing_queue)[1:], 1):
try:
await bot.send_message(
item['message'].chat.id,
f"⏳ Вы в очереди: {i} из {len(processing_queue)-1}. Ожидайте...",
reply_to_message_id=item['message'].message_id
)
except Exception as e:
logger.warning(f"Не удалось отправить уведомление о позиции: {e}")
await process_photo_internal(current_item['message'])
except Exception as e:
logger.error(f"Ошибка при обработке из очереди: {e}", exc_info=True)
try:
await current_item['message'].answer(
f"{EMOJI['error']} Произошла ошибка при обработке. Попробуйте снова.",
reply_to_message_id=current_item['message'].message_id
)
except:
pass
finally:
async with queue_lock:
if processing_queue and processing_queue[0]['user_id'] == current_item['user_id']:
processing_queue.popleft()
logger.info(f"Пользователь {current_item['user_id']} удален из очереди")
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Критическая ошибка в обработчике очереди: {e}", exc_info=True)
await asyncio.sleep(1)
async def cleanup_old_user_data():
while True:
try:
current_time = time.time()
to_delete = []
for key, data in user_data.items():
if "timestamp" in data and current_time - data["timestamp"] > 3600:
to_delete.append(key)
for key in to_delete:
del user_data[key]
if to_delete:
logger.info(f"Очищено {len(to_delete)} устаревших записей пользователей")
await asyncio.sleep(300)
except Exception as e:
logger.error(f"Ошибка при очистке данных: {e}")
await asyncio.sleep(60)
async def main():
global queue_task, bot, bot_user_id
queue_task = None
cleanup_task = None
try:
logger.info("Запуск бота...")
bot_info = await bot.get_me()
bot_user_id = bot_info.id
logger.info(f"Бот запущен: @{bot_info.username} (ID: {bot_user_id})")
try:
await initialize_gemini_model()
except Exception as e:
logger.error(f"Ошибка инициализации Gemini: {e}")
logger.warning("Бот будет работать без анализа изображений")
queue_task = asyncio.create_task(process_queue())
cleanup_task = asyncio.create_task(cleanup_old_user_data())
logger.info("Фоновые задачи запущены")
await dp.start_polling(bot, drop_pending_updates=True)
except Exception as e:
logger.error(f"Ошибка при запуске бота: {e}", exc_info=True)
raise
finally:
if queue_task and not queue_task.done():
queue_task.cancel()
try:
await queue_task
except asyncio.CancelledError:
pass
if cleanup_task and not cleanup_task.done():
cleanup_task.cancel()
try:
await cleanup_task
except asyncio.CancelledError:
pass
await bot.session.close()
if __name__ == "__main__":
try:
print("Бот запущен.")
asyncio.run(main())
except KeyboardInterrupt:
print("Бот остановлен.")
except Exception as e:
print(f"Произошла ошибка при запуске бота: {e}")
Команды бота:
/start@vash_bot/api_status@vash_bot - для работы этой команды не забудьте вписать свой админский ID в переменную списка ADMIN_IDS = [6955805799]Внимание! Бот работает как в групповых чатах, так и в ЛС.
Пример работы:
Бота потестить можете тут (мне не жалко):
P.S. Полный исходник со структурой проекта прикрепил в zip.
P.S.S. Рекомендую использовать OVPN для подключения бота (Не Российский) - Все известные маски айпишников VDS/VPS забанены гуглом.
Вложения
Последнее редактирование: