Files
cyclop/discord_helper.py
T

140 lines
6.5 KiB
Python
Raw Normal View History

import os
import logging
import asyncio
import time
from typing import TypedDict, Union, TypeAlias, List
import discord
from discord.ext import commands
from inference_bot import InferenceBot
class MessageHandlerLogicResult(TypedDict):
success: bool
response_text: Union[str, None]
error_message: Union[str, None]
LogicResult: TypeAlias = MessageHandlerLogicResult
class DiscordHelper(commands.Cog):
QUOTE_BLOCK_START = '>>> '\n**Thinking...**'
QUOTE_BLOCK_END = ''
CHUNK_MESSAGE_SLEEP_DURATION = 0.1
def __init__(self, bot: InferenceBot,
bot_config: dict = None,
chunk_message_sleep_duration: float | None = None,
logger=None):
self.bot_logic = bot # Avoid confusion with Discord's bot
self.discord_bot_token = os.getenv('DISCORD_BOT_TOKEN')
self.start_time = time.time()
self.chunk_message_sleep_duration = chunk_message_sleep_duration if chunk_message_sleep_duration is not None else self.CHUNK_MESSAGE_SLEEP_DURATION
self.logger = logger or logging.getLogger(__name__)
self.bot = None # This will be set in run()
async def _start_logic(self) -> str:
await self.bot_logic.start()
return "Hello! I'm your AI assistant for Discord. How can I help you today?"
@discord.app_commands.command(name="start", description="Starts the bot and initializes the conversation.")
async def start(self, interaction: discord.Interaction):
await interaction.response.defer() # Defer the response as the logic might take time
response_message = await self._start_logic()
await interaction.followup.send(response_message)
async def _clear_logic(self, user_id: int) -> str:
self.bot_logic.clear_conversation_history(user_id)
return "Conversation history cleared. Let's start fresh!"
@discord.app_commands.command(name="clear", description="Clears your conversation history with the bot.")
async def clear(self, interaction: discord.Interaction):
await interaction.response.defer()
user_id = interaction.user.id
response_message = await self._clear_logic(user_id)
await interaction.followup.send(response_message)
async def _status_logic(self) -> str:
return await self.bot_logic.get_bot_status()
@discord.app_commands.command(name="status", description="Checks the current status of the bot.")
async def status(self, interaction: discord.Interaction):
await interaction.response.defer()
response_message = await self._status_logic()
await interaction.followup.send(response_message)
async def _switch_logic(self) -> str:
if hasattr(self.bot_logic, 'switch_model'):
return await self.bot_logic.switch_model()
else:
return "Model switching is not supported for this bot."
@discord.app_commands.command(name="switch", description="Switches the underlying model (if supported).")
async def switch(self, interaction: discord.Interaction):
await interaction.response.defer()
response_message = await self._switch_logic()
await interaction.followup.send(response_message)
async def _handle_message_logic(self, user_id: int, user_message: str) -> LogicResult:
try:
response = await self.bot_logic.handle_message(user_id, user_message)
processed_response = response.replace("<think>", self.QUOTE_BLOCK_START).replace("</think>", self.QUOTE_BLOCK_END)
return LogicResult(success=True, response_text=processed_response, error_message=None)
except Exception as e:
self.logger.error(f"Error in _handle_message_logic for user {user_id}: {str(e)}")
return LogicResult(success=False, response_text=None, error_message=str(e))
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
if message.author.bot or message.is_command(): # Ignore bot messages and commands handled by slash commands
return
user_id = message.author.id
user_message = message.content
# Only process messages that mention the bot, or are in a DM
if self.bot.user in message.mentions or isinstance(message.channel, discord.DMChannel):
try:
logic_result = await self._handle_message_logic(user_id, user_message)
if logic_result["success"]:
response_text = logic_result["response_text"]
if response_text:
if len(response_text) > 2000:
chunks = [response_text[i:i + 2000] for i in range(0, len(response_text), 2000)]
for chunk in chunks:
await message.channel.send(chunk)
await asyncio.sleep(self.chunk_message_sleep_duration)
else:
await message.channel.send(response_text)
else:
self.logger.warning("Successful logic result but no response text.")
await message.channel.send("Something went unexpectedly well, but I have nothing to say.")
else:
await message.channel.send("Sorry, an error occurred while processing your request.")
except Exception as e:
self.logger.error(f"Outer error in handle_message for user {user_id}: {str(e)}")
try:
await message.channel.send("Sorry, an unexpected error occurred with the bot.")
except Exception as e_reply:
self.logger.error(f"Failed to send error reply: {e_reply}")
def run(self):
intents = discord.Intents.default()
intents.message_content = True # Required for accessing message.content
intents.messages = True
intents.guilds = True # Required for synchronizing slash commands globally
bot = commands.Bot(command_prefix="!", intents=intents)
self.bot = bot # Save instance for on_message lookup and command tree
@bot.event
async def on_ready():
self.logger.info(f"Logged in as {bot.user} (ID: {bot.user.id})")
await bot.add_cog(self)
try:
synced = await bot.tree.sync()
self.logger.info(f"Synced {len(synced)} command(s).")
except Exception as e:
self.logger.error(f"Error syncing commands: {e}")
self.logger.info("Discord bot is running...")
bot.run(self.discord_bot_token)