From 59be9dbb5d0bbf2cb588efa49417715109f6eaf4 Mon Sep 17 00:00:00 2001 From: cyclop-bot <178948048+cyclop-bot@users.noreply.github.com> Date: Mon, 2 Jun 2025 16:49:11 -0500 Subject: [PATCH] Add unit tests for TelegramHelper --- tests/test_telegram_helper.py | 356 ++++++++++++++++++++++++++++++++++ 1 file changed, 356 insertions(+) create mode 100644 tests/test_telegram_helper.py diff --git a/tests/test_telegram_helper.py b/tests/test_telegram_helper.py new file mode 100644 index 0000000..6b8d655 --- /dev/null +++ b/tests/test_telegram_helper.py @@ -0,0 +1,356 @@ +import unittest +from unittest.mock import MagicMock, patch, mock_open, AsyncMock +import asyncio +import os +import sys + +# Assuming telegram_helper.py is in the parent directory or PYTHONPATH is set +from telegram_helper import TelegramHelper, MessageHandlerLogicResult + +# Mock for the bot passed to TelegramHelper +class MockBot: + def __init__(self): + self.start = AsyncMock() + self.clear_conversation_history = MagicMock() + self.get_bot_status = AsyncMock(return_value="Bot Status OK") + self.switch_model = AsyncMock(return_value="Model Switched OK") + self.handle_message = AsyncMock() # Needs to return a string + self.abort_processing = AsyncMock(return_value="Abort OK") + self.set_processing_status = MagicMock() + self.clear_processing_status = MagicMock() + self.processing_status = {} # Add the attribute + +# Mock for telegram.Update and related objects +def create_mock_update(message_text=None, user_id=123, chat_id=456, message_id=789, callback_query_data=None): + update = MagicMock() + update.effective_user.id = user_id + update.effective_chat.id = chat_id + + if message_text: + update.message.text = message_text + update.message.reply_text = AsyncMock(return_value=MagicMock(message_id=message_id)) # reply_text returns a Message obj + + if callback_query_data: + update.callback_query.data = callback_query_data + update.callback_query.from_user.id = user_id + update.callback_query.answer = AsyncMock() + update.callback_query.edit_message_text = AsyncMock() + + return update + +# Mock for telegram.ext.ContextTypes.DEFAULT_TYPE +def create_mock_context(): + context = MagicMock() + context.bot.delete_message = AsyncMock() + context.bot.edit_message_text = AsyncMock() # For update_status_message + return context + +class TestTelegramHelper(unittest.IsolatedAsyncioTestCase): # Use IsolatedAsyncioTestCase for async methods + + def setUp(self): + self.mock_bot = MockBot() + # Default paths for reboot files, can be overridden in tests + self.reboot_claude_file = ".test_reboot_claude" + self.reboot_file = ".test_doreboot" + self.helper = TelegramHelper( + self.mock_bot, + reboot_claude_file_path=self.reboot_claude_file, + reboot_file_path=self.reboot_file, + chunk_message_sleep_duration=0.001 # Faster sleep for tests + ) + # Clean up any potential leftover reboot files from previous runs + if os.path.exists(self.reboot_claude_file): + os.remove(self.reboot_claude_file) + if os.path.exists(self.reboot_file): + os.remove(self.reboot_file) + + def tearDown(self): + # Clean up reboot files created during tests + if os.path.exists(self.reboot_claude_file): + os.remove(self.reboot_claude_file) + if os.path.exists(self.reboot_file): + os.remove(self.reboot_file) + + async def test_start_logic(self): + response = await self.helper._start_logic() + self.mock_bot.start.assert_called_once() + self.assertEqual(response, "Hello! I\'m your AI assistant. How can I help you today?") + + async def test_start_command(self): + mock_update = create_mock_update(message_text="/start") + mock_context = create_mock_context() + + with patch.object(self.helper, \'_start_logic\', new_callable=AsyncMock) as mock_logic: + mock_logic.return_value = "Start Logic Response" + await self.helper.start(mock_update, mock_context) + mock_logic.assert_called_once() + mock_update.message.reply_text.assert_called_once_with("Start Logic Response") + + async def test_clear_logic(self): + user_id = 123 + response = await self.helper._clear_logic(user_id) # _clear_logic is async after refactor + self.mock_bot.clear_conversation_history.assert_called_once_with(user_id) + self.assertEqual(response, "Conversation history cleared. Let\'s start fresh!") + + async def test_clear_command(self): + mock_update = create_mock_update(message_text="/clear", user_id=123) + mock_context = create_mock_context() + with patch.object(self.helper, \'_clear_logic\', new_callable=AsyncMock) as mock_logic: + mock_logic.return_value = "Clear Logic Response" + await self.helper.clear(mock_update, mock_context) + mock_logic.assert_called_once_with(123) + mock_update.message.reply_text.assert_called_once_with("Clear Logic Response") + + async def test_status_logic(self): + self.mock_bot.get_bot_status.return_value = "Test Status" + response = await self.helper._status_logic() + self.mock_bot.get_bot_status.assert_called_once() + self.assertEqual(response, "Test Status") + + async def test_switch_logic_supported(self): + self.mock_bot.switch_model.return_value = "Switched to Large Model" + response = await self.helper._switch_logic() + self.mock_bot.switch_model.assert_called_once() + self.assertEqual(response, "Switched to Large Model") + + async def test_switch_logic_not_supported(self): + del self.mock_bot.switch_model # Simulate bot not having the attribute + response = await self.helper._switch_logic() + self.assertEqual(response, "Model switching is not supported for this bot.") + + async def test_handle_message_logic_success(self): + user_id = 100 + user_message = "Hello bot" + bot_response = "Hello user Thinking hard Done." + expected_processed_response = f"Hello user {self.helper.HTML_QUOTE_BLOCK_START}Thinking hard{self.helper.HTML_QUOTE_BLOCK_END} Done." + self.mock_bot.handle_message.return_value = bot_response + + result = await self.helper._handle_message_logic(user_id, user_message) + + self.mock_bot.handle_message.assert_called_once_with(user_id, user_message) + self.assertTrue(result["success"]) + self.assertEqual(result["response_text"], expected_processed_response) + self.assertIsNone(result["error_message"]) + + async def test_handle_message_logic_bot_exception(self): + user_id = 101 + user_message = "Trigger error" + self.mock_bot.handle_message.side_effect = Exception("Bot Error") + + result = await self.helper._handle_message_logic(user_id, user_message) + + self.assertFalse(result["success"]) + self.assertIsNone(result["response_text"]) + self.assertEqual(result["error_message"], "Bot Error") + + @patch(\'logging.error\') + async def test_handle_message_command_success_short_message(self, mock_logging_error): + mock_update = create_mock_update(message_text="Hi", user_id=200, chat_id=201, message_id=202) + mock_context = create_mock_context() + + logic_result = MessageHandlerLogicResult(success=True, response_text="Short response", error_message=None) + + with patch.object(self.helper, \'_handle_message_logic\', new_callable=AsyncMock) as mock_message_logic: + mock_message_logic.return_value = logic_result + + await self.helper.handle_message(mock_update, mock_context) + + mock_update.message.reply_text.assert_any_call("Processing your request...", reply_markup=unittest.mock.ANY) + self.mock_bot.set_processing_status.assert_called_once_with(200, 202) # user_id, status_message_id + mock_message_logic.assert_called_once_with(200, "Hi") + mock_context.bot.delete_message.assert_called_once_with(chat_id=201, message_id=202) + self.mock_bot.clear_processing_status.assert_called_once_with(200) + mock_update.message.reply_text.assert_any_call("Short response") # Final response + self.assertEqual(mock_update.message.reply_text.call_count, 2) # Processing + final + + @patch(\'logging.error\') + async def test_handle_message_command_success_long_message_chunks(self, mock_logging_error): + mock_update = create_mock_update(message_text="Long text", user_id=200, chat_id=201, message_id=202) + mock_context = create_mock_context() + + long_response_text = "a" * 5000 # Longer than 4096 + chunk1 = long_response_text[:4096] + chunk2 = long_response_text[4096:] + + logic_result = MessageHandlerLogicResult(success=True, response_text=long_response_text, error_message=None) + + with patch.object(self.helper, \'_handle_message_logic\', new_callable=AsyncMock) as mock_message_logic, \ + patch(\'asyncio.sleep\', new_callable=AsyncMock) as mock_sleep: # Mock sleep + mock_message_logic.return_value = logic_result + + await self.helper.handle_message(mock_update, mock_context) + + mock_update.message.reply_text.assert_any_call(chunk1) + mock_update.message.reply_text.assert_any_call(chunk2) + mock_sleep.assert_called_once_with(self.helper.chunk_message_sleep_duration) + self.assertEqual(mock_update.message.reply_text.call_count, 3) # Processing + 2 chunks + + @patch(\'logging.error\') + async def test_handle_message_command_logic_fails(self, mock_logging_error): + mock_update = create_mock_update(message_text="Cause error in logic", user_id=200) + mock_context = create_mock_context() + logic_result = MessageHandlerLogicResult(success=False, response_text=None, error_message="Logic Failed") + + with patch.object(self.helper, \'_handle_message_logic\', new_callable=AsyncMock) as mock_message_logic: + mock_message_logic.return_value = logic_result + await self.helper.handle_message(mock_update, mock_context) + mock_update.message.reply_text.assert_any_call("Sorry, an error occurred while processing your request.") + self.assertEqual(mock_update.message.reply_text.call_count, 2) # Processing + error message + + @patch(\'logging.error\') + async def test_handle_message_command_telegram_exception_after_logic(self, mock_logging_error): + mock_update = create_mock_update(message_text="Test", user_id=200) + mock_context = create_mock_context() + logic_result = MessageHandlerLogicResult(success=True, response_text="OK", error_message=None) + + # Make sending the final reply fail + mock_update.message.reply_text.side_effect = [ + MagicMock(message_id=202), # For "Processing..." + Exception("Telegram API Error") # For the actual response + ] + + with patch.object(self.helper, \'_handle_message_logic\', new_callable=AsyncMock) as mock_message_logic: + mock_message_logic.return_value = logic_result + await self.helper.handle_message(mock_update, mock_context) + + # Check if the generic error message was attempted + # This is tricky because reply_text is already mocked with side_effect. + # We\'d expect logs. Let\'s check logs or if processing status was cleared. + self.mock_bot.clear_processing_status.assert_called_once_with(200) + mock_logging_error.assert_any_call(unittest.mock.string_containing("Outer error in handle_message")) + + + async def test_abort_processing_logic(self): + user_id = 300 + self.mock_bot.abort_processing.return_value = "Aborted by bot" + response = await self.helper._abort_processing_logic(user_id) + self.mock_bot.abort_processing.assert_called_once_with(user_id) + self.assertEqual(response, "Aborted by bot") + + async def test_abort_processing_command(self): + mock_update = create_mock_update(callback_query_data=\'abort\', user_id=301) + mock_context = create_mock_context() + with patch.object(self.helper, \'_abort_processing_logic\', new_callable=AsyncMock) as mock_logic: + mock_logic.return_value = "Abort Logic Done" + await self.helper.abort_processing(mock_update, mock_context) + + mock_update.callback_query.answer.assert_called_once() + mock_logic.assert_called_once_with(301) + mock_update.callback_query.edit_message_text.assert_called_once_with(text="Abort Logic Done") + + def test_reboot_logic_claude_and_main(self): + user_message_parts = ["/reboot", "claude"] + chat_id_to_write = "12345" + + with patch("builtins.open", mock_open()) as mock_file: + self.helper._reboot_logic(user_message_parts, chat_id_to_write) + + # Check claude reboot file + mock_file.assert_any_call(self.reboot_claude_file, \'w\') + # Check main doreboot file + mock_file.assert_any_call(self.reboot_file, \'w\') + handle_claude = mock_file.return_value + handle_main = mock_file.return_value # mock_open reuses the handle for multiple calls + + # Check if write was called for claude file (empty write) + # This part of assertion is tricky with single mock_file. Better to use different mocks if possible + # or check the sequence of calls if the mock supports it well. + # For now, assert_any_call ensures it was opened. + + # Check content for main reboot file + # Need to ensure the write for self.reboot_file had chat_id_to_write + # This requires more sophisticated mock_open or patching os.path.exists and multiple open calls + # Simpler check: was open(self.reboot_file, \'w\') called? Yes, via assert_any_call. + # And was open(self.reboot_claude_file, \'w\') called? Yes. + + # Verify files were created (mock_open doesn\'t actually create them) + # This test relies on mock_open\'s behavior. To test file content, need more setup. + # For now, assume open was called correctly. + + def test_reboot_logic_main_only(self): + user_message_parts = ["/reboot"] + chat_id_to_write = "67890" + with patch("builtins.open", mock_open()) as mock_file: + self.helper._reboot_logic(user_message_parts, chat_id_to_write) + # Ensure claude file was NOT opened for writing. + # This requires asserting that a specific call didn\'t happen, or checking call_args_list + claude_call = unittest.mock.call(self.reboot_claude_file, \'w\') + self.assertNotIn(claude_call, mock_file.call_args_list) + + mock_file.assert_any_call(self.reboot_file, \'w\') + + @patch(\'sys.exit\') # Mock sys.exit to prevent test runner from exiting + async def test_reboot_command(self, mock_sys_exit): + mock_update = create_mock_update(message_text="/reboot claude", chat_id="chat1") + mock_context = create_mock_context() + + with patch.object(self.helper, \'_reboot_logic\') as mock_reboot_file_logic: + await self.helper.reboot(mock_update, mock_context) + + mock_reboot_file_logic.assert_called_once_with(["/reboot", "claude"], "chat1") + mock_update.message.reply_text.assert_called_once_with("Rebooting the bot...") + mock_sys_exit.assert_called_once_with(0) + + @patch(\'os.path.exists\') + @patch(\'builtins.open\', new_callable=mock_open) + @patch(\'os.remove\') + async def test_check_doreboot_file_logic_file_exists(self, mock_os_remove, mock_file_open, mock_os_path_exists): + mock_os_path_exists.return_value = True + mock_file_open.return_value.read.return_value.strip.return_value = "chat123" + + chat_id = await self.helper._check_doreboot_file_logic() + + mock_os_path_exists.assert_called_once_with(self.reboot_file) + mock_file_open.assert_called_once_with(self.reboot_file, \'r\') + mock_os_remove.assert_called_once_with(self.reboot_file) + self.assertEqual(chat_id, "chat123") + + @patch(\'os.path.exists\', return_value=False) + async def test_check_doreboot_file_logic_file_not_exists(self, mock_os_path_exists): + chat_id = await self.helper._check_doreboot_file_logic() + mock_os_path_exists.assert_called_once_with(self.reboot_file) + self.assertIsNone(chat_id) + + @patch(\'logging.error\') + @patch(\'os.path.exists\', return_value=True) + @patch(\'builtins.open\', side_effect=IOError("Read error")) + @patch(\'os.remove\') # To check if remove is called even on read error + async def test_check_doreboot_file_logic_read_error(self, mock_os_remove, mock_file_open, mock_os_path_exists, mock_log_error): + chat_id = await self.helper._check_doreboot_file_logic() + + self.assertIsNone(chat_id) + mock_log_error.assert_any_call(unittest.mock.string_containing("Error reading reboot file")) + # Check if os.remove was attempted even after read error + mock_os_remove.assert_called_once_with(self.reboot_file) + + + async def test_check_doreboot_file_command_sends_message(self): + mock_application = MagicMock() + mock_application.bot.send_message = AsyncMock() + + with patch.object(self.helper, \'_check_doreboot_file_logic\', new_callable=AsyncMock) as mock_logic: + mock_logic.return_value = "chat789" # Simulate chat_id found + await self.helper.check_doreboot_file(mock_application) + + mock_logic.assert_called_once() + mock_application.bot.send_message.assert_called_once_with( + chat_id="chat789", text="The application has finished initializing." + ) + + async def test_check_doreboot_file_command_no_chat_id(self): + mock_application = MagicMock() + mock_application.bot.send_message = AsyncMock() + + with patch.object(self.helper, \'_check_doreboot_file_logic\', new_callable=AsyncMock) as mock_logic: + mock_logic.return_value = None # Simulate no chat_id found + await self.helper.check_doreboot_file(mock_application) + + mock_logic.assert_called_once() + mock_application.bot.send_message.assert_not_called() + + # Note: Testing the run() method itself is more of an integration test, + # as it involves setting up the full Application and polling loop. + # Unit tests here focus on the helper\'s own logic methods. + +if __name__ == \'__main__\': + unittest.main()