import logging from dotenv import dotenv_values from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import ( CommandHandler, MessageHandler, filters, CallbackContext, CallbackQueryHandler, ChatMemberHandler, ApplicationBuilder, ConversationHandler, ) from telegram.constants import ParseMode import os config = dotenv_values(".env") # Настройка логгирования (пока не записываем) # logging.basicConfig( # format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO # ) # logger = logging.getLogger(__name__) # Константы QUESTIONS_FILE = "questions.txt" # Файл с вопросами (один вопрос на строку) USER_DATA_FILE = "user_answers.json" # Файл для сохранения ответов FIRST, SECOND = range(2) class SurveyBot: def __init__(self, token): self.current_question_index = {} self.questions = self.load_questions() self.user_data = {} self.application = ApplicationBuilder().token(token).build() conv_handler = ConversationHandler( entry_points=[ CommandHandler('start', self.start), ], states={ FIRST: [ CallbackQueryHandler(self.button), ] }, fallbacks=[CommandHandler('start', self.start)], ) self.application.add_handler(conv_handler) # Регистрируем обработчики self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)) self.application.add_handler(MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS, self.track_chat_members)) self.application.add_handler(CallbackQueryHandler(self.button)) # Для администрирования self.application.add_handler(CommandHandler("get_answers", self.get_answers)) self.application.add_handler(CommandHandler("reload_questions", self.reload_questions)) # text_handler = MessageHandler(filters.TEXT & (~filters.COMMAND), self.text_msg) # application.add_handler(text_handler) # Загружаем сохраненные данные пользователей self.user_data = self.load_user_data() def load_questions(self): """Загружает вопросы из текстового файла""" if not os.path.exists(QUESTIONS_FILE): with open(QUESTIONS_FILE, "w", encoding="utf-8") as f: f.write("Как вас зовут?\nСколько вам лет?\nОткуда вы узнали о нас?") return ["Как вас зовут?", "Сколько вам лет?", "Откуда вы узнали о нас?"] with open(QUESTIONS_FILE, "r", encoding="utf-8") as f: questions = [line.strip() for line in f.readlines() if line.strip()] return questions def load_user_data(self): """Загружает сохраненные ответы пользователей""" if not os.path.exists(USER_DATA_FILE): return {} try: import json with open(USER_DATA_FILE, "r", encoding="utf-8") as f: return json.load(f) except: return {} def save_user_data(self, context: CallbackContext): """Сохраняет ответы пользователей в файл""" import json with open(USER_DATA_FILE, "w", encoding="utf-8") as f: json.dump(self.user_data, f, ensure_ascii=False, indent=2) async def start(self, update: Update, context: CallbackContext): user_id = update.effective_user.id if str(user_id) not in self.user_data: context.user_data['current_user'] = { "username": update.effective_chat.username, "first_name": update.effective_chat.first_name, "last_name": update.effective_chat.last_name, "chat_id": update.effective_chat.id, "message_id": 0, "answers": {}, "completed": False, "current_question_index": 0 } self.user_data[str(user_id)] = context.user_data['current_user'] await self.ask_question(user_id, update.effective_chat.id, context) else: context.user_data['current_user'] = self.user_data[str(user_id)] if self.user_data[str(user_id)]['completed'] == False: await self.ask_question(user_id, update.effective_chat.id, context) else: await context.bot.send_message( chat_id=user_id, text="Вы уже ответили на вопросы." ) async def track_chat_members(self, update: Update, context: CallbackContext): """Отслеживает новых участников чата/канала""" for member in update.message.new_chat_members: if member.is_bot: # Игнорируем других ботов continue user_id = member.id chat_id = update.effective_chat.id if str(user_id) not in self.user_data: context.user_data['current_user'] = { "username": member.username, "first_name": member.first_name, "last_name": member.last_name, "chat_id": chat_id, "message_id": 0, "answers": {}, "completed": False, "current_question_index": 0 } self.user_data[str(user_id)] = context.user_data['current_user'] msg = await context.bot.send_message( chat_id=chat_id, text="@{username}, Пройдите пожалуйста опрос нашего бота. Пройти опрос".format( username=member.username, bot_link=config['BOT_LINK'] ), parse_mode=ParseMode.HTML ) self.user_data[str(user_id)]['message_id'] = msg.message_id async def ask_question(self, user_id, chat_id, context): """Задает пользователю следующий вопрос""" question_index = self.user_data[str(user_id)]['current_question_index'] if question_index >= len(self.questions): # Все вопросы заданы self.user_data[str(user_id)]["completed"] = True self.save_user_data(context) await context.bot.send_message( chat_id=chat_id, text="Спасибо за ответы! Теперь вы полноправный участник нашего сообщества." ) if self.user_data[str(user_id)]['message_id'] > 0: await context.bot.delete_message( chat_id=self.user_data[str(user_id)]['chat_id'], message_id=self.user_data[str(user_id)]['message_id'] ) return question = self.questions[question_index] keyboard = [ [InlineKeyboardButton("Пропустить вопрос", callback_data=f"skip_{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await context.bot.send_message( chat_id=user_id, text=question, reply_markup=reply_markup ) async def handle_message(self, update: Update, context: CallbackContext): """Обрабатывает ответы пользователя на вопросы""" user_id = update.effective_user.id question_index = self.user_data[str(user_id)]['current_question_index'] if question_index >= len(self.questions): return answer = update.message.text question = self.questions[question_index] # Сохраняем ответ self.user_data[str(user_id)]["answers"][question] = answer self.user_data[str(user_id)]['current_question_index'] += 1 # Задаем следующий вопрос await self.ask_question(user_id, update.effective_chat.id, context) async def button(self, update: Update, context: CallbackContext): """Обрабатывает нажатия кнопок""" query = update.callback_query await query.answer() if query.data.startswith("skip_"): user_id = int(query.data.split("_")[1]) self.user_data[str(user_id)]['current_question_index'] += 1 await self.ask_question(user_id, query.message.chat_id, context) async def get_answers(self, update: Update, context: CallbackContext): """Команда для получения всех ответов (только для админов)""" admins = config['ADMIN_ID'].split(" ") if str(update.effective_user.id) not in admins: # Замените на ваш ID await update.message.reply_text("У вас нет прав для этой команды.") return if not self.user_data: await update.message.reply_text("Нет данных об ответах пользователей.") return import json answers_str = json.dumps(self.user_data, ensure_ascii=False, indent=2) await update.message.reply_text(f"Ответы пользователей:\n{answers_str}") async def reload_questions(self, update: Update, context: CallbackContext): """Перезагружает вопросы из файла (только для админов)""" admins = config['ADMIN_ID'].split(" ") if str(update.effective_user.id) not in admins: await update.message.reply_text("У вас нет прав для этой команды.") return self.questions = self.load_questions() await update.message.reply_text(f"Вопросы перезагружены. Всего вопросов: {len(self.questions)}") def run(self): """Запускает бота""" self.application.run_polling() if __name__ == '__main__': # Замените 'YOUR_BOT_TOKEN' на токен вашего бота bot = SurveyBot(token=config['BOT_TOKEN']) bot.run()