This commit is contained in:
2026-05-05 01:00:12 +03:00
parent 34d89c2245
commit d3862e6b97
2 changed files with 630 additions and 201 deletions
+625 -191
View File
@@ -6,7 +6,7 @@ import logging
import base64 import base64
import re import re
from datetime import datetime from datetime import datetime
from telegram import Update from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ( from telegram.ext import (
Application, Application,
CommandHandler, CommandHandler,
@@ -14,7 +14,8 @@ from telegram.ext import (
ConversationHandler, ConversationHandler,
filters, filters,
ContextTypes, ContextTypes,
ApplicationBuilder ApplicationBuilder,
CallbackQueryHandler
) )
from config import TELEGRAM_BOT_TOKEN, PIApi_API_KEY, PIApi_BASE_URL, PROXY_URL, PROXY_TYPE from config import TELEGRAM_BOT_TOKEN, PIApi_API_KEY, PIApi_BASE_URL, PROXY_URL, PROXY_TYPE
@@ -29,7 +30,9 @@ logging.basicConfig(
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Состояния # Состояния
AWAITING_PROMPT = 1 AWAITING_PHOTO = 0
AWAITING_CUSTOM_PROMPT = 1
AWAITING_STYLE_SELECTION = 2
# Директории # Директории
UPLOAD_DIR = "uploads" UPLOAD_DIR = "uploads"
@@ -37,6 +40,57 @@ PROCESSED_DIR = "processed"
os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True) os.makedirs(PROCESSED_DIR, exist_ok=True)
# Доступные стили для 9 мая
VICTORY_STYLES = {
"vintage_photo": {
"name": "📷 Винтажное фото",
"prompt": """IMPORTANT: Keep the person's face, facial features, and identity EXACTLY as in the original photo. Do not change the face, do not replace it with another face, do not distort facial features.
Convert this photo to vintage 1945 style black and white photo with grain texture, aged paper effect, subtle sepia tone, and soft vignette. Add authentic WWII era photo characteristics: slight blur, classic film grain, and historical atmosphere reminiscent of May 9, 1945 Victory Day celebration. Preserve all facial details of the person in the photo.""",
"description": "Эффект старой фотографии 1945 года (сохраняется лицо)"
},
"soviet_poster": {
"name": "🎨 Советский плакат",
"prompt": """IMPORTANT: Keep the person's face, facial features, and identity EXACTLY as in the original photo. Do not change the face, do not replace it with another face.
Transform this photo into a Soviet propaganda poster style from May 9 Victory Day. Use bold red and gold colors, dramatic socialist realism art style, heroic composition with triumphant pose. Add stylized fireworks in background, Order of Victory ribbon elements, and text '9 МАЯ' in classic Soviet typography. The person should look like a heroic figure from WWII victory celebration. Keep their original face unchanged.""",
"description": "Стиль советского агитационного плаката (сохраняется лицо)"
},
"military_uniform": {
"name": "🎖️ Военная форма",
"prompt": """CRITICAL: Keep the person's face, facial features, eyes, nose, mouth, and hairstyle EXACTLY as in the original photo. Do NOT change or replace the face under any circumstances.
Transform this person into wearing authentic Soviet WWII military uniform from 1945 Victory Day. Add officer's uniform with medals: Order of the Red Star, Order of the Patriotic War, Victory medal. Include garrison cap with cockade, shoulder boards with lieutenant/epaulette ranks. Background should show Moscow or Berlin 1945 victory celebration with fireworks and waving flags. The person's face must remain 100% identical to the original photo.""",
"description": "Примерка военной формы с наградами (сохраняется лицо)"
},
"monument": {
"name": "🗿 Памятник герою",
"prompt": """IMPORTANT: Keep the person's facial features EXACTLY as in the original photo. The face should be recognizable and unchanged.
Transform this person into a bronze monument statue in the style of Soviet war memorials. The statue should stand on a granite pedestal with stars and victory symbols. Background shows Eternal Flame and typical memorial architecture. Create realistic bronze texture with patina effects, dramatic lighting from below (uplighting), and majestic clouds in background. Style reminiscent of the Motherland Calls or Soviet War Memorial in Treptower Park. The face must preserve all original features of the person.""",
"description": "Превращение в памятник герою (сохраняется лицо)"
},
"victory_parade": {
"name": "🎆 Парад Победы",
"prompt": """CRITICAL: Keep the person's face, all facial features, and identity EXACTLY as in the original photo. Do not alter the face.
Place this person in the middle of Red Square during Victory Parade. Add military parade background with marching soldiers, T-34 tanks, and Russian military vehicles. Include tribunes with veterans wearing medals, fireworks in sky, St. George's ribbons, and huge '9 MАЯ' banners. The person should be watching the parade or saluting. Use bright, celebratory colors with patriotic atmosphere. Their original face must be perfectly preserved.""",
"description": "На фоне Парада Победы (сохраняется лицо)"
},
"front_letter": {
"name": "✉️ Письмо с фронта",
"prompt": """IMPORTANT: Keep the person's face and facial features EXACTLY as in the original photo. Do not change or replace the face.
Transform this photo into a stylized old war letter (frontovoe pismo) from WWII. Your image should appear like a vintage triangular folded letter with sepia tones, handwritten Cyrillic text visible, coffee stain effects, and a black and white photo attached to the letter. Add elements like field post stamps, dried flower from battlefield, and patriotic phrases like 'С Победой!' The overall mood should be nostalgic and emotional. The person's face in the attached photo must match the original face exactly.""",
"description": "Стилизация под письмо с фронта (сохраняется лицо)"
},
"custom": {
"name": "✨ Свой вариант",
"prompt": "",
"description": "Напишите свой промпт для обработки (не забудьте указать 'сохранить лицо')"
}
}
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик /start""" """Обработчик /start"""
@@ -50,204 +104,180 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
last_name=user.last_name last_name=user.last_name
) )
# Сброс состояния
context.user_data.clear()
# Создаем клавиатуру с выбором действия
keyboard = [
[InlineKeyboardButton("🎖️ Обработать фото к 9 мая", callback_data="process_victory")],
[InlineKeyboardButton("📸 Обычная обработка фото", callback_data="process_normal")],
[InlineKeyboardButton("📊 История обработок", callback_data="history")],
[InlineKeyboardButton("❓ Помощь", callback_data="help")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text( await update.message.reply_text(
f"👋 Привет, {user.first_name}!\n\n" f"👋 Привет, {user.first_name}!\n\n"
f"Я бот для обработки фотографий через нейросеть.\n\n" f"🇷🇺 **С Днем Победы!** 🇷🇺\n\n"
f"📸 Отправь мне фото и напиши, как его изменить!\n\n" f"Я бот, который может обработать твои фото в тематике 9 мая.\n\n"
f"Команды:\n" f"📸 Отправь мне фото и выбери стиль обработки:\n"
f"/start - Приветствие\n" f"• Винтажное фото 1945 года\n"
f"/test - Проверка работы\n" f"• Советский агитационный плакат\n"
f"/history - История обработок" f"• Военная форма с наградами\n"
f"• Памятник герою\n"
f"• Парад Победы\n"
f"• Письмо с фронта\n\n"
f"Также доступна обычная обработка через нейросеть.\n\n"
f"Выбери действие ниже:",
parse_mode='Markdown',
reply_markup=reply_markup
) )
async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE): async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Тестовая команда""" """Обработка нажатий на кнопки"""
logger.info(f"Test от {update.effective_user.id}") query = update.callback_query
await update.message.reply_text("✅ Бот работает через SOCKS5 прокси!") await query.answer()
data = query.data
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if data == "process_victory":
"""Помощь""" context.user_data['processing_mode'] = 'victory'
await update.message.reply_text( await query.message.reply_text(
"📖 Инструкция:\n\n" "🎖️ **Обработка фото к 9 мая**\n\n"
"1. Отправь фото\n" "📸 Отправь мне фото, и я предложу выбрать стиль обработки!\n\n"
"2. Напиши промпт\n" "Доступные стили:\n"
"3. Получи результат!\n\n" "• Винтажное фото 1945 года\n"
"Команды: /start, /test, /history" "• Советский агитационный плакат\n"
"• Военная форма с наградами\n"
"• Памятник герою\n"
"• Парад Победы\n"
"• Письмо с фронта\n\n"
"Отправь фото:",
parse_mode='Markdown'
) )
# Устанавливаем состояние ожидания фото
return AWAITING_PHOTO
elif data == "process_normal":
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): context.user_data['processing_mode'] = 'normal'
"""Обработка фото""" await query.message.reply_text(
user = update.effective_user "📸 **Обычная обработка фото**\n\n"
logger.info(f"Фото от {user.id}") "Отправь мне фото, а затем напиши текстовое описание того, "
"как ты хочешь изменить изображение.\n\n"
photo_file = await update.message.photo[-1].get_file()
file_id = photo_file.file_id
# Сохраняем фото
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
local_path = os.path.join(UPLOAD_DIR, f"user_{user.id}_{timestamp}.jpg")
await photo_file.download_to_drive(local_path)
# Запись в БД
image_id = await save_image_record(
user_id=user.id,
original_file_id=file_id,
original_url=local_path
)
context.user_data['image_id'] = image_id
context.user_data['image_path'] = local_path
await update.message.reply_text(
"📸 Фото получено!\n\n"
"Напиши, как изменить это фото.\n"
"Например: 'сделай фон пляжем' или 'преврати в рисунок'\n\n" "Например: 'сделай фон пляжем' или 'преврати в рисунок'\n\n"
"/cancel - отмена" "Отправь фото:",
parse_mode='Markdown'
)
return AWAITING_PHOTO
elif data == "history":
await history_command(update, context)
elif data == "help":
await help_command(update, context)
return AWAITING_PHOTO
async def show_style_keyboard(update: Update, context: ContextTypes.DEFAULT_TYPE, photo_path: str, photo_file_id: str,
image_id: int):
"""Показать клавиатуру выбора стиля для 9 мая"""
# Сохраняем данные фото в контексте
context.user_data['photo_path'] = photo_path
context.user_data['photo_file_id'] = photo_file_id
context.user_data['image_id'] = image_id
keyboard = []
for style_id, style_info in VICTORY_STYLES.items():
keyboard.append([InlineKeyboardButton(style_info["name"], callback_data=f"style_{style_id}")])
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"🎖️ **Выбери стиль обработки к 9 мая:**\n\n"
"• 📷 Винтажное фото - эффект старой фотографии 1945 года\n"
"• 🎨 Советский плакат - стиль агитационного плаката\n"
"• 🎖️ Военная форма - примерка формы с наградами\n"
"• 🗿 Памятник - превращение в бронзовый монумент\n"
"• 🎆 Парад Победы - на фоне военного парада\n"
"• ✉️ Письмо с фронта - стилизация под старое письмо\n"
"• ✨ Свой вариант - напиши свой промпт",
parse_mode='Markdown',
reply_markup=reply_markup
) )
return AWAITING_PROMPT
async def style_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка выбора стиля"""
query = update.callback_query
await query.answer()
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE): style_id = query.data.replace("style_", "")
"""Отмена"""
logger.info(f"Cancel от {update.effective_user.id}")
if 'image_path' in context.user_data:
path = context.user_data['image_path']
if os.path.exists(path):
os.remove(path)
await update.message.reply_text("Операция отменена.")
context.user_data.clear()
return ConversationHandler.END
async def call_piapi_api(image_path: str, prompt: str) -> str:
"""Вызов PiAPI через SOCKS5 прокси"""
if not PIApi_API_KEY:
raise Exception("PIApi_API_KEY не настроен")
with open(image_path, "rb") as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
headers = {
"x-api-key": PIApi_API_KEY,
"Content-Type": "application/json"
}
payload = {
"model": "black-forest-labs/FLUX.1-dev",
"task_type": "image-to-image",
"input": {
"image": f"data:image/jpeg;base64,{image_base64}",
"prompt": prompt,
"num_inference_steps": 28,
"guidance_scale": 7.5,
"strength": 0.8
}
}
# Используем aiohttp-socks для запросов
from aiohttp_socks import ProxyConnector
# Парсим прокси
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
password = match.group(2)
host = match.group(3)
port = int(match.group(4))
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
else:
from urllib.parse import urlparse
parsed = urlparse(PROXY_URL)
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session:
async with session.post(PIApi_BASE_URL, headers=headers, json=payload, timeout=30) as response:
if response.status != 200:
text = await response.text()
raise Exception(f"HTTP {response.status}: {text[:100]}")
data = await response.json()
if data.get('code') != 200:
raise Exception(f"API Error: {data.get('message')}")
task_id = data['data']['task_id']
logger.info(f"Создана задача: {task_id}")
return await poll_task(task_id, connector)
async def poll_task(task_id: str, connector, max_attempts: int = 60) -> str:
"""Ожидание завершения задачи"""
get_url = f"https://api.piapi.ai/api/v1/task/{task_id}"
headers = {"x-api-key": PIApi_API_KEY}
async with aiohttp.ClientSession(connector=connector) as session:
for attempt in range(max_attempts):
try:
async with session.get(get_url, headers=headers) as response:
if response.status != 200:
await asyncio.sleep(2)
continue
data = await response.json()
status = data.get('data', {}).get('status')
logger.info(f"Задача {task_id}: {status} (попытка {attempt + 1})")
if status == 'completed':
output = data.get('data', {}).get('output', {})
result_url = output.get('url') or output.get('image')
if result_url:
logger.info(f"Задача {task_id} выполнена")
return result_url
elif status == 'failed':
raise Exception("Задача не выполнена")
await asyncio.sleep(2)
except Exception as e:
logger.error(f"Ошибка опроса: {e}")
await asyncio.sleep(2)
raise Exception("Превышено время ожидания")
async def process_image(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка через PiAPI"""
prompt = update.message.text
user = update.effective_user
logger.info(f"Промпт от {user.id}: {prompt[:50]}")
# Получаем сохраненные данные фото
photo_path = context.user_data.get('photo_path')
photo_file_id = context.user_data.get('photo_file_id')
image_id = context.user_data.get('image_id') image_id = context.user_data.get('image_id')
image_path = context.user_data.get('image_path')
if not image_id or not image_path: if not photo_path or not image_id:
await update.message.reply_text("❌ Ошибка: фото не найдено") await query.message.reply_text("❌ Ошибка: фото не найдено. Пожалуйста, отправьте фото заново.")
return ConversationHandler.END return
status_msg = await update.message.reply_text( if style_id == "custom":
f"🎨 Обрабатываю...\n\n" context.user_data['selected_style'] = 'custom'
f"📝 Промпт: {prompt}\n\n" context.user_data['awaiting_custom_prompt'] = True
f"⏳ Это займет 10-30 секунд" await query.message.reply_text(
"✏️ **Напиши свой промпт**\n\n"
"Опиши, как ты хочешь изменить фото. Например:\n"
"'Сделай фон Кремля с салютом'\n"
"'Добавь георгиевскую ленту и гвоздики'\n"
"'Преврати в черно-белое фото с эффектом старины'\n\n"
"Напиши свой промпт:",
parse_mode='Markdown'
)
else:
context.user_data['selected_style'] = style_id
context.user_data['custom_prompt'] = VICTORY_STYLES[style_id]["prompt"]
await query.message.reply_text(
f"✅ Выбран стиль: **{VICTORY_STYLES[style_id]['name']}**\n\n"
f"{VICTORY_STYLES[style_id]['description']}\n\n"
f"🎨 Начинаю обработку... Пожалуйста, подожди 10-20 секунд.",
parse_mode='Markdown'
)
# Запускаем обработку
await process_image_with_style(query.message, context, style_id)
async def process_image_with_style(message, context: ContextTypes.DEFAULT_TYPE, style_id: str):
"""Обработка фото с выбранным стилем"""
user = message.from_user
photo_path = context.user_data.get('photo_path')
image_id = context.user_data.get('image_id')
custom_prompt = context.user_data.get('custom_prompt', VICTORY_STYLES[style_id]["prompt"])
status_msg = await message.reply_text(
f"🎨 Обрабатываю через Nano Banana...\n\n"
f"🎖️ Стиль: **{VICTORY_STYLES[style_id]['name']}**\n\n"
f"⏳ Это займет 10-20 секунд",
parse_mode='Markdown'
) )
try: try:
async with aiosqlite.connect("bot_database.db") as db: result_url = await process_with_nano_banana(photo_path, custom_prompt)
await db.execute("UPDATE images SET prompt = ? WHERE id = ?", (prompt, image_id))
await db.commit()
result_url = await call_piapi_api(image_path, prompt)
if result_url: if result_url:
logger.info(f"Скачиваем результат по URL: {result_url[:100]}...")
result_path = os.path.join( result_path = os.path.join(
PROCESSED_DIR, PROCESSED_DIR,
f"user_{user.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_result.jpg" f"user_{user.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_result.png"
) )
# Скачиваем результат # Скачиваем результат через прокси
from aiohttp_socks import ProxyConnector from aiohttp_socks import ProxyConnector
import re import re
@@ -270,10 +300,161 @@ async def process_image(update: Update, context: ContextTypes.DEFAULT_TYPE):
f.write(await resp.read()) f.write(await resp.read())
await status_msg.delete() await status_msg.delete()
caption = f"🎖️ **{VICTORY_STYLES[style_id]['name']}**\n\n✅ Готово!\n\n🇷🇺 С Днем Победы!"
with open(result_path, "rb") as photo:
await message.reply_photo(
photo=photo,
caption=caption,
parse_mode='Markdown'
)
await update_image_record(
image_id=image_id,
processed_file_id=result_path,
processed_url=result_path,
status='completed'
)
logger.info(f"Обработка Nano Banana успешна для {user.id}")
else:
raise Exception(f"HTTP {resp.status} при скачивании результата")
else:
raise Exception("Не получен URL результата")
except Exception as e:
logger.error(f"Ошибка обработки: {e}")
await status_msg.edit_text(f"❌ Ошибка: {str(e)}\n\nПопробуйте еще раз.")
await update_image_record(image_id=image_id, status='failed')
# Очистка временных данных
if os.path.exists(photo_path):
os.remove(photo_path)
context.user_data.clear()
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка фото"""
user = update.effective_user
logger.info(f"Фото от {user.id}")
processing_mode = context.user_data.get('processing_mode', 'normal')
photo_file = await update.message.photo[-1].get_file()
file_id = photo_file.file_id
# Сохраняем фото
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
local_path = os.path.join(UPLOAD_DIR, f"user_{user.id}_{timestamp}.jpg")
await photo_file.download_to_drive(local_path)
# Запись в БД
image_id = await save_image_record(
user_id=user.id,
original_file_id=file_id,
original_url=local_path
)
if processing_mode == 'victory':
# Показываем выбор стиля для 9 мая
await show_style_keyboard(update, context, local_path, file_id, image_id)
else:
# Обычная обработка
context.user_data['image_id'] = image_id
context.user_data['image_path'] = local_path
await update.message.reply_text(
"📸 Фото получено!\n\n"
"Напиши, как изменить это фото.\n"
"Например: 'сделай фон пляжем' или 'преврати в рисунок'\n\n"
"/cancel - отмена"
)
return AWAITING_CUSTOM_PROMPT
return ConversationHandler.END
async def process_with_custom_prompt(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка с кастомным промптом (обычная обработка)"""
prompt = update.message.text
user = update.effective_user
# Проверяем, ожидаем ли мы кастомный промпт для стиля
if context.user_data.get('awaiting_custom_prompt'):
style_id = 'custom'
custom_prompt = prompt
context.user_data['awaiting_custom_prompt'] = False
context.user_data['selected_style'] = style_id
context.user_data['custom_prompt'] = custom_prompt
await update.message.reply_text(
f"✅ Промпт сохранен:\n\n\"{prompt}\"\n\n"
f"🎨 Начинаю обработку... Пожалуйста, подожди 10-20 секунд.",
parse_mode='Markdown'
)
# Запускаем обработку
await process_image_with_style(update.message, context, style_id)
return ConversationHandler.END
# Обычная обработка с кастомным промптом
logger.info(f"Промпт от {user.id}: {prompt[:50]}")
image_id = context.user_data.get('image_id')
image_path = context.user_data.get('image_path')
if not image_id or not image_path:
await update.message.reply_text("❌ Ошибка: фото не найдено")
return ConversationHandler.END
status_msg = await update.message.reply_text(
f"🎨 Обрабатываю через Nano Banana...\n\n"
f"📝 Промпт: {prompt[:100]}{'...' if len(prompt) > 100 else ''}\n\n"
f"⏳ Это займет 10-20 секунд"
)
try:
async with aiosqlite.connect("bot_database.db") as db:
await db.execute("UPDATE images SET prompt = ? WHERE id = ?", (prompt, image_id))
await db.commit()
result_url = await process_with_nano_banana(image_path, prompt)
if result_url:
result_path = os.path.join(
PROCESSED_DIR,
f"user_{user.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_result.png"
)
from aiohttp_socks import ProxyConnector
import re
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
password = match.group(2)
host = match.group(3)
port = int(match.group(4))
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
else:
from urllib.parse import urlparse
parsed = urlparse(PROXY_URL)
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(result_url) as resp:
if resp.status == 200:
with open(result_path, "wb") as f:
f.write(await resp.read())
await status_msg.delete()
with open(result_path, "rb") as photo: with open(result_path, "rb") as photo:
await update.message.reply_photo( await update.message.reply_photo(
photo=photo, photo=photo,
caption=f"✅ Готово!\n\n{prompt}" caption=f"✅ Готово!\n\n{prompt[:100]}{'...' if len(prompt) > 100 else ''}"
) )
await update_image_record( await update_image_record(
@@ -302,24 +483,265 @@ async def process_image(update: Update, context: ContextTypes.DEFAULT_TYPE):
return ConversationHandler.END return ConversationHandler.END
async def process_with_nano_banana(image_path: str, prompt: str) -> str:
"""Обработка изображения через Nano Banana Pro (Google Gemini) API"""
if not PIApi_API_KEY:
raise Exception("PIApi_API_KEY не настроен")
# Проверяем, есть ли в промпте инструкция о сохранении лица
if "face" not in prompt.lower() and "сохрани" not in prompt.lower():
logger.warning("В промпте нет инструкции о сохранении лица! Добавляем автоматически.")
prompt = f"""IMPORTANT: Keep the person's face, facial features, and identity EXACTLY as in the original photo. Do not change or replace the face.
{prompt}"""
# Загружаем изображение на временный хостинг или используем base64
# Nano Banana Pro принимает image_urls массивом
with open(image_path, "rb") as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
# Создаем data URL для изображения
image_data_url = f"data:image/jpeg;base64,{image_base64}"
headers = {
"x-api-key": PIApi_API_KEY,
"Content-Type": "application/json"
}
# Правильный payload для Nano Banana Pro
payload = {
"model": "gemini",
"task_type": "nano-banana-pro",
"input": {
"image_urls": [image_data_url], # Передаем массивом
"prompt": prompt,
"output_format": "png",
"aspect_ratio": "1:1",
"strength": 0.65,
"safety_level": "medium" # medium уровень безопасности
}
}
logger.info(f"Отправляем запрос в Nano Banana Pro...")
logger.info(f"Изображение передано в image_urls (base64, длина: {len(image_data_url)})")
logger.info(f"Промпт (первые 300 символов): {prompt[:300]}...")
logger.info(f"Параметры: strength=0.65, safety_level=medium, aspect_ratio=1:1")
from aiohttp_socks import ProxyConnector
import re
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
password = match.group(2)
host = match.group(3)
port = int(match.group(4))
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
else:
from urllib.parse import urlparse
parsed = urlparse(PROXY_URL)
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session:
async with session.post(PIApi_BASE_URL, headers=headers, json=payload, timeout=90) as response:
response_text = await response.text()
logger.info(f"PiAPI Response status: {response.status}")
if response.status != 200:
raise Exception(f"HTTP {response.status}: {response_text[:200]}")
import json
data = json.loads(response_text)
if data.get('code') != 200:
raise Exception(f"API Error: {data.get('message')}")
task_id = data.get('data', {}).get('task_id')
if not task_id:
raise Exception(f"Не получен task_id: {data}")
logger.info(f"✅ Создана задача Nano Banana Pro: {task_id}")
# Ожидаем результат
result_url = await poll_nano_banana_task(task_id)
logger.info(f"✅ Получен результат: {result_url[:100]}...")
return result_url
async def poll_nano_banana_task(task_id: str, max_attempts: int = 90) -> str:
"""Ожидание завершения задачи Nano Banana Pro"""
get_url = f"https://api.piapi.ai/api/v1/task/{task_id}"
headers = {"x-api-key": PIApi_API_KEY}
for attempt in range(max_attempts):
try:
from aiohttp_socks import ProxyConnector
import re
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
password = match.group(2)
host = match.group(3)
port = int(match.group(4))
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
else:
from urllib.parse import urlparse
parsed = urlparse(PROXY_URL)
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(get_url, headers=headers, timeout=30) as response:
if response.status != 200:
logger.warning(f"HTTP {response.status}, повтор через 3 секунды...")
await asyncio.sleep(3)
continue
data = await response.json()
# Безопасное получение данных
data_info = data.get('data', {})
if data_info is None:
data_info = {}
status = data_info.get('status', 'unknown')
output = data_info.get('output')
logger.info(f"Задача {task_id}: статус {status} (попытка {attempt + 1})")
# Если output None, пропускаем
if output is None:
if status == 'completed':
logger.warning(f"Статус completed, но output = None. Ждем...")
await asyncio.sleep(3)
continue
elif status == 'pending' or status == 'processing':
await asyncio.sleep(3)
continue
elif status == 'failed':
error = data_info.get('error', {})
error_msg = error.get('message', 'Unknown error') if error else 'Unknown error'
raise Exception(f"Задача не выполнена: {error_msg}")
else:
await asyncio.sleep(3)
continue
# Безопасное получение URL из output
result_url = None
if isinstance(output, dict):
# Проверяем разные форматы URL
if output.get('image_urls') and isinstance(output['image_urls'], list) and len(
output['image_urls']) > 0:
result_url = output['image_urls'][0]
logger.info(f"Найден URL в image_urls[0]")
elif output.get('url'):
result_url = output['url']
logger.info(f"Найден URL в url")
elif output.get('image'):
result_url = output['image']
logger.info(f"Найден URL в image")
if status == 'completed' and result_url:
logger.info(f"✅ Задача {task_id} выполнена! URL получен.")
return result_url
elif status == 'completed' and not result_url:
logger.warning(f"Статус completed, но URL не найден. Output: {output}")
await asyncio.sleep(3)
continue
elif status == 'failed':
error = data_info.get('error', {})
error_msg = error.get('message', 'Unknown error') if error else 'Unknown error'
raise Exception(f"Задача не выполнена: {error_msg}")
await asyncio.sleep(3)
except asyncio.TimeoutError:
logger.error(f"Таймаут при опросе задачи (попытка {attempt + 1})")
await asyncio.sleep(3)
except Exception as e:
logger.error(f"Ошибка опроса (попытка {attempt + 1}): {e}")
await asyncio.sleep(3)
raise Exception("Превышено время ожидания")
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Отмена"""
logger.info(f"Cancel от {update.effective_user.id}")
if 'photo_path' in context.user_data:
path = context.user_data['photo_path']
if os.path.exists(path):
os.remove(path)
if 'image_path' in context.user_data:
path = context.user_data['image_path']
if os.path.exists(path):
os.remove(path)
await update.message.reply_text("Операция отменена.")
context.user_data.clear()
return ConversationHandler.END
async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Тестовая команда"""
logger.info(f"Test от {update.effective_user.id}")
await update.message.reply_text("✅ Бот работает через SOCKS5 прокси!")
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Помощь"""
help_text = (
"📖 **Инструкция:**\n\n"
"**🇷🇺 Обработка к 9 мая:**\n"
"1. Нажми 'Обработать фото к 9 мая'\n"
"2. Отправь фото\n"
"3. Выбери стиль из предложенных\n"
"4. Получи обработанное фото!\n\n"
"**📸 Обычная обработка:**\n"
"1. Нажми 'Обычная обработка фото'\n"
"2. Отправь фото\n"
"3. Напиши текстовое описание\n"
"4. Получи результат!\n\n"
"**Доступные стили для 9 мая:**\n"
"• Винтажное фото 1945 года\n"
"• Советский агитационный плакат\n"
"• Военная форма с наградами\n"
"• Памятник герою\n"
"• Парад Победы\n"
"• Письмо с фронта\n\n"
"**Команды:**\n"
"/start - Главное меню\n"
"/test - Проверка работы\n"
"/history - История обработок"
)
await update.message.reply_text(help_text, parse_mode='Markdown')
async def history_command(update: Update, context: ContextTypes.DEFAULT_TYPE): async def history_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""История обработок""" """История обработок"""
user = update.effective_user user = update.effective_user
images = await get_user_images(user.id, limit=5)
if hasattr(update, 'callback_query'):
message = update.callback_query.message
await message.reply_text("📊 Загружаю историю...")
user_id = update.callback_query.from_user.id
else:
user_id = user.id
images = await get_user_images(user_id, limit=5)
if not images: if not images:
await update.message.reply_text("📭 Нет обработанных изображений") await update.effective_message.reply_text("📭 Нет обработанных изображений")
return return
text = "🖼 **История обработок:**\n\n" text = "🖼 **История обработок:**\n\n"
for i, img in enumerate(images, 1): for i, img in enumerate(images, 1):
status = "" if img['status'] == 'completed' else "" status = "" if img['status'] == 'completed' else ""
date = img['created_at'][:16].replace('T', ' ') date = img['created_at'][:16].replace('T', ' ')
prompt = img['prompt'][:35] + "..." if img['prompt'] and len(img['prompt']) > 35 else img[ prompt = img['prompt'][:35] + "..." if img['prompt'] and len(img['prompt']) > 35 else img.get('prompt',
'prompt'] or "без промпта" 'без промпта')
text += f"{status} **{i}.** {date}\n 📝 {prompt}\n\n" text += f"{status} **{i}.** {date}\n 📝 {prompt}\n\n"
await update.message.reply_text(text, parse_mode='Markdown') await update.effective_message.reply_text(text, parse_mode='Markdown')
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
@@ -330,15 +752,15 @@ async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
async def main(): async def main():
"""Запуск бота с прокси как в примере""" """Запуск бота"""
print("\n" + "=" * 50) print("\n" + "=" * 50)
print("🤖 ЗАПУСК TELEGRAM БОТА") print("🇷🇺 БОТ ДЛЯ ОБРАБОТКИ ФОТО К 9 МАЯ 🇷🇺")
print("=" * 50)
print("Используется модель: Nano Banana Pro (Google Gemini 2.5 Flash Image)")
print("=" * 50) print("=" * 50)
# Инициализация БД
await init_db() await init_db()
# Создаем приложение с прокси (как в вашем примере)
TOKEN = TELEGRAM_BOT_TOKEN TOKEN = TELEGRAM_BOT_TOKEN
PROXY = PROXY_URL PROXY = PROXY_URL
@@ -354,16 +776,25 @@ async def main():
print("=" * 50) print("=" * 50)
# Регистрируем обработчики # Регистрируем обработчики команд
application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("test", test_command)) application.add_handler(CommandHandler("test", test_command))
application.add_handler(CommandHandler("help", help_command)) application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("history", history_command)) application.add_handler(CommandHandler("history", history_command))
application.add_handler(CommandHandler("cancel", cancel))
# Регистрируем callback обработчики для кнопок
application.add_handler(
CallbackQueryHandler(button_callback, pattern="^(process_victory|process_normal|history|help)$"))
application.add_handler(CallbackQueryHandler(style_callback, pattern="^style_"))
# ConversationHandler для обработки фото
conv_handler = ConversationHandler( conv_handler = ConversationHandler(
entry_points=[MessageHandler(filters.PHOTO, handle_photo)], entry_points=[MessageHandler(filters.PHOTO, handle_photo)],
states={AWAITING_PROMPT: [MessageHandler(filters.TEXT & ~filters.COMMAND, process_image)]}, states={
fallbacks=[CommandHandler("cancel", cancel), CommandHandler("start", start)], AWAITING_CUSTOM_PROMPT: [MessageHandler(filters.TEXT & ~filters.COMMAND, process_with_custom_prompt)],
},
fallbacks=[CommandHandler("cancel", cancel)],
) )
application.add_handler(conv_handler) application.add_handler(conv_handler)
application.add_error_handler(error_handler) application.add_error_handler(error_handler)
@@ -371,17 +802,20 @@ async def main():
print("\n✅ Бот запускается...") print("\n✅ Бот запускается...")
try: try:
# Запускаем бота
await application.initialize() await application.initialize()
await application.start() await application.start()
await application.updater.start_polling() await application.updater.start_polling()
print("\n" + "=" * 50) print("\n" + "=" * 50)
print("🤖 БОТ УСПЕШНО ЗАПУЩЕН!") print("🇷🇺 БОТ УСПЕШНО ЗАПУЩЕН! 🇷🇺")
print("=" * 50) print("=" * 50)
print("\n📱 Отправь команду /test в Telegram\n") print("\n📱 Отправь команду /start в Telegram\n")
print("🎖️ Доступны стили для обработки к 9 мая:")
for style_id, style_info in VICTORY_STYLES.items():
if style_id != 'custom':
print(f"{style_info['name']}")
print("\n")
# Держим бота запущенным
await asyncio.Event().wait() await asyncio.Event().wait()
except KeyboardInterrupt: except KeyboardInterrupt:
+3 -8
View File
@@ -8,10 +8,8 @@ DATABASE_PATH = "bot_database.db"
async def init_db(): async def init_db():
"""Инициализация базы данных и создание таблиц""" """Инициализация базы данных и создание таблиц"""
async with aiosqlite.connect(DATABASE_PATH) as db: async with aiosqlite.connect(DATABASE_PATH) as db:
# Включаем WAL режим для лучшей производительности
await db.execute("PRAGMA journal_mode=WAL;") await db.execute("PRAGMA journal_mode=WAL;")
# Таблица пользователей
await db.execute(""" await db.execute("""
CREATE TABLE IF NOT EXISTS users ( CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY, user_id INTEGER PRIMARY KEY,
@@ -23,7 +21,6 @@ async def init_db():
) )
""") """)
# Таблица обработанных изображений
await db.execute(""" await db.execute("""
CREATE TABLE IF NOT EXISTS images ( CREATE TABLE IF NOT EXISTS images (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -71,11 +68,9 @@ async def save_image_record(
cursor = await db.execute(""" cursor = await db.execute("""
INSERT INTO images (user_id, original_file_id, original_url, prompt, created_at, status) INSERT INTO images (user_id, original_file_id, original_url, prompt, created_at, status)
VALUES (?, ?, ?, ?, ?, 'processing') VALUES (?, ?, ?, ?, ?, 'processing')
RETURNING id
""", (user_id, original_file_id, original_url, prompt, now)) """, (user_id, original_file_id, original_url, prompt, now))
await db.commit() await db.commit()
row = await cursor.fetchone() return cursor.lastrowid
return row[0] if row else None
async def update_image_record( async def update_image_record(
@@ -102,12 +97,12 @@ async def get_user_images(user_id: int, limit: int = 10) -> List[Dict[str, Any]]
"""Получение истории обработанных изображений пользователя""" """Получение истории обработанных изображений пользователя"""
async with aiosqlite.connect(DATABASE_PATH) as db: async with aiosqlite.connect(DATABASE_PATH) as db:
db.row_factory = aiosqlite.Row db.row_factory = aiosqlite.Row
cursor = await db.execute(""" async with db.execute("""
SELECT id, original_file_id, processed_file_id, prompt, status, created_at, completed_at SELECT id, original_file_id, processed_file_id, prompt, status, created_at, completed_at
FROM images FROM images
WHERE user_id = ? WHERE user_id = ?
ORDER BY created_at DESC ORDER BY created_at DESC
LIMIT ? LIMIT ?
""", (user_id, limit)) """, (user_id, limit)) as cursor:
rows = await cursor.fetchall() rows = await cursor.fetchall()
return [dict(row) for row in rows] return [dict(row) for row in rows]