import asyncio import aiohttp import os import sys import logging import base64 from datetime import datetime from telegram import Update from telegram.ext import ( Application, CommandHandler, MessageHandler, ConversationHandler, filters, ContextTypes ) from config import TELEGRAM_BOT_TOKEN, PIApi_API_KEY, PIApi_BASE_URL, PROXY_URL, PROXY_TYPE from database import init_db, save_user, save_image_record, update_image_record, get_user_images import aiosqlite # Настройка логирования logging.basicConfig( format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # Состояния AWAITING_PROMPT = 1 # Директории UPLOAD_DIR = "uploads" PROCESSED_DIR = "processed" os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(PROCESSED_DIR, exist_ok=True) async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обработчик /start""" user = update.effective_user logger.info(f"Start от {user.id}") await save_user( user_id=user.id, username=user.username, first_name=user.first_name, last_name=user.last_name ) await update.message.reply_text( f"👋 Привет, {user.first_name}!\n\n" f"Я бот для обработки фотографий через нейросеть.\n\n" f"📸 Отправь мне фото и напиши, как его изменить!\n\n" f"Команды:\n" f"/start - Приветствие\n" f"/test - Проверка работы\n" f"/history - История обработок" ) async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Тестовая команда""" logger.info(f"Test от {update.effective_user.id}") await update.message.reply_text("✅ Бот работает через SOCKS5 прокси!") async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Помощь""" await update.message.reply_text( "📖 Инструкция:\n\n" "1. Отправь фото\n" "2. Напиши промпт\n" "3. Получи результат!\n\n" "Команды: /start, /test, /history" ) async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обработка фото""" user = update.effective_user logger.info(f"Фото от {user.id}") photo_file = await update.message.photo[-1].get_file() file_id = photo_file.file_id # Сохраняем фото timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') local_path = os.path.join(UPLOAD_DIR, f"user_{user.id}_{timestamp}.jpg") await photo_file.download_to_drive(local_path) # Запись в БД image_id = await save_image_record( user_id=user.id, original_file_id=file_id, original_url=local_path ) context.user_data['image_id'] = image_id context.user_data['image_path'] = local_path await update.message.reply_text( "📸 Фото получено!\n\n" "Напиши, как изменить это фото.\n" "Например: 'сделай фон пляжем' или 'преврати в рисунок'\n\n" "/cancel - отмена" ) return AWAITING_PROMPT async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE): """Отмена""" logger.info(f"Cancel от {update.effective_user.id}") if 'image_path' in context.user_data: path = context.user_data['image_path'] if os.path.exists(path): os.remove(path) await update.message.reply_text("Операция отменена.") context.user_data.clear() return ConversationHandler.END async def call_piapi_api(image_path: str, prompt: str) -> str: """Вызов PiAPI через SOCKS5 прокси""" if not PIApi_API_KEY: raise Exception("PIApi_API_KEY не настроен") with open(image_path, "rb") as f: image_base64 = base64.b64encode(f.read()).decode('utf-8') headers = { "x-api-key": PIApi_API_KEY, "Content-Type": "application/json" } payload = { "model": "black-forest-labs/FLUX.1-dev", "task_type": "image-to-image", "input": { "image": f"data:image/jpeg;base64,{image_base64}", "prompt": prompt, "num_inference_steps": 28, "guidance_scale": 7.5, "strength": 0.8 } } # Используем aiohttp-socks для запросов from aiohttp_socks import ProxyConnector import re # Парсим прокси match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL) if match: username = match.group(1) password = match.group(2) host = match.group(3) port = int(match.group(4)) connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}") else: from urllib.parse import urlparse parsed = urlparse(PROXY_URL) connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}") async with aiohttp.ClientSession(connector=connector) as session: async with session.post(PIApi_BASE_URL, headers=headers, json=payload, timeout=30) as response: if response.status != 200: text = await response.text() raise Exception(f"HTTP {response.status}: {text[:100]}") data = await response.json() if data.get('code') != 200: raise Exception(f"API Error: {data.get('message')}") task_id = data['data']['task_id'] logger.info(f"Создана задача: {task_id}") return await poll_task(task_id, connector) async def poll_task(task_id: str, connector, max_attempts: int = 60) -> str: """Ожидание завершения задачи""" get_url = f"https://api.piapi.ai/api/v1/task/{task_id}" headers = {"x-api-key": PIApi_API_KEY} async with aiohttp.ClientSession(connector=connector) as session: for attempt in range(max_attempts): try: async with session.get(get_url, headers=headers) as response: if response.status != 200: await asyncio.sleep(2) continue data = await response.json() status = data.get('data', {}).get('status') logger.info(f"Задача {task_id}: {status} (попытка {attempt + 1})") if status == 'completed': output = data.get('data', {}).get('output', {}) result_url = output.get('url') or output.get('image') if result_url: logger.info(f"Задача {task_id} выполнена") return result_url elif status == 'failed': raise Exception("Задача не выполнена") await asyncio.sleep(2) except Exception as e: logger.error(f"Ошибка опроса: {e}") await asyncio.sleep(2) raise Exception("Превышено время ожидания") async def process_image(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обработка через PiAPI""" prompt = update.message.text user = update.effective_user logger.info(f"Промпт от {user.id}: {prompt[:50]}") image_id = context.user_data.get('image_id') image_path = context.user_data.get('image_path') if not image_id or not image_path: await update.message.reply_text("❌ Ошибка: фото не найдено") return ConversationHandler.END status_msg = await update.message.reply_text( f"🎨 Обрабатываю...\n\n" f"📝 Промпт: {prompt}\n\n" f"⏳ Это займет 10-30 секунд" ) try: async with aiosqlite.connect("bot_database.db") as db: await db.execute("UPDATE images SET prompt = ? WHERE id = ?", (prompt, image_id)) await db.commit() result_url = await call_piapi_api(image_path, prompt) if result_url: result_path = os.path.join( PROCESSED_DIR, f"user_{user.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_result.jpg" ) # Скачиваем результат from aiohttp_socks import ProxyConnector import re match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL) if match: username = match.group(1) password = match.group(2) host = match.group(3) port = int(match.group(4)) connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}") else: from urllib.parse import urlparse parsed = urlparse(PROXY_URL) connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}") async with aiohttp.ClientSession(connector=connector) as session: async with session.get(result_url) as resp: if resp.status == 200: with open(result_path, "wb") as f: f.write(await resp.read()) await status_msg.delete() with open(result_path, "rb") as photo: await update.message.reply_photo( photo=photo, caption=f"✅ Готово!\n\n✨ {prompt}" ) await update_image_record( image_id=image_id, processed_file_id=result_path, processed_url=result_path, status='completed' ) logger.info(f"Обработка успешна для {user.id}") else: raise Exception(f"HTTP {resp.status}") else: raise Exception("Не получен URL результата") except Exception as e: logger.error(f"Ошибка: {e}") await status_msg.edit_text(f"❌ Ошибка: {str(e)}\n\nПопробуйте еще раз.") await update_image_record(image_id=image_id, status='failed') # Очистка if os.path.exists(image_path): os.remove(image_path) context.user_data.clear() return ConversationHandler.END async def history_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """История обработок""" user = update.effective_user images = await get_user_images(user.id, limit=5) if not images: await update.message.reply_text("📭 Нет обработанных изображений") return text = "🖼 **История обработок:**\n\n" for i, img in enumerate(images, 1): status = "✅" if img['status'] == 'completed' else "❌" date = img['created_at'][:16].replace('T', ' ') prompt = img['prompt'][:35] + "..." if img['prompt'] and len(img['prompt']) > 35 else img[ 'prompt'] or "без промпта" text += f"{status} **{i}.** {date}\n 📝 {prompt}\n\n" await update.message.reply_text(text, parse_mode='Markdown') async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обработчик ошибок""" logger.error(f"Ошибка: {context.error}") if update and update.effective_message: await update.effective_message.reply_text("😵 Произошла ошибка. Попробуйте позже.") async def main(): """Запуск бота""" print("\n" + "=" * 50) print("🤖 ЗАПУСК TELEGRAM БОТА") print("=" * 50) if not PROXY_URL: print("❌ Прокси не настроен!") return print(f"🔌 Прокси: {PROXY_URL.split('@')[-1] if '@' in PROXY_URL else PROXY_URL}") print("=" * 50) # Инициализация БД await init_db() # Создаем приложение с поддержкой SOCKS5 # python-telegram-bot[socks] автоматически поддерживает SOCKS5 application = Application.builder().token(TELEGRAM_BOT_TOKEN).build() # Для исходящих запросов бота используем прокси # Устанавливаем прокси через переменные окружения os.environ['ALL_PROXY'] = PROXY_URL os.environ['HTTP_PROXY'] = PROXY_URL os.environ['HTTPS_PROXY'] = PROXY_URL # Регистрируем обработчики application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("test", test_command)) application.add_handler(CommandHandler("help", help_command)) application.add_handler(CommandHandler("history", history_command)) conv_handler = ConversationHandler( entry_points=[MessageHandler(filters.PHOTO, handle_photo)], states={AWAITING_PROMPT: [MessageHandler(filters.TEXT & ~filters.COMMAND, process_image)]}, fallbacks=[CommandHandler("cancel", cancel)], ) application.add_handler(conv_handler) application.add_error_handler(error_handler) print("\n✅ Бот запускается через SOCKS5 прокси...") try: await application.initialize() await application.start() await application.updater.start_polling() print("\n" + "=" * 50) print("🤖 БОТ УСПЕШНО ЗАПУЩЕН!") print("=" * 50) print("\n📱 Отправь команду /test в Telegram\n") await asyncio.Event().wait() except KeyboardInterrupt: print("\n🛑 Бот остановлен") except Exception as e: logger.error(f"Ошибка: {e}") print(f"\n❌ Ошибка: {e}") finally: await application.updater.stop() await application.stop() if __name__ == "__main__": if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(main())