This commit is contained in:
2026-05-08 21:01:38 +03:00
parent b4f889def2
commit 834fb0eda9
+540 -141
View File
@@ -6,7 +6,7 @@ import logging
import base64 import base64
import re import re
from datetime import datetime from datetime import datetime
from telegram import Update from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ( from telegram.ext import (
Application, Application,
CommandHandler, CommandHandler,
@@ -14,7 +14,8 @@ from telegram.ext import (
ConversationHandler, ConversationHandler,
filters, filters,
ContextTypes, ContextTypes,
ApplicationBuilder ApplicationBuilder,
CallbackQueryHandler
) )
from config import TELEGRAM_BOT_TOKEN, PIApi_API_KEY, PIApi_BASE_URL, PROXY_URL, PROXY_TYPE from config import TELEGRAM_BOT_TOKEN, PIApi_API_KEY, PIApi_BASE_URL, PROXY_URL, PROXY_TYPE
@@ -29,7 +30,7 @@ logging.basicConfig(
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Состояния # Состояния
AWAITING_PROMPT = 1 AWAITING_CUSTOM_PROMPT = 1
# Директории # Директории
UPLOAD_DIR = "uploads" UPLOAD_DIR = "uploads"
@@ -37,6 +38,96 @@ PROCESSED_DIR = "processed"
os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True) os.makedirs(PROCESSED_DIR, exist_ok=True)
# Доступные стили для 9 мая
VICTORY_STYLES = {
"vintage_photo": {
"name": "Винтажное фото",
"icon": "📷",
"prompt": """CRITICAL: Keep the person's face, facial features, and identity EXACTLY as in the original image. Do not change or replace the face.
Convert this photo to vintage 1945 style black and white photo with grain texture, aged paper effect, subtle sepia tone, and soft vignette. Add authentic WWII era photo characteristics: slight blur, classic film grain, and historical atmosphere reminiscent of May 9, 1945 Victory Day celebration. The face must be preserved.""",
"description": "Эффект старой фотографии 1945 года"
},
"soviet_poster": {
"name": "Советский плакат",
"icon": "🎨",
"prompt": """CRITICAL: Keep the person's face, facial features, and identity EXACTLY as in the original image. Do not change or replace the face.
Transform this photo into a Soviet propaganda poster style from May 9 Victory Day. Use bold red and gold colors, dramatic socialist realism art style, heroic composition with triumphant pose. Add stylized fireworks in background, Order of Victory ribbon elements, and text '9 МАЯ' in classic Soviet typography. The face must be preserved.""",
"description": "Стиль советского агитационного плаката"
},
"military_uniform": {
"name": "Военная форма",
"icon": "🎖️",
"prompt": """CRITICAL: Keep the person's face, facial features, eyes, nose, mouth, and hairstyle EXACTLY as in the original image. Do NOT change or replace the face.
Transform this person into wearing authentic Soviet WWII military uniform from 1945 Victory Day. Add officer's uniform with medals: Order of the Red Star, Order of the Patriotic War, Victory medal. Include garrison cap with cockade, shoulder boards with lieutenant/epaulette ranks. Background should show Moscow or Berlin 1945 victory celebration with fireworks and waving flags. The face must remain identical.""",
"description": "Примерка военной формы с наградами"
},
"george_ribbon": {
"name": "Георгиевская лента",
"icon": "🎗️",
"prompt": """CRITICAL: Keep the person's face, facial features, and identity EXACTLY as in the original image. Do not change or replace the face.
Transform this photo into a composition with the iconic St. George ribbon (Georgievskaya lenta). The person should be decorated with a flowing orange and black ribbon across their chest. Add traditional Russian birch trees, a field of red carnations, and a dramatic sunset sky. Style reminiscent of classic Soviet war photography with modern patriotic elements. The face must be preserved and recognizable.""",
"description": "Портрет с Георгиевской лентой"
},
"victory_parade": {
"name": "Парад Победы",
"icon": "🎆",
"prompt": """CRITICAL: Keep the person's face, all facial features, and identity EXACTLY as in the original image. Do not alter the face.
Place this person in the middle of Red Square during Victory Parade. Add military parade background with marching soldiers, T-34 tanks, and Russian military vehicles. Include tribunes with veterans wearing medals, fireworks in sky, St. George's ribbons, and huge '9 МАЯ' banners. The person should be watching the parade or saluting. Their original face must be perfectly preserved.""",
"description": "На фоне Парада Победы"
},
"front_letter": {
"name": "Письмо с фронта",
"icon": "✉️",
"prompt": """CRITICAL: Keep the person's face and facial features EXACTLY as in the original image. Do not change or replace the face.
Transform this photo into a stylized old war letter from WWII. The image should appear like a vintage triangular folded letter with sepia tones, handwritten Cyrillic text visible, coffee stain effects, and a black and white photo attached to the letter. Add elements like field post stamps, dried flower from battlefield, and patriotic phrases like 'С Победой!'. The person's face must match the original exactly.""",
"description": "Стилизация под письмо с фронта"
},
"custom": {
"name": "Свой вариант",
"icon": "",
"prompt": "",
"description": "Напишите свой промпт для обработки"
}
}
async def get_telegram_file_url(file_id: str) -> str:
"""Получение прямого URL файла из Telegram"""
get_file_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getFile?file_id={file_id}"
from aiohttp_socks import ProxyConnector
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
password = match.group(2)
host = match.group(3)
port = int(match.group(4))
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
else:
from urllib.parse import urlparse
parsed = urlparse(PROXY_URL)
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(get_file_url) as response:
if response.status != 200:
raise Exception(f"Не удалось получить информацию о файле: {response.status}")
data = await response.json()
if not data.get('ok'):
raise Exception(f"Ошибка API Telegram: {data}")
file_path = data['result']['file_path']
file_url = f"https://api.telegram.org/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}"
return file_url
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик /start""" """Обработчик /start"""
@@ -50,87 +141,205 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
last_name=user.last_name last_name=user.last_name
) )
context.user_data.clear()
keyboard = [
[InlineKeyboardButton("🎖️ Обработать фото к 9 мая", callback_data="process_victory")],
[InlineKeyboardButton("📸 Обычная обработка фото", callback_data="process_normal")],
[InlineKeyboardButton("📊 История обработок", callback_data="history")],
[InlineKeyboardButton("❓ Помощь", callback_data="help")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text( await update.message.reply_text(
f"👋 Привет, {user.first_name}!\n\n" f"👋 Привет, {user.first_name}! 🇷🇺\n\n"
f"Я бот для обработки фотографий через нейросеть.\n\n" f"✨ *Добро пожаловать в Фотохудожник Победы!* ✨\n\n"
f"📸 Отправь мне фото и напиши, как его изменить!\n\n" f"Я превращаю твои фотографии в уникальные артефакты военной эпохи.\n\n"
f"Команды:\n" f"🎖️ *Доступные стили:*\n"
f"/start - Приветствие\n" f"📷 Винтажное фото 1945 года\n"
f"/test - Проверка работы\n" f"🎨 Советский агитационный плакат\n"
f"/history - История обработок" f"🎖️ Военная форма с наградами\n"
f"🎗️ Георгиевская лента\n"
f"🎆 Парад Победы\n"
f"✉️ Письмо с фронта\n\n"
f"🖼️ *Формат результата:* 9:16 (вертикальный)\n\n"
f"📸 *Выбери действие ниже:*",
parse_mode='Markdown',
reply_markup=reply_markup
) )
async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE): async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Тестовая команда""" """Обработка нажатий на кнопки"""
logger.info(f"Test от {update.effective_user.id}") query = update.callback_query
await update.message.reply_text("✅ Бот работает через SOCKS5 прокси!") await query.answer()
data = query.data
logger.info(f"Нажата кнопка: {data}")
if data == "process_victory":
context.user_data['processing_mode'] = 'victory'
await query.message.reply_text(
"🎖️ *Обработка фото к 9 мая*\n\n"
"📸 Отправь мне фото, и я предложу выбрать стиль обработки!\n\n"
"*Доступные стили:*\n"
"📷 Винтажное фото 1945 года\n"
"🎨 Советский агитационный плакат\n"
"🎖️ Военная форма с наградами\n"
"🎗️ Георгиевская лента\n"
"🎆 Парад Победы\n"
"✉️ Письмо с фронта\n\n"
"🖼️ *Формат результата:* 9:16 (вертикальный)\n\n"
"📤 *Отправь фото:*",
parse_mode='Markdown'
)
elif data == "process_normal":
context.user_data['processing_mode'] = 'normal'
await query.message.reply_text(
"📸 *Обычная обработка фото*\n\n"
"Отправь мне фото, а затем напиши текстовое описание того, "
"как ты хочешь изменить изображение.\n\n"
"Например: 'сделай фон пляжем' или 'преврати в рисунок'\n\n"
"📤 *Отправь фото:*",
parse_mode='Markdown'
)
elif data == "history":
await history_command(update, context)
elif data == "help":
await help_command(update, context)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): async def show_style_keyboard(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Помощь""" """Показать клавиатуру выбора стиля для 9 мая с иконками"""
keyboard = []
for style_id, style_info in VICTORY_STYLES.items():
button_text = f"{style_info['icon']} {style_info['name']}"
keyboard.append([InlineKeyboardButton(button_text, callback_data=f"style_{style_id}")])
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text( await update.message.reply_text(
"📖 Инструкция:\n\n" "🎖️ *Выбери стиль обработки к 9 мая:*\n\n"
"1. Отправь фото\n" "• Винтажное фото - эффект старой фотографии 1945 года\n"
"2. Напиши промпт\n" "• Советский плакат - стиль агитационного плаката\n"
"3. Получи результат!\n\n" "• Военная форма - примерка формы с наградами\n"
"Команды: /start, /test, /history" "• Георгиевская лента - портрет с символом Победы\n"
"• Парад Победы - на фоне военного парада\n"
"• Письмо с фронта - стилизация под старое письмо\n"
"• Свой вариант - напиши свой промпт\n\n"
"🖼️ *Формат результата:* 9:16 (вертикальный)\n\n"
"👇 *Выбери стиль:*",
parse_mode='Markdown',
reply_markup=reply_markup
) )
async def style_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка выбора стиля"""
query = update.callback_query
await query.answer()
style_id = query.data.replace("style_", "")
logger.info(f"Выбран стиль: {style_id}")
photo_url = context.user_data.get('photo_url')
photo_path = context.user_data.get('photo_path')
image_id = context.user_data.get('image_id')
if (not photo_url and not photo_path) or not image_id:
await query.message.reply_text("❌ Ошибка: фото не найдено. Пожалуйста, отправьте фото заново.")
return
style_info = VICTORY_STYLES.get(style_id, {})
style_icon = style_info.get('icon', '🎖️')
if style_id == "custom":
context.user_data['selected_style'] = 'custom'
context.user_data['awaiting_custom_prompt'] = True
await query.message.reply_text(
f"{style_icon} *Напиши свой промпт*\n\n"
"Опиши, как ты хочешь изменить фото.\n\n"
"💡 *Важно:* добавь в промпт 'сохрани лицо человека неизменным'\n\n"
"📝 *Напиши свой промпт:*",
parse_mode='Markdown'
)
else:
context.user_data['selected_style'] = style_id
context.user_data['custom_prompt'] = style_info["prompt"]
await query.message.reply_text(
f"{style_icon} Выбран стиль: *{style_info['name']}*\n\n"
f"📖 {style_info['description']}\n\n"
f"🎨 Начинаю обработку... Пожалуйста, подожди 15-30 секунд.",
parse_mode='Markdown'
)
await process_image_with_style(query.message, context, style_id)
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка фото""" """Обработка фото"""
user = update.effective_user user = update.effective_user
logger.info(f"Фото от {user.id}") logger.info(f"Фото от {user.id}")
processing_mode = context.user_data.get('processing_mode', 'normal')
photo_file = await update.message.photo[-1].get_file() photo_file = await update.message.photo[-1].get_file()
file_id = photo_file.file_id file_id = photo_file.file_id
# Сохраняем фото
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
local_path = os.path.join(UPLOAD_DIR, f"user_{user.id}_{timestamp}.jpg") local_path = os.path.join(UPLOAD_DIR, f"user_{user.id}_{timestamp}.jpg")
await photo_file.download_to_drive(local_path) await photo_file.download_to_drive(local_path)
# Запись в БД try:
image_id = await save_image_record( telegram_url = await get_telegram_file_url(file_id)
user_id=user.id, logger.info(f"Получен прямой URL из Telegram")
original_file_id=file_id, except Exception as e:
original_url=local_path logger.error(f"Не удалось получить URL из Telegram: {e}")
) telegram_url = None
try:
image_id = await save_image_record(
user_id=user.id,
original_file_id=file_id,
original_url=local_path
)
logger.info(f"Создана запись в БД: image_id={image_id}")
except Exception as e:
logger.error(f"Ошибка при сохранении в БД: {e}")
await update.message.reply_text("❌ Ошибка при сохранении фото. Попробуйте еще раз.")
return
context.user_data['photo_path'] = local_path
context.user_data['photo_file_id'] = file_id
context.user_data['photo_url'] = telegram_url
context.user_data['image_id'] = image_id context.user_data['image_id'] = image_id
context.user_data['image_path'] = local_path context.user_data['processing_mode'] = processing_mode
await update.message.reply_text( if processing_mode == 'victory':
"📸 Фото получено!\n\n" await show_style_keyboard(update, context)
"Напиши, как изменить это фото.\n" else:
"Например: 'сделай фон пляжем' или 'преврати в рисунок'\n\n" await update.message.reply_text(
"/cancel - отмена" "📸 Фото получено!\n\n"
) "Напиши, как изменить это фото.\n"
"Например: 'сделай фон пляжем' или 'преврати в рисунок'\n\n"
return AWAITING_PROMPT "/cancel - отмена"
)
return AWAITING_CUSTOM_PROMPT
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE): async def process_with_nano_banana(image_url: str, prompt: str) -> str:
"""Отмена""" """Обработка изображения через Nano Banana Pro"""
logger.info(f"Cancel от {update.effective_user.id}")
if 'image_path' in context.user_data:
path = context.user_data['image_path']
if os.path.exists(path):
os.remove(path)
await update.message.reply_text("Операция отменена.")
context.user_data.clear()
return ConversationHandler.END
async def call_piapi_api(image_path: str, prompt: str) -> str:
"""Вызов PiAPI через SOCKS5 прокси"""
if not PIApi_API_KEY: if not PIApi_API_KEY:
raise Exception("PIApi_API_KEY не настроен") raise Exception("PIApi_API_KEY не настроен")
with open(image_path, "rb") as f: if "face" not in prompt.lower() and "сохрани" not in prompt.lower():
image_base64 = base64.b64encode(f.read()).decode('utf-8') prompt = f"""CRITICAL: Keep the person's face exactly as in the original image. Do not change or replace the face.
{prompt}"""
headers = { headers = {
"x-api-key": PIApi_API_KEY, "x-api-key": PIApi_API_KEY,
@@ -138,21 +347,20 @@ async def call_piapi_api(image_path: str, prompt: str) -> str:
} }
payload = { payload = {
"model": "black-forest-labs/FLUX.1-dev", "model": "gemini",
"task_type": "image-to-image", "task_type": "nano-banana-pro",
"input": { "input": {
"image": f"data:image/jpeg;base64,{image_base64}", "image_urls": [image_url],
"prompt": prompt, "prompt": prompt,
"num_inference_steps": 28, "output_format": "png",
"guidance_scale": 7.5, "aspect_ratio": "9:16",
"strength": 0.8 "strength": 0.65,
"safety_level": "medium"
} }
} }
# Используем aiohttp-socks для запросов
from aiohttp_socks import ProxyConnector from aiohttp_socks import ProxyConnector
# Парсим прокси
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL) match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match: if match:
username = match.group(1) username = match.group(1)
@@ -166,90 +374,123 @@ async def call_piapi_api(image_path: str, prompt: str) -> str:
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}") connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session: async with aiohttp.ClientSession(connector=connector) as session:
async with session.post(PIApi_BASE_URL, headers=headers, json=payload, timeout=30) as response: async with session.post(PIApi_BASE_URL, headers=headers, json=payload, timeout=90) as response:
if response.status != 200: if response.status != 200:
text = await response.text() text = await response.text()
raise Exception(f"HTTP {response.status}: {text[:100]}") raise Exception(f"HTTP {response.status}: {text[:200]}")
data = await response.json() data = await response.json()
if data.get('code') != 200: if data.get('code') != 200:
raise Exception(f"API Error: {data.get('message')}") raise Exception(f"API Error: {data.get('message')}")
task_id = data['data']['task_id'] task_id = data.get('data', {}).get('task_id')
if not task_id:
raise Exception(f"Не получен task_id")
logger.info(f"Создана задача: {task_id}") logger.info(f"Создана задача: {task_id}")
return await poll_task(task_id, connector) return await poll_nano_banana_task(task_id)
async def poll_task(task_id: str, connector, max_attempts: int = 60) -> str: async def poll_nano_banana_task(task_id: str, max_attempts: int = 90) -> str:
"""Ожидание завершения задачи""" """Ожидание завершения задачи"""
get_url = f"https://api.piapi.ai/api/v1/task/{task_id}" get_url = f"https://api.piapi.ai/api/v1/task/{task_id}"
headers = {"x-api-key": PIApi_API_KEY} headers = {"x-api-key": PIApi_API_KEY}
async with aiohttp.ClientSession(connector=connector) as session: for attempt in range(max_attempts):
for attempt in range(max_attempts): try:
try: from aiohttp_socks import ProxyConnector
async with session.get(get_url, headers=headers) as response:
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
password = match.group(2)
host = match.group(3)
port = int(match.group(4))
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
else:
from urllib.parse import urlparse
parsed = urlparse(PROXY_URL)
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(get_url, headers=headers, timeout=30) as response:
if response.status != 200: if response.status != 200:
await asyncio.sleep(2) await asyncio.sleep(3)
continue continue
data = await response.json() data = await response.json()
status = data.get('data', {}).get('status') data_info = data.get('data', {}) or {}
status = data_info.get('status', 'unknown')
output = data_info.get('output')
logger.info(f"Задача {task_id}: {status} (попытка {attempt + 1})") logger.info(f"Задача {task_id}: {status} (попытка {attempt + 1})")
if status == 'completed': if output and isinstance(output, dict):
output = data.get('data', {}).get('output', {}) if output.get('image_urls') and len(output['image_urls']) > 0:
result_url = output.get('url') or output.get('image') result_url = output['image_urls'][0]
if result_url: if status == 'completed' and result_url:
logger.info(f"Задача {task_id} выполнена") logger.info(f"Задача выполнена")
return result_url return result_url
elif status == 'failed':
raise Exception("Задача не выполнена")
await asyncio.sleep(2) if status == 'failed':
except Exception as e: error = data_info.get('error', {})
logger.error(f"Ошибка опроса: {e}") error_msg = error.get('message', 'Unknown error') if error else 'Unknown error'
await asyncio.sleep(2) raise Exception(f"Задача не выполнена: {error_msg}")
await asyncio.sleep(3)
except Exception as e:
logger.error(f"Ошибка опроса: {e}")
await asyncio.sleep(3)
raise Exception("Превышено время ожидания") raise Exception("Превышено время ожидания")
async def process_image(update: Update, context: ContextTypes.DEFAULT_TYPE): async def process_image_with_style(message, context: ContextTypes.DEFAULT_TYPE, style_id: str):
"""Обработка через PiAPI""" """Обработка фото с выбранным стилем"""
prompt = update.message.text user = message.from_user
user = update.effective_user
logger.info(f"Промпт от {user.id}: {prompt[:50]}")
photo_url = context.user_data.get('photo_url')
photo_path = context.user_data.get('photo_path')
image_id = context.user_data.get('image_id') image_id = context.user_data.get('image_id')
image_path = context.user_data.get('image_path')
if not image_id or not image_path: style_info = VICTORY_STYLES.get(style_id, {})
await update.message.reply_text("❌ Ошибка: фото не найдено") style_icon = style_info.get('icon', '🎖️')
return ConversationHandler.END style_name = style_info.get('name', 'обработка')
status_msg = await update.message.reply_text( if style_id == "custom":
f"🎨 Обрабатываю...\n\n" custom_prompt = context.user_data.get('custom_prompt', '')
f"📝 Промпт: {prompt}\n\n" else:
f"⏳ Это займет 10-30 секунд" custom_prompt = style_info.get("prompt", "")
if not photo_url and not photo_path:
await message.reply_text("❌ Ошибка: фото не найдено")
return
if not photo_url:
with open(photo_path, "rb") as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
photo_url = f"data:image/jpeg;base64,{image_base64}"
status_msg = await message.reply_text(
f"{style_icon} Обрабатываю через Nano Banana Pro...\n\n"
f"🎖️ Стиль: *{style_name}*\n"
f"🖼️ Формат: 9:16 (вертикальный)\n\n"
f"⏳ Это займет 15-30 секунд",
parse_mode='Markdown'
) )
try: try:
async with aiosqlite.connect("bot_database.db") as db: result_url = await process_with_nano_banana(photo_url, custom_prompt)
await db.execute("UPDATE images SET prompt = ? WHERE id = ?", (prompt, image_id))
await db.commit()
result_url = await call_piapi_api(image_path, prompt)
if result_url: if result_url:
result_path = os.path.join( result_path = os.path.join(
PROCESSED_DIR, PROCESSED_DIR,
f"user_{user.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_result.jpg" f"user_{user.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_result.png"
) )
# Скачиваем результат
from aiohttp_socks import ProxyConnector from aiohttp_socks import ProxyConnector
import re
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL) match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match: if match:
@@ -270,10 +511,110 @@ async def process_image(update: Update, context: ContextTypes.DEFAULT_TYPE):
f.write(await resp.read()) f.write(await resp.read())
await status_msg.delete() await status_msg.delete()
caption = f"{style_icon} *{style_name}*\n\n✅ Готово!\n\n🇷🇺 С Днем Победы!"
with open(result_path, "rb") as photo:
await message.reply_photo(photo=photo, caption=caption, parse_mode='Markdown')
await update_image_record(
image_id=image_id,
processed_file_id=result_path,
processed_url=result_path,
status='completed'
)
logger.info(f"Обработка успешна для {user.id}")
else:
raise Exception(f"HTTP {resp.status}")
else:
raise Exception("Не получен URL результата")
except Exception as e:
logger.error(f"Ошибка обработки: {e}")
await status_msg.edit_text(f"❌ Ошибка: {str(e)}\n\nПопробуйте еще раз.")
await update_image_record(image_id=image_id, status='failed')
if photo_path and os.path.exists(photo_path):
os.remove(photo_path)
context.user_data.clear()
async def process_with_custom_prompt(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка с кастомным промптом"""
prompt = update.message.text
user = update.effective_user
if context.user_data.get('awaiting_custom_prompt'):
context.user_data['awaiting_custom_prompt'] = False
context.user_data['custom_prompt'] = prompt
await update.message.reply_text(
f"✅ Промпт сохранен\n\n🎨 Начинаю обработку..."
)
await process_image_with_style(update.message, context, 'custom')
return ConversationHandler.END
image_id = context.user_data.get('image_id')
photo_path = context.user_data.get('photo_path')
photo_url = context.user_data.get('photo_url')
if not image_id or (not photo_path and not photo_url):
await update.message.reply_text("❌ Ошибка: фото не найдено")
return ConversationHandler.END
if not photo_url:
with open(photo_path, "rb") as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
photo_url = f"data:image/jpeg;base64,{image_base64}"
status_msg = await update.message.reply_text(
f"🎨 Обрабатываю через Nano Banana Pro...\n\n"
f"📝 Промпт: {prompt[:100]}\n\n"
f"⏳ Это займет 15-30 секунд"
)
try:
async with aiosqlite.connect("bot_database.db") as db:
await db.execute("UPDATE images SET prompt = ? WHERE id = ?", (prompt, image_id))
await db.commit()
result_url = await process_with_nano_banana(photo_url, prompt)
if result_url:
result_path = os.path.join(
PROCESSED_DIR,
f"user_{user.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_result.png"
)
from aiohttp_socks import ProxyConnector
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
password = match.group(2)
host = match.group(3)
port = int(match.group(4))
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
else:
from urllib.parse import urlparse
parsed = urlparse(PROXY_URL)
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(result_url) as resp:
if resp.status == 200:
with open(result_path, "wb") as f:
f.write(await resp.read())
await status_msg.delete()
with open(result_path, "rb") as photo: with open(result_path, "rb") as photo:
await update.message.reply_photo( await update.message.reply_photo(
photo=photo, photo=photo,
caption=f"✅ Готово!\n\n{prompt}" caption=f"✅ Готово!\n\n{prompt[:200]}"
) )
await update_image_record( await update_image_record(
@@ -294,9 +635,8 @@ async def process_image(update: Update, context: ContextTypes.DEFAULT_TYPE):
await status_msg.edit_text(f"❌ Ошибка: {str(e)}\n\nПопробуйте еще раз.") await status_msg.edit_text(f"❌ Ошибка: {str(e)}\n\nПопробуйте еще раз.")
await update_image_record(image_id=image_id, status='failed') await update_image_record(image_id=image_id, status='failed')
# Очистка if photo_path and os.path.exists(photo_path):
if os.path.exists(image_path): os.remove(photo_path)
os.remove(image_path)
context.user_data.clear() context.user_data.clear()
return ConversationHandler.END return ConversationHandler.END
@@ -304,66 +644,127 @@ async def process_image(update: Update, context: ContextTypes.DEFAULT_TYPE):
async def history_command(update: Update, context: ContextTypes.DEFAULT_TYPE): async def history_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""История обработок""" """История обработок"""
user = update.effective_user if hasattr(update, 'callback_query'):
images = await get_user_images(user.id, limit=5) user_id = update.callback_query.from_user.id
message = update.callback_query.message
else:
user_id = update.effective_user.id
message = update.message
await message.reply_text("📊 Загружаю историю...")
images = await get_user_images(user_id, limit=5)
if not images: if not images:
await update.message.reply_text("📭 Нет обработанных изображений") await message.reply_text("📭 Нет обработанных изображений")
return return
text = "🖼 **История обработок:**\n\n" text = "🖼 *История обработок:*\n\n"
for i, img in enumerate(images, 1): for i, img in enumerate(images, 1):
status = "" if img['status'] == 'completed' else "" status = "" if img['status'] == 'completed' else ""
date = img['created_at'][:16].replace('T', ' ') date = img['created_at'][:16].replace('T', ' ')
prompt = img['prompt'][:35] + "..." if img['prompt'] and len(img['prompt']) > 35 else img[ prompt = img['prompt'][:35] + "..." if img['prompt'] and len(img['prompt']) > 35 else img.get('prompt',
'prompt'] or "без промпта" 'без промпта')
text += f"{status} **{i}.** {date}\n 📝 {prompt}\n\n" text += f"{status} {i}. {date}\n 📝 {prompt}\n\n"
await update.message.reply_text(text, parse_mode='Markdown') await message.reply_text(text, parse_mode='Markdown')
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Помощь"""
help_text = (
"📖 *ПОМОЩЬ И ПОДДЕРЖКА*\n\n"
"🇷🇺 *Обработка к 9 мая:*\n"
"1. Нажми 'Обработать фото к 9 мая'\n"
"2. Отправь фото\n"
"3. Выбери стиль\n"
"4. Получи результат!\n\n"
"📸 *Обычная обработка:*\n"
"1. Нажми 'Обычная обработка фото'\n"
"2. Отправь фото\n"
"3. Напиши промпт\n"
"4. Получи результат!\n\n"
"🎖️ *Стили для 9 мая:*\n"
"📷 Винтажное фото 1945 года\n"
"🎨 Советский плакат\n"
"🎖️ Военная форма\n"
"🎗️ Георгиевская лента\n"
"🎆 Парад Победы\n"
"✉️ Письмо с фронта\n\n"
"📞 *По всем вопросам:* @kirill_bouko",
parse_mode='Markdown'
)
if hasattr(update, 'callback_query'):
await update.callback_query.message.reply_text(help_text, parse_mode='Markdown')
else:
await update.message.reply_text(help_text, parse_mode='Markdown')
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Отмена"""
logger.info(f"Cancel от {update.effective_user.id}")
if 'photo_path' in context.user_data:
path = context.user_data['photo_path']
if os.path.exists(path):
os.remove(path)
await update.message.reply_text("Операция отменена.")
context.user_data.clear()
return ConversationHandler.END
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик ошибок""" """Обработчик ошибок"""
logger.error(f"Ошибка: {context.error}") logger.error(f"Ошибка: {context.error}")
if update and update.effective_message:
await update.effective_message.reply_text("😵 Произошла ошибка. Попробуйте позже.") error_message = "😵 Произошла техническая ошибка.\n\n📞 По всем вопросам: @kirill_bouko"
try:
if update and update.effective_message:
await update.effective_message.reply_text(error_message)
elif update and hasattr(update, 'callback_query') and update.callback_query:
await update.callback_query.message.reply_text(error_message)
except Exception as e:
logger.error(f"Ошибка при отправке сообщения: {e}")
async def main(): async def main():
"""Запуск бота с прокси как в примере""" """Запуск бота"""
print("\n" + "=" * 50) print("\n" + "=" * 50)
print("🤖 ЗАПУСК TELEGRAM БОТА") print("🇷🇺 БОТ ДЛЯ ОБРАБОТКИ ФОТО К 9 МАЯ 🇷🇺")
print("=" * 50)
print("Модель: Nano Banana Pro")
print("Формат: 9:16")
print("=" * 50) print("=" * 50)
# Инициализация БД
await init_db() await init_db()
# Создаем приложение с прокси (как в вашем примере) builder = ApplicationBuilder().token(TELEGRAM_BOT_TOKEN)
TOKEN = TELEGRAM_BOT_TOKEN
PROXY = PROXY_URL
builder = ApplicationBuilder().token(TOKEN) if PROXY_URL:
builder = builder.proxy(PROXY_URL).get_updates_proxy(PROXY_URL)
if PROXY: print(f"🔌 Прокси: {PROXY_URL.split('@')[-1] if '@' in PROXY_URL else PROXY_URL}")
builder = builder.proxy(PROXY).get_updates_proxy(PROXY)
print(f"🔌 Прокси: {PROXY.split('@')[-1] if '@' in PROXY else PROXY}")
else: else:
print("🔌 Прокси не используется") print("🔌 Прокси не используется")
application = builder.build() application = builder.build()
print("=" * 50)
# Регистрируем обработчики
application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("test", test_command))
application.add_handler(CommandHandler("help", help_command)) application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("history", history_command)) application.add_handler(CommandHandler("history", history_command))
application.add_handler(CommandHandler("cancel", cancel))
application.add_handler(
CallbackQueryHandler(button_callback, pattern="^(process_victory|process_normal|history|help)$"))
application.add_handler(CallbackQueryHandler(style_callback, pattern="^style_"))
conv_handler = ConversationHandler( conv_handler = ConversationHandler(
entry_points=[MessageHandler(filters.PHOTO, handle_photo)], entry_points=[MessageHandler(filters.PHOTO, handle_photo)],
states={AWAITING_PROMPT: [MessageHandler(filters.TEXT & ~filters.COMMAND, process_image)]}, states={
fallbacks=[CommandHandler("cancel", cancel), CommandHandler("start", start)], AWAITING_CUSTOM_PROMPT: [MessageHandler(filters.TEXT & ~filters.COMMAND, process_with_custom_prompt)],
},
fallbacks=[CommandHandler("cancel", cancel)],
) )
application.add_handler(conv_handler) application.add_handler(conv_handler)
application.add_error_handler(error_handler) application.add_error_handler(error_handler)
@@ -371,17 +772,15 @@ async def main():
print("\n✅ Бот запускается...") print("\n✅ Бот запускается...")
try: try:
# Запускаем бота
await application.initialize() await application.initialize()
await application.start() await application.start()
await application.updater.start_polling() await application.updater.start_polling()
print("\n" + "=" * 50) print("\n" + "=" * 50)
print("🤖 БОТ УСПЕШНО ЗАПУЩЕН!") print("🇷🇺 БОТ УСПЕШНО ЗАПУЩЕН! 🇷🇺")
print("=" * 50) print("=" * 50)
print("\n📱 Отправь команду /test в Telegram\n") print("\n📱 Отправь /start в Telegram\n")
# Держим бота запущенным
await asyncio.Event().wait() await asyncio.Event().wait()
except KeyboardInterrupt: except KeyboardInterrupt: