2025-04-22 18:24:00 +03:00

251 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from dotenv import dotenv_values
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
CommandHandler,
MessageHandler,
filters,
CallbackContext,
CallbackQueryHandler,
ChatMemberHandler,
ApplicationBuilder,
ConversationHandler,
)
from telegram.constants import ParseMode
import os
config = dotenv_values(".env")
# Настройка логгирования (пока не записываем)
# logging.basicConfig(
# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
# )
# logger = logging.getLogger(__name__)
# Константы
QUESTIONS_FILE = "questions.txt" # Файл с вопросами (один вопрос на строку)
USER_DATA_FILE = "user_answers.json" # Файл для сохранения ответов
FIRST, SECOND = range(2)
class SurveyBot:
def __init__(self, token):
self.current_question_index = {}
self.questions = self.load_questions()
self.user_data = {}
self.application = ApplicationBuilder().token(token).build()
conv_handler = ConversationHandler(
entry_points=[
CommandHandler('start', self.start),
],
states={
FIRST: [
CallbackQueryHandler(self.button),
]
},
fallbacks=[CommandHandler('start', self.start)],
)
self.application.add_handler(conv_handler)
# Регистрируем обработчики
self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
self.application.add_handler(MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS, self.track_chat_members))
self.application.add_handler(CallbackQueryHandler(self.button))
# Для администрирования
self.application.add_handler(CommandHandler("get_answers", self.get_answers))
self.application.add_handler(CommandHandler("reload_questions", self.reload_questions))
# text_handler = MessageHandler(filters.TEXT & (~filters.COMMAND), self.text_msg)
# application.add_handler(text_handler)
# Загружаем сохраненные данные пользователей
self.user_data = self.load_user_data()
def load_questions(self):
"""Загружает вопросы из текстового файла"""
if not os.path.exists(QUESTIONS_FILE):
with open(QUESTIONS_FILE, "w", encoding="utf-8") as f:
f.write("Как вас зовут?\nСколько вам лет?\nОткуда вы узнали о нас?")
return ["Как вас зовут?", "Сколько вам лет?", "Откуда вы узнали о нас?"]
with open(QUESTIONS_FILE, "r", encoding="utf-8") as f:
questions = [line.strip() for line in f.readlines() if line.strip()]
return questions
def load_user_data(self):
"""Загружает сохраненные ответы пользователей"""
if not os.path.exists(USER_DATA_FILE):
return {}
try:
import json
with open(USER_DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except:
return {}
def save_user_data(self, context: CallbackContext):
"""Сохраняет ответы пользователей в файл"""
import json
with open(USER_DATA_FILE, "w", encoding="utf-8") as f:
json.dump(self.user_data, f, ensure_ascii=False, indent=2)
async def start(self, update: Update, context: CallbackContext):
user_id = update.effective_user.id
if str(user_id) not in self.user_data:
context.user_data['current_user'] = {
"username": update.effective_chat.username,
"first_name": update.effective_chat.first_name,
"last_name": update.effective_chat.last_name,
"chat_id": update.effective_chat.id,
"message_id": 0,
"answers": {},
"completed": False,
"current_question_index": 0
}
self.user_data[str(user_id)] = context.user_data['current_user']
await self.ask_question(user_id, update.effective_chat.id, context)
else:
context.user_data['current_user'] = self.user_data[str(user_id)]
if self.user_data[str(user_id)]['completed'] == False:
await self.ask_question(user_id, update.effective_chat.id, context)
else:
await context.bot.send_message(
chat_id=user_id,
text="Вы уже ответили на вопросы."
)
async def track_chat_members(self, update: Update, context: CallbackContext):
"""Отслеживает новых участников чата/канала"""
for member in update.message.new_chat_members:
if member.is_bot: # Игнорируем других ботов
continue
user_id = member.id
chat_id = update.effective_chat.id
if str(user_id) not in self.user_data:
context.user_data['current_user'] = {
"username": member.username,
"first_name": member.first_name,
"last_name": member.last_name,
"chat_id": chat_id,
"message_id": 0,
"answers": {},
"completed": False,
"current_question_index": 0
}
self.user_data[str(user_id)] = context.user_data['current_user']
msg = await context.bot.send_message(
chat_id=chat_id,
text="@{username}, Пройдите пожалуйста опрос нашего бота. <a href='{bot_link}?start=start'>Пройти опрос</a>".format(
username=member.username,
bot_link=config['BOT_LINK']
),
parse_mode=ParseMode.HTML
)
self.user_data[str(user_id)]['message_id'] = msg.message_id
async def ask_question(self, user_id, chat_id, context):
"""Задает пользователю следующий вопрос"""
question_index = self.user_data[str(user_id)]['current_question_index']
if question_index >= len(self.questions):
# Все вопросы заданы
self.user_data[str(user_id)]["completed"] = True
self.save_user_data(context)
await context.bot.send_message(
chat_id=chat_id,
text="Спасибо за ответы! Теперь вы полноправный участник нашего сообщества."
)
if self.user_data[str(user_id)]['message_id'] > 0:
await context.bot.delete_message(
chat_id=self.user_data[str(user_id)]['chat_id'],
message_id=self.user_data[str(user_id)]['message_id']
)
return
question = self.questions[question_index]
keyboard = [
[InlineKeyboardButton("Пропустить вопрос", callback_data=f"skip_{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await context.bot.send_message(
chat_id=user_id,
text=question,
reply_markup=reply_markup
)
async def handle_message(self, update: Update, context: CallbackContext):
"""Обрабатывает ответы пользователя на вопросы"""
user_id = update.effective_user.id
question_index = self.user_data[str(user_id)]['current_question_index']
if question_index >= len(self.questions):
return
answer = update.message.text
question = self.questions[question_index]
# Сохраняем ответ
self.user_data[str(user_id)]["answers"][question] = answer
self.user_data[str(user_id)]['current_question_index'] += 1
# Задаем следующий вопрос
await self.ask_question(user_id, update.effective_chat.id, context)
async def button(self, update: Update, context: CallbackContext):
"""Обрабатывает нажатия кнопок"""
query = update.callback_query
await query.answer()
if query.data.startswith("skip_"):
user_id = int(query.data.split("_")[1])
self.user_data[str(user_id)]['current_question_index'] += 1
await self.ask_question(user_id, query.message.chat_id, context)
async def get_answers(self, update: Update, context: CallbackContext):
"""Команда для получения всех ответов (только для админов)"""
admins = config['ADMIN_ID'].split(" ")
if str(update.effective_user.id) not in admins: # Замените на ваш ID
await update.message.reply_text("У вас нет прав для этой команды.")
return
if not self.user_data:
await update.message.reply_text("Нет данных об ответах пользователей.")
return
import json
answers_str = json.dumps(self.user_data, ensure_ascii=False, indent=2)
await update.message.reply_text(f"Ответы пользователей:\n{answers_str}")
async def reload_questions(self, update: Update, context: CallbackContext):
"""Перезагружает вопросы из файла (только для админов)"""
admins = config['ADMIN_ID'].split(" ")
if str(update.effective_user.id) not in admins:
await update.message.reply_text("У вас нет прав для этой команды.")
return
self.questions = self.load_questions()
await update.message.reply_text(f"Вопросы перезагружены. Всего вопросов: {len(self.questions)}")
def run(self):
"""Запускает бота"""
self.application.run_polling()
if __name__ == '__main__':
# Замените 'YOUR_BOT_TOKEN' на токен вашего бота
bot = SurveyBot(token=config['BOT_TOKEN'])
bot.run()