2024-08-18 21:11:42 -05:00
import json
import os
import importlib
import inspect
import logging
import asyncio
from telegram import error as TelegramErrors , Update , __version__ as telegram_version , InlineKeyboardButton , InlineKeyboardMarkup
from telegram . ext import Application , CommandHandler , MessageHandler , filters , ContextTypes , CallbackQueryHandler
from dotenv import load_dotenv
from tools . base_tool import BaseTool
from tools . metrics_tool import MetricsTool
2024-08-19 10:27:20 -05:00
from openai import OpenAI
2024-08-18 21:11:42 -05:00
# Load environment variables
load_dotenv ( )
2024-08-19 10:27:20 -05:00
openai_client = OpenAI (
api_key = os . environ . get ( " GLHF_CHAT_API_KEY " ) ,
base_url = os . environ . get ( " GLHF_CHAT_API_BASE " , " https://api.openai.com/v1 " )
)
2024-08-18 21:11:42 -05:00
# Set up logging to console and file
logging . basicConfig ( level = logging . WARNING , handlers = [
logging . StreamHandler ( ) ,
logging . FileHandler ( ' logs/output.log ' , mode = ' a ' )
] )
# Set up Telegram bot
TELEGRAM_BOT_TOKEN = os . getenv ( ' TELEGRAM_BOT_TOKEN ' )
# Load system prompt
with open ( " prompts/developer_prompt.txt " , " r " ) as file :
system_prompt = file . read ( ) . strip ( )
# Dictionary to store conversation history for each user
conversation_history = { }
# Dictionary to store processing status for each user
processing_status = { }
# Load tools
tools = [ MetricsTool ( ) ] # Add MetricsTool instance
tools_dir = os . path . join ( os . path . dirname ( __file__ ) , ' tools ' )
for filename in os . listdir ( tools_dir ) :
if filename . endswith ( ' .py ' ) and filename not in [ ' __init__.py ' , ' base_tool.py ' , ' metrics_tool.py ' ] :
module_name = f ' tools. { filename [ : - 3 ] } '
module = importlib . import_module ( module_name )
for name , obj in inspect . getmembers ( module ) :
if inspect . isclass ( obj ) and issubclass ( obj , BaseTool ) and obj != BaseTool :
tools . append ( obj ( ) )
# Collect all function definitions
functions = [ ]
for tool in tools :
functions . extend ( tool . get_functions ( ) )
async def start ( update : Update , context : ContextTypes . DEFAULT_TYPE ) - > None :
logging . info ( " Bot started " )
await update . message . reply_text (
2024-08-19 10:27:20 -05:00
" Hello! I ' m your AI assistant. How can I help you today? "
2024-08-18 21:11:42 -05:00
)
async def clear ( update : Update , context : ContextTypes . DEFAULT_TYPE ) - > None :
user_id = update . effective_user . id
if user_id in conversation_history :
del conversation_history [ user_id ]
for tool in tools :
tool . clear ( )
2024-08-19 10:27:20 -05:00
logging . info ( f " Cleared conversation history for user { user_id } " )
await update . message . reply_text ( " Conversation history cleared. Let ' s start fresh! " )
2024-08-18 21:11:42 -05:00
async def update_status_message ( context : ContextTypes . DEFAULT_TYPE , chat_id : int , message_id : int , status : str ) :
keyboard = [
[ InlineKeyboardButton ( " Abort " , callback_data = ' abort ' ) ]
]
reply_markup = InlineKeyboardMarkup ( keyboard )
await context . bot . edit_message_text (
chat_id = chat_id ,
message_id = message_id ,
text = f " Current status: { status } " ,
reply_markup = reply_markup
)
async def handle_message ( update : Update , context : ContextTypes . DEFAULT_TYPE ) - > None :
try :
user_id = update . effective_user . id
user_message = update . message . text
logging . info ( f " Message from user { user_id } : { user_message } " )
if user_id not in conversation_history :
conversation_history [ user_id ] = [ ]
conversation_history [ user_id ] . append ( { " role " : " user " , " content " : user_message } )
# Send initial status message
status_message = await update . message . reply_text ( " Processing your request... " , reply_markup = InlineKeyboardMarkup ( [ [ InlineKeyboardButton ( " Abort " , callback_data = ' abort ' ) ] ] ) )
processing_status [ user_id ] = { " processing " : True , " message_id " : status_message . message_id }
messages = conversation_history [ user_id ]
response = get_chat_response ( messages )
2024-08-19 10:27:20 -05:00
tool_calls = [ ]
for choice in response . choices :
if choice . finish_reason == " tool_calls " :
tool_calls . append ( choice )
else :
assistant_message = choice . message . content
messages . append ( { " role " : " assistant " , " content " : assistant_message } )
2024-08-18 21:11:42 -05:00
toolUseCount = 0
previous_function_name = " "
while tool_calls and toolUseCount < 50 and processing_status [ user_id ] [ " processing " ] :
tool_use_results = [ ]
for tool_call in tool_calls :
function_name = tool_call [ ' function ' ] [ ' name ' ]
if previous_function_name != function_name :
await update_status_message ( context , update . effective_chat . id , status_message . message_id , f " Using tool: { function_name } " )
previous_function_name = function_name
tool_response = call_tool ( tool_call [ ' function ' ] )
tool_use_results . append ( { " tool_call_id " : tool_call [ ' id ' ] , " role " : " tool " , " name " : function_name , " content " : json . dumps ( tool_response ) } )
messages . extend ( tool_use_results )
response = get_chat_response ( messages )
tool_calls = response . get ( ' tool_calls ' , [ ] )
assistant_message = response . get ( ' content ' , ' ' )
messages . append ( { " role " : " assistant " , " content " : assistant_message } )
toolUseCount + = 1
if toolUseCount == 0 :
conversation_history [ user_id ] . append ( { " role " : " assistant " , " content " : assistant_message } )
if len ( conversation_history [ user_id ] ) > 20 :
conversation_history [ user_id ] = conversation_history [ user_id ] [ - 20 : ]
# Remove the status message
await context . bot . delete_message ( chat_id = update . effective_chat . id , message_id = status_message . message_id )
del processing_status [ user_id ]
try :
await update . message . reply_text ( assistant_message )
except TelegramErrors . BadRequest as e :
logging . error ( f " An error occurred when trying to send a message in telegram: { str ( e ) } " )
except Exception as e :
logging . error ( f " An error occurred: { str ( e ) } " )
await update . message . reply_text ( " Sorry, an error occurred while processing your request. " )
def call_tool ( function_call ) :
function_name = function_call [ ' name ' ]
function_args = json . loads ( function_call [ ' arguments ' ] )
for tool in tools :
if function_name in [ f [ " name " ] for f in tool . get_functions ( ) ] :
return tool . execute ( function_name , * * function_args )
def get_chat_response ( messages ) :
try :
2024-08-19 10:27:20 -05:00
response = openai_client . chat . completions . create (
model = " hf:meta-llama/Meta-Llama-3.1-405B-Instruct " , # You can change this to your desired model
2024-08-18 21:11:42 -05:00
messages = [ { " role " : " system " , " content " : system_prompt } ] + messages ,
2024-08-19 10:27:20 -05:00
max_tokens = 4096 ,
temperature = 0.6 ,
2024-08-18 21:11:42 -05:00
top_p = 1 ,
frequency_penalty = 0 ,
presence_penalty = 0 ,
2024-08-19 10:27:20 -05:00
tools = functions
#tool_choice={ }
2024-08-18 21:11:42 -05:00
)
2024-08-19 10:27:20 -05:00
return response
2024-08-18 21:11:42 -05:00
except Exception as e :
logging . error ( f " An error occurred: { str ( e ) } " )
return None
async def status ( update : Update , context : ContextTypes . DEFAULT_TYPE ) - > None :
await update . message . reply_text ( " Currently using gpt-3.5-turbo " )
async def abort_processing ( update : Update , context : ContextTypes . DEFAULT_TYPE ) - > None :
query = update . callback_query
await query . answer ( )
user_id = query . from_user . id
if user_id in processing_status :
processing_status [ user_id ] [ " processing " ] = False
await context . bot . edit_message_text (
chat_id = query . message . chat_id ,
message_id = query . message . message_id ,
text = " Processing aborted. "
)
await clear ( update , context )
else :
await query . edit_message_text ( text = " No active processing to abort. " )
def main ( ) - > None :
# Create the Application and pass it your bot's token
application = Application . builder ( ) . token ( TELEGRAM_BOT_TOKEN ) . build ( )
# Add handlers
application . add_handler ( CommandHandler ( " start " , start ) )
application . add_handler ( CommandHandler ( " clear " , clear ) )
application . add_handler ( CommandHandler ( " status " , status ) )
application . add_handler ( MessageHandler ( filters . TEXT & ~ filters . COMMAND , handle_message ) )
application . add_handler ( CallbackQueryHandler ( abort_processing , pattern = ' ^abort$ ' ) )
# Start the Bot
logging . info ( " Bot is running... " )
application . run_polling ( )
if __name__ == ' __main__ ' :
main ( )