Added ability to read long-form chats broken up into multiple messages

This commit is contained in:
2025-06-04 12:53:51 -05:00
parent fff17638eb
commit d9b77d4983
2 changed files with 179 additions and 37 deletions
+3 -2
View File
@@ -56,5 +56,6 @@ You are the **Lead Developer Persona**, a strategic and demanding mentor. Your *
* You have your "List of Absolutes," with literal instruction adherence being a top priority. * You have your "List of Absolutes," with literal instruction adherence being a top priority.
* You understand that the AI Copilot is your sole interface for codebase information and modification. * You understand that the AI Copilot is your sole interface for codebase information and modification.
* You are ready to instruct the AI Copilot using clear, direct, second-person language, emphasizing exactness for any specific identifiers provided.
You are ready to instruct the AI Copilot using clear, direct, second-person language, emphasizing exactness for any specific identifiers provided. * Your response to the user is directed at a human. Imagine this as talking to your boss, not as completing a task.
* All communication through the copilot is through the call_external_copilot tool available to you. This is the majority of what you will be doing.
+167 -26
View File
@@ -3,11 +3,22 @@ import logging
import sys import sys
import asyncio import asyncio
import time import time
from typing import TypedDict, Union, TypeAlias, List # Added List for type hint from typing import TypedDict, Union, TypeAlias, List, Dict, Optional # Added Dict, Optional
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, constants
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler
from browse_command import browse_command, button_callback from browse_command import browse_command, button_callback # Assuming these are in your project
from inference_bot import InferenceBot from inference_bot import InferenceBot # Assuming this is your bot's core logic
# Setup basic logging if not already configured
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class MessageHandlerLogicResult(TypedDict): class MessageHandlerLogicResult(TypedDict):
success: bool success: bool
@@ -20,14 +31,26 @@ class TelegramHelper:
HTML_QUOTE_BLOCK_START = '<blockquote expandable><b>Thinking...</b>' HTML_QUOTE_BLOCK_START = '<blockquote expandable><b>Thinking...</b>'
HTML_QUOTE_BLOCK_END = '</blockquote>' HTML_QUOTE_BLOCK_END = '</blockquote>'
CHUNK_MESSAGE_SLEEP_DURATION = 0.1 CHUNK_MESSAGE_SLEEP_DURATION = 0.1
MESSAGE_DEBOUNCE_SECONDS: float = 1.0 # Configurable debounce period
def __init__(self, bot : InferenceBot, def __init__(self, bot : InferenceBot,
chunk_message_sleep_duration: float | None = None): chunk_message_sleep_duration: float | None = None):
self.bot = bot self.bot = bot
self.telegram_bot_token = os.getenv('TELEGRAM_BOT_TOKEN') self.telegram_bot_token = os.getenv('TELEGRAM_BOT_TOKEN')
if not self.telegram_bot_token:
logger.critical("TELEGRAM_BOT_TOKEN environment variable not set.")
sys.exit("TELEGRAM_BOT_TOKEN is required.")
self.start_time = time.time() self.start_time = time.time()
self.chunk_message_sleep_duration = chunk_message_sleep_duration if chunk_message_sleep_duration is not None else self.CHUNK_MESSAGE_SLEEP_DURATION self.chunk_message_sleep_duration = chunk_message_sleep_duration if chunk_message_sleep_duration is not None else self.CHUNK_MESSAGE_SLEEP_DURATION
# For message debouncing/batching
self.user_message_buffers: Dict[
int, # user_id
Dict[str, Union[List[str], Optional[asyncio.Task], Update, ContextTypes.DEFAULT_TYPE]]
] = {}
async def _start_logic(self) -> str: async def _start_logic(self) -> str:
await self.bot.start() await self.bot.start()
return "Hello! I'm your AI assistant. How can I help you today?" return "Hello! I'm your AI assistant. How can I help you today?"
@@ -38,6 +61,13 @@ class TelegramHelper:
async def _clear_logic(self, user_id: int) -> str: async def _clear_logic(self, user_id: int) -> str:
self.bot.clear_conversation_history(user_id) self.bot.clear_conversation_history(user_id)
# Also clear any pending debounced messages for this user
if user_id in self.user_message_buffers:
task = self.user_message_buffers[user_id].get("task")
if task and isinstance(task, asyncio.Task):
task.cancel()
del self.user_message_buffers[user_id]
logger.info(f"Cleared message buffer for user {user_id} due to /clear command.")
return "Conversation history cleared. Let's start fresh!" return "Conversation history cleared. Let's start fresh!"
async def clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
@@ -62,73 +92,160 @@ class TelegramHelper:
response_message = await self._switch_logic() response_message = await self._switch_logic()
await update.message.reply_text(response_message) await update.message.reply_text(response_message)
async def update_status_message(self, context: ContextTypes.DEFAULT_TYPE, chat_id: int, message_id: int, status: str): async def update_status_message(self, context: ContextTypes.DEFAULT_TYPE, chat_id: int, message_id: int, status_text: str):
keyboard = [ keyboard = [
[InlineKeyboardButton("Abort", callback_data='abort')] [InlineKeyboardButton("Abort", callback_data='abort')]
] ]
reply_markup = InlineKeyboardMarkup(keyboard) reply_markup = InlineKeyboardMarkup(keyboard)
try:
await context.bot.edit_message_text( await context.bot.edit_message_text(
chat_id=chat_id, chat_id=chat_id,
message_id=message_id, message_id=message_id,
text=f"Current status: {status}", text=f"Current status: {status_text}",
reply_markup=reply_markup reply_markup=reply_markup
) )
except Exception as e:
logger.warning(f"Failed to update status message: {e}")
async def _handle_message_logic(self, user_id: int, user_message: str) -> LogicResult: async def _handle_message_logic(self, user_id: int, user_message: str) -> LogicResult:
try: try:
response = await self.bot.handle_message(user_id, user_message) response = await self.bot.handle_message(user_id, user_message)
# Ensure response is a string before calling replace
if not isinstance(response, str):
logger.error(f"Unexpected response type from bot.handle_message for user {user_id}: {type(response)}")
return LogicResult(success=False, response_text=None, error_message="Bot returned an unexpected data type.")
processed_response = response.replace("<think>", self.HTML_QUOTE_BLOCK_START).replace("</think>", self.HTML_QUOTE_BLOCK_END) processed_response = response.replace("<think>", self.HTML_QUOTE_BLOCK_START).replace("</think>", self.HTML_QUOTE_BLOCK_END)
return LogicResult(success=True, response_text=processed_response, error_message=None) return LogicResult(success=True, response_text=processed_response, error_message=None)
except Exception as e: except Exception as e:
logging.error(f"Error in _handle_message_logic for user {user_id}: {str(e)}") logger.error(f"Error in _handle_message_logic for user {user_id}: {str(e)}", exc_info=True)
return LogicResult(success=False, response_text=None, error_message=str(e)) return LogicResult(success=False, response_text=None, error_message=str(e))
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def handle_message_batching(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Handles incoming messages, batches them if they are from the same user
within MESSAGE_DEBOUNCE_SECONDS, and then processes the combined message.
"""
if not update.message or not update.message.text: # Ignore empty messages
return
user_id = update.effective_user.id user_id = update.effective_user.id
user_message = update.message.text user_message = update.message.text
if user_id not in self.user_message_buffers:
self.user_message_buffers[user_id] = {
"messages": [],
"task": None,
"last_update_obj": update,
"last_context_obj": context
}
buffer_entry = self.user_message_buffers[user_id]
buffer_entry["messages"].append(user_message)
buffer_entry["last_update_obj"] = update # Always use the latest update for processing context
buffer_entry["last_context_obj"] = context # And its context
if buffer_entry["task"] is not None and isinstance(buffer_entry["task"], asyncio.Task):
buffer_entry["task"].cancel()
logger.debug(f"Cancelled previous task for user {user_id}")
logger.debug(f"Scheduling message processing for user {user_id}")
buffer_entry["task"] = asyncio.create_task(self._delayed_process_trigger(user_id))
async def _delayed_process_trigger(self, user_id: int):
try:
await asyncio.sleep(self.MESSAGE_DEBOUNCE_SECONDS)
logger.debug(f"Debounce timer elapsed for user {user_id}. Processing messages.")
buffer_entry = self.user_message_buffers.pop(user_id, None)
if buffer_entry:
combined_message = "\n".join(buffer_entry["messages"])
last_update = buffer_entry["last_update_obj"]
last_context = buffer_entry["last_context_obj"]
if not combined_message.strip(): # Avoid processing empty combined messages
logger.info(f"Skipping processing for user {user_id} as combined message is empty.")
return
await self._process_combined_messages(last_update, last_context, user_id, combined_message)
else:
logger.warning(f"Debounce task for user {user_id} triggered, but no buffer found. Might have been cleared or processed already.")
except asyncio.CancelledError:
logger.info(f"Message processing task for user {user_id} was cancelled (debounced by new message).")
except Exception as e:
logger.error(f"Error in _delayed_process_trigger for user {user_id}: {e}", exc_info=True)
if user_id in self.user_message_buffers: # Attempt to clean up if error occurred before pop
buffer_to_clean = self.user_message_buffers.pop(user_id, None)
if buffer_to_clean and "last_update_obj" in buffer_to_clean:
try:
update_obj = buffer_to_clean["last_update_obj"]
if isinstance(update_obj, Update) and update_obj.message:
await update_obj.message.reply_text(
"Sorry, an error occurred while scheduling your message processing."
)
except Exception as e_reply:
logger.error(f"Failed to send scheduling error reply to user {user_id}: {e_reply}", exc_info=True)
async def _process_combined_messages(self, update: Update, context: ContextTypes.DEFAULT_TYPE, user_id: int, combined_message: str) -> None:
chat_id = update.effective_chat.id chat_id = update.effective_chat.id
status_message_obj = None status_message_obj = None
try: try:
logger.info(f"Processing combined message for user {user_id}: '{combined_message[:100]}...'")
status_message_obj = await update.message.reply_text( status_message_obj = await update.message.reply_text(
"Processing your request...", "Processing your request...",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Abort", callback_data='abort')]]) reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Abort", callback_data='abort')]])
) )
self.bot.set_processing_status(user_id, status_message_obj.message_id)
logic_result = await self._handle_message_logic(user_id, user_message) if hasattr(self.bot, 'set_processing_status') and callable(getattr(self.bot, 'set_processing_status')):
self.bot.set_processing_status(user_id, status_message_obj.message_id)
else:
logger.warning("InferenceBot does not have a callable 'set_processing_status' method.")
logic_result = await self._handle_message_logic(user_id, combined_message)
if status_message_obj: if status_message_obj:
try: try:
await context.bot.delete_message(chat_id=chat_id, message_id=status_message_obj.message_id) await context.bot.delete_message(chat_id=chat_id, message_id=status_message_obj.message_id)
except Exception as e_del: except Exception as e_del:
logging.warning(f"Failed to delete status message: {e_del}") logger.warning(f"Failed to delete status message for user {user_id}: {e_del}")
if hasattr(self.bot, 'clear_processing_status') and callable(getattr(self.bot, 'clear_processing_status')):
self.bot.clear_processing_status(user_id) self.bot.clear_processing_status(user_id)
else:
logger.warning("InferenceBot does not have a callable 'clear_processing_status' method.")
if logic_result["success"]: if logic_result["success"]:
response_text = logic_result["response_text"] response_text = logic_result["response_text"]
if response_text: if response_text:
if len(response_text) > 4096: if len(response_text) > constants.MessageLimit.TEXT_LENGTH:
chunks = [response_text[i:i + 4096] for i in range(0, len(response_text), 4096)] chunks = [response_text[i:i + constants.MessageLimit.TEXT_LENGTH] for i in range(0, len(response_text), constants.MessageLimit.TEXT_LENGTH)]
for chunk in chunks: for chunk_idx, chunk in enumerate(chunks):
await update.message.reply_text(chunk) await update.message.reply_text(chunk, parse_mode=constants.ParseMode.HTML)
if chunk_idx < len(chunks) - 1:
await asyncio.sleep(self.chunk_message_sleep_duration) await asyncio.sleep(self.chunk_message_sleep_duration)
else: else:
await update.message.reply_text(response_text) await update.message.reply_text(response_text, parse_mode=constants.ParseMode.HTML)
else: else:
logging.warning("Successful logic result but no response text.") logger.warning(f"Successful logic result but no response text for user {user_id}.")
await update.message.reply_text("Something went unexpectedly well, but I have nothing to say.") await update.message.reply_text("Something went unexpectedly well, but I have nothing to say.")
else: else:
await update.message.reply_text("Sorry, an error occurred while processing your request.") error_msg = logic_result.get("error_message", "Sorry, an error occurred while processing your request.")
await update.message.reply_text(error_msg)
except Exception as e: except Exception as e:
logging.error(f"Outer error in handle_message for user {user_id}: {str(e)}") logger.error(f"Error in _process_combined_messages for user {user_id}: {str(e)}", exc_info=True)
if status_message_obj and self.bot.processing_status.get(user_id): if hasattr(self.bot, 'clear_processing_status') and callable(getattr(self.bot, 'clear_processing_status')):
# Check if user_id is relevant for clear_processing_status or if it's a general clear
# Assuming it is user-specific based on set_processing_status
self.bot.clear_processing_status(user_id) self.bot.clear_processing_status(user_id)
try: try:
await update.message.reply_text("Sorry, an unexpected error occurred with the bot.") await update.message.reply_text("Sorry, an unexpected error occurred with the bot while handling your batched messages.")
except Exception as e_reply: except Exception as e_reply:
logging.error(f"Failed to send error reply: {e_reply}") logger.error(f"Failed to send error reply to user {user_id} in _process_combined_messages: {e_reply}", exc_info=True)
async def _abort_processing_logic(self, user_id: int) -> str: async def _abort_processing_logic(self, user_id: int) -> str:
return await self.bot.abort_processing(user_id) return await self.bot.abort_processing(user_id)
@@ -137,13 +254,29 @@ class TelegramHelper:
query = update.callback_query query = update.callback_query
await query.answer() await query.answer()
user_id = query.from_user.id user_id = query.from_user.id
logger.info(f"Abort requested by user {user_id}")
response_text = await self._abort_processing_logic(user_id) response_text = await self._abort_processing_logic(user_id)
try:
await query.edit_message_text(text=response_text) await query.edit_message_text(text=response_text)
except Exception as e: # Message might have been deleted or changed
logger.warning(f"Failed to edit message on abort for user {user_id}: {e}. Sending new message instead.")
if query.message: # Check if message attribute exists
await context.bot.send_message(chat_id=query.message.chat_id, text=response_text)
async def browse(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def browse(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# Assuming browse_command is defined elsewhere and compatible
await browse_command(update, context, self.bot) await browse_command(update, context, self.bot)
async def handle_button_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# Assuming button_callback is defined elsewhere and compatible
await button_callback(update, context, self.bot)
def run(self): def run(self):
if not self.telegram_bot_token:
logger.error("Cannot run bot: TELEGRAM_BOT_TOKEN is not set.")
return
application = Application.builder().token(self.telegram_bot_token).build() application = Application.builder().token(self.telegram_bot_token).build()
application.add_handler(CommandHandler("start", self.start)) application.add_handler(CommandHandler("start", self.start))
@@ -151,9 +284,17 @@ class TelegramHelper:
application.add_handler(CommandHandler("switch", self.switch)) application.add_handler(CommandHandler("switch", self.switch))
application.add_handler(CommandHandler("status", self.status)) application.add_handler(CommandHandler("status", self.status))
application.add_handler(CommandHandler("browse", self.browse)) application.add_handler(CommandHandler("browse", self.browse))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
application.add_handler(CallbackQueryHandler(self.abort_processing, pattern='^abort$'))
application.add_handler(CallbackQueryHandler(button_callback, pattern='^(browse|file):'))
logging.info("Bot is running...") # Use the new handle_message_batching for text messages
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message_batching))
application.add_handler(CallbackQueryHandler(self.abort_processing, pattern='^abort$'))
# Ensure button_callback is correctly routed if it's part of this class or imported
application.add_handler(CallbackQueryHandler(self.handle_button_callback, pattern='^(browse|file):'))
logger.info("Bot is running...")
try:
application.run_polling() application.run_polling()
except Exception as e:
logger.critical(f"Bot failed to start or crashed: {e}", exc_info=True)