251 lines
10 KiB
Python
251 lines
10 KiB
Python
import logging
|
||
from dotenv import dotenv_values
|
||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
|
||
from telegram.ext import (
|
||
CommandHandler,
|
||
MessageHandler,
|
||
filters,
|
||
CallbackContext,
|
||
CallbackQueryHandler,
|
||
ChatMemberHandler,
|
||
ApplicationBuilder,
|
||
ConversationHandler,
|
||
)
|
||
from telegram.constants import ParseMode
|
||
import os
|
||
|
||
config = dotenv_values(".env")
|
||
|
||
# Настройка логгирования (пока не записываем)
|
||
# logging.basicConfig(
|
||
# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
|
||
# )
|
||
# logger = logging.getLogger(__name__)
|
||
|
||
# Константы
|
||
QUESTIONS_FILE = "questions.txt" # Файл с вопросами (один вопрос на строку)
|
||
USER_DATA_FILE = "user_answers.json" # Файл для сохранения ответов
|
||
|
||
FIRST, SECOND = range(2)
|
||
|
||
|
||
class SurveyBot:
|
||
def __init__(self, token):
|
||
self.current_question_index = {}
|
||
self.questions = self.load_questions()
|
||
self.user_data = {}
|
||
self.application = ApplicationBuilder().token(token).build()
|
||
|
||
conv_handler = ConversationHandler(
|
||
entry_points=[
|
||
CommandHandler('start', self.start),
|
||
],
|
||
states={
|
||
FIRST: [
|
||
CallbackQueryHandler(self.button),
|
||
]
|
||
},
|
||
fallbacks=[CommandHandler('start', self.start)],
|
||
)
|
||
|
||
self.application.add_handler(conv_handler)
|
||
|
||
# Регистрируем обработчики
|
||
self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
|
||
self.application.add_handler(MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS, self.track_chat_members))
|
||
self.application.add_handler(CallbackQueryHandler(self.button))
|
||
|
||
# Для администрирования
|
||
self.application.add_handler(CommandHandler("get_answers", self.get_answers))
|
||
self.application.add_handler(CommandHandler("reload_questions", self.reload_questions))
|
||
|
||
# text_handler = MessageHandler(filters.TEXT & (~filters.COMMAND), self.text_msg)
|
||
# application.add_handler(text_handler)
|
||
|
||
|
||
# Загружаем сохраненные данные пользователей
|
||
self.user_data = self.load_user_data()
|
||
|
||
def load_questions(self):
|
||
"""Загружает вопросы из текстового файла"""
|
||
if not os.path.exists(QUESTIONS_FILE):
|
||
with open(QUESTIONS_FILE, "w", encoding="utf-8") as f:
|
||
f.write("Как вас зовут?\nСколько вам лет?\nОткуда вы узнали о нас?")
|
||
return ["Как вас зовут?", "Сколько вам лет?", "Откуда вы узнали о нас?"]
|
||
|
||
with open(QUESTIONS_FILE, "r", encoding="utf-8") as f:
|
||
questions = [line.strip() for line in f.readlines() if line.strip()]
|
||
|
||
return questions
|
||
|
||
def load_user_data(self):
|
||
"""Загружает сохраненные ответы пользователей"""
|
||
if not os.path.exists(USER_DATA_FILE):
|
||
return {}
|
||
|
||
try:
|
||
import json
|
||
with open(USER_DATA_FILE, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except:
|
||
return {}
|
||
|
||
def save_user_data(self, context: CallbackContext):
|
||
"""Сохраняет ответы пользователей в файл"""
|
||
import json
|
||
with open(USER_DATA_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(self.user_data, f, ensure_ascii=False, indent=2)
|
||
|
||
async def start(self, update: Update, context: CallbackContext):
|
||
user_id = update.effective_user.id
|
||
if str(user_id) not in self.user_data:
|
||
context.user_data['current_user'] = {
|
||
"username": update.effective_chat.username,
|
||
"first_name": update.effective_chat.first_name,
|
||
"last_name": update.effective_chat.last_name,
|
||
"chat_id": update.effective_chat.id,
|
||
"message_id": 0,
|
||
"answers": {},
|
||
"completed": False,
|
||
"current_question_index": 0
|
||
}
|
||
self.user_data[str(user_id)] = context.user_data['current_user']
|
||
await self.ask_question(user_id, update.effective_chat.id, context)
|
||
else:
|
||
context.user_data['current_user'] = self.user_data[str(user_id)]
|
||
if self.user_data[str(user_id)]['completed'] == False:
|
||
await self.ask_question(user_id, update.effective_chat.id, context)
|
||
else:
|
||
await context.bot.send_message(
|
||
chat_id=user_id,
|
||
text="Вы уже ответили на вопросы."
|
||
)
|
||
|
||
async def track_chat_members(self, update: Update, context: CallbackContext):
|
||
"""Отслеживает новых участников чата/канала"""
|
||
for member in update.message.new_chat_members:
|
||
if member.is_bot: # Игнорируем других ботов
|
||
continue
|
||
|
||
user_id = member.id
|
||
chat_id = update.effective_chat.id
|
||
|
||
if str(user_id) not in self.user_data:
|
||
context.user_data['current_user'] = {
|
||
"username": member.username,
|
||
"first_name": member.first_name,
|
||
"last_name": member.last_name,
|
||
"chat_id": chat_id,
|
||
"message_id": 0,
|
||
"answers": {},
|
||
"completed": False,
|
||
"current_question_index": 0
|
||
}
|
||
self.user_data[str(user_id)] = context.user_data['current_user']
|
||
|
||
msg = await context.bot.send_message(
|
||
chat_id=chat_id,
|
||
text="@{username}, Пройдите пожалуйста опрос нашего бота. <a href='{bot_link}?start=start'>Пройти опрос</a>".format(
|
||
username=member.username,
|
||
bot_link=config['BOT_LINK']
|
||
),
|
||
parse_mode=ParseMode.HTML
|
||
)
|
||
|
||
self.user_data[str(user_id)]['message_id'] = msg.message_id
|
||
|
||
async def ask_question(self, user_id, chat_id, context):
|
||
"""Задает пользователю следующий вопрос"""
|
||
question_index = self.user_data[str(user_id)]['current_question_index']
|
||
|
||
if question_index >= len(self.questions):
|
||
# Все вопросы заданы
|
||
self.user_data[str(user_id)]["completed"] = True
|
||
self.save_user_data(context)
|
||
await context.bot.send_message(
|
||
chat_id=chat_id,
|
||
text="Спасибо за ответы! Теперь вы полноправный участник нашего сообщества."
|
||
)
|
||
|
||
if self.user_data[str(user_id)]['message_id'] > 0:
|
||
await context.bot.delete_message(
|
||
chat_id=self.user_data[str(user_id)]['chat_id'],
|
||
message_id=self.user_data[str(user_id)]['message_id']
|
||
)
|
||
|
||
return
|
||
|
||
question = self.questions[question_index]
|
||
keyboard = [
|
||
[InlineKeyboardButton("Пропустить вопрос", callback_data=f"skip_{user_id}")]
|
||
]
|
||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||
|
||
await context.bot.send_message(
|
||
chat_id=user_id,
|
||
text=question,
|
||
reply_markup=reply_markup
|
||
)
|
||
|
||
async def handle_message(self, update: Update, context: CallbackContext):
|
||
"""Обрабатывает ответы пользователя на вопросы"""
|
||
user_id = update.effective_user.id
|
||
|
||
question_index = self.user_data[str(user_id)]['current_question_index']
|
||
if question_index >= len(self.questions):
|
||
return
|
||
|
||
answer = update.message.text
|
||
question = self.questions[question_index]
|
||
|
||
# Сохраняем ответ
|
||
self.user_data[str(user_id)]["answers"][question] = answer
|
||
self.user_data[str(user_id)]['current_question_index'] += 1
|
||
|
||
# Задаем следующий вопрос
|
||
await self.ask_question(user_id, update.effective_chat.id, context)
|
||
|
||
async def button(self, update: Update, context: CallbackContext):
|
||
"""Обрабатывает нажатия кнопок"""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
if query.data.startswith("skip_"):
|
||
user_id = int(query.data.split("_")[1])
|
||
self.user_data[str(user_id)]['current_question_index'] += 1
|
||
await self.ask_question(user_id, query.message.chat_id, context)
|
||
|
||
async def get_answers(self, update: Update, context: CallbackContext):
|
||
"""Команда для получения всех ответов (только для админов)"""
|
||
admins = config['ADMIN_ID'].split(" ")
|
||
if str(update.effective_user.id) not in admins: # Замените на ваш ID
|
||
await update.message.reply_text("У вас нет прав для этой команды.")
|
||
return
|
||
|
||
if not self.user_data:
|
||
await update.message.reply_text("Нет данных об ответах пользователей.")
|
||
return
|
||
|
||
import json
|
||
answers_str = json.dumps(self.user_data, ensure_ascii=False, indent=2)
|
||
await update.message.reply_text(f"Ответы пользователей:\n{answers_str}")
|
||
|
||
async def reload_questions(self, update: Update, context: CallbackContext):
|
||
"""Перезагружает вопросы из файла (только для админов)"""
|
||
admins = config['ADMIN_ID'].split(" ")
|
||
if str(update.effective_user.id) not in admins:
|
||
await update.message.reply_text("У вас нет прав для этой команды.")
|
||
return
|
||
|
||
self.questions = self.load_questions()
|
||
await update.message.reply_text(f"Вопросы перезагружены. Всего вопросов: {len(self.questions)}")
|
||
|
||
def run(self):
|
||
"""Запускает бота"""
|
||
self.application.run_polling()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# Замените 'YOUR_BOT_TOKEN' на токен вашего бота
|
||
bot = SurveyBot(token=config['BOT_TOKEN'])
|
||
bot.run() |