This commit is contained in:
Kavalar 2025-04-30 17:07:16 +03:00
commit 2190ec7ca7
37 changed files with 994 additions and 0 deletions

8
.env.example Normal file
View File

@ -0,0 +1,8 @@
PIAPI_TOKEN="80d3...4e43f378e583"
PIAPI_URL="https://api.piapi.ai/api/v1/task"
IGF_USER=admin
IGF_PASS=ChangeMe
IGF_URL=http://ai-bot-back.loc/api
TELEGRAM_TOKEN="5645:gdfdfgh"

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
# created by virtualenv automatically
.idea
bin
.env
__pycache__
lib
pyvenv.cfg

50
AiBot.py Normal file
View File

@ -0,0 +1,50 @@
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, ReplyKeyboardRemove
from telegram.ext import Updater, CommandHandler, CallbackQueryHandler, ConversationHandler, MessageHandler, \
ChatMemberHandler, filters, ApplicationBuilder
from ai_bot.handlers.ImageHandler import ImageHandler
from ai_bot.handlers.MainHandler import MainHandler
from ai_bot.handlers.ModelHandler import ModelHandler
from ai_bot.handlers.VideoHandler import VideoHandler
from bot import config, FIRST, SECOND
from ai_bot import states
class AiBot:
def run(self):
TOKEN = config['TELEGRAM_TOKEN']
application = ApplicationBuilder().token(TOKEN).build()
app_states = { # словарь состояний разговора, возвращаемых callback функциями
FIRST: [
CallbackQueryHandler(MainHandler.get_menu, pattern='^menu'),
CallbackQueryHandler(VideoHandler.create_video, pattern='^create_video'),
CallbackQueryHandler(ImageHandler.create_img, pattern='^create_img'),
],
SECOND: [
CallbackQueryHandler(ImageHandler.start_model_scenario, pattern='^start_model_scenario?\S{3,}'),
],
}
app_states.update(states)
conv_handler = ConversationHandler(
entry_points=[
CommandHandler('start', MainHandler.start),
CommandHandler('menu', MainHandler.get_menu)
],
states=app_states,
fallbacks=[CommandHandler('start', MainHandler.start)],
)
application.add_handler(conv_handler)
text_handler = MessageHandler(filters.TEXT & (~filters.COMMAND), self.text_msg)
application.add_handler(text_handler)
application.run_polling()
def text_msg(self, update, context):
pass

15
ai_bot/__init__.py Normal file
View File

@ -0,0 +1,15 @@
from ai_bot.handlers.FluxHandler import FluxHandler
from ai_bot.handlers.MidjourneyHandler import MidjourneyHandler
from piapi_ai.PiapiAiApi import PiapiAiApi
states_instance_arr = [
FluxHandler,
MidjourneyHandler,
]
states = {}
for state in states_instance_arr:
states.update(state.get_states())

View File

@ -0,0 +1,166 @@
from ai_bot.keyboards.flux.SelectTypeRequestKeyboard import SelectTypeRequestKeyboard
from ai_bot.msg.flux.SelectTypeMsg import SelectTypeMsg
from bot import FIRST
from bot.Handler import Handler
from telegram.constants import ParseMode
from ai_bot.states.FluxStates import FLUX_SET_PROMPT, FLUX_SET_SUBMODEL, FLUX_SET_TYPE_REQUEST, FLUX_SET_TEXT_PROMPT, FLUX_SET_IMG_TEXT_PROMPT, FluxStates
from telegram.ext import MessageHandler, filters, CallbackQueryHandler
from igf_api.IgfClient import IgfClient
from piapi_ai.PiapiAiApi import PiapiAiApi
from piapi_ai import client as piapi_client
from telegram import Bot
from dotenv import dotenv_values
import base64
config = dotenv_values(".env")
class FluxHandler(Handler):
@staticmethod
async def set_prompt(update, context):
igf_client = IgfClient()
task = igf_client.piapiTask.get_one({
'bot_id': context.bot.id,
'dialog_id': update.effective_chat.id,
'status': 0
})
task['prompt'] = update.effective_message.text
piapi_task = piapi_client.flux.create_task({
'model': task['model'],
'task_type': task['task_type'],
'input': {
'prompt': task['prompt'],
'width': 1024,
'height': 1024,
}
})
task['status'] = 1
task['task_id'] = piapi_task['data']['task_id']
igf_client.piapiTask.update(task['id'], task)
await context.bot.send_message(chat_id=update.effective_chat.id, text="Запрос к Flux принят, идет обработка",
parse_mode=ParseMode.HTML)
return FIRST
@staticmethod
async def set_img_prompt(update, context):
igf_client = IgfClient()
task = igf_client.piapiTask.get_one({
'bot_id': context.bot.id,
'dialog_id': update.effective_chat.id,
'status': 0
})
if update.message.caption:
task['prompt'] = update.message.caption
else:
await context.bot.send_message(chat_id=update.effective_chat.id, text="Prompt нужно указать в описании изображения",
parse_mode=ParseMode.HTML)
return FluxStates.get_state_by_key("set_img_text_prompt")
photo_inst = update.message.photo[-1]
photo = await context.bot.get_file(photo_inst['file_id'])
file_content = await photo.download_as_bytearray()
# Convert the file content to Base64
base64_encoded = base64.b64encode(file_content).decode('utf-8')
piapi_task = piapi_client.flux.create_task({
'model': task['model'],
'task_type': task['task_type'],
'input': {
'prompt': task['prompt'],
'image': base64_encoded,
}
})
task['status'] = 1
task['task_id'] = piapi_task['data']['task_id']
igf_client.piapiTask.update(task['id'], task)
await context.bot.send_message(chat_id=update.effective_chat.id, text="Запрос к Flux принят",
parse_mode=ParseMode.HTML)
return FIRST
@staticmethod
async def set_submodel(update, context):
query = update.callback_query
command, params = Handler.load_callback_query(query.data)
@staticmethod
async def set_type_request(update, context):
query = update.callback_query
command, params = Handler.load_callback_query(query.data)
igf_client = IgfClient()
task = igf_client.piapiTask.create({
'bot_id': context.bot.id,
'dialog_id': update.effective_chat.id,
'task_type': params['type'],
'model': 'Qubico/flux1-dev',
'status': 0,
})
if params['type'] == 'txt2img':
text = "Введите запрос для генерации изображения"
state = FluxStates.get_state_by_key("set_text_prompt")
else:
text = "Загрузите изображение и запрос для создания изображения"
state = FluxStates.get_state_by_key("set_img_text_prompt")
await context.bot.send_message(chat_id=update.effective_chat.id, text=text,
parse_mode=ParseMode.HTML)
return state
@staticmethod
async def start_scenario(update, context):
msg = SelectTypeMsg()
reply_markup = SelectTypeRequestKeyboard()
state = FluxStates.get_state_by_key("set_type_request")
await context.bot.send_message(chat_id=update.effective_chat.id, text=msg.get_msg(),
parse_mode=ParseMode.HTML, reply_markup=reply_markup.create_keyboard())
return state
@staticmethod
async def send_task_result(igf_task: dict, piapi_task: dict):
TOKEN = config['TELEGRAM_TOKEN']
bot = Bot(TOKEN)
await bot.send_document(chat_id=igf_task['dialog_id'], document=piapi_task['data']['output']['image_url'])
await bot.send_photo(chat_id=igf_task['dialog_id'], photo=piapi_task['data']['output']['image_url'])
igf_client = IgfClient()
igf_task['status'] = 2
igf_client.piapiTask.update(igf_task['id'], igf_task)
return FIRST
@staticmethod
def get_states(data=None) -> dict:
return {
FLUX_SET_TEXT_PROMPT: [
MessageHandler(filters.TEXT, FluxHandler.set_prompt)
],
FLUX_SET_SUBMODEL: [
CallbackQueryHandler(FluxHandler.set_submodel, pattern='^set_submodel?\S{3,}'),
],
FLUX_SET_TYPE_REQUEST: [
CallbackQueryHandler(FluxHandler.set_type_request, pattern='^set_type_request?\S{3,}'),
],
FLUX_SET_IMG_TEXT_PROMPT: [
MessageHandler(filters.PHOTO, FluxHandler.set_img_prompt)
],
}

View File

@ -0,0 +1,53 @@
from ai_bot.keyboards.CreateImageKeyboard import CreateImageKeyboard
from ai_bot.msg.CreateImageMsg import CreateImageMsg
from bot.Handler import Handler
from bot.halpers.Instance import Instance
from bot import SECOND
from telegram.constants import ParseMode
class ImageHandler(Handler):
@staticmethod
async def create_img(update, context):
models_arr = ['Midjourney', 'Flux']
reply_markup = CreateImageKeyboard()
reply_markup.add_option("models_arr", models_arr)
msg = CreateImageMsg()
await context.bot.send_message(chat_id=update.effective_chat.id, text=msg.get_msg(),
parse_mode=ParseMode.HTML, reply_markup=reply_markup.create_keyboard())
return SECOND
@staticmethod
async def generate_img(update, context):
query = update.callback_query
command, params = Handler.load_callback_query(query.data)
@staticmethod
async def set_prompt(update, context):
query = update.callback_query
command, params = Handler.load_callback_query(query.data)
model = Instance.get_instance("ai_bot.states.{model}States".format(model=params['model']), "{model}States".format(model=params['model']))
state = model.get_state_by_key("set_prompt")
await context.bot.send_message(chat_id=update.effective_chat.id, text="Напишите запрос.",
parse_mode=ParseMode.HTML)
return state
@staticmethod
async def start_model_scenario(update, context):
query = update.callback_query
command, params = Handler.load_callback_query(query.data)
model = Instance.get_instance("ai_bot.handlers.{model}Handler".format(model=params['model']),
"{model}Handler".format(model=params['model']))
state = await model.start_scenario(update, context)
return state

View File

@ -0,0 +1,24 @@
from bot.Handler import Handler
from telegram.constants import ParseMode
from telegram.ext import MessageHandler, filters, CallbackQueryHandler
from dotenv import dotenv_values
from ai_bot.states.KlingStates import KLING_SET_PROMPT, KLING_SET_TEXT_PROMPT
config = dotenv_values(".env")
class KlingHandler(Handler):
@staticmethod
def set_prompt(update, context):
@staticmethod
def get_states(data=None) -> dict:
return {
KLING_SET_TEXT_PROMPT: [
MessageHandler(filters.TEXT, KlingHandler.set_prompt)
],
}

View File

@ -0,0 +1,44 @@
from ai_bot.keyboards.MainKeyboard import MainKeyboard
from ai_bot.keyboards.MenuKeyboard import MenuKeyboard
from ai_bot.msg.MainMsg import MainMsg
from bot.Handler import Handler
from telegram.constants import ParseMode
from bot import FIRST
from igf_api.IgfClient import IgfClient
from piapi_ai import client
class MainHandler(Handler):
@staticmethod
async def start(update, context):
msg = MainMsg()
reply_markup = MainKeyboard()
client_igf = IgfClient()
client_igf.tgBot.create({
'bot_id': context.bot.id,
'dialog_id': update.effective_chat.id,
'username': update.effective_chat.username,
'first_name': update.effective_chat.first_name,
'last_name': update.effective_chat.last_name,
'status': 1,
})
client_igf.piapiTask.to_archive_all_new(context.bot.id, update.effective_chat.id)
await context.bot.send_message(chat_id=update.effective_chat.id, text=msg.get_msg(),
parse_mode=ParseMode.HTML, reply_markup=reply_markup.create_keyboard())
return FIRST
@staticmethod
async def get_menu(update, context):
reply_markup = MenuKeyboard()
await context.bot.send_message(chat_id=update.effective_chat.id, text="Меню:",
parse_mode=ParseMode.HTML, reply_markup=reply_markup.create_keyboard())
return FIRST

View File

@ -0,0 +1,23 @@
from bot.Handler import Handler
from bot import FIRST
from telegram.constants import ParseMode
from ai_bot.states.MidjourneyStates import MIDJOURNEY_SET_PROMPT
from telegram.ext import MessageHandler, filters, CallbackQueryHandler
class MidjourneyHandler(Handler):
@staticmethod
async def set_prompt(update, context):
await context.bot.send_message(chat_id=update.effective_chat.id, text="Запрос к Midjourney принят",
parse_mode=ParseMode.HTML)
return FIRST
@staticmethod
def get_states(data=None) -> dict:
return {
MIDJOURNEY_SET_PROMPT: [
MessageHandler(filters.TEXT, MidjourneyHandler.set_prompt)
]
}

View File

@ -0,0 +1,8 @@
from bot.Handler import Handler
class ModelHandler(Handler):
@staticmethod
def select_model(update, context):
print("select model")

View File

@ -0,0 +1,22 @@
from ai_bot.keyboards.CreateVideoKeyboard import CreateVideoKeyboard
from ai_bot.msg.CreateVideoMsg import CreateVideoMsg
from bot.Handler import Handler
from bot import SECOND
from telegram.constants import ParseMode
class VideoHandler(Handler):
@staticmethod
async def create_video(update, context):
models_arr = ['Luma', 'Kling']
reply_markup = CreateVideoKeyboard()
reply_markup.add_option("models_arr", models_arr)
msg = CreateVideoMsg()
await context.bot.send_message(chat_id=update.effective_chat.id, text=msg.get_msg(),
parse_mode=ParseMode.HTML, reply_markup=reply_markup.create_keyboard())
return SECOND

View File

@ -0,0 +1,14 @@
from bot.Keyboard import Keyboard
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
class CreateImageKeyboard(Keyboard):
def get_keyboard(self):
keyboard = []
models = self.get_option("models_arr")
for model in models:
keyboard.append([InlineKeyboardButton(model, callback_data='start_model_scenario?model={model}'.format(model=model))])
return keyboard

View File

@ -0,0 +1,14 @@
from bot.Keyboard import Keyboard
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
class CreateVideoKeyboard(Keyboard):
def get_keyboard(self):
keyboard = []
models = self.get_option("models_arr")
for model in models:
keyboard.append([InlineKeyboardButton(model, callback_data='start_model_scenario?model={model}'.format(model=model))])
return keyboard

View File

@ -0,0 +1,12 @@
from bot.Keyboard import Keyboard
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
class MainKeyboard(Keyboard):
def get_keyboard(self):
return [
[
InlineKeyboardButton("Меню", callback_data='menu'),
],
]

View File

@ -0,0 +1,13 @@
from bot.Keyboard import Keyboard
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
class MenuKeyboard(Keyboard):
def get_keyboard(self):
return [
[
InlineKeyboardButton("Сгенерировать видео", callback_data='create_video'),
InlineKeyboardButton("Сгенерировать изображение", callback_data='create_img')
],
]

View File

@ -0,0 +1,15 @@
from bot.Keyboard import Keyboard
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
class SelectTypeRequestKeyboard(Keyboard):
def get_keyboard(self):
return [
[
InlineKeyboardButton("Сгенерировать изображение из текста", callback_data='set_type_request?type=txt2img'),
],
[
InlineKeyboardButton("Сгенерировать изображение на основе изображения", callback_data='set_type_request?type=img2img')
]
]

View File

@ -0,0 +1,13 @@
from bot.HtmlMsg import HtmlMsg
class CreateImageMsg(HtmlMsg):
def get_msg(self, data=None) -> str:
text = ("<strong>Flux.1 Pro</strong> — это профессиональная модель текста в изображение, оптимизированная для коммерческих приложений. "
"Эта премиальная модель обеспечивает исключительное качество изображения и производительность, "
"что делает ее идеальной для профессиональной творческой работы.\n\n"
"<strong>MidJourney</strong> это инструмент, который с помощью искусственного интеллекта создаёт уникальные изображения по вашим запросам. "
"Просто опишите, что хотите увидеть, а ИИ превратит ваши слова в произведение искусства!")
return text

View File

@ -0,0 +1,12 @@
from bot.HtmlMsg import HtmlMsg
class CreateVideoMsg(HtmlMsg):
def get_msg(self, data=None) -> str:
text = ("<strong>Luma AI</strong> — эта одна из первых нейросетей для генерации видео на такой же архитектуре, которую можно попробовать самому."
"Она создает реалистичные ролики, которые выглядят качественнее, чем у конкуренто.\n\n"
"<strong>Kling</strong> - это модель генерации видео, разработанная командой Kuaishou, "
"обладающая мощными возможностями генерации видео и позволяющая пользователям легко и эффективно создавать художественные видео.")
return text

22
ai_bot/msg/MainMsg.py Normal file
View File

@ -0,0 +1,22 @@
from bot.HtmlMsg import HtmlMsg
class MainMsg(HtmlMsg):
def get_msg(self, data=None) -> str:
if data is None:
data = {}
text = ("<strong>🌟 Что умеет бот?</strong> \n"
" - Генерирует тексты, статьи, описания и многое другое. \n"
" - Создаёт уникальные изображения по вашему запросу. \n"
" - Помогает анализировать данные и проводить расчёты. \n"
" - Отвечает на вопросы, обучает и даёт советы в любых темах. \n"
" - Автоматизирует рутинные задачи сэкономьте своё время! \n\n"
"<strong>💡 Почему стоит попробовать?</strong> \n"
" - Легко и быстро взаимодействие с ИИ через удобный интерфейс Telegram. \n"
" - Без навыков программирования всё просто и понятно. \n"
" - Работает 24/7 ваш помощник всегда на связи. \n"
" - Подходит для бизнеса, учёбы, творчества и повседневных задач. \n")
return text

View File

@ -0,0 +1,13 @@
from bot.DaMsg import DaMsg
from bot.HtmlMsg import HtmlMsg
class SelectTypeMsg(HtmlMsg):
def get_msg(self, data=None) -> str:
if data is None:
data = {}
text = ("Выберите тип запроса.")
return text

View File

@ -0,0 +1,33 @@
from bot.States import States
from telegram.ext import MessageHandler, filters, CallbackQueryHandler
FLUX_SET_PROMPT = 31
FLUX_SET_SUBMODEL = 32
FLUX_SET_TYPE_REQUEST = 33
FLUX_SET_TEXT_PROMPT = 34
FLUX_SET_IMG_TEXT_PROMPT = 35
def set_states():
states_arr = {
"set_prompt": FLUX_SET_PROMPT,
"set_submodel": FLUX_SET_SUBMODEL,
"set_type_request": FLUX_SET_TYPE_REQUEST,
"set_text_prompt": FLUX_SET_TEXT_PROMPT,
"set_img_text_prompt": FLUX_SET_IMG_TEXT_PROMPT,
}
return states_arr
class FluxStates(States):
def __init__(self):
pass
@staticmethod
def get_state_by_key(key: str):
states = set_states()
if key in states:
return states[key]
return None

View File

@ -0,0 +1,27 @@
from bot.States import States
KLING_SET_PROMPT = 51
KLING_SET_TEXT_PROMPT = 52
def set_states():
states_arr = {
"set_prompt": KLING_SET_PROMPT,
"set_text_prompt": KLING_SET_TEXT_PROMPT,
}
return states_arr
class KlingStates(States):
def __init__(self):
pass
@staticmethod
def get_state_by_key(key: str):
states = set_states()
if key in states:
return states[key]
return None

View File

@ -0,0 +1,24 @@
from bot.States import States
MIDJOURNEY_SET_PROMPT = 41
def set_states():
states_arr = {
"set_prompt": MIDJOURNEY_SET_PROMPT
}
return states_arr
class MidjourneyStates(States):
def __init__(self):
pass
@staticmethod
def get_state_by_key(key: str):
states = set_states()
if key in states:
return states[key]
return None

1
bot Submodule

@ -0,0 +1 @@
Subproject commit ce6ab8586b7ea357bfc53a2a4d583cf39526f630

18
bot.py Normal file
View File

@ -0,0 +1,18 @@
import sys
from AiBot import AiBot
from daemon.SimpleDaemon import Daemon
class DaBotDaemon(Daemon):
def run(self):
ai_bot = AiBot()
ai_bot.run()
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
# with DaBotDaemon('/tmp/daemon-dabot.pid', error_log_file='errlog.txt') as daemon:
# daemon.process_command()
ai_bot = AiBot()
ai_bot.run()

194
daemon/SimpleDaemon.py Normal file
View File

@ -0,0 +1,194 @@
"""Generic linux daemon base class for python 3.x."""
import sys, os, time, atexit, signal, logging
class Daemon:
"""A generic daemon class.
Usage: subclass the daemon class and override the run() method."""
def __init__(self, pidfile, error_log_file='/dev/null'):
self.logging = logging
self.logging.basicConfig(filename=error_log_file, filemode='w',
format='%(name)s - %(levelname)s - %(message)s\n')
self.logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
self.error_log_file = error_log_file
self.pidfile = pidfile
self.commands = {}
def __enter__(self):
self.base_commands()
self.reg_command()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def daemonize(self):
"""Deamonize class. UNIX double fork mechanism."""
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError as err:
sys.stderr.write('fork #1 failed: {0}\n'.format(err))
self.logging.error('fork #1 failed: {0}\n'.format(err))
sys.exit(1)
# decouple from parent environment
os.chdir('/')
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
except OSError as err:
sys.stderr.write('fork #2 failed: {0}\n'.format(err))
self.logging.error('fork #2 failed: {0}\n'.format(err))
sys.exit(1)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open(os.devnull, 'r')
so = open(os.devnull, 'a+')
se = open(os.devnull, 'a+')
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
atexit.register(self.delpid)
pid = str(os.getpid())
with open(self.pidfile, 'w+') as f:
f.write(pid + '\n')
def delpid(self):
os.remove(self.pidfile)
def start(self):
"""Start the daemon."""
self.logging.info("Start")
# Check for a pidfile to see if the daemon already runs
try:
with open(self.pidfile, 'r') as pf:
pid = int(pf.read().strip())
except IOError:
pid = None
if pid:
message = "pidfile {0} already exist. " + \
"Daemon already running?\n"
sys.stderr.write(message.format(self.pidfile))
self.logging.error(message.format(self.pidfile))
sys.exit(1)
# Start the daemon
self.daemonize()
self.run()
def stop(self):
"""Stop the daemon."""
self.logging.info("Stop")
# Get the pid from the pidfile
try:
with open(self.pidfile, 'r') as pf:
pid = int(pf.read().strip())
except IOError:
pid = None
if not pid:
message = "pidfile {0} does not exist. " + \
"Daemon not running?\n"
sys.stderr.write(message.format(self.pidfile))
self.logging.error(message.format(self.pidfile))
return # not an error in a restart
# Try killing the daemon process
try:
while 1:
os.kill(pid, signal.SIGTERM)
time.sleep(0.1)
except OSError as err:
e = str(err.args)
if e.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print(str(err.args))
sys.exit(1)
def restart(self):
"""Restart the daemon."""
self.logging.info("Restart")
self.stop()
self.start()
def status(self):
print("Status")
try:
with open(self.pidfile, 'r') as pf:
pid = int(pf.read().strip())
except IOError:
pid = None
if pid:
print("Process started, pid %d" % pid)
else:
print("Process is not running")
def console_stdout(self):
sys.stdout = sys.__stdout__
print(123)
def process_command(self):
if len(sys.argv) > 1:
command = sys.argv[1]
handler = self.get_command_handler(command)
if handler:
handler()
else:
print("Unknown command: %s" % command)
else:
print("usage: %s start|stop|restart|status" % sys.argv[0])
sys.exit(2)
def base_commands(self):
self.add_command('start', self.start)
self.add_command('stop', self.stop)
self.add_command('restart', self.restart)
self.add_command('status', self.status)
self.add_command('console_stdout', self.console_stdout)
def add_command(self, command, handler):
if command not in self.commands:
self.commands[command] = handler
def get_command_handler(self, command):
if command in self.commands:
return self.commands[command]
return None
def reg_command(self):
pass
def run(self):
"""You should override this method when you subclass Daemon.
It will be called after the process has been daemonized by
start() or restart()."""

0
daemon/__init__.py Normal file
View File

15
get_task.py Normal file
View File

@ -0,0 +1,15 @@
import requests
from piapi_ai import client as piapi_client
if __name__ == "__main__":
# endpoint = 'https://api.piapi.ai/api/v1/task'
# headers = {
# 'X-API-Key': '80d3e864cb1c0a74a728d85f627d8aacf4fc334a43c489392bbd4e43f378e583',
# }
#
# response = requests.get("https://api.piapi.ai/api/v1/task/{task_id}".format(task_id="df422ee1-2ac8-4f70-9687-74e44d6518f5"), headers=headers)
#
# print(response.status_code)
# print(response.json())
task = piapi_client.flux.get_task("62257693-3b97-4734-b898-27783abca405")
print(task)

6
igf.py Normal file
View File

@ -0,0 +1,6 @@
from igf_api.IgfClient import IgfClient
if __name__ == '__main__':
client = IgfClient()
tg_bot = client.tgBot.get_list()
print(tg_bot)

1
igf_api Submodule

@ -0,0 +1 @@
Subproject commit 33229449f5631f41fd6176b182ef6e72dbafd0e8

15
main.py Normal file
View File

@ -0,0 +1,15 @@
import requests
from piapi_ai import client
if __name__ == "__main__":
# data = {
# "task_type": "txt2img",
# "input": {
# "prompt": "DC Extended Universe photo of a beautiful girl. Superheroes, grim, darker tones, highly detailed yet ordinary people, cheerful, brighter tones, undetailed, mask, masked.",
# "width": 1024,
# "height": 1024
# }
# }
# flux = client.flux.create_task(data=data)
flux = client.flux.get_task("d7bfaec8-5cae-41fb-b324-1fd86d8a09dd")
print(flux)

55
piapi_ai/Entity.py Normal file
View File

@ -0,0 +1,55 @@
from urllib.error import HTTPError
import jsonschema
import json
import requests
class Entity:
def __init__(self, config):
self.config = config
self.endpoint = config['PIAPI_URL']
self.token = config['PIAPI_TOKEN']
def raw_request(self, method, params=None, data=None, headers=None):
if data is None:
data = {}
if params is None:
params = {}
if headers is None:
headers = {}
headers['X-API-Key'] = self.token
try:
response = requests.request(method=method,
url="{url}".format(url=self.endpoint),
params=params,
json=data,
headers=headers
)
response.raise_for_status()
except HTTPError as http_err:
print(f'HTTP error occurred: {http_err}') # Python 3.6
except Exception as err:
print(f'Other error occurred: {err}') # Python 3.6
else:
json_response = response.json()
return json_response
def validate_dict(self, dictionary, schema):
# schema = {
# 'type': 'object',
# 'properties': {
# 'key1': {'type': 'integer'},
# 'key2': {'type': 'string'}
# },
# 'required': ['key1', 'key2']
# }
try:
jsonschema.validate(dictionary, schema)
return True
except jsonschema.ValidationError as ex:
print(ex)
return False

6
piapi_ai/PiapiAiApi.py Normal file
View File

@ -0,0 +1,6 @@
from piapi_ai.entities.Flux import Flux
class PiapiAiApi:
def __init__(self, config):
self.flux = Flux(config)

8
piapi_ai/__init__.py Normal file
View File

@ -0,0 +1,8 @@
import os
from dotenv import load_dotenv, dotenv_values
from piapi_ai.PiapiAiApi import PiapiAiApi
config = dotenv_values('.env')
client = PiapiAiApi(config)

15
piapi_ai/entities/Flux.py Normal file
View File

@ -0,0 +1,15 @@
from piapi_ai.Entity import Entity
class Flux(Entity):
def create_task(self, data):
result = self.raw_request("post", data=data)
return result
def get_task(self, task_id):
self.endpoint = self.endpoint + "/" + task_id
result = self.raw_request("get")
return result

23
send_tasks.py Normal file
View File

@ -0,0 +1,23 @@
from igf_api.IgfClient import IgfClient
from piapi_ai import client
import asyncio
from dotenv import dotenv_values
from telegram import Bot
from ai_bot.handlers.FluxHandler import FluxHandler
config = dotenv_values(".env")
def send_msg(igf_task_dict: dict, piapi_task_dict: dict):
if piapi_task_dict['data']['output'] is not None:
if piapi_task_dict['data']['model'] == "Qubico/flux1-dev":
asyncio.run(FluxHandler.send_task_result(igf_task=igf_task_dict, piapi_task=piapi_task_dict))
if __name__ == "__main__":
igf_client = IgfClient()
tasks = igf_client.piapiTask.get_new_tasks()
for task in tasks:
piapi_task = client.flux.get_task(task['task_id'])
send_msg(igf_task_dict=task, piapi_task_dict=piapi_task)

5
send_tasks.sh Executable file
View File

@ -0,0 +1,5 @@
#!/bin/bash
cd /home/kavlar/work/python/ai
source bin/activate
python send_tasks.py