Files
9may/bot.py
T
apuc b4f889def2 Merge remote-tracking branch 'origin/master'
# Conflicts:
#	bot.py
#	database.py
2026-05-08 20:56:53 +03:00

401 lines
14 KiB
Python

import asyncio
import aiohttp
import os
import sys
import logging
import base64
import re
from datetime import datetime
from telegram import Update
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
ConversationHandler,
filters,
ContextTypes,
ApplicationBuilder
)
from config import TELEGRAM_BOT_TOKEN, PIApi_API_KEY, PIApi_BASE_URL, PROXY_URL, PROXY_TYPE
from database import init_db, save_user, save_image_record, update_image_record, get_user_images
import aiosqlite
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Состояния
AWAITING_PROMPT = 1
# Директории
UPLOAD_DIR = "uploads"
PROCESSED_DIR = "processed"
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик /start"""
user = update.effective_user
logger.info(f"Start от {user.id}")
await save_user(
user_id=user.id,
username=user.username,
first_name=user.first_name,
last_name=user.last_name
)
await update.message.reply_text(
f"👋 Привет, {user.first_name}!\n\n"
f"Я бот для обработки фотографий через нейросеть.\n\n"
f"📸 Отправь мне фото и напиши, как его изменить!\n\n"
f"Команды:\n"
f"/start - Приветствие\n"
f"/test - Проверка работы\n"
f"/history - История обработок"
)
async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Тестовая команда"""
logger.info(f"Test от {update.effective_user.id}")
await update.message.reply_text("✅ Бот работает через SOCKS5 прокси!")
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Помощь"""
await update.message.reply_text(
"📖 Инструкция:\n\n"
"1. Отправь фото\n"
"2. Напиши промпт\n"
"3. Получи результат!\n\n"
"Команды: /start, /test, /history"
)
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка фото"""
user = update.effective_user
logger.info(f"Фото от {user.id}")
photo_file = await update.message.photo[-1].get_file()
file_id = photo_file.file_id
# Сохраняем фото
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
local_path = os.path.join(UPLOAD_DIR, f"user_{user.id}_{timestamp}.jpg")
await photo_file.download_to_drive(local_path)
# Запись в БД
image_id = await save_image_record(
user_id=user.id,
original_file_id=file_id,
original_url=local_path
)
context.user_data['image_id'] = image_id
context.user_data['image_path'] = local_path
await update.message.reply_text(
"📸 Фото получено!\n\n"
"Напиши, как изменить это фото.\n"
"Например: 'сделай фон пляжем' или 'преврати в рисунок'\n\n"
"/cancel - отмена"
)
return AWAITING_PROMPT
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Отмена"""
logger.info(f"Cancel от {update.effective_user.id}")
if 'image_path' in context.user_data:
path = context.user_data['image_path']
if os.path.exists(path):
os.remove(path)
await update.message.reply_text("Операция отменена.")
context.user_data.clear()
return ConversationHandler.END
async def call_piapi_api(image_path: str, prompt: str) -> str:
"""Вызов PiAPI через SOCKS5 прокси"""
if not PIApi_API_KEY:
raise Exception("PIApi_API_KEY не настроен")
with open(image_path, "rb") as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
headers = {
"x-api-key": PIApi_API_KEY,
"Content-Type": "application/json"
}
payload = {
"model": "black-forest-labs/FLUX.1-dev",
"task_type": "image-to-image",
"input": {
"image": f"data:image/jpeg;base64,{image_base64}",
"prompt": prompt,
"num_inference_steps": 28,
"guidance_scale": 7.5,
"strength": 0.8
}
}
# Используем aiohttp-socks для запросов
from aiohttp_socks import ProxyConnector
# Парсим прокси
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
password = match.group(2)
host = match.group(3)
port = int(match.group(4))
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
else:
from urllib.parse import urlparse
parsed = urlparse(PROXY_URL)
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session:
async with session.post(PIApi_BASE_URL, headers=headers, json=payload, timeout=30) as response:
if response.status != 200:
text = await response.text()
raise Exception(f"HTTP {response.status}: {text[:100]}")
data = await response.json()
if data.get('code') != 200:
raise Exception(f"API Error: {data.get('message')}")
task_id = data['data']['task_id']
logger.info(f"Создана задача: {task_id}")
return await poll_task(task_id, connector)
async def poll_task(task_id: str, connector, max_attempts: int = 60) -> str:
"""Ожидание завершения задачи"""
get_url = f"https://api.piapi.ai/api/v1/task/{task_id}"
headers = {"x-api-key": PIApi_API_KEY}
async with aiohttp.ClientSession(connector=connector) as session:
for attempt in range(max_attempts):
try:
async with session.get(get_url, headers=headers) as response:
if response.status != 200:
await asyncio.sleep(2)
continue
data = await response.json()
status = data.get('data', {}).get('status')
logger.info(f"Задача {task_id}: {status} (попытка {attempt + 1})")
if status == 'completed':
output = data.get('data', {}).get('output', {})
result_url = output.get('url') or output.get('image')
if result_url:
logger.info(f"Задача {task_id} выполнена")
return result_url
elif status == 'failed':
raise Exception("Задача не выполнена")
await asyncio.sleep(2)
except Exception as e:
logger.error(f"Ошибка опроса: {e}")
await asyncio.sleep(2)
raise Exception("Превышено время ожидания")
async def process_image(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка через PiAPI"""
prompt = update.message.text
user = update.effective_user
logger.info(f"Промпт от {user.id}: {prompt[:50]}")
image_id = context.user_data.get('image_id')
image_path = context.user_data.get('image_path')
if not image_id or not image_path:
await update.message.reply_text("❌ Ошибка: фото не найдено")
return ConversationHandler.END
status_msg = await update.message.reply_text(
f"🎨 Обрабатываю...\n\n"
f"📝 Промпт: {prompt}\n\n"
f"⏳ Это займет 10-30 секунд"
)
try:
async with aiosqlite.connect("bot_database.db") as db:
await db.execute("UPDATE images SET prompt = ? WHERE id = ?", (prompt, image_id))
await db.commit()
result_url = await call_piapi_api(image_path, prompt)
if result_url:
result_path = os.path.join(
PROCESSED_DIR,
f"user_{user.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_result.jpg"
)
# Скачиваем результат
from aiohttp_socks import ProxyConnector
import re
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
password = match.group(2)
host = match.group(3)
port = int(match.group(4))
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
else:
from urllib.parse import urlparse
parsed = urlparse(PROXY_URL)
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(result_url) as resp:
if resp.status == 200:
with open(result_path, "wb") as f:
f.write(await resp.read())
await status_msg.delete()
with open(result_path, "rb") as photo:
await update.message.reply_photo(
photo=photo,
caption=f"✅ Готово!\n\n{prompt}"
)
await update_image_record(
image_id=image_id,
processed_file_id=result_path,
processed_url=result_path,
status='completed'
)
logger.info(f"Обработка успешна для {user.id}")
else:
raise Exception(f"HTTP {resp.status}")
else:
raise Exception("Не получен URL результата")
except Exception as e:
logger.error(f"Ошибка: {e}")
await status_msg.edit_text(f"❌ Ошибка: {str(e)}\n\nПопробуйте еще раз.")
await update_image_record(image_id=image_id, status='failed')
# Очистка
if os.path.exists(image_path):
os.remove(image_path)
context.user_data.clear()
return ConversationHandler.END
async def history_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""История обработок"""
user = update.effective_user
images = await get_user_images(user.id, limit=5)
if not images:
await update.message.reply_text("📭 Нет обработанных изображений")
return
text = "🖼 **История обработок:**\n\n"
for i, img in enumerate(images, 1):
status = "" if img['status'] == 'completed' else ""
date = img['created_at'][:16].replace('T', ' ')
prompt = img['prompt'][:35] + "..." if img['prompt'] and len(img['prompt']) > 35 else img[
'prompt'] or "без промпта"
text += f"{status} **{i}.** {date}\n 📝 {prompt}\n\n"
await update.message.reply_text(text, parse_mode='Markdown')
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик ошибок"""
logger.error(f"Ошибка: {context.error}")
if update and update.effective_message:
await update.effective_message.reply_text("😵 Произошла ошибка. Попробуйте позже.")
async def main():
"""Запуск бота с прокси как в примере"""
print("\n" + "=" * 50)
print("🤖 ЗАПУСК TELEGRAM БОТА")
print("=" * 50)
# Инициализация БД
await init_db()
# Создаем приложение с прокси (как в вашем примере)
TOKEN = TELEGRAM_BOT_TOKEN
PROXY = PROXY_URL
builder = ApplicationBuilder().token(TOKEN)
if PROXY:
builder = builder.proxy(PROXY).get_updates_proxy(PROXY)
print(f"🔌 Прокси: {PROXY.split('@')[-1] if '@' in PROXY else PROXY}")
else:
print("🔌 Прокси не используется")
application = builder.build()
print("=" * 50)
# Регистрируем обработчики
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("test", test_command))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("history", history_command))
conv_handler = ConversationHandler(
entry_points=[MessageHandler(filters.PHOTO, handle_photo)],
states={AWAITING_PROMPT: [MessageHandler(filters.TEXT & ~filters.COMMAND, process_image)]},
fallbacks=[CommandHandler("cancel", cancel), CommandHandler("start", start)],
)
application.add_handler(conv_handler)
application.add_error_handler(error_handler)
print("\n✅ Бот запускается...")
try:
# Запускаем бота
await application.initialize()
await application.start()
await application.updater.start_polling()
print("\n" + "=" * 50)
print("🤖 БОТ УСПЕШНО ЗАПУЩЕН!")
print("=" * 50)
print("\n📱 Отправь команду /test в Telegram\n")
# Держим бота запущенным
await asyncio.Event().wait()
except KeyboardInterrupt:
print("\n🛑 Бот остановлен")
except Exception as e:
logger.error(f"Ошибка: {e}")
print(f"\n❌ Ошибка: {e}")
finally:
await application.updater.stop()
await application.stop()
if __name__ == "__main__":
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())