2024-08-19 10:24:17 -05:00
import json
import os
import importlib
import inspect
import logging
import asyncio
from telegram import error as TelegramErrors , Update , __version__ as telegram_version , InlineKeyboardButton , InlineKeyboardMarkup
from telegram . ext import Application , CommandHandler , MessageHandler , filters , ContextTypes , CallbackQueryHandler
from dotenv import load_dotenv
from tools . base_tool import BaseTool
from tools . metrics_tool import MetricsTool
2024-08-19 10:36:03 -05:00
from openai import OpenAI
2024-08-19 10:24:17 -05:00
# Load environment variables
load_dotenv ( )
2024-08-19 10:36:03 -05:00
client = OpenAI (
api_key = os . environ . get ( " OPENAI_API_KEY " ) ,
)
2024-08-19 10:24:17 -05:00
# Set up logging to console and file
logging . basicConfig ( level = logging . WARNING , handlers = [
logging . StreamHandler ( ) ,
logging . FileHandler ( ' logs/output.log ' , mode = ' a ' )
] )
# Set up Telegram bot
TELEGRAM_BOT_TOKEN = os . getenv ( ' TELEGRAM_BOT_TOKEN ' )
# Load system prompt
with open ( " prompts/developer_prompt.txt " , " r " ) as file :
system_prompt = file . read ( ) . strip ( )
# Dictionary to store conversation history for each user
conversation_history = { }
# Dictionary to store processing status for each user
processing_status = { }
# Load tools
tools = [ MetricsTool ( ) ] # Add MetricsTool instance
tools_dir = os . path . join ( os . path . dirname ( __file__ ) , ' tools ' )
for filename in os . listdir ( tools_dir ) :
if filename . endswith ( ' .py ' ) and filename not in [ ' __init__.py ' , ' base_tool.py ' , ' metrics_tool.py ' ] :
module_name = f ' tools. { filename [ : - 3 ] } '
module = importlib . import_module ( module_name )
for name , obj in inspect . getmembers ( module ) :
if inspect . isclass ( obj ) and issubclass ( obj , BaseTool ) and obj != BaseTool :
tools . append ( obj ( ) )
# Collect all function definitions
functions = [ ]
for tool in tools :
functions . extend ( tool . get_functions ( ) )
async def start ( update : Update , context : ContextTypes . DEFAULT_TYPE ) - > None :
logging . info ( " Bot started " )
await update . message . reply_text (
" Hello! I ' m your AI assistant. How can I help you today? You can send me images and then ask questions about them. "
)
async def clear ( update : Update , context : ContextTypes . DEFAULT_TYPE ) - > None :
user_id = update . effective_user . id
if user_id in conversation_history :
del conversation_history [ user_id ]
for tool in tools :
tool . clear ( )
logging . info ( f " Cleared conversation history and image for user { user_id } " )
await update . message . reply_text ( " Conversation history and image cleared. Let ' s start fresh! " )
async def update_status_message ( context : ContextTypes . DEFAULT_TYPE , chat_id : int , message_id : int , status : str ) :
keyboard = [
[ InlineKeyboardButton ( " Abort " , callback_data = ' abort ' ) ]
]
reply_markup = InlineKeyboardMarkup ( keyboard )
await context . bot . edit_message_text (
chat_id = chat_id ,
message_id = message_id ,
text = f " Current status: { status } " ,
reply_markup = reply_markup
)
async def handle_message ( update : Update , context : ContextTypes . DEFAULT_TYPE ) - > None :
try :
user_id = update . effective_user . id
user_message = update . message . text
logging . info ( f " Message from user { user_id } : { user_message } " )
if user_id not in conversation_history :
conversation_history [ user_id ] = [ ]
conversation_history [ user_id ] . append ( { " role " : " user " , " content " : user_message } )
# Send initial status message
status_message = await update . message . reply_text ( " Processing your request... " , reply_markup = InlineKeyboardMarkup ( [ [ InlineKeyboardButton ( " Abort " , callback_data = ' abort ' ) ] ] ) )
processing_status [ user_id ] = { " processing " : True , " message_id " : status_message . message_id }
messages = conversation_history [ user_id ]
response = get_chat_response ( messages )
tool_calls = response . get ( ' function_calls ' , [ ] )
assistant_message = response . get ( ' content ' , ' ' )
messages . append ( { " role " : " assistant " , " content " : assistant_message } )
toolUseCount = 0
previous_function_name = " "
while len ( tool_calls ) > 0 and toolUseCount < 50 and processing_status [ user_id ] [ " processing " ] :
tool_use_results = [ ]
for tool_call in tool_calls :
function_name = tool_call [ ' name ' ]
if previous_function_name != function_name :
await update_status_message ( context , update . effective_chat . id , status_message . message_id , f " Using tool: { function_name } " )
previous_function_name = function_name
tool_response = call_tool ( tool_call )
tool_use_results . append ( { " name " : function_name , " content " : json . dumps ( tool_response ) } )
formatted_result = { " role " : " function " , " content " : json . dumps ( tool_use_results ) }
messages . append ( formatted_result )
response = get_chat_response ( messages )
tool_calls = response . get ( ' function_calls ' , [ ] )
assistant_message = response . get ( ' content ' , ' ' )
messages . append ( { " role " : " assistant " , " content " : assistant_message } )
toolUseCount + = 1
if toolUseCount == 0 :
conversation_history [ user_id ] . append ( { " role " : " assistant " , " content " : assistant_message } )
if len ( conversation_history [ user_id ] ) > 20 :
conversation_history [ user_id ] = conversation_history [ user_id ] [ - 20 : ]
# Remove the status message
await context . bot . delete_message ( chat_id = update . effective_chat . id , message_id = status_message . message_id )
del processing_status [ user_id ]
try :
await update . message . reply_text ( assistant_message )
except TelegramErrors . BadRequest as e :
logging . error ( f " An error occurred when trying to send a message in telegram: { str ( e ) } " )
except Exception as e :
logging . error ( f " An error occurred: { str ( e ) } " )
await update . message . reply_text ( " Sorry, an error occurred while processing your request. " )
def call_tool ( function_call ) :
function_name = function_call [ ' name ' ]
function_args = json . loads ( function_call [ ' arguments ' ] )
for tool in tools :
if function_name in [ f [ " name " ] for f in tool . get_functions ( ) ] :
return tool . execute ( function_name , * * function_args )
def get_chat_response ( messages ) :
return get_openai_response ( messages )
def get_openai_response ( messages ) :
try :
openai_functions = [
{
" name " : function [ ' name ' ] ,
" description " : function [ ' description ' ] ,
" parameters " : function [ ' parameters ' ] if function [ ' parameters ' ] not in [ None , { } ] else { " type " : " object " , " properties " : { " param1 " : { " type " : " string " , " description " : " Unnecessary " } } , " required " : [ ] }
}
for function in functions
]
2024-08-19 10:36:03 -05:00
response = client . chat . completions . create (
model = " gpt-4 " , # Changed from "gpt-4o" to "gpt-4"
2024-08-19 10:24:17 -05:00
messages = [ { " role " : " system " , " content " : system_prompt } ] + messages ,
functions = openai_functions ,
function_call = " auto " ,
)
2024-08-19 10:36:03 -05:00
message = response . choices [ 0 ] . message
content = message . content
function_call = message . function_call
2024-08-19 10:24:17 -05:00
if function_call :
return {
' content ' : content ,
' function_calls ' : [ function_call ]
}
else :
return { ' content ' : content }
except Exception as e :
logging . error ( f " An error occurred: { str ( e ) } " )
return None
async def status ( update : Update , context : ContextTypes . DEFAULT_TYPE ) - > None :
2024-08-19 10:36:03 -05:00
await update . message . reply_text ( " Currently using gpt-4 " )
2024-08-19 10:24:17 -05:00
async def abort_processing ( update : Update , context : ContextTypes . DEFAULT_TYPE ) - > None :
query = update . callback_query
await query . answer ( )
user_id = query . from_user . id
if user_id in processing_status :
processing_status [ user_id ] [ " processing " ] = False
await context . bot . edit_message_text (
chat_id = query . message . chat_id ,
message_id = query . message . message_id ,
text = " Processing aborted. "
)
await clear ( update , context )
else :
await query . edit_message_text ( text = " No active processing to abort. " )
def main ( ) - > None :
# Create the Application and pass it your bot's token
application = Application . builder ( ) . token ( TELEGRAM_BOT_TOKEN ) . build ( )
# Add handlers
application . add_handler ( CommandHandler ( " start " , start ) )
application . add_handler ( CommandHandler ( " clear " , clear ) )
application . add_handler ( CommandHandler ( " status " , status ) )
application . add_handler ( MessageHandler ( filters . TEXT & ~ filters . COMMAND , handle_message ) )
application . add_handler ( CallbackQueryHandler ( abort_processing , pattern = ' ^abort$ ' ) )
# Start the Bot
logging . info ( " Bot is running... " )
application . run_polling ( )
if __name__ == ' __main__ ' :
main ( )