259 lines
10 KiB
Python
259 lines
10 KiB
Python
import asyncio
|
|
import aiohttp
|
|
from aiohttp_socks import ProxyConnector
|
|
import json
|
|
import re
|
|
import os
|
|
from dotenv import load_dotenv
|
|
import logging
|
|
from datetime import datetime
|
|
import base64
|
|
from PIL import Image
|
|
import io
|
|
|
|
# Настройка логирования
|
|
logging.basicConfig(
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
level=logging.INFO
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
load_dotenv()
|
|
|
|
# Конфигурация
|
|
PIAPI_API_KEY = os.getenv("PIAPI_API_KEY")
|
|
PIAPI_API_URL = os.getenv("PIAPI_API_URL", "https://api.piapi.ai/api/v1/task")
|
|
PROXY_URL = os.getenv("PROXY_URL")
|
|
USE_PROXY = os.getenv("USE_PROXY", "true").lower() == "true"
|
|
|
|
MODEL_NAME = os.getenv("MODEL_NAME", "gemini")
|
|
TASK_TYPE = os.getenv("TASK_TYPE", "nano-banana-pro")
|
|
ASPECT_RATIO = os.getenv("ASPECT_RATIO", "9:16")
|
|
STRENGTH = float(os.getenv("STRENGTH", "0.65"))
|
|
SAFETY_LEVEL = os.getenv("SAFETY_LEVEL", "medium")
|
|
OUTPUT_FORMAT = os.getenv("OUTPUT_FORMAT", "png")
|
|
|
|
IMGBB_API_KEY = os.getenv("IMGBB_API_KEY", "")
|
|
|
|
|
|
def get_proxy_connector():
|
|
"""Создает ProxyConnector для SOCKS5"""
|
|
if not USE_PROXY or not PROXY_URL:
|
|
return None
|
|
|
|
try:
|
|
match = re.search(r'socks5://([^:]+):([^@]+)@([^:]+):(\d+)', PROXY_URL)
|
|
if match:
|
|
username = match.group(1)
|
|
password = match.group(2)
|
|
host = match.group(3)
|
|
port = int(match.group(4))
|
|
connector = ProxyConnector.from_url(f"socks5://{username}:{password}@{host}:{port}")
|
|
logger.info(f"✅ Прокси настроен: {host}:{port}")
|
|
return connector
|
|
else:
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(PROXY_URL)
|
|
connector = ProxyConnector.from_url(f"socks5://{parsed.hostname}:{parsed.port}")
|
|
logger.info(f"✅ Прокси настроен (без авторизации): {parsed.hostname}:{parsed.port}")
|
|
return connector
|
|
except Exception as e:
|
|
logger.error(f"❌ Ошибка настройки прокси: {e}")
|
|
return None
|
|
|
|
|
|
async def poll_task_result(task_id: str, max_attempts: int = 60) -> str:
|
|
"""Ожидание результата задачи с правильным управлением сессией"""
|
|
print("\n" + "=" * 60)
|
|
print("⏳ Ожидание результата обработки")
|
|
print("=" * 60)
|
|
|
|
get_url = f"https://api.piapi.ai/api/v1/task/{task_id}"
|
|
headers = {"x-api-key": PIAPI_API_KEY}
|
|
|
|
print(f"📋 ID задачи: {task_id}")
|
|
print(f"🔄 Ожидание результата (максимум {max_attempts * 3} секунд)...")
|
|
|
|
connector = get_proxy_connector()
|
|
|
|
for attempt in range(max_attempts):
|
|
try:
|
|
# Создаем новую сессию для каждой попытки
|
|
async with aiohttp.ClientSession(connector=connector) as session:
|
|
async with session.get(get_url, headers=headers, timeout=30) as response:
|
|
if response.status != 200:
|
|
print(f"⚠️ HTTP {response.status}, повтор через 3 секунды...")
|
|
await asyncio.sleep(3)
|
|
continue
|
|
|
|
data = await response.json()
|
|
data_info = data.get('data', {})
|
|
status = data_info.get('status', 'unknown')
|
|
output = data_info.get('output')
|
|
|
|
print(f" Попытка {attempt + 1}: статус = {status}")
|
|
|
|
if status == 'completed' and output:
|
|
result_url = None
|
|
if isinstance(output, dict):
|
|
if output.get('image_urls') and len(output['image_urls']) > 0:
|
|
result_url = output['image_urls'][0]
|
|
print(f" ✅ Найден URL в image_urls[0]")
|
|
elif output.get('url'):
|
|
result_url = output['url']
|
|
print(f" ✅ Найден URL в url")
|
|
elif output.get('image'):
|
|
result_url = output['image']
|
|
print(f" ✅ Найден URL в image")
|
|
|
|
if result_url:
|
|
print(f"\n✅ ЗАДАЧА ВЫПОЛНЕНА!")
|
|
print(f"📸 URL результата: {result_url[:100]}...")
|
|
return result_url
|
|
else:
|
|
print(f"⚠️ Статус completed, но URL не найден")
|
|
print(f" Output: {output}")
|
|
|
|
elif status == 'failed':
|
|
error = data_info.get('error', {})
|
|
error_msg = error.get('message', 'Unknown error')
|
|
print(f"❌ Задача не выполнена: {error_msg}")
|
|
return None
|
|
|
|
elif status in ['pending', 'processing']:
|
|
# Продолжаем ожидание
|
|
pass
|
|
|
|
else:
|
|
print(f" Неизвестный статус: {status}")
|
|
|
|
except asyncio.TimeoutError:
|
|
print(f"⚠️ Таймаут при опросе (попытка {attempt + 1})")
|
|
except aiohttp.ClientError as e:
|
|
print(f"⚠️ Клиентская ошибка (попытка {attempt + 1}): {e}")
|
|
except Exception as e:
|
|
print(f"⚠️ Ошибка (попытка {attempt + 1}): {e}")
|
|
|
|
# Ждем перед следующей попыткой
|
|
await asyncio.sleep(3)
|
|
|
|
print("❌ Превышено время ожидания")
|
|
return None
|
|
|
|
|
|
async def download_result(url: str):
|
|
"""Скачивание результата"""
|
|
print("\n" + "=" * 60)
|
|
print("💾 Скачивание результата")
|
|
print("=" * 60)
|
|
|
|
connector = get_proxy_connector()
|
|
|
|
try:
|
|
async with aiohttp.ClientSession(connector=connector) as session:
|
|
async with session.get(url, timeout=30) as response:
|
|
if response.status == 200:
|
|
content = await response.read()
|
|
print(f"✅ Изображение скачано! Размер: {len(content)} байт")
|
|
|
|
os.makedirs("test_results", exist_ok=True)
|
|
filename = f"test_results/test_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
|
|
with open(filename, "wb") as f:
|
|
f.write(content)
|
|
print(f"💾 Сохранено в: {filename}")
|
|
return True
|
|
else:
|
|
print(f"❌ Ошибка скачивания: HTTP {response.status}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"❌ Ошибка: {e}")
|
|
return False
|
|
|
|
|
|
async def check_status(task_id: str):
|
|
"""Проверка статуса задачи"""
|
|
print("\n" + "=" * 60)
|
|
print("🔍 ПРОВЕРКА СТАТУСА ЗАДАЧИ")
|
|
print("=" * 60)
|
|
|
|
get_url = f"https://api.piapi.ai/api/v1/task/{task_id}"
|
|
headers = {"x-api-key": PIAPI_API_KEY}
|
|
connector = get_proxy_connector()
|
|
|
|
try:
|
|
async with aiohttp.ClientSession(connector=connector) as session:
|
|
async with session.get(get_url, headers=headers, timeout=30) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
print(json.dumps(data, indent=2, ensure_ascii=False))
|
|
|
|
# Проверяем статус
|
|
data_info = data.get('data', {})
|
|
status = data_info.get('status')
|
|
output = data_info.get('output')
|
|
|
|
if status == 'completed' and output:
|
|
print("\n✅ Задача выполнена!")
|
|
if isinstance(output, dict):
|
|
if output.get('image_urls'):
|
|
print(f"📸 URL: {output['image_urls'][0]}")
|
|
elif status == 'failed':
|
|
error = data_info.get('error', {})
|
|
print(f"\n❌ Задача не выполнена: {error.get('message')}")
|
|
else:
|
|
print(f"\n⏳ Статус: {status}")
|
|
|
|
return data
|
|
else:
|
|
print(f"❌ HTTP {response.status}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"❌ Ошибка: {e}")
|
|
return None
|
|
|
|
|
|
async def full_test(task_id: str):
|
|
"""Полный тест с ожиданием результата"""
|
|
print("\n" + "=" * 60)
|
|
print("🚀 ПОЛНЫЙ ТЕСТ")
|
|
print("=" * 60)
|
|
|
|
# Проверяем статус задачи
|
|
data = await check_status(task_id)
|
|
if not data:
|
|
print("❌ Не удалось получить статус задачи")
|
|
return
|
|
|
|
data_info = data.get('data', {})
|
|
status = data_info.get('status')
|
|
|
|
# Если задача уже выполнена, сразу скачиваем
|
|
if status == 'completed':
|
|
output = data_info.get('output', {})
|
|
if isinstance(output, dict):
|
|
result_url = output.get('image_urls', [None])[0] or output.get('url')
|
|
if result_url:
|
|
await download_result(result_url)
|
|
return
|
|
|
|
# Иначе ждем
|
|
result_url = await poll_task_result(task_id)
|
|
if result_url:
|
|
await download_result(result_url)
|
|
else:
|
|
print("\n❌ Не удалось получить результат")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
if len(sys.argv) > 2 and sys.argv[1] == "--full":
|
|
task_id = sys.argv[2]
|
|
asyncio.run(full_test(task_id))
|
|
elif len(sys.argv) > 2 and sys.argv[1] == "--status":
|
|
task_id = sys.argv[2]
|
|
asyncio.run(check_status(task_id))
|
|
else:
|
|
print("Использование:")
|
|
print(" python test_api.py --status TASK_ID - проверить статус")
|
|
print(" python test_api.py --full TASK_ID - дождаться результата и скачать") |