Files
2026-05-07 00:17:25 +03:00

259 lines
10 KiB
Python

import asyncio
import aiohttp
from aiohttp_socks import ProxyConnector
import json
import re
import os
from dotenv import load_dotenv
import logging
from datetime import datetime
import base64
from PIL import Image
import io
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
load_dotenv()
# Конфигурация
PIAPI_API_KEY = os.getenv("PIAPI_API_KEY")
PIAPI_API_URL = os.getenv("PIAPI_API_URL", "https://api.piapi.ai/api/v1/task")
PROXY_URL = os.getenv("PROXY_URL")
USE_PROXY = os.getenv("USE_PROXY", "true").lower() == "true"
MODEL_NAME = os.getenv("MODEL_NAME", "gemini")
TASK_TYPE = os.getenv("TASK_TYPE", "nano-banana-pro")
ASPECT_RATIO = os.getenv("ASPECT_RATIO", "9:16")
STRENGTH = float(os.getenv("STRENGTH", "0.65"))
SAFETY_LEVEL = os.getenv("SAFETY_LEVEL", "medium")
OUTPUT_FORMAT = os.getenv("OUTPUT_FORMAT", "png")
IMGBB_API_KEY = os.getenv("IMGBB_API_KEY", "")
def get_proxy_connector():
"""Создает ProxyConnector для SOCKS5"""
if not USE_PROXY or not PROXY_URL:
return None
try:
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
if match:
username = match.group(1)
password = match.group(2)
host = match.group(3)
port = int(match.group(4))
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
logger.info(f"✅ Прокси настроен: {host}:{port}")
return connector
else:
from urllib.parse import urlparse
parsed = urlparse(PROXY_URL)
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
logger.info(f"✅ Прокси настроен (без авторизации): {parsed.hostname}:{parsed.port}")
return connector
except Exception as e:
logger.error(f"❌ Ошибка настройки прокси: {e}")
return None
async def poll_task_result(task_id: str, max_attempts: int = 60) -> str:
"""Ожидание результата задачи с правильным управлением сессией"""
print("\n" + "=" * 60)
print("⏳ Ожидание результата обработки")
print("=" * 60)
get_url = f"https://api.piapi.ai/api/v1/task/{task_id}"
headers = {"x-api-key": PIAPI_API_KEY}
print(f"📋 ID задачи: {task_id}")
print(f"🔄 Ожидание результата (максимум {max_attempts * 3} секунд)...")
connector = get_proxy_connector()
for attempt in range(max_attempts):
try:
# Создаем новую сессию для каждой попытки
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(get_url, headers=headers, timeout=30) as response:
if response.status != 200:
print(f"⚠️ HTTP {response.status}, повтор через 3 секунды...")
await asyncio.sleep(3)
continue
data = await response.json()
data_info = data.get('data', {})
status = data_info.get('status', 'unknown')
output = data_info.get('output')
print(f" Попытка {attempt + 1}: статус = {status}")
if status == 'completed' and output:
result_url = None
if isinstance(output, dict):
if output.get('image_urls') and len(output['image_urls']) > 0:
result_url = output['image_urls'][0]
print(f" ✅ Найден URL в image_urls[0]")
elif output.get('url'):
result_url = output['url']
print(f" ✅ Найден URL в url")
elif output.get('image'):
result_url = output['image']
print(f" ✅ Найден URL в image")
if result_url:
print(f"\n✅ ЗАДАЧА ВЫПОЛНЕНА!")
print(f"📸 URL результата: {result_url[:100]}...")
return result_url
else:
print(f"⚠️ Статус completed, но URL не найден")
print(f" Output: {output}")
elif status == 'failed':
error = data_info.get('error', {})
error_msg = error.get('message', 'Unknown error')
print(f"❌ Задача не выполнена: {error_msg}")
return None
elif status in ['pending', 'processing']:
# Продолжаем ожидание
pass
else:
print(f" Неизвестный статус: {status}")
except asyncio.TimeoutError:
print(f"⚠️ Таймаут при опросе (попытка {attempt + 1})")
except aiohttp.ClientError as e:
print(f"⚠️ Клиентская ошибка (попытка {attempt + 1}): {e}")
except Exception as e:
print(f"⚠️ Ошибка (попытка {attempt + 1}): {e}")
# Ждем перед следующей попыткой
await asyncio.sleep(3)
print("❌ Превышено время ожидания")
return None
async def download_result(url: str):
"""Скачивание результата"""
print("\n" + "=" * 60)
print("💾 Скачивание результата")
print("=" * 60)
connector = get_proxy_connector()
try:
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(url, timeout=30) as response:
if response.status == 200:
content = await response.read()
print(f"✅ Изображение скачано! Размер: {len(content)} байт")
os.makedirs("test_results", exist_ok=True)
filename = f"test_results/test_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
with open(filename, "wb") as f:
f.write(content)
print(f"💾 Сохранено в: {filename}")
return True
else:
print(f"❌ Ошибка скачивания: HTTP {response.status}")
return False
except Exception as e:
print(f"❌ Ошибка: {e}")
return False
async def check_status(task_id: str):
"""Проверка статуса задачи"""
print("\n" + "=" * 60)
print("🔍 ПРОВЕРКА СТАТУСА ЗАДАЧИ")
print("=" * 60)
get_url = f"https://api.piapi.ai/api/v1/task/{task_id}"
headers = {"x-api-key": PIAPI_API_KEY}
connector = get_proxy_connector()
try:
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(get_url, headers=headers, timeout=30) as response:
if response.status == 200:
data = await response.json()
print(json.dumps(data, indent=2, ensure_ascii=False))
# Проверяем статус
data_info = data.get('data', {})
status = data_info.get('status')
output = data_info.get('output')
if status == 'completed' and output:
print("\n✅ Задача выполнена!")
if isinstance(output, dict):
if output.get('image_urls'):
print(f"📸 URL: {output['image_urls'][0]}")
elif status == 'failed':
error = data_info.get('error', {})
print(f"\n❌ Задача не выполнена: {error.get('message')}")
else:
print(f"\n⏳ Статус: {status}")
return data
else:
print(f"❌ HTTP {response.status}")
return None
except Exception as e:
print(f"❌ Ошибка: {e}")
return None
async def full_test(task_id: str):
"""Полный тест с ожиданием результата"""
print("\n" + "=" * 60)
print("🚀 ПОЛНЫЙ ТЕСТ")
print("=" * 60)
# Проверяем статус задачи
data = await check_status(task_id)
if not data:
print("❌ Не удалось получить статус задачи")
return
data_info = data.get('data', {})
status = data_info.get('status')
# Если задача уже выполнена, сразу скачиваем
if status == 'completed':
output = data_info.get('output', {})
if isinstance(output, dict):
result_url = output.get('image_urls', [None])[0] or output.get('url')
if result_url:
await download_result(result_url)
return
# Иначе ждем
result_url = await poll_task_result(task_id)
if result_url:
await download_result(result_url)
else:
print("\n❌ Не удалось получить результат")
if __name__ == "__main__":
import sys
if len(sys.argv) > 2 and sys.argv[1] == "--full":
task_id = sys.argv[2]
asyncio.run(full_test(task_id))
elif len(sys.argv) > 2 and sys.argv[1] == "--status":
task_id = sys.argv[2]
asyncio.run(check_status(task_id))
else:
print("Использование:")
print(" python test_api.py --status TASK_ID - проверить статус")
print(" python test_api.py --full TASK_ID - дождаться результата и скачать")