This commit is contained in:
2026-05-08 20:56:12 +03:00
parent 34d89c2245
commit b748aeaabe
2 changed files with 548 additions and 146 deletions
+546 -140
View File
@@ -5,8 +5,9 @@ import sys
import logging
import base64
import re
import json
from datetime import datetime
from telegram import Update
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
@@ -14,7 +15,8 @@ from telegram.ext import (
ConversationHandler,
filters,
ContextTypes,
ApplicationBuilder
ApplicationBuilder,
CallbackQueryHandler
)
from config import TELEGRAM_BOT_TOKEN, PIApi_API_KEY, PIApi_BASE_URL, PROXY_URL, PROXY_TYPE
@@ -29,7 +31,9 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
# Состояния
AWAITING_PROMPT = 1
AWAITING_PHOTO = 0
AWAITING_CUSTOM_PROMPT = 1
AWAITING_STYLE_SELECTION = 2
# Директории
UPLOAD_DIR = "uploads"
@@ -37,6 +41,97 @@ PROCESSED_DIR = "processed"
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)
# Доступные стили для 9 мая
VICTORY_STYLES = {
"vintage_photo": {
"name": "Винтажное фото",
"icon": "📷",
"prompt": """CRITICAL: Keep the person's face, facial features, and identity EXACTLY as in the original image. Do not change or replace the face.
Convert this photo to vintage 1945 style black and white photo with grain texture, aged paper effect, subtle sepia tone, and soft vignette. Add authentic WWII era photo characteristics: slight blur, classic film grain, and historical atmosphere reminiscent of May 9, 1945 Victory Day celebration. The face must be preserved.""",
"description": "Эффект старой фотографии 1945 года"
},
"soviet_poster": {
"name": "Советский плакат",
"icon": "🎨",
"prompt": """CRITICAL: Keep the person's face, facial features, and identity EXACTLY as in the original image. Do not change or replace the face.
Transform this photo into a Soviet propaganda poster style from May 9 Victory Day. Use bold red and gold colors, dramatic socialist realism art style, heroic composition with triumphant pose. Add stylized fireworks in background, Order of Victory ribbon elements, and text '9 МАЯ' in classic Soviet typography. The face must be preserved.""",
"description": "Стиль советского агитационного плаката"
},
"military_uniform": {
"name": "Военная форма",
"icon": "🎖️",
"prompt": """CRITICAL: Keep the person's face, facial features, eyes, nose, mouth, and hairstyle EXACTLY as in the original image. Do NOT change or replace the face.
Transform this person into wearing authentic Soviet WWII military uniform from 1945 Victory Day. Add officer's uniform with medals: Order of the Red Star, Order of the Patriotic War, Victory medal. Include garrison cap with cockade, shoulder boards with lieutenant/epaulette ranks. Background should show Moscow or Berlin 1945 victory celebration with fireworks and waving flags. The face must remain identical.""",
"description": "Примерка военной формы с наградами"
},
"george_ribbon": {
"name": "Георгиевская лента",
"icon": "🎗️",
"prompt": """CRITICAL: Keep the person's face, facial features, and identity EXACTLY as in the original image. Do not change or replace the face.
Transform this photo into a composition with the iconic St. George ribbon (Georgievskaya lenta). The person should be decorated with a flowing orange and black ribbon across their chest. Add traditional Russian birch trees, a field of red carnations, and a dramatic sunset sky. Style reminiscent of classic Soviet war photography with modern patriotic elements. The face must be preserved and recognizable.""",
"description": "Портрет с Георгиевской лентой и цветами"
},
"victory_parade": {
"name": "Парад Победы",
"icon": "🎆",
"prompt": """CRITICAL: Keep the person's face, all facial features, and identity EXACTLY as in the original image. Do not alter the face.
Place this person in the middle of Red Square during Victory Parade. Add military parade background with marching soldiers, T-34 tanks, and Russian military vehicles. Include tribunes with veterans wearing medals, fireworks in sky, St. George's ribbons, and huge '9 МАЯ' banners. The person should be watching the parade or saluting. Their original face must be perfectly preserved.""",
"description": "На фоне Парада Победы"
},
"front_letter": {
"name": "Письмо с фронта",
"icon": "✉️",
"prompt": """CRITICAL: Keep the person's face and facial features EXACTLY as in the original image. Do not change or replace the face.
Transform this photo into a stylized old war letter from WWII. The image should appear like a vintage triangular folded letter with sepia tones, handwritten Cyrillic text visible, coffee stain effects, and a black and white photo attached to the letter. Add elements like field post stamps, dried flower from battlefield, and patriotic phrases like 'С Победой!'. The person's face must match the original exactly.""",
"description": "Стилизация под письмо с фронта"
},
"custom": {
"name": "Свой вариант",
"icon": "",
"prompt": "",
"description": "Напишите свой промпт для обработки"
}
}
async def get_telegram_file_url(file_id: str) -> str:
"""Получение прямого URL файла из Telegram"""
get_file_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getFile?file_id={file_id}"
from aiohttp_socks import ProxyConnector
import re
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
password = match.group(2)
host = match.group(3)
port = int(match.group(4))
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
else:
from urllib.parse import urlparse
parsed = urlparse(PROXY_URL)
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(get_file_url) as response:
if response.status != 200:
raise Exception(f"Не удалось получить информацию о файле: {response.status}")
data = await response.json()
if not data.get('ok'):
raise Exception(f"Ошибка API Telegram: {data}")
file_path = data['result']['file_path']
file_url = f"https://api.telegram.org/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}"
return file_url
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик /start"""
@@ -50,87 +145,205 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
last_name=user.last_name
)
context.user_data.clear()
keyboard = [
[InlineKeyboardButton("🎖️ Обработать фото к 9 мая", callback_data="process_victory")],
[InlineKeyboardButton("📸 Обычная обработка фото", callback_data="process_normal")],
[InlineKeyboardButton("📊 История обработок", callback_data="history")],
[InlineKeyboardButton("❓ Помощь", callback_data="help")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"👋 Привет, {user.first_name}!\n\n"
f"Я бот для обработки фотографий через нейросеть.\n\n"
f"📸 Отправь мне фото и напиши, как его изменить!\n\n"
f"Команды:\n"
f"/start - Приветствие\n"
f"/test - Проверка работы\n"
f"/history - История обработок"
f"👋 Привет, {user.first_name}! 🇷🇺\n\n"
f"✨ *Добро пожаловать в Фотохудожник Победы!* ✨\n\n"
f"Я превращаю твои фотографии в уникальные артефакты военной эпохи.\n\n"
f"🎖️ *Доступные стили:*\n"
f"📷 Винтажное фото 1945 года\n"
f"🎨 Советский агитационный плакат\n"
f"🎖️ Военная форма с наградами\n"
f"🎗️ Георгиевская лента\n"
f"🎆 Парад Победы\n"
f"✉️ Письмо с фронта\n\n"
f"🖼️ *Формат результата:* 9:16 (вертикальный)\n\n"
f"📸 *Выбери действие ниже:*",
parse_mode='Markdown',
reply_markup=reply_markup
)
async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Тестовая команда"""
logger.info(f"Test от {update.effective_user.id}")
await update.message.reply_text("✅ Бот работает через SOCKS5 прокси!")
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка нажатий на кнопки"""
query = update.callback_query
await query.answer()
data = query.data
logger.info(f"Нажата кнопка: {data}")
if data == "process_victory":
context.user_data['processing_mode'] = 'victory'
await query.message.reply_text(
"🎖️ *Обработка фото к 9 мая*\n\n"
"📸 Отправь мне фото, и я предложу выбрать стиль обработки!\n\n"
"*Доступные стили:*\n"
"📷 Винтажное фото 1945 года\n"
"🎨 Советский агитационный плакат\n"
"🎖️ Военная форма с наградами\n"
"🎗️ Георгиевская лента\n"
"🎆 Парад Победы\n"
"✉️ Письмо с фронта\n\n"
"🖼️ *Формат результата:* 9:16 (вертикальный)\n\n"
"📤 *Отправь фото:*",
parse_mode='Markdown'
)
elif data == "process_normal":
context.user_data['processing_mode'] = 'normal'
await query.message.reply_text(
"📸 Обычная обработка фото\n\n"
"Отправь мне фото, а затем напиши текстовое описание того, "
"как ты хочешь изменить изображение.\n\n"
"Например: 'сделай фон пляжем' или 'преврати в рисунок'\n\n"
"Отправь фото:"
)
elif data == "history":
await history_command(update, context)
elif data == "help":
await help_command(update, context)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Помощь"""
async def show_style_keyboard(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Показать клавиатуру выбора стиля для 9 мая с иконками"""
keyboard = []
for style_id, style_info in VICTORY_STYLES.items():
button_text = f"{style_info['icon']} {style_info['name']}"
keyboard.append([InlineKeyboardButton(button_text, callback_data=f"style_{style_id}")])
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"📖 Инструкция:\n\n"
"1. Отправь фото\n"
"2. Напиши промпт\n"
"3. Получи результат!\n\n"
"Команды: /start, /test, /history"
"🎖️ *Выбери стиль обработки к 9 мая:*\n\n"
"• Винтажное фото - эффект старой фотографии 1945 года\n"
"• Советский плакат - стиль агитационного плаката\n"
"• Военная форма - примерка формы с наградами\n"
"• Георгиевская лента - портрет с символом Победы\n"
"• Парад Победы - на фоне военного парада\n"
"• Письмо с фронта - стилизация под старое письмо\n"
"• Свой вариант - напиши свой промпт\n\n"
"🖼️ *Формат результата:* 9:16 (вертикальный)\n\n"
"👇 *Выбери стиль:*",
parse_mode='Markdown',
reply_markup=reply_markup
)
async def style_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка выбора стиля"""
query = update.callback_query
await query.answer()
style_id = query.data.replace("style_", "")
logger.info(f"Выбран стиль: {style_id}")
photo_url = context.user_data.get('photo_url')
photo_path = context.user_data.get('photo_path')
image_id = context.user_data.get('image_id')
if (not photo_url and not photo_path) or not image_id:
await query.message.reply_text("❌ Ошибка: фото не найдено. Пожалуйста, отправьте фото заново.")
return
style_info = VICTORY_STYLES.get(style_id, {})
style_icon = style_info.get('icon', '🎖️')
if style_id == "custom":
context.user_data['selected_style'] = 'custom'
context.user_data['awaiting_custom_prompt'] = True
await query.message.reply_text(
f"{style_icon} Напиши свой промпт\n\n"
"Опиши, как ты хочешь изменить фото.\n\n"
"💡 *Важно:* добавь в промпт 'сохрани лицо человека неизменным'\n\n"
"📝 Напиши свой промпт:",
parse_mode='Markdown'
)
else:
context.user_data['selected_style'] = style_id
context.user_data['custom_prompt'] = style_info["prompt"]
await query.message.reply_text(
f"{style_icon} Выбран стиль: *{style_info['name']}*\n\n"
f"📖 {style_info['description']}\n\n"
f"🎨 Начинаю обработку... Пожалуйста, подожди 15-30 секунд.",
parse_mode='Markdown'
)
await process_image_with_style(query.message, context, style_id)
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка фото"""
user = update.effective_user
logger.info(f"Фото от {user.id}")
processing_mode = context.user_data.get('processing_mode', 'normal')
photo_file = await update.message.photo[-1].get_file()
file_id = photo_file.file_id
# Сохраняем фото
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
local_path = os.path.join(UPLOAD_DIR, f"user_{user.id}_{timestamp}.jpg")
await photo_file.download_to_drive(local_path)
# Запись в БД
image_id = await save_image_record(
user_id=user.id,
original_file_id=file_id,
original_url=local_path
)
try:
telegram_url = await get_telegram_file_url(file_id)
logger.info(f"Получен прямой URL из Telegram")
except Exception as e:
logger.error(f"Не удалось получить URL из Telegram: {e}")
telegram_url = None
# Сохраняем запись в БД с обработкой ошибок
try:
image_id = await save_image_record(
user_id=user.id,
original_file_id=file_id,
original_url=local_path
)
logger.info(f"Создана запись в БД: image_id={image_id}")
except Exception as e:
logger.error(f"Ошибка при сохранении в БД: {e}")
await update.message.reply_text("❌ Ошибка при сохранении фото. Попробуйте еще раз.")
return
context.user_data['photo_path'] = local_path
context.user_data['photo_file_id'] = file_id
context.user_data['photo_url'] = telegram_url
context.user_data['image_id'] = image_id
context.user_data['image_path'] = local_path
context.user_data['processing_mode'] = processing_mode
await update.message.reply_text(
"📸 Фото получено!\n\n"
"Напиши, как изменить это фото.\n"
"Например: 'сделай фон пляжем' или 'преврати в рисунок'\n\n"
"/cancel - отмена"
)
return AWAITING_PROMPT
if processing_mode == 'victory':
await show_style_keyboard(update, context)
else:
await update.message.reply_text(
"📸 Фото получено!\n\n"
"Напиши, как изменить это фото.\n"
"Например: 'сделай фон пляжем' или 'преврати в рисунок'\n\n"
"/cancel - отмена"
)
return AWAITING_CUSTOM_PROMPT
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Отмена"""
logger.info(f"Cancel от {update.effective_user.id}")
if 'image_path' in context.user_data:
path = context.user_data['image_path']
if os.path.exists(path):
os.remove(path)
await update.message.reply_text("Операция отменена.")
context.user_data.clear()
return ConversationHandler.END
async def call_piapi_api(image_path: str, prompt: str) -> str:
"""Вызов PiAPI через SOCKS5 прокси"""
async def process_with_nano_banana(image_url: str, prompt: str) -> str:
"""Обработка изображения через Nano Banana Pro"""
if not PIApi_API_KEY:
raise Exception("PIApi_API_KEY не настроен")
with open(image_path, "rb") as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
if "face" not in prompt.lower() and "сохрани" not in prompt.lower():
prompt = f"""CRITICAL: Keep the person's face exactly as in the original image. Do not change or replace the face.
{prompt}"""
headers = {
"x-api-key": PIApi_API_KEY,
@@ -138,21 +351,21 @@ async def call_piapi_api(image_path: str, prompt: str) -> str:
}
payload = {
"model": "black-forest-labs/FLUX.1-dev",
"task_type": "image-to-image",
"model": "gemini",
"task_type": "nano-banana-pro",
"input": {
"image": f"data:image/jpeg;base64,{image_base64}",
"image_urls": [image_url],
"prompt": prompt,
"num_inference_steps": 28,
"guidance_scale": 7.5,
"strength": 0.8
"output_format": "png",
"aspect_ratio": "9:16",
"strength": 0.65,
"safety_level": "medium"
}
}
# Используем aiohttp-socks для запросов
from aiohttp_socks import ProxyConnector
import re
# Парсим прокси
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
@@ -166,88 +379,123 @@ async def call_piapi_api(image_path: str, prompt: str) -> str:
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session:
async with session.post(PIApi_BASE_URL, headers=headers, json=payload, timeout=30) as response:
async with session.post(PIApi_BASE_URL, headers=headers, json=payload, timeout=90) as response:
if response.status != 200:
text = await response.text()
raise Exception(f"HTTP {response.status}: {text[:100]}")
raise Exception(f"HTTP {response.status}: {text[:200]}")
data = await response.json()
if data.get('code') != 200:
raise Exception(f"API Error: {data.get('message')}")
task_id = data['data']['task_id']
task_id = data.get('data', {}).get('task_id')
if not task_id:
raise Exception(f"Не получен task_id")
logger.info(f"Создана задача: {task_id}")
return await poll_task(task_id, connector)
return await poll_nano_banana_task(task_id)
async def poll_task(task_id: str, connector, max_attempts: int = 60) -> str:
async def poll_nano_banana_task(task_id: str, max_attempts: int = 90) -> str:
"""Ожидание завершения задачи"""
get_url = f"https://api.piapi.ai/api/v1/task/{task_id}"
headers = {"x-api-key": PIApi_API_KEY}
async with aiohttp.ClientSession(connector=connector) as session:
for attempt in range(max_attempts):
try:
async with session.get(get_url, headers=headers) as response:
for attempt in range(max_attempts):
try:
from aiohttp_socks import ProxyConnector
import re
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
password = match.group(2)
host = match.group(3)
port = int(match.group(4))
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
else:
from urllib.parse import urlparse
parsed = urlparse(PROXY_URL)
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(get_url, headers=headers, timeout=30) as response:
if response.status != 200:
await asyncio.sleep(2)
await asyncio.sleep(3)
continue
data = await response.json()
status = data.get('data', {}).get('status')
data_info = data.get('data', {}) or {}
status = data_info.get('status', 'unknown')
output = data_info.get('output')
logger.info(f"Задача {task_id}: {status} (попытка {attempt + 1})")
if status == 'completed':
output = data.get('data', {}).get('output', {})
result_url = output.get('url') or output.get('image')
if result_url:
logger.info(f"Задача {task_id} выполнена")
return result_url
elif status == 'failed':
raise Exception("Задача не выполнена")
if output and isinstance(output, dict):
if output.get('image_urls') and len(output['image_urls']) > 0:
result_url = output['image_urls'][0]
if status == 'completed' and result_url:
logger.info(f"Задача выполнена")
return result_url
await asyncio.sleep(2)
except Exception as e:
logger.error(f"Ошибка опроса: {e}")
await asyncio.sleep(2)
if status == 'failed':
error = data_info.get('error', {})
error_msg = error.get('message', 'Unknown error') if error else 'Unknown error'
raise Exception(f"Задача не выполнена: {error_msg}")
await asyncio.sleep(3)
except Exception as e:
logger.error(f"Ошибка опроса: {e}")
await asyncio.sleep(3)
raise Exception("Превышено время ожидания")
async def process_image(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка через PiAPI"""
prompt = update.message.text
user = update.effective_user
logger.info(f"Промпт от {user.id}: {prompt[:50]}")
async def process_image_with_style(message, context: ContextTypes.DEFAULT_TYPE, style_id: str):
"""Обработка фото с выбранным стилем"""
user = message.from_user
photo_url = context.user_data.get('photo_url')
photo_path = context.user_data.get('photo_path')
image_id = context.user_data.get('image_id')
image_path = context.user_data.get('image_path')
if not image_id or not image_path:
await update.message.reply_text("❌ Ошибка: фото не найдено")
return ConversationHandler.END
style_info = VICTORY_STYLES.get(style_id, {})
style_icon = style_info.get('icon', '🎖️')
style_name = style_info.get('name', 'обработка')
status_msg = await update.message.reply_text(
f"🎨 Обрабатываю...\n\n"
f"📝 Промпт: {prompt}\n\n"
f"⏳ Это займет 10-30 секунд"
if style_id == "custom":
custom_prompt = context.user_data.get('custom_prompt', '')
else:
custom_prompt = style_info.get("prompt", "")
if not photo_url and not photo_path:
await message.reply_text("❌ Ошибка: фото не найдено")
return
if not photo_url:
with open(photo_path, "rb") as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
photo_url = f"data:image/jpeg;base64,{image_base64}"
status_msg = await message.reply_text(
f"{style_icon} Обрабатываю через Nano Banana Pro...\n\n"
f"🎖️ Стиль: *{style_name}*\n"
f"🖼️ Формат: 9:16 (вертикальный)\n\n"
f"⏳ Это займет 15-30 секунд",
parse_mode='Markdown'
)
try:
async with aiosqlite.connect("bot_database.db") as db:
await db.execute("UPDATE images SET prompt = ? WHERE id = ?", (prompt, image_id))
await db.commit()
result_url = await call_piapi_api(image_path, prompt)
result_url = await process_with_nano_banana(photo_url, custom_prompt)
if result_url:
result_path = os.path.join(
PROCESSED_DIR,
f"user_{user.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_result.jpg"
f"user_{user.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_result.png"
)
# Скачиваем результат
from aiohttp_socks import ProxyConnector
import re
@@ -270,10 +518,111 @@ async def process_image(update: Update, context: ContextTypes.DEFAULT_TYPE):
f.write(await resp.read())
await status_msg.delete()
caption = f"🎖️ {VICTORY_STYLES[style_id]['name']}\n\n✅ Готово!\n\n🇷🇺 С Днем Победы!"
with open(result_path, "rb") as photo:
await message.reply_photo(photo=photo, caption=caption)
await update_image_record(
image_id=image_id,
processed_file_id=result_path,
processed_url=result_path,
status='completed'
)
logger.info(f"Обработка успешна для {user.id}")
else:
raise Exception(f"HTTP {resp.status}")
else:
raise Exception("Не получен URL результата")
except Exception as e:
logger.error(f"Ошибка обработки: {e}")
await status_msg.edit_text(f"❌ Ошибка: {str(e)}\n\nПопробуйте еще раз.")
await update_image_record(image_id=image_id, status='failed')
if photo_path and os.path.exists(photo_path):
os.remove(photo_path)
context.user_data.clear()
async def process_with_custom_prompt(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка с кастомным промптом"""
prompt = update.message.text
user = update.effective_user
if context.user_data.get('awaiting_custom_prompt'):
context.user_data['awaiting_custom_prompt'] = False
context.user_data['custom_prompt'] = prompt
await update.message.reply_text(
f"✅ Промпт сохранен\n\n🎨 Начинаю обработку..."
)
await process_image_with_style(update.message, context, 'custom')
return ConversationHandler.END
image_id = context.user_data.get('image_id')
photo_path = context.user_data.get('photo_path')
photo_url = context.user_data.get('photo_url')
if not image_id or (not photo_path and not photo_url):
await update.message.reply_text("❌ Ошибка: фото не найдено")
return ConversationHandler.END
if not photo_url:
with open(photo_path, "rb") as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
photo_url = f"data:image/jpeg;base64,{image_base64}"
status_msg = await update.message.reply_text(
f"🎨 Обрабатываю через Nano Banana Pro...\n\n"
f"📝 Промпт: {prompt[:100]}\n\n"
f"⏳ Это займет 15-30 секунд"
)
try:
async with aiosqlite.connect("bot_database.db") as db:
await db.execute("UPDATE images SET prompt = ? WHERE id = ?", (prompt, image_id))
await db.commit()
result_url = await process_with_nano_banana(photo_url, prompt)
if result_url:
result_path = os.path.join(
PROCESSED_DIR,
f"user_{user.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_result.png"
)
from aiohttp_socks import ProxyConnector
import re
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
password = match.group(2)
host = match.group(3)
port = int(match.group(4))
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
else:
from urllib.parse import urlparse
parsed = urlparse(PROXY_URL)
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(result_url) as resp:
if resp.status == 200:
with open(result_path, "wb") as f:
f.write(await resp.read())
await status_msg.delete()
with open(result_path, "rb") as photo:
await update.message.reply_photo(
photo=photo,
caption=f"✅ Готово!\n\n{prompt}"
caption=f"✅ Готово!\n\n{prompt[:200]}"
)
await update_image_record(
@@ -294,9 +643,8 @@ async def process_image(update: Update, context: ContextTypes.DEFAULT_TYPE):
await status_msg.edit_text(f"❌ Ошибка: {str(e)}\n\nПопробуйте еще раз.")
await update_image_record(image_id=image_id, status='failed')
# Очистка
if os.path.exists(image_path):
os.remove(image_path)
if photo_path and os.path.exists(photo_path):
os.remove(photo_path)
context.user_data.clear()
return ConversationHandler.END
@@ -304,66 +652,126 @@ async def process_image(update: Update, context: ContextTypes.DEFAULT_TYPE):
async def history_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""История обработок"""
user = update.effective_user
images = await get_user_images(user.id, limit=5)
if hasattr(update, 'callback_query'):
user_id = update.callback_query.from_user.id
message = update.callback_query.message
else:
user_id = update.effective_user.id
message = update.message
await message.reply_text("📊 Загружаю историю...")
images = await get_user_images(user_id, limit=5)
if not images:
await update.message.reply_text("📭 Нет обработанных изображений")
await message.reply_text("📭 Нет обработанных изображений")
return
text = "🖼 **История обработок:**\n\n"
text = "🖼 История обработок:\n\n"
for i, img in enumerate(images, 1):
status = "" if img['status'] == 'completed' else ""
date = img['created_at'][:16].replace('T', ' ')
prompt = img['prompt'][:35] + "..." if img['prompt'] and len(img['prompt']) > 35 else img[
'prompt'] or "без промпта"
text += f"{status} **{i}.** {date}\n 📝 {prompt}\n\n"
prompt = img['prompt'][:35] + "..." if img['prompt'] and len(img['prompt']) > 35 else img.get('prompt',
'без промпта')
text += f"{status} {i}. {date}\n 📝 {prompt}\n\n"
await update.message.reply_text(text, parse_mode='Markdown')
await message.reply_text(text)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Помощь"""
help_text = (
"📖 *ПОМОЩЬ И ПОДДЕРЖКА*\n\n"
"🇷🇺 *Обработка к 9 мая:*\n"
"1. Нажми 'Обработать фото к 9 мая'\n"
"2. Отправь фото\n"
"3. Выбери стиль\n"
"4. Получи результат!\n\n"
"📸 *Обычная обработка:*\n"
"1. Нажми 'Обычная обработка фото'\n"
"2. Отправь фото\n"
"3. Напиши промпт\n"
"4. Получи результат!\n\n"
"🎖️ *Стили для 9 мая:*\n"
"📷 Винтажное фото 1945 года\n"
"🎨 Советский плакат\n"
"🎖️ Военная форма\n"
"🎗️ Георгиевская лента\n"
"🎆 Парад Победы\n"
"✉️ Письмо с фронта\n\n"
"📞 *По всем вопросам:* @kirill_bouko",
)
if hasattr(update, 'callback_query'):
await update.callback_query.message.reply_text(help_text, parse_mode='Markdown')
else:
await update.message.reply_text(help_text, parse_mode='Markdown')
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Отмена"""
logger.info(f"Cancel от {update.effective_user.id}")
if 'photo_path' in context.user_data:
path = context.user_data['photo_path']
if os.path.exists(path):
os.remove(path)
await update.message.reply_text("Операция отменена.")
context.user_data.clear()
return ConversationHandler.END
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик ошибок"""
logger.error(f"Ошибка: {context.error}")
if update and update.effective_message:
await update.effective_message.reply_text("😵 Произошла ошибка. Попробуйте позже.")
error_message = "😵 Произошла техническая ошибка.\n\n📞 По всем вопросам: @kirill_bouko"
try:
if update and update.effective_message:
await update.effective_message.reply_text(error_message)
elif update and hasattr(update, 'callback_query') and update.callback_query:
await update.callback_query.message.reply_text(error_message)
except Exception as e:
logger.error(f"Ошибка при отправке сообщения: {e}")
async def main():
"""Запуск бота с прокси как в примере"""
"""Запуск бота"""
print("\n" + "=" * 50)
print("🤖 ЗАПУСК TELEGRAM БОТА")
print("🇷🇺 БОТ ДЛЯ ОБРАБОТКИ ФОТО К 9 МАЯ 🇷🇺")
print("=" * 50)
print("Модель: Nano Banana Pro")
print("Формат: 9:16")
print("=" * 50)
# Инициализация БД
await init_db()
# Создаем приложение с прокси (как в вашем примере)
TOKEN = TELEGRAM_BOT_TOKEN
PROXY = PROXY_URL
builder = ApplicationBuilder().token(TELEGRAM_BOT_TOKEN)
builder = ApplicationBuilder().token(TOKEN)
if PROXY:
builder = builder.proxy(PROXY).get_updates_proxy(PROXY)
print(f"🔌 Прокси: {PROXY.split('@')[-1] if '@' in PROXY else PROXY}")
if PROXY_URL:
builder = builder.proxy(PROXY_URL).get_updates_proxy(PROXY_URL)
print(f"🔌 Прокси: {PROXY_URL.split('@')[-1] if '@' in PROXY_URL else PROXY_URL}")
else:
print("🔌 Прокси не используется")
application = builder.build()
print("=" * 50)
# Регистрируем обработчики
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("test", test_command))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("history", history_command))
application.add_handler(CommandHandler("cancel", cancel))
application.add_handler(
CallbackQueryHandler(button_callback, pattern="^(process_victory|process_normal|history|help)$"))
application.add_handler(CallbackQueryHandler(style_callback, pattern="^style_"))
conv_handler = ConversationHandler(
entry_points=[MessageHandler(filters.PHOTO, handle_photo)],
states={AWAITING_PROMPT: [MessageHandler(filters.TEXT & ~filters.COMMAND, process_image)]},
fallbacks=[CommandHandler("cancel", cancel), CommandHandler("start", start)],
states={
AWAITING_CUSTOM_PROMPT: [MessageHandler(filters.TEXT & ~filters.COMMAND, process_with_custom_prompt)],
},
fallbacks=[CommandHandler("cancel", cancel)],
)
application.add_handler(conv_handler)
application.add_error_handler(error_handler)
@@ -371,17 +779,15 @@ async def main():
print("\n✅ Бот запускается...")
try:
# Запускаем бота
await application.initialize()
await application.start()
await application.updater.start_polling()
print("\n" + "=" * 50)
print("🤖 БОТ УСПЕШНО ЗАПУЩЕН!")
print("🇷🇺 БОТ УСПЕШНО ЗАПУЩЕН! 🇷🇺")
print("=" * 50)
print("\n📱 Отправь команду /test в Telegram\n")
print("\n📱 Отправь /start в Telegram\n")
# Держим бота запущенным
await asyncio.Event().wait()
except KeyboardInterrupt:
+2 -6
View File
@@ -8,10 +8,8 @@ DATABASE_PATH = "bot_database.db"
async def init_db():
"""Инициализация базы данных и создание таблиц"""
async with aiosqlite.connect(DATABASE_PATH) as db:
# Включаем WAL режим для лучшей производительности
await db.execute("PRAGMA journal_mode=WAL;")
# Таблица пользователей
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
@@ -23,7 +21,6 @@ async def init_db():
)
""")
# Таблица обработанных изображений
await db.execute("""
CREATE TABLE IF NOT EXISTS images (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -71,11 +68,9 @@ async def save_image_record(
cursor = await db.execute("""
INSERT INTO images (user_id, original_file_id, original_url, prompt, created_at, status)
VALUES (?, ?, ?, ?, ?, 'processing')
RETURNING id
""", (user_id, original_file_id, original_url, prompt, now))
await db.commit()
row = await cursor.fetchone()
return row[0] if row else None
return cursor.lastrowid
async def update_image_record(
@@ -110,4 +105,5 @@ async def get_user_images(user_id: int, limit: int = 10) -> List[Dict[str, Any]]
LIMIT ?
""", (user_id, limit))
rows = await cursor.fetchall()
await db.commit()
return [dict(row) for row in rows]