import asyncio import aiohttp from aiohttp_socks import ProxyConnector import json import re import os from dotenv import load_dotenv import logging from datetime import datetime import base64 from PIL import Image import io # Настройка логирования logging.basicConfig( format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) load_dotenv() # Конфигурация PIAPI_API_KEY = os.getenv("PIAPI_API_KEY") PIAPI_API_URL = os.getenv("PIAPI_API_URL", "https://api.piapi.ai/api/v1/task") PROXY_URL = os.getenv("PROXY_URL") USE_PROXY = os.getenv("USE_PROXY", "true").lower() == "true" MODEL_NAME = os.getenv("MODEL_NAME", "gemini") TASK_TYPE = os.getenv("TASK_TYPE", "nano-banana-pro") ASPECT_RATIO = os.getenv("ASPECT_RATIO", "9:16") STRENGTH = float(os.getenv("STRENGTH", "0.65")) SAFETY_LEVEL = os.getenv("SAFETY_LEVEL", "medium") OUTPUT_FORMAT = os.getenv("OUTPUT_FORMAT", "png") IMGBB_API_KEY = os.getenv("IMGBB_API_KEY", "") def get_proxy_connector(): """Создает ProxyConnector для SOCKS5""" if not USE_PROXY or not PROXY_URL: return None try: match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL) if match: username = match.group(1) password = match.group(2) host = match.group(3) port = int(match.group(4)) connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}") logger.info(f"✅ Прокси настроен: {host}:{port}") return connector else: from urllib.parse import urlparse parsed = urlparse(PROXY_URL) connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}") logger.info(f"✅ Прокси настроен (без авторизации): {parsed.hostname}:{parsed.port}") return connector except Exception as e: logger.error(f"❌ Ошибка настройки прокси: {e}") return None async def poll_task_result(task_id: str, max_attempts: int = 60) -> str: """Ожидание результата задачи с правильным управлением сессией""" print("\n" + "=" * 60) print("⏳ Ожидание результата обработки") print("=" * 60) get_url = f"https://api.piapi.ai/api/v1/task/{task_id}" headers = {"x-api-key": PIAPI_API_KEY} print(f"📋 ID задачи: {task_id}") print(f"🔄 Ожидание результата (максимум {max_attempts * 3} секунд)...") connector = get_proxy_connector() for attempt in range(max_attempts): try: # Создаем новую сессию для каждой попытки async with aiohttp.ClientSession(connector=connector) as session: async with session.get(get_url, headers=headers, timeout=30) as response: if response.status != 200: print(f"⚠️ HTTP {response.status}, повтор через 3 секунды...") await asyncio.sleep(3) continue data = await response.json() data_info = data.get('data', {}) status = data_info.get('status', 'unknown') output = data_info.get('output') print(f" Попытка {attempt + 1}: статус = {status}") if status == 'completed' and output: result_url = None if isinstance(output, dict): if output.get('image_urls') and len(output['image_urls']) > 0: result_url = output['image_urls'][0] print(f" ✅ Найден URL в image_urls[0]") elif output.get('url'): result_url = output['url'] print(f" ✅ Найден URL в url") elif output.get('image'): result_url = output['image'] print(f" ✅ Найден URL в image") if result_url: print(f"\n✅ ЗАДАЧА ВЫПОЛНЕНА!") print(f"📸 URL результата: {result_url[:100]}...") return result_url else: print(f"⚠️ Статус completed, но URL не найден") print(f" Output: {output}") elif status == 'failed': error = data_info.get('error', {}) error_msg = error.get('message', 'Unknown error') print(f"❌ Задача не выполнена: {error_msg}") return None elif status in ['pending', 'processing']: # Продолжаем ожидание pass else: print(f" Неизвестный статус: {status}") except asyncio.TimeoutError: print(f"⚠️ Таймаут при опросе (попытка {attempt + 1})") except aiohttp.ClientError as e: print(f"⚠️ Клиентская ошибка (попытка {attempt + 1}): {e}") except Exception as e: print(f"⚠️ Ошибка (попытка {attempt + 1}): {e}") # Ждем перед следующей попыткой await asyncio.sleep(3) print("❌ Превышено время ожидания") return None async def download_result(url: str): """Скачивание результата""" print("\n" + "=" * 60) print("💾 Скачивание результата") print("=" * 60) connector = get_proxy_connector() try: async with aiohttp.ClientSession(connector=connector) as session: async with session.get(url, timeout=30) as response: if response.status == 200: content = await response.read() print(f"✅ Изображение скачано! Размер: {len(content)} байт") os.makedirs("test_results", exist_ok=True) filename = f"test_results/test_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" with open(filename, "wb") as f: f.write(content) print(f"💾 Сохранено в: {filename}") return True else: print(f"❌ Ошибка скачивания: HTTP {response.status}") return False except Exception as e: print(f"❌ Ошибка: {e}") return False async def check_status(task_id: str): """Проверка статуса задачи""" print("\n" + "=" * 60) print("🔍 ПРОВЕРКА СТАТУСА ЗАДАЧИ") print("=" * 60) get_url = f"https://api.piapi.ai/api/v1/task/{task_id}" headers = {"x-api-key": PIAPI_API_KEY} connector = get_proxy_connector() try: async with aiohttp.ClientSession(connector=connector) as session: async with session.get(get_url, headers=headers, timeout=30) as response: if response.status == 200: data = await response.json() print(json.dumps(data, indent=2, ensure_ascii=False)) # Проверяем статус data_info = data.get('data', {}) status = data_info.get('status') output = data_info.get('output') if status == 'completed' and output: print("\n✅ Задача выполнена!") if isinstance(output, dict): if output.get('image_urls'): print(f"📸 URL: {output['image_urls'][0]}") elif status == 'failed': error = data_info.get('error', {}) print(f"\n❌ Задача не выполнена: {error.get('message')}") else: print(f"\n⏳ Статус: {status}") return data else: print(f"❌ HTTP {response.status}") return None except Exception as e: print(f"❌ Ошибка: {e}") return None async def full_test(task_id: str): """Полный тест с ожиданием результата""" print("\n" + "=" * 60) print("🚀 ПОЛНЫЙ ТЕСТ") print("=" * 60) # Проверяем статус задачи data = await check_status(task_id) if not data: print("❌ Не удалось получить статус задачи") return data_info = data.get('data', {}) status = data_info.get('status') # Если задача уже выполнена, сразу скачиваем if status == 'completed': output = data_info.get('output', {}) if isinstance(output, dict): result_url = output.get('image_urls', [None])[0] or output.get('url') if result_url: await download_result(result_url) return # Иначе ждем result_url = await poll_task_result(task_id) if result_url: await download_result(result_url) else: print("\n❌ Не удалось получить результат") if __name__ == "__main__": import sys if len(sys.argv) > 2 and sys.argv[1] == "--full": task_id = sys.argv[2] asyncio.run(full_test(task_id)) elif len(sys.argv) > 2 and sys.argv[1] == "--status": task_id = sys.argv[2] asyncio.run(check_status(task_id)) else: print("Использование:") print(" python test_api.py --status TASK_ID - проверить статус") print(" python test_api.py --full TASK_ID - дождаться результата и скачать")