357 lines
18 KiB
Python
357 lines
18 KiB
Python
import unittest
|
|
from unittest.mock import MagicMock, patch, mock_open, AsyncMock
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
|
|
# Assuming telegram_helper.py is in the parent directory or PYTHONPATH is set
|
|
from telegram_helper import TelegramHelper, MessageHandlerLogicResult
|
|
|
|
# Mock for the bot passed to TelegramHelper
|
|
class MockBot:
|
|
def __init__(self):
|
|
self.start = AsyncMock()
|
|
self.clear_conversation_history = MagicMock()
|
|
self.get_bot_status = AsyncMock(return_value="Bot Status OK")
|
|
self.switch_model = AsyncMock(return_value="Model Switched OK")
|
|
self.handle_message = AsyncMock() # Needs to return a string
|
|
self.abort_processing = AsyncMock(return_value="Abort OK")
|
|
self.set_processing_status = MagicMock()
|
|
self.clear_processing_status = MagicMock()
|
|
self.processing_status = {} # Add the attribute
|
|
|
|
# Mock for telegram.Update and related objects
|
|
def create_mock_update(message_text=None, user_id=123, chat_id=456, message_id=789, callback_query_data=None):
|
|
update = MagicMock()
|
|
update.effective_user.id = user_id
|
|
update.effective_chat.id = chat_id
|
|
|
|
if message_text:
|
|
update.message.text = message_text
|
|
update.message.reply_text = AsyncMock(return_value=MagicMock(message_id=message_id)) # reply_text returns a Message obj
|
|
|
|
if callback_query_data:
|
|
update.callback_query.data = callback_query_data
|
|
update.callback_query.from_user.id = user_id
|
|
update.callback_query.answer = AsyncMock()
|
|
update.callback_query.edit_message_text = AsyncMock()
|
|
|
|
return update
|
|
|
|
# Mock for telegram.ext.ContextTypes.DEFAULT_TYPE
|
|
def create_mock_context():
|
|
context = MagicMock()
|
|
context.bot.delete_message = AsyncMock()
|
|
context.bot.edit_message_text = AsyncMock() # For update_status_message
|
|
return context
|
|
|
|
class TestTelegramHelper(unittest.IsolatedAsyncioTestCase): # Use IsolatedAsyncioTestCase for async methods
|
|
|
|
def setUp(self):
|
|
self.mock_bot = MockBot()
|
|
# Default paths for reboot files, can be overridden in tests
|
|
self.reboot_claude_file = ".test_reboot_claude"
|
|
self.reboot_file = ".test_doreboot"
|
|
self.helper = TelegramHelper(
|
|
self.mock_bot,
|
|
reboot_claude_file_path=self.reboot_claude_file,
|
|
reboot_file_path=self.reboot_file,
|
|
chunk_message_sleep_duration=0.001 # Faster sleep for tests
|
|
)
|
|
# Clean up any potential leftover reboot files from previous runs
|
|
if os.path.exists(self.reboot_claude_file):
|
|
os.remove(self.reboot_claude_file)
|
|
if os.path.exists(self.reboot_file):
|
|
os.remove(self.reboot_file)
|
|
|
|
def tearDown(self):
|
|
# Clean up reboot files created during tests
|
|
if os.path.exists(self.reboot_claude_file):
|
|
os.remove(self.reboot_claude_file)
|
|
if os.path.exists(self.reboot_file):
|
|
os.remove(self.reboot_file)
|
|
|
|
async def test_start_logic(self):
|
|
response = await self.helper._start_logic()
|
|
self.mock_bot.start.assert_called_once()
|
|
self.assertEqual(response, "Hello! I\'m your AI assistant. How can I help you today?")
|
|
|
|
async def test_start_command(self):
|
|
mock_update = create_mock_update(message_text="/start")
|
|
mock_context = create_mock_context()
|
|
|
|
with patch.object(self.helper, \'_start_logic\', new_callable=AsyncMock) as mock_logic:
|
|
mock_logic.return_value = "Start Logic Response"
|
|
await self.helper.start(mock_update, mock_context)
|
|
mock_logic.assert_called_once()
|
|
mock_update.message.reply_text.assert_called_once_with("Start Logic Response")
|
|
|
|
async def test_clear_logic(self):
|
|
user_id = 123
|
|
response = await self.helper._clear_logic(user_id) # _clear_logic is async after refactor
|
|
self.mock_bot.clear_conversation_history.assert_called_once_with(user_id)
|
|
self.assertEqual(response, "Conversation history cleared. Let\'s start fresh!")
|
|
|
|
async def test_clear_command(self):
|
|
mock_update = create_mock_update(message_text="/clear", user_id=123)
|
|
mock_context = create_mock_context()
|
|
with patch.object(self.helper, \'_clear_logic\', new_callable=AsyncMock) as mock_logic:
|
|
mock_logic.return_value = "Clear Logic Response"
|
|
await self.helper.clear(mock_update, mock_context)
|
|
mock_logic.assert_called_once_with(123)
|
|
mock_update.message.reply_text.assert_called_once_with("Clear Logic Response")
|
|
|
|
async def test_status_logic(self):
|
|
self.mock_bot.get_bot_status.return_value = "Test Status"
|
|
response = await self.helper._status_logic()
|
|
self.mock_bot.get_bot_status.assert_called_once()
|
|
self.assertEqual(response, "Test Status")
|
|
|
|
async def test_switch_logic_supported(self):
|
|
self.mock_bot.switch_model.return_value = "Switched to Large Model"
|
|
response = await self.helper._switch_logic()
|
|
self.mock_bot.switch_model.assert_called_once()
|
|
self.assertEqual(response, "Switched to Large Model")
|
|
|
|
async def test_switch_logic_not_supported(self):
|
|
del self.mock_bot.switch_model # Simulate bot not having the attribute
|
|
response = await self.helper._switch_logic()
|
|
self.assertEqual(response, "Model switching is not supported for this bot.")
|
|
|
|
async def test_handle_message_logic_success(self):
|
|
user_id = 100
|
|
user_message = "Hello bot"
|
|
bot_response = "Hello user <think>Thinking hard</think> Done."
|
|
expected_processed_response = f"Hello user {self.helper.HTML_QUOTE_BLOCK_START}Thinking hard{self.helper.HTML_QUOTE_BLOCK_END} Done."
|
|
self.mock_bot.handle_message.return_value = bot_response
|
|
|
|
result = await self.helper._handle_message_logic(user_id, user_message)
|
|
|
|
self.mock_bot.handle_message.assert_called_once_with(user_id, user_message)
|
|
self.assertTrue(result["success"])
|
|
self.assertEqual(result["response_text"], expected_processed_response)
|
|
self.assertIsNone(result["error_message"])
|
|
|
|
async def test_handle_message_logic_bot_exception(self):
|
|
user_id = 101
|
|
user_message = "Trigger error"
|
|
self.mock_bot.handle_message.side_effect = Exception("Bot Error")
|
|
|
|
result = await self.helper._handle_message_logic(user_id, user_message)
|
|
|
|
self.assertFalse(result["success"])
|
|
self.assertIsNone(result["response_text"])
|
|
self.assertEqual(result["error_message"], "Bot Error")
|
|
|
|
@patch(\'logging.error\')
|
|
async def test_handle_message_command_success_short_message(self, mock_logging_error):
|
|
mock_update = create_mock_update(message_text="Hi", user_id=200, chat_id=201, message_id=202)
|
|
mock_context = create_mock_context()
|
|
|
|
logic_result = MessageHandlerLogicResult(success=True, response_text="Short response", error_message=None)
|
|
|
|
with patch.object(self.helper, \'_handle_message_logic\', new_callable=AsyncMock) as mock_message_logic:
|
|
mock_message_logic.return_value = logic_result
|
|
|
|
await self.helper.handle_message(mock_update, mock_context)
|
|
|
|
mock_update.message.reply_text.assert_any_call("Processing your request...", reply_markup=unittest.mock.ANY)
|
|
self.mock_bot.set_processing_status.assert_called_once_with(200, 202) # user_id, status_message_id
|
|
mock_message_logic.assert_called_once_with(200, "Hi")
|
|
mock_context.bot.delete_message.assert_called_once_with(chat_id=201, message_id=202)
|
|
self.mock_bot.clear_processing_status.assert_called_once_with(200)
|
|
mock_update.message.reply_text.assert_any_call("Short response") # Final response
|
|
self.assertEqual(mock_update.message.reply_text.call_count, 2) # Processing + final
|
|
|
|
@patch(\'logging.error\')
|
|
async def test_handle_message_command_success_long_message_chunks(self, mock_logging_error):
|
|
mock_update = create_mock_update(message_text="Long text", user_id=200, chat_id=201, message_id=202)
|
|
mock_context = create_mock_context()
|
|
|
|
long_response_text = "a" * 5000 # Longer than 4096
|
|
chunk1 = long_response_text[:4096]
|
|
chunk2 = long_response_text[4096:]
|
|
|
|
logic_result = MessageHandlerLogicResult(success=True, response_text=long_response_text, error_message=None)
|
|
|
|
with patch.object(self.helper, \'_handle_message_logic\', new_callable=AsyncMock) as mock_message_logic, \
|
|
patch(\'asyncio.sleep\', new_callable=AsyncMock) as mock_sleep: # Mock sleep
|
|
mock_message_logic.return_value = logic_result
|
|
|
|
await self.helper.handle_message(mock_update, mock_context)
|
|
|
|
mock_update.message.reply_text.assert_any_call(chunk1)
|
|
mock_update.message.reply_text.assert_any_call(chunk2)
|
|
mock_sleep.assert_called_once_with(self.helper.chunk_message_sleep_duration)
|
|
self.assertEqual(mock_update.message.reply_text.call_count, 3) # Processing + 2 chunks
|
|
|
|
@patch(\'logging.error\')
|
|
async def test_handle_message_command_logic_fails(self, mock_logging_error):
|
|
mock_update = create_mock_update(message_text="Cause error in logic", user_id=200)
|
|
mock_context = create_mock_context()
|
|
logic_result = MessageHandlerLogicResult(success=False, response_text=None, error_message="Logic Failed")
|
|
|
|
with patch.object(self.helper, \'_handle_message_logic\', new_callable=AsyncMock) as mock_message_logic:
|
|
mock_message_logic.return_value = logic_result
|
|
await self.helper.handle_message(mock_update, mock_context)
|
|
mock_update.message.reply_text.assert_any_call("Sorry, an error occurred while processing your request.")
|
|
self.assertEqual(mock_update.message.reply_text.call_count, 2) # Processing + error message
|
|
|
|
@patch(\'logging.error\')
|
|
async def test_handle_message_command_telegram_exception_after_logic(self, mock_logging_error):
|
|
mock_update = create_mock_update(message_text="Test", user_id=200)
|
|
mock_context = create_mock_context()
|
|
logic_result = MessageHandlerLogicResult(success=True, response_text="OK", error_message=None)
|
|
|
|
# Make sending the final reply fail
|
|
mock_update.message.reply_text.side_effect = [
|
|
MagicMock(message_id=202), # For "Processing..."
|
|
Exception("Telegram API Error") # For the actual response
|
|
]
|
|
|
|
with patch.object(self.helper, \'_handle_message_logic\', new_callable=AsyncMock) as mock_message_logic:
|
|
mock_message_logic.return_value = logic_result
|
|
await self.helper.handle_message(mock_update, mock_context)
|
|
|
|
# Check if the generic error message was attempted
|
|
# This is tricky because reply_text is already mocked with side_effect.
|
|
# We\'d expect logs. Let\'s check logs or if processing status was cleared.
|
|
self.mock_bot.clear_processing_status.assert_called_once_with(200)
|
|
mock_logging_error.assert_any_call(unittest.mock.string_containing("Outer error in handle_message"))
|
|
|
|
|
|
async def test_abort_processing_logic(self):
|
|
user_id = 300
|
|
self.mock_bot.abort_processing.return_value = "Aborted by bot"
|
|
response = await self.helper._abort_processing_logic(user_id)
|
|
self.mock_bot.abort_processing.assert_called_once_with(user_id)
|
|
self.assertEqual(response, "Aborted by bot")
|
|
|
|
async def test_abort_processing_command(self):
|
|
mock_update = create_mock_update(callback_query_data=\'abort\', user_id=301)
|
|
mock_context = create_mock_context()
|
|
with patch.object(self.helper, \'_abort_processing_logic\', new_callable=AsyncMock) as mock_logic:
|
|
mock_logic.return_value = "Abort Logic Done"
|
|
await self.helper.abort_processing(mock_update, mock_context)
|
|
|
|
mock_update.callback_query.answer.assert_called_once()
|
|
mock_logic.assert_called_once_with(301)
|
|
mock_update.callback_query.edit_message_text.assert_called_once_with(text="Abort Logic Done")
|
|
|
|
def test_reboot_logic_claude_and_main(self):
|
|
user_message_parts = ["/reboot", "claude"]
|
|
chat_id_to_write = "12345"
|
|
|
|
with patch("builtins.open", mock_open()) as mock_file:
|
|
self.helper._reboot_logic(user_message_parts, chat_id_to_write)
|
|
|
|
# Check claude reboot file
|
|
mock_file.assert_any_call(self.reboot_claude_file, \'w\')
|
|
# Check main doreboot file
|
|
mock_file.assert_any_call(self.reboot_file, \'w\')
|
|
handle_claude = mock_file.return_value
|
|
handle_main = mock_file.return_value # mock_open reuses the handle for multiple calls
|
|
|
|
# Check if write was called for claude file (empty write)
|
|
# This part of assertion is tricky with single mock_file. Better to use different mocks if possible
|
|
# or check the sequence of calls if the mock supports it well.
|
|
# For now, assert_any_call ensures it was opened.
|
|
|
|
# Check content for main reboot file
|
|
# Need to ensure the write for self.reboot_file had chat_id_to_write
|
|
# This requires more sophisticated mock_open or patching os.path.exists and multiple open calls
|
|
# Simpler check: was open(self.reboot_file, \'w\') called? Yes, via assert_any_call.
|
|
# And was open(self.reboot_claude_file, \'w\') called? Yes.
|
|
|
|
# Verify files were created (mock_open doesn\'t actually create them)
|
|
# This test relies on mock_open\'s behavior. To test file content, need more setup.
|
|
# For now, assume open was called correctly.
|
|
|
|
def test_reboot_logic_main_only(self):
|
|
user_message_parts = ["/reboot"]
|
|
chat_id_to_write = "67890"
|
|
with patch("builtins.open", mock_open()) as mock_file:
|
|
self.helper._reboot_logic(user_message_parts, chat_id_to_write)
|
|
# Ensure claude file was NOT opened for writing.
|
|
# This requires asserting that a specific call didn\'t happen, or checking call_args_list
|
|
claude_call = unittest.mock.call(self.reboot_claude_file, \'w\')
|
|
self.assertNotIn(claude_call, mock_file.call_args_list)
|
|
|
|
mock_file.assert_any_call(self.reboot_file, \'w\')
|
|
|
|
@patch(\'sys.exit\') # Mock sys.exit to prevent test runner from exiting
|
|
async def test_reboot_command(self, mock_sys_exit):
|
|
mock_update = create_mock_update(message_text="/reboot claude", chat_id="chat1")
|
|
mock_context = create_mock_context()
|
|
|
|
with patch.object(self.helper, \'_reboot_logic\') as mock_reboot_file_logic:
|
|
await self.helper.reboot(mock_update, mock_context)
|
|
|
|
mock_reboot_file_logic.assert_called_once_with(["/reboot", "claude"], "chat1")
|
|
mock_update.message.reply_text.assert_called_once_with("Rebooting the bot...")
|
|
mock_sys_exit.assert_called_once_with(0)
|
|
|
|
@patch(\'os.path.exists\')
|
|
@patch(\'builtins.open\', new_callable=mock_open)
|
|
@patch(\'os.remove\')
|
|
async def test_check_doreboot_file_logic_file_exists(self, mock_os_remove, mock_file_open, mock_os_path_exists):
|
|
mock_os_path_exists.return_value = True
|
|
mock_file_open.return_value.read.return_value.strip.return_value = "chat123"
|
|
|
|
chat_id = await self.helper._check_doreboot_file_logic()
|
|
|
|
mock_os_path_exists.assert_called_once_with(self.reboot_file)
|
|
mock_file_open.assert_called_once_with(self.reboot_file, \'r\')
|
|
mock_os_remove.assert_called_once_with(self.reboot_file)
|
|
self.assertEqual(chat_id, "chat123")
|
|
|
|
@patch(\'os.path.exists\', return_value=False)
|
|
async def test_check_doreboot_file_logic_file_not_exists(self, mock_os_path_exists):
|
|
chat_id = await self.helper._check_doreboot_file_logic()
|
|
mock_os_path_exists.assert_called_once_with(self.reboot_file)
|
|
self.assertIsNone(chat_id)
|
|
|
|
@patch(\'logging.error\')
|
|
@patch(\'os.path.exists\', return_value=True)
|
|
@patch(\'builtins.open\', side_effect=IOError("Read error"))
|
|
@patch(\'os.remove\') # To check if remove is called even on read error
|
|
async def test_check_doreboot_file_logic_read_error(self, mock_os_remove, mock_file_open, mock_os_path_exists, mock_log_error):
|
|
chat_id = await self.helper._check_doreboot_file_logic()
|
|
|
|
self.assertIsNone(chat_id)
|
|
mock_log_error.assert_any_call(unittest.mock.string_containing("Error reading reboot file"))
|
|
# Check if os.remove was attempted even after read error
|
|
mock_os_remove.assert_called_once_with(self.reboot_file)
|
|
|
|
|
|
async def test_check_doreboot_file_command_sends_message(self):
|
|
mock_application = MagicMock()
|
|
mock_application.bot.send_message = AsyncMock()
|
|
|
|
with patch.object(self.helper, \'_check_doreboot_file_logic\', new_callable=AsyncMock) as mock_logic:
|
|
mock_logic.return_value = "chat789" # Simulate chat_id found
|
|
await self.helper.check_doreboot_file(mock_application)
|
|
|
|
mock_logic.assert_called_once()
|
|
mock_application.bot.send_message.assert_called_once_with(
|
|
chat_id="chat789", text="The application has finished initializing."
|
|
)
|
|
|
|
async def test_check_doreboot_file_command_no_chat_id(self):
|
|
mock_application = MagicMock()
|
|
mock_application.bot.send_message = AsyncMock()
|
|
|
|
with patch.object(self.helper, \'_check_doreboot_file_logic\', new_callable=AsyncMock) as mock_logic:
|
|
mock_logic.return_value = None # Simulate no chat_id found
|
|
await self.helper.check_doreboot_file(mock_application)
|
|
|
|
mock_logic.assert_called_once()
|
|
mock_application.bot.send_message.assert_not_called()
|
|
|
|
# Note: Testing the run() method itself is more of an integration test,
|
|
# as it involves setting up the full Application and polling loop.
|
|
# Unit tests here focus on the helper\'s own logic methods.
|
|
|
|
if __name__ == \'__main__\':
|
|
unittest.main()
|