This commit is contained in:
2025-08-06 11:49:14 +03:00
commit b9be2179de
29 changed files with 1371 additions and 0 deletions

13
.env.example Normal file
View File

@@ -0,0 +1,13 @@
PIAPI_TOKEN="80d3e864cb1c0a74a728d85f627d8aacf4fc334a43c489392bbd4e43f378e583"
PIAPI_URL="https://api.piapi.ai/api/v1/task"
IGF_USER=admin
IGF_PASS=ChangeMe
IGF_URL=http://fmd.loc/api
IGF_URL_TEST=http://la-bot.loc:8383
BOT_ADMINS="1078162189 1029785406 471419562"
TELEGRAM_TOKEN="8470479443:AAFJ8v2KMjCskzdWHd9_RqecuGkpJC0RX_0"
OPENAI_API_KEY="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"
GEN_API_TOKEN="sk-GquJ61SqwPacqdei0wkMnwFZXF0iJGy58ScP3ZJ1nhUPlfWSw2ZJj4pGQYw6"

6
.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
.idea
bin
lib
lib64
.env
__pycache__/

44
FmdBot.py Normal file
View File

@@ -0,0 +1,44 @@
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, ReplyKeyboardRemove
from telegram.ext import Updater, CommandHandler, CallbackQueryHandler, ConversationHandler, MessageHandler, \
ChatMemberHandler, filters, ApplicationBuilder
from fmd_bot import config, FIRST, SECOND, states
from fmd_bot.handlers.MainHandler import MainHandler, SET_ADMIN_MSG
from tg_bot import recursive_dict_merge
class FmdBot:
def run(self):
TOKEN = config['TELEGRAM_TOKEN']
application = ApplicationBuilder().token(TOKEN).build()
app_states = { # словарь состояний разговора, возвращаемых callback функциями
FIRST: [
CallbackQueryHandler(MainHandler.menu, pattern='^main_menu'),
],
SET_ADMIN_MSG: [
],
}
# app_states.update(states)
final_states = recursive_dict_merge(states, app_states)
conv_handler = ConversationHandler(
entry_points=[
CommandHandler('start', MainHandler.start),
],
states=final_states,
fallbacks=[CommandHandler('start', MainHandler.start)],
)
application.add_handler(conv_handler)
# text_handler = MessageHandler(filters.TEXT & (~filters.COMMAND), self.text_msg)
# application.add_handler(text_handler)
application.run_polling()

14
bot.py Normal file
View File

@@ -0,0 +1,14 @@
from FmdBot import FmdBot
from daemon.SimpleDaemon import Daemon
class FmdBotDaemon(Daemon):
def run(self):
fmd_bot = FmdBot()
fmd_bot.run()
if __name__ == '__main__':
# with FmdBotDaemon('/tmp/daemon-fmdbot.pid', error_log_file='errlog.txt') as daemon:
# daemon.process_command()
fmd_bot = FmdBot()
fmd_bot.run()

194
daemon/SimpleDaemon.py Normal file
View File

@@ -0,0 +1,194 @@
"""Generic linux daemon base class for python 3.x."""
import sys, os, time, atexit, signal, logging
class Daemon:
"""A generic daemon class.
Usage: subclass the daemon class and override the run() method."""
def __init__(self, pidfile, error_log_file='/dev/null'):
self.logging = logging
self.logging.basicConfig(filename=error_log_file, filemode='w',
format='%(name)s - %(levelname)s - %(message)s\n')
self.logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
self.error_log_file = error_log_file
self.pidfile = pidfile
self.commands = {}
def __enter__(self):
self.base_commands()
self.reg_command()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def daemonize(self):
"""Deamonize class. UNIX double fork mechanism."""
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError as err:
sys.stderr.write('fork #1 failed: {0}\n'.format(err))
self.logging.error('fork #1 failed: {0}\n'.format(err))
sys.exit(1)
# decouple from parent environment
os.chdir('/')
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
except OSError as err:
sys.stderr.write('fork #2 failed: {0}\n'.format(err))
self.logging.error('fork #2 failed: {0}\n'.format(err))
sys.exit(1)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open(os.devnull, 'r')
so = open(os.devnull, 'a+')
se = open(os.devnull, 'a+')
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
atexit.register(self.delpid)
pid = str(os.getpid())
with open(self.pidfile, 'w+') as f:
f.write(pid + '\n')
def delpid(self):
os.remove(self.pidfile)
def start(self):
"""Start the daemon."""
self.logging.info("Start")
# Check for a pidfile to see if the daemon already runs
try:
with open(self.pidfile, 'r') as pf:
pid = int(pf.read().strip())
except IOError:
pid = None
if pid:
message = "pidfile {0} already exist. " + \
"Daemon already running?\n"
sys.stderr.write(message.format(self.pidfile))
self.logging.error(message.format(self.pidfile))
sys.exit(1)
# Start the daemon
self.daemonize()
self.run()
def stop(self):
"""Stop the daemon."""
self.logging.info("Stop")
# Get the pid from the pidfile
try:
with open(self.pidfile, 'r') as pf:
pid = int(pf.read().strip())
except IOError:
pid = None
if not pid:
message = "pidfile {0} does not exist. " + \
"Daemon not running?\n"
sys.stderr.write(message.format(self.pidfile))
self.logging.error(message.format(self.pidfile))
return # not an error in a restart
# Try killing the daemon process
try:
while 1:
os.kill(pid, signal.SIGTERM)
time.sleep(0.1)
except OSError as err:
e = str(err.args)
if e.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print(str(err.args))
sys.exit(1)
def restart(self):
"""Restart the daemon."""
self.logging.info("Restart")
self.stop()
self.start()
def status(self):
print("Status")
try:
with open(self.pidfile, 'r') as pf:
pid = int(pf.read().strip())
except IOError:
pid = None
if pid:
print("Process started, pid %d" % pid)
else:
print("Process is not running")
def console_stdout(self):
sys.stdout = sys.__stdout__
print(123)
def process_command(self):
if len(sys.argv) > 1:
command = sys.argv[1]
handler = self.get_command_handler(command)
if handler:
handler()
else:
print("Unknown command: %s" % command)
else:
print("usage: %s start|stop|restart|status" % sys.argv[0])
sys.exit(2)
def base_commands(self):
self.add_command('start', self.start)
self.add_command('stop', self.stop)
self.add_command('restart', self.restart)
self.add_command('status', self.status)
self.add_command('console_stdout', self.console_stdout)
def add_command(self, command, handler):
if command not in self.commands:
self.commands[command] = handler
def get_command_handler(self, command):
if command in self.commands:
return self.commands[command]
return None
def reg_command(self):
pass
def run(self):
"""You should override this method when you subclass Daemon.
It will be called after the process has been daemonized by
start() or restart()."""

0
daemon/__init__.py Normal file
View File

18
fmd_bot/__init__.py Normal file
View File

@@ -0,0 +1,18 @@
from dotenv import dotenv_values
from tg_bot import recursive_dict_merge
config = dotenv_values(".env")
FIRST, SECOND = range(2)
states_instance_arr = [
]
states = {}
for state in states_instance_arr:
states = recursive_dict_merge(states, state.get_states())

View File

@@ -0,0 +1,164 @@
import requests
from telegram.ext import ContextTypes
from tg_bot.Handler import Handler
from telegram import Update
from cia_bot.states.ByImgStyleStates import ByImgStyleStates
from cia_bot.keyboards.BisSelectSizeKeyboard import BisSelectSizeKeyboard
from telegram.ext import MessageHandler, filters, CallbackQueryHandler
from db_bot.User import save_user, get_user, update_user, decrement_gen_count
from db_bot.TaskModel import TaskModel
from image_helper.img import image_url_to_png_bytes, get_filename_without_extension
from io import BytesIO
from gen_api.GenApi import GenApi
from dotenv import dotenv_values
from cia_bot.keyboards.MenuKetboard import MenuKeyboard
class BisHandler(Handler):
def __init__(self):
super().__init__()
@staticmethod
async def get_prompt(update: Update, context: ContextTypes.DEFAULT_TYPE):
await context.bot.send_message(
chat_id=update.effective_chat.id,
text='''
Отправьте мне промпт для генерации изображения.
'''
)
return ByImgStyleStates.get_state_by_key("bis_set_prompt")
@staticmethod
async def set_prompt(update: Update, context: ContextTypes.DEFAULT_TYPE):
context.user_data['prompt'] = update.message.text
reply_markup = BisSelectSizeKeyboard()
await update.message.reply_text(
text="Выберите размер изображения:",
reply_markup=reply_markup.create_keyboard()
)
return ByImgStyleStates.get_state_by_key("bis_select_size")
@staticmethod
async def select_size(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
# query.answer()
data = query.data
command, params = Handler.load_callback_query(data)
context.user_data['size'] = params['size']
await context.bot.send_message(
text="Теперь отправь мне изображение стиль которого нужно скопировать.",
chat_id=update.effective_chat.id,
)
return ByImgStyleStates.get_state_by_key("bis_get_base_img")
@staticmethod
async def get_base_img(update: Update, context: ContextTypes.DEFAULT_TYPE):
if 'prompt' not in context.user_data:
await update.message.reply_text("Сначала отправь текстовый запрос.")
return
prompt = context.user_data['prompt']
size = context.user_data['size']
user = get_user(update)
try:
# Получаем URL изображения из Telegram
photo_file = await update.message.photo[-1].get_file()
image_url = photo_file.file_path
# Проверяем доступность изображения
test_resp = requests.head(image_url)
if test_resp.status_code != 200:
await update.message.reply_text("Не удалось получить изображение")
return
if user:
if user.gen_count <= 0:
await update.message.reply_text("На сегодня художники устали! Можете обратиться к @kirill_bouko")
return
decrement_gen_count(update)
await update.message.reply_text("Рисуем постер!")
# Генерируем новое изображение
response = BisHandler.create_poster(image_url, prompt, size)
tm = TaskModel()
tm.save({
'user_id': update.effective_chat.id,
'request_id': response['request_id'],
'status': 1,
})
# Получаем и отправляем результат
# result_url = response['response'][0]
# img_data = requests.get(result_url).content
#
# await update.message.reply_photo(photo=BytesIO(img_data))
#
# reply_markup = MenuKeyboard()
#
# await context.bot.send_message(
# chat_id=update.effective_chat.id,
# text='Вернуться в меню.',
# reply_markup=reply_markup.create_keyboard()
# )
return ByImgStyleStates.get_state_by_key("first")
except Exception as e:
print(f"Ошибка: {str(e)}")
finally:
context.user_data.pop('prompt', None)
@staticmethod
def create_poster(url, text, size):
config = dotenv_values(".env")
img = image_url_to_png_bytes(url)
img_name = get_filename_without_extension(url)
prompt = '''
{text}
Стиль скопируй из приложенного изображения
'''.format(text=text)
input = {
"prompt": prompt,
"size": size,
"quality": "medium",
"is_sync": 0,
}
files = {
# "image": [("imgs_file/2.png", image_data, 'image/png')],
"image[]": ("{name}.png".format(name=img_name), img, 'image/png'),
}
ga = GenApi(token=config['GEN_API_TOKEN'])
response = ga.gpt_image_1(data_input=input, files=files)
return response
@staticmethod
def get_states(data=None) -> dict:
return {
ByImgStyleStates.get_state_by_key("first"): [
CallbackQueryHandler(BisHandler.get_prompt, pattern='^bis_get_prompt$'),
],
ByImgStyleStates.get_state_by_key("bis_set_prompt"): [
MessageHandler(filters.TEXT & ~filters.COMMAND, BisHandler.set_prompt),
],
ByImgStyleStates.get_state_by_key("bis_select_size"): [
CallbackQueryHandler(BisHandler.select_size, pattern='^bis_set_size?\S{3,}'),
],
ByImgStyleStates.get_state_by_key("bis_get_base_img"): [
MessageHandler(filters.PHOTO, BisHandler.get_base_img),
],
}

View File

@@ -0,0 +1,141 @@
import requests
from telegram.ext import ContextTypes
from tg_bot.Handler import Handler
from telegram import Update
from cia_bot.states.DrctStates import DrctStates
from telegram.ext import MessageHandler, filters, CallbackQueryHandler
from db_bot.User import save_user, get_user, update_user, decrement_gen_count
from db_bot.TaskModel import TaskModel
from image_helper.img import image_url_to_png_bytes, get_filename_without_extension
from io import BytesIO
from gen_api.GenApi import GenApi
from dotenv import dotenv_values
from cia_bot.keyboards.MenuKetboard import MenuKeyboard
class DrctHandler(Handler):
def __init__(self):
super().__init__()
@staticmethod
async def get_prompt(update: Update, context: ContextTypes.DEFAULT_TYPE):
await context.bot.send_message(
chat_id=update.effective_chat.id,
text='''
Загрузите изображение которое нужно улучшить.
'''
)
return DrctStates.get_state_by_key("drct_get_base_img")
# @staticmethod
# async def set_prompt(update: Update, context: ContextTypes.DEFAULT_TYPE):
# context.user_data['prompt'] = update.message.text
# await update.message.reply_text("Теперь отправь мне изображение.")
#
# return UssrStates.get_state_by_key("get_base_img")
@staticmethod
async def get_base_img(update: Update, context: ContextTypes.DEFAULT_TYPE):
user = get_user(update)
try:
# Получаем URL изображения из Telegram
photo_file = await update.message.photo[-1].get_file()
image_url = photo_file.file_path
# Проверяем доступность изображения
test_resp = requests.head(image_url)
if test_resp.status_code != 200:
await update.message.reply_text("Не удалось получить изображение")
return
if user:
if user.gen_count <= 0:
await update.message.reply_text("На сегодня художники устали! Можете обратиться к @kirill_bouko")
return
decrement_gen_count(update)
await update.message.reply_text("Улучшаем изображение")
# Генерируем новое изображение
response = await DrctHandler.run_upscale(image_url)
tm = TaskModel()
tm.save({
'user_id': update.effective_chat.id,
'request_id': response['request_id'],
'send_url': 1,
'status': 1,
})
# Получаем и отправляем результат
# result_url = response['response'][0]
# img_data = requests.get(result_url).content
#
# await update.message.reply_photo(photo=BytesIO(img_data))
#
# reply_markup = MenuKeyboard()
#
# await context.bot.send_message(
# chat_id=update.effective_chat.id,
# text='Вернуться в меню.',
# reply_markup=reply_markup.create_keyboard()
# )
return DrctStates.get_state_by_key("first")
except Exception as e:
print(f"Ошибка: {str(e)}")
finally:
context.user_data.pop('prompt', None)
@staticmethod
async def run_upscale(url):
config = dotenv_values(".env")
img = image_url_to_png_bytes(url)
img_name = get_filename_without_extension(url)
input = {
"image_url": url,
"is_sync": 0,
}
ga = GenApi(token=config['GEN_API_TOKEN'])
response = ga.drct(data_input=input)
return response
@staticmethod
def create_poster(url):
config = dotenv_values(".env")
img = image_url_to_png_bytes(url)
img_name = get_filename_without_extension(url)
input = {
"is_sync": 0,
}
files = {
# "image": [("imgs_file/2.png", image_data, 'image/png')],
"image[]": ("{name}.png".format(name=img_name), img, 'image/png'),
}
ga = GenApi(token=config['GEN_API_TOKEN'])
response = ga.drct(data_input=input, files=files)
return response
@staticmethod
def get_states(data=None) -> dict:
return {
DrctStates.get_state_by_key("first"): [
CallbackQueryHandler(DrctHandler.get_prompt, pattern='^drct_get_prompt'),
],
DrctStates.get_state_by_key("drct_get_base_img"): [
MessageHandler(filters.PHOTO, DrctHandler.get_base_img),
],
}

View File

@@ -0,0 +1,69 @@
from tg_bot.Handler import Handler
from telegram.constants import ParseMode
from fmd_bot import FIRST
from telegram.ext import ContextTypes
from telegram import Update, InputMediaPhoto
from fmd_bot.keyboards.MainKeyboard import MainKeyboard
from igf_api.IgfClient import IgfClient
from dotenv import dotenv_values
SET_ADMIN_MSG = 110
GET_ADMIN_MSG_IMG = 111
config = dotenv_values(".env")
admins = config['BOT_ADMINS']
class MainHandler(Handler):
@staticmethod
async def start(update, context):
reply_markup = MainKeyboard()
client_igf = IgfClient()
client_igf.tgBot.create({
'bot_id': context.bot.id,
'dialog_id': update.effective_chat.id,
'username': update.effective_chat.username,
'first_name': update.effective_chat.first_name,
'last_name': update.effective_chat.last_name,
'status': 1,
})
await update.message.reply_text(
'''
Я - твой пошниксмсммммcvc.
''',
reply_markup=reply_markup.create_keyboard()
)
return FIRST
@staticmethod
async def balance(update, context):
reply_markup = MainKeyboard()
await update.message.reply_text(
'''
Количество генераций: {gen_count}.
'''
)
return FIRST
@staticmethod
async def menu(update, context):
reply_markup = MainKeyboard()
await context.bot.send_message(
chat_id=update.effective_chat.id,
text='''
Выберите нужный креатив.
''',
reply_markup=reply_markup.create_keyboard()
)
return FIRST

View File

@@ -0,0 +1,135 @@
import requests
from telegram.ext import ContextTypes
from tg_bot.Handler import Handler
from telegram import Update
from cia_bot.states.RikStates import RikStates
from telegram.ext import MessageHandler, filters, CallbackQueryHandler
from db_bot.User import save_user, get_user, update_user, decrement_gen_count
from db_bot.TaskModel import TaskModel
from image_helper.img import image_url_to_png_bytes, get_filename_without_extension
from io import BytesIO
from gen_api.GenApi import GenApi
from dotenv import dotenv_values
from cia_bot.keyboards.MenuKetboard import MenuKeyboard
class RikHandler(Handler):
def __init__(self):
super().__init__()
@staticmethod
async def get_prompt(update: Update, context: ContextTypes.DEFAULT_TYPE):
await context.bot.send_message(
chat_id=update.effective_chat.id,
text='''
Добро пожаловать в Rick & Morty Style Poster — тут мы не соблюдаем законы физики, но зато делаем крутые постеры!\nЗакидывай фото и погнали! \nWubba Lubba Dub Dub! 🤖✌️
'''
)
return RikStates.get_state_by_key("rik_get_base_img")
# @staticmethod
# async def set_prompt(update: Update, context: ContextTypes.DEFAULT_TYPE):
# context.user_data['prompt'] = update.message.text
# await update.message.reply_text("Теперь отправь мне изображение.")
#
# return UssrStates.get_state_by_key("get_base_img")
@staticmethod
async def get_base_img(update: Update, context: ContextTypes.DEFAULT_TYPE):
user = get_user(update)
try:
# Получаем URL изображения из Telegram
photo_file = await update.message.photo[-1].get_file()
image_url = photo_file.file_path
# Проверяем доступность изображения
test_resp = requests.head(image_url)
if test_resp.status_code != 200:
await update.message.reply_text("Не удалось получить изображение")
return
if user:
if user.gen_count <= 0:
await update.message.reply_text("На сегодня художники устали! Можете обратиться к @kirill_bouko")
return
decrement_gen_count(update)
await update.message.reply_text("Рисуем постер!")
# Генерируем новое изображение
response = RikHandler.create_poster(image_url)
tm = TaskModel()
tm.save({
'user_id': update.effective_chat.id,
'request_id': response['request_id'],
'status': 1,
})
# Получаем и отправляем результат
# result_url = response['response'][0]
# img_data = requests.get(result_url).content
#
# await update.message.reply_photo(photo=BytesIO(img_data))
#
# reply_markup = MenuKeyboard()
#
# await context.bot.send_message(
# chat_id=update.effective_chat.id,
# text='Вернуться в меню.',
# reply_markup=reply_markup.create_keyboard()
# )
return RikStates.get_state_by_key("first")
except Exception as e:
print(f"Ошибка: {str(e)}")
finally:
context.user_data.pop('prompt', None)
@staticmethod
def create_poster(url):
config = dotenv_values(".env")
img = image_url_to_png_bytes(url)
img_name = get_filename_without_extension(url)
prompt = '''
Create an illustration in the style of 'Rick and Morty' use bold outlines, vibrant colors, and exaggerated, wobbly shapes.
The characters should have large, expressive eyes, and slightly grotesque proportions.
The background should be sci-fi themed with surreal elements like floating planets, portals, or weird aliens.
Keep the shading minimal with flat colors and add a slight texture to mimic the show's hand-drawn aesthetic.
Make it look dynamic and chaotic, with a touch of dark humor. Sci-fi background with interdimensional portals, weird creatures, and glowing fluids in beakers.
The style should match the chaotic energy of Rick's garage.
'''
input = {
"prompt": prompt,
"size": "1024x1536",
"quality": "medium",
"is_sync": 0,
}
files = {
# "image": [("imgs_file/2.png", image_data, 'image/png')],
"image[]": ("{name}.png".format(name=img_name), img, 'image/png'),
}
ga = GenApi(token=config['GEN_API_TOKEN'])
response = ga.gpt_image_1(data_input=input, files=files)
return response
@staticmethod
def get_states(data=None) -> dict:
return {
RikStates.get_state_by_key("first"): [
CallbackQueryHandler(RikHandler.get_prompt, pattern='^rik_get_prompt'),
],
RikStates.get_state_by_key("rik_get_base_img"): [
MessageHandler(filters.PHOTO, RikHandler.get_base_img),
],
}

View File

@@ -0,0 +1,145 @@
import requests
from telegram.ext import ContextTypes
from tg_bot.Handler import Handler
from telegram import Update
from cia_bot.states.UssrStates import UssrStates
from telegram.ext import MessageHandler, filters, CallbackQueryHandler
from db_bot.User import save_user, get_user, update_user, decrement_gen_count
from db_bot.TaskModel import TaskModel
from image_helper.img import image_url_to_png_bytes, get_filename_without_extension
from io import BytesIO
from gen_api.GenApi import GenApi
from dotenv import dotenv_values
from cia_bot.keyboards.MenuKetboard import MenuKeyboard
class UssrHandler(Handler):
def __init__(self):
super().__init__()
@staticmethod
async def get_prompt(update: Update, context: ContextTypes.DEFAULT_TYPE):
await context.bot.send_message(
chat_id=update.effective_chat.id,
text='''
Приветствуем тебя, Товарищ!\nДавай вместе создадим агитационный плакат, достойный великих свершений нашего народа! \nСмелые лозунги, ясные цели и яркие образы — всё во имя прогресса и мира!\nНапишите свой лозунг!
'''
)
return UssrStates.get_state_by_key("ussr_set_prompt")
@staticmethod
async def set_prompt(update: Update, context: ContextTypes.DEFAULT_TYPE):
context.user_data['prompt'] = update.message.text
await update.message.reply_text("Теперь отправь мне изображение.")
return UssrStates.get_state_by_key("get_base_img")
@staticmethod
async def get_base_img(update: Update, context: ContextTypes.DEFAULT_TYPE):
if 'prompt' not in context.user_data:
await update.message.reply_text("Сначала отправь текстовый запрос.")
return
prompt = context.user_data['prompt']
user = get_user(update)
try:
# Получаем URL изображения из Telegram
photo_file = await update.message.photo[-1].get_file()
image_url = photo_file.file_path
# Проверяем доступность изображения
test_resp = requests.head(image_url)
if test_resp.status_code != 200:
await update.message.reply_text("Не удалось получить изображение")
return
if user:
if user.gen_count <= 0:
await update.message.reply_text("На сегодня художники устали! Можете обратиться к @kirill_bouko")
return
decrement_gen_count(update)
await update.message.reply_text("Рисуем постер!")
# Генерируем новое изображение
response = UssrHandler.create_poster(image_url, prompt)
tm = TaskModel()
tm.save({
'user_id': update.effective_chat.id,
'request_id': response['request_id'],
'status': 1,
})
# Получаем и отправляем результат
# result_url = response['response'][0]
# img_data = requests.get(result_url).content
#
# await update.message.reply_photo(photo=BytesIO(img_data))
#
# reply_markup = MenuKeyboard()
#
# await context.bot.send_message(
# chat_id=update.effective_chat.id,
# text='Вернуться в меню.',
# reply_markup=reply_markup.create_keyboard()
# )
return UssrStates.get_state_by_key("first")
except Exception as e:
print(f"Ошибка: {str(e)}")
finally:
context.user_data.pop('prompt', None)
@staticmethod
def create_poster(url, text):
config = dotenv_values(".env")
img = image_url_to_png_bytes(url)
img_name = get_filename_without_extension(url)
prompt = '''
Создай изображение из моей фотографии в стиле советского пропагандистского постера.
Используй яркие, контрастные цвета: красный, желтый, черный, белый.
Включи в композицию мощные символы — серп и молот, красную звезду, восходящее солнце, фигуры рабочих, крестьян или космонавтов.
Добавь динамику и героический пафос.
Текст должен быть крупным, брутальным шрифтом, например: '{text}'.
Стиль — графичный, лаконичный, с четкими линиями и минимальной детализацией, как в классических плакатах 1920-1950-х годов.
Избегай современной цифровой стилистики, сохрани дух эпохи.
'''.format(text=text)
input = {
"prompt": prompt,
"size": "1024x1536",
"quality": "medium",
"is_sync": 0,
}
files = {
# "image": [("imgs_file/2.png", image_data, 'image/png')],
"image[]": ("{name}.png".format(name=img_name), img, 'image/png'),
}
ga = GenApi(token=config['GEN_API_TOKEN'])
response = ga.gpt_image_1(data_input=input, files=files)
return response
@staticmethod
def get_states(data=None) -> dict:
return {
UssrStates.get_state_by_key("first"): [
CallbackQueryHandler(UssrHandler.get_prompt, pattern='^ussr_get_prompt$'),
],
UssrStates.get_state_by_key("ussr_set_prompt"): [
MessageHandler(filters.TEXT & ~filters.COMMAND, UssrHandler.set_prompt),
],
UssrStates.get_state_by_key("get_base_img"): [
MessageHandler(filters.PHOTO, UssrHandler.get_base_img),
],
}

Binary file not shown.

View File

@@ -0,0 +1,20 @@
from tg_bot.Keyboard import Keyboard
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
class BisSelectSizeKeyboard(Keyboard):
def get_keyboard(self):
keyboard = [
[
InlineKeyboardButton("1024x1024", callback_data="bis_set_size?size=1024x1024")
],
[
InlineKeyboardButton("1536x1024", callback_data="bis_set_size?size=1536x1024")
],
[
InlineKeyboardButton("1024x1536", callback_data="bis_set_size?size=1024x1536")
]
]
return keyboard

View File

@@ -0,0 +1,24 @@
from tg_bot.Keyboard import Keyboard
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
class MainKeyboard(Keyboard):
def get_keyboard(self):
keyboard = [
[
InlineKeyboardButton("Пункт 1", callback_data="ussr_get_prompt"),
InlineKeyboardButton("Пункт 2", callback_data="ussr_get_prompt")
],
[
InlineKeyboardButton("Пункт 3", callback_data="rik_get_prompt"),
InlineKeyboardButton("Пункт 4", callback_data="rik_get_prompt")
],
[
InlineKeyboardButton("Пункт 5", callback_data="bis_get_prompt"),
InlineKeyboardButton("Пункт 6", callback_data="bis_get_prompt")
],
]
return keyboard

View File

@@ -0,0 +1,14 @@
from tg_bot.Keyboard import Keyboard
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
class MenuKeyboard(Keyboard):
def get_keyboard(self):
keyboard = [
[
InlineKeyboardButton("Меню", callback_data="main_menu")
]
]
return keyboard

View File

@@ -0,0 +1,33 @@
from tg_bot.States import States
from telegram.ext import MessageHandler, filters, CallbackQueryHandler
FIRST, SECOND = range(2)
BIS_GET_PROMPT = 51
BIS_SET_PROMPT = 52
BIS_GET_IMG = 53
BIS_SELECT_SIZE = 54
def set_states():
states_arr = {
"first": FIRST,
"bis_get_prompt": BIS_GET_PROMPT,
"bis_set_prompt": BIS_SET_PROMPT,
"bis_get_base_img": BIS_GET_IMG,
"bis_select_size": BIS_SELECT_SIZE,
}
return states_arr
class ByImgStyleStates(States):
def __init__(self):
pass
@staticmethod
def get_state_by_key(key: str):
states = set_states()
if key in states:
return states[key]
return None

View File

@@ -0,0 +1,31 @@
from tg_bot.States import States
from telegram.ext import MessageHandler, filters, CallbackQueryHandler
FIRST, SECOND = range(2)
DRCT_GET_PROMPT = 61
DRCT_SET_PROMPT = 62
DRCT_GET_IMG = 63
def set_states():
states_arr = {
"first": FIRST,
"drct_get_prompt": DRCT_GET_PROMPT,
"drct_set_prompt": DRCT_SET_PROMPT,
"drct_get_base_img": DRCT_GET_IMG,
}
return states_arr
class DrctStates(States):
def __init__(self):
pass
@staticmethod
def get_state_by_key(key: str):
states = set_states()
if key in states:
return states[key]
return None

View File

@@ -0,0 +1,31 @@
from tg_bot.States import States
from telegram.ext import MessageHandler, filters, CallbackQueryHandler
FIRST, SECOND = range(2)
RIK_GET_PROMPT = 41
RIK_SET_PROMPT = 42
RIK_GET_IMG = 43
def set_states():
states_arr = {
"first": FIRST,
"rik_get_prompt": RIK_GET_PROMPT,
"rik_set_prompt": RIK_SET_PROMPT,
"rik_get_base_img": RIK_GET_IMG,
}
return states_arr
class RikStates(States):
def __init__(self):
pass
@staticmethod
def get_state_by_key(key: str):
states = set_states()
if key in states:
return states[key]
return None

View File

@@ -0,0 +1,31 @@
from tg_bot.States import States
from telegram.ext import MessageHandler, filters, CallbackQueryHandler
FIRST, SECOND = range(2)
USSR_GET_PROMPT = 31
USSR_SET_PROMPT = 32
USSR_GET_IMG = 33
def set_states():
states_arr = {
"first": FIRST,
"ussr_get_prompt": USSR_GET_PROMPT,
"ussr_set_prompt": USSR_SET_PROMPT,
"get_base_img": USSR_GET_IMG,
}
return states_arr
class UssrStates(States):
def __init__(self):
pass
@staticmethod
def get_state_by_key(key: str):
states = set_states()
if key in states:
return states[key]
return None

1
igf_api/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
__pycache__

23
igf_api/BaseEntity.py Normal file
View File

@@ -0,0 +1,23 @@
import jsonschema
class BaseEntity:
def __init__(self, connection):
self.connection = connection
def validate_dict(self, dictionary, schema):
# schema = {
# 'type': 'object',
# 'properties': {
# 'key1': {'type': 'integer'},
# 'key2': {'type': 'string'}
# },
# 'required': ['key1', 'key2']
# }
try:
jsonschema.validate(dictionary, schema)
return True
except jsonschema.ValidationError as ex:
print(ex)
return False

88
igf_api/Connection.py Normal file
View File

@@ -0,0 +1,88 @@
import time
from datetime import datetime, date
from requests.exceptions import HTTPError
import requests
class Connection:
def __init__(self, config):
self._login = config['IGF_USER']
self._password = config['IGF_PASS']
self.url = config['IGF_URL']
self.access_token = ''
self.access_token_expired_at = ''
def login(self):
json_response = self.raw_request_without_auth(method="post", endpoint="/secure/auth", data={'username': self._login, 'password': self._password})
self.access_token = json_response['access_token']
self.access_token_expired_at = json_response['access_token_expires_at']
print('Login Success!', self.access_token, self.access_token_expired_at)
def raw_request(self, method, endpoint, params=None, data=None):
# TODO доработать сравнение
access_token_expired_at_date, access_token_expired_at_time = self.access_token_expired_at.split(" ")
expired_at = datetime.strptime(access_token_expired_at_date, '%Y-%m-%d')
now = datetime.today()
if (expired_at < now):
print("exp")
self.login()
if data is None:
data = {}
if params is None:
params = {}
try:
response = requests.request(method=method,
url="{url}{endpoint}".format(url=self.url, endpoint=endpoint),
auth=BearerAuth(self.access_token),
params=params,
data=data
)
response.raise_for_status()
except HTTPError as http_err:
print(f'HTTP error occurred: {http_err}') # Python 3.6
except Exception as err:
print(f'Other error occurred: {err}') # Python 3.6
else:
json_response = response.json()
return json_response
def raw_request_without_auth(self, method, endpoint, params=None, data=None):
if data is None:
data = {}
if params is None:
params = {}
try:
response = requests.request(method=method,
url="{url}{endpoint}".format(url=self.url, endpoint=endpoint),
params=params,
data=data
)
response.raise_for_status()
except HTTPError as http_err:
print(f'HTTP error occurred: {http_err}') # Python 3.6
except Exception as err:
print(f'Other error occurred: {err}') # Python 3.6
else:
json_response = response.json()
return json_response
def set_url(self, url: str):
self.url = url
class BearerAuth(requests.auth.AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, r):
r.headers["authorization"] = "Bearer " + self.token
return r

19
igf_api/IgfClient.py Normal file
View File

@@ -0,0 +1,19 @@
from igf_api.Connection import Connection
from igf_api.entities.PiapiTask import PiapiTask
from igf_api.entities.TgBot import TgBot
from igf_api import connection
class IgfClient:
def __init__(self):
self.connection = connection
self.connection.login()
self.tgBot = TgBot(connection=self.connection)
self.piapiTask = PiapiTask(connection=self.connection)
def login(self, login: str, password: str):
self.connection.login(login=login, password=password)
def set_url(self, url: str):
self.connection.set_url(url=url)

7
igf_api/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
from dotenv import dotenv_values
from igf_api.Connection import Connection
config = dotenv_values(".env")
connection = Connection(config=config)

View File

@@ -0,0 +1,74 @@
from igf_api.BaseEntity import BaseEntity
class PiapiTask(BaseEntity):
def get_list(self):
return self.connection.raw_request(method='get', endpoint='/piapi')
def get_one(self, data):
schema = {
'type': 'object',
'properties': {
'bot_id': {'type': 'integer'},
'dialog_id': {'type': 'integer'},
'task_id': {"type": ["string", "null"]},
'model': {'type': 'string'},
'task_type': {'type': 'string'},
'prompt': {'type': 'string'},
'status': {'type': 'integer'},
},
'required': []
}
if self.validate_dict(data, schema):
return self.connection.raw_request('get', '/piapi/get-one', params=data)
return False
def create(self, data):
schema = {
'type': 'object',
'properties': {
'bot_id': {'type': 'integer'},
'dialog_id': {'type': 'integer'},
'task_id': {"type": ["string", "null"]},
'model': {'type': 'string'},
'task_type': {'type': 'string'},
'prompt': {'type': 'string'},
'status': {'type': 'integer'},
},
'required': ['dialog_id', 'bot_id']
}
if self.validate_dict(data, schema):
return self.connection.raw_request('post', '/piapi', data=data)
return False
def update(self, id: int, data):
schema = {
'type': 'object',
'properties': {
'bot_id': {'type': 'integer'},
'dialog_id': {'type': 'integer'},
'task_id': {"type": ["string", "null"]},
'model': {'type': 'string'},
'task_type': {'type': 'string'},
'prompt': {'type': 'string'},
'status': {'type': 'integer'},
},
'required': []
}
if self.validate_dict(data, schema):
return self.connection.raw_request('post', '/piapi/update/{id}'.format(id=id), data=data)
return False
def to_archive_all_new(self, bot_id: int, dialog_id: int):
return self.connection.raw_request('get', '/piapi/to-archive-all-new/{bot_id}/{dialog_id}'.format(bot_id=bot_id,
dialog_id=dialog_id))
def get_new_tasks(self):
return self.connection.raw_request('get', '/piapi/find?status={status}'.format(status=1))

26
igf_api/entities/TgBot.py Normal file
View File

@@ -0,0 +1,26 @@
from igf_api.BaseEntity import BaseEntity
class TgBot(BaseEntity):
def get_list(self):
return self.connection.raw_request(method='get', endpoint='/tg-bot')
def create(self, data):
schema = {
'type': 'object',
'properties': {
'bot_id': {'type': 'integer'},
'dialog_id': {'type': 'integer'},
'username': {'type': 'string'},
'first_name': {'type': 'string'},
'last_name': {'type': 'string'},
'status': {'type': 'integer'},
},
'required': ['dialog_id', 'bot_id']
}
if self.validate_dict(data, schema):
return self.connection.raw_request('post', '/tg-bot', data=data)
return False

5
pyvenv.cfg Normal file
View File

@@ -0,0 +1,5 @@
home = /home/kavlar/miniconda3/bin
include-system-site-packages = false
version = 3.12.3
executable = /home/kavlar/miniconda3/bin/python3.12
command = /home/kavlar/miniconda3/bin/python -m venv /media/work/python/fmd

1
tg_bot Submodule

Submodule tg_bot added at b224a1f7d0