diff --git a/tests/tools/test_metrics.py b/tests/tools/test_metrics.py new file mode 100644 index 0000000..902abb7 --- /dev/null +++ b/tests/tools/test_metrics.py @@ -0,0 +1,217 @@ +import unittest +from unittest.mock import patch, MagicMock, ANY +import time +import logging + +# Ensure tools.metrics is accessible +from tools.metrics import Metrics # Import the class itself for direct testing +from tools.metrics import metrics as global_metrics_instance # Import the global instance + +# A simple function to decorate for testing +def sample_function_for_metrics(duration=0.01): + # Simulate some work + # Note: time.sleep is not always precisely profiled by cProfile in the same way as CPU-bound work. + # For testing, we will mock the cProfile/pstats interaction rather than relying on actual sleep duration. + if duration > 0: # Make it conditional so we can test zero-time case too + pass # The actual work is not important when mocking cProfile results + return "sample_output" + +def another_sample_function(x, y): + return x + y + +class TestMetrics(unittest.TestCase): + + def setUp(self): + # Create a fresh Metrics instance for most tests to avoid interference + self.logger = logging.getLogger('tools.metrics.test') + if not self.logger.handlers: # Avoid adding handler multiple times + self.logger.addHandler(logging.NullHandler()) + self.metrics_instance = Metrics(logger=self.logger) + + # Clear the global instance before each test that might use it + global_metrics_instance.clear_metrics() + + def test_measure_decorator_counts_calls(self): + decorated_func = self.metrics_instance.measure(sample_function_for_metrics) + + self.assertEqual(self.metrics_instance.call_count["sample_function_for_metrics"], 0) + decorated_func() + self.assertEqual(self.metrics_instance.call_count["sample_function_for_metrics"], 1) + decorated_func() + self.assertEqual(self.metrics_instance.call_count["sample_function_for_metrics"], 2) + + @patch('cProfile.Profile') + @patch('pstats.Stats') + def test_measure_decorator_records_time(self, MockPStats, MockCProfile): + # Mock cProfile and pstats to control the time value + mock_profiler_instance = MockCProfile.return_value + mock_pstats_instance = MockPStats.return_value + + # Simulate that pstats.Stats.stats dictionary contains the function's stats + # Key: (filename, lineno, funcname) + # Value: (cc, nc, tt, ct, callers) where ct is cumulative_time (index 3) + + # Get code object of the function *before* decoration for correct key + original_func_code = sample_function_for_metrics.__code__ + func_key = (original_func_code.co_filename, original_func_code.co_firstlineno, original_func_code.co_name) + + # Configure mock_pstats_instance.stats to return our desired time + mock_pstats_instance.stats = {func_key: (1, 1, 0.05, 0.123, {})} # cc, nc, tt, ct=0.123 + + decorated_func = self.metrics_instance.measure(sample_function_for_metrics) + + self.assertEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0) + + # Call the decorated function + decorated_func(duration=0) # Duration arg doesn't matter due to mocking + + # Assertions + mock_profiler_instance.enable.assert_called_once() + mock_profiler_instance.disable.assert_called_once() + MockPStats.assert_called_once_with(mock_profiler_instance) + + self.assertEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0.123) + + # Call again to see accumulation + # Reset mock stats for a new time value if needed, or assume same time per call + mock_pstats_instance.stats = {func_key: (1, 1, 0.05, 0.100, {})} # New ct=0.100 + decorated_func(duration=0) + self.assertAlmostEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0.123 + 0.100) + + + @patch('cProfile.Profile') + @patch('pstats.Stats') + def test_measure_decorator_fallback_time_recording_by_name(self, MockPStats, MockCProfile): + mock_profiler_instance = MockCProfile.return_value + mock_pstats_instance = MockPStats.return_value + + original_func_code = sample_function_for_metrics.__code__ # func to be decorated + # Simulate the primary key lookup fails by creating a slightly different key for what we expect + # This is what the code will try to look up first. + expected_primary_key = (original_func_code.co_filename, original_func_code.co_firstlineno, original_func_code.co_name) + + # This is the key that will *actually* be in pstats.stats, simulating a mismatch for primary lookup + # but a match for the by-name fallback. + actual_stats_key_in_pstats = (original_func_code.co_filename, + original_func_code.co_firstlineno + 5, # simulate a lineno difference for primary key mismatch + original_func_code.co_name) # Name is the same for fallback + + mock_pstats_instance.stats = { + # expected_primary_key is NOT present + actual_stats_key_in_pstats: (1, 1, 0.03, 0.077, {}) # ct = 0.077 + } + + decorated_func = self.metrics_instance.measure(sample_function_for_metrics) + + # Expecting a debug log for fallback, but assertLogs needs the logger to have a handler that captures. + # self.logger is already set up with NullHandler. For this test, let's use a specific logger. + metrics_internal_logger = logging.getLogger('tools.metrics') # Logger used inside Metrics class + original_level = metrics_internal_logger.level + metrics_internal_logger.setLevel(logging.DEBUG) + + with self.assertLogs(metrics_internal_logger, level='DEBUG') as log_capture: + decorated_func(duration=0) + + metrics_internal_logger.setLevel(original_level) # Reset logger level + + self.assertTrue(any("Found stats for sample_function_for_metrics by name" in msg for msg in log_capture.output)) + self.assertEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0.077) + + + @patch('cProfile.Profile') + @patch('pstats.Stats') + def test_measure_decorator_handles_func_stats_not_found(self, MockPStats, MockCProfile): + mock_profiler_instance = MockCProfile.return_value + mock_pstats_instance = MockPStats.return_value + mock_pstats_instance.stats = {} # Empty stats, function will not be found + + decorated_func = self.metrics_instance.measure(sample_function_for_metrics) + + metrics_internal_logger = logging.getLogger('tools.metrics') + original_level = metrics_internal_logger.level + metrics_internal_logger.setLevel(logging.WARNING) + with self.assertLogs(metrics_internal_logger, level='WARNING') as log_capture: + decorated_func(duration=0) + metrics_internal_logger.setLevel(original_level) + + self.assertTrue(any("Could not find exact cProfile stats" in msg for msg in log_capture.output)) + self.assertEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0) + + + def test_get_metrics_empty(self): + self.assertEqual(self.metrics_instance.get_metrics(), {}) + + @patch('cProfile.Profile') + @patch('pstats.Stats') + def test_get_metrics_with_data(self, MockPStats, MockCProfile): + mock_pstats_instance = MockPStats.return_value + + # Decorate two different functions + decorated_func1 = self.metrics_instance.measure(sample_function_for_metrics) + decorated_func2 = self.metrics_instance.measure(another_sample_function) + + # Data for func1 + func1_code = sample_function_for_metrics.__code__ + func1_key = (func1_code.co_filename, func1_code.co_firstlineno, func1_code.co_name) + mock_pstats_instance.stats = {func1_key: (1,1,0.1,0.1,{})} + decorated_func1() + + # Data for func2 + func2_code = another_sample_function.__code__ + func2_key = (func2_code.co_filename, func2_code.co_firstlineno, func2_code.co_name) + mock_pstats_instance.stats = {func2_key: (1,1,0.2,0.2,{})} # Cumulative time 0.2 + decorated_func2(1,2) + mock_pstats_instance.stats = {func2_key: (1,1,0.3,0.3,{})} # Cumulative time 0.3 for second call + decorated_func2(3,4) + + metrics_data = self.metrics_instance.get_metrics() + + self.assertIn("sample_function_for_metrics", metrics_data) + self.assertEqual(metrics_data["sample_function_for_metrics"]["call_count"], 1) + self.assertEqual(metrics_data["sample_function_for_metrics"]["total_time"], 0.1) + self.assertEqual(metrics_data["sample_function_for_metrics"]["average_time"], 0.1) + + self.assertIn("another_sample_function", metrics_data) + self.assertEqual(metrics_data["another_sample_function"]["call_count"], 2) + self.assertAlmostEqual(metrics_data["another_sample_function"]["total_time"], 0.5) + self.assertAlmostEqual(metrics_data["another_sample_function"]["average_time"], 0.25) + + + def test_clear_metrics(self): + # Add some data + self.metrics_instance.call_count["test_func"] = 5 + self.metrics_instance.total_time["test_func"] = 1.234 + + self.metrics_instance.clear_metrics() + + self.assertEqual(self.metrics_instance.call_count, {}) + self.assertEqual(self.metrics_instance.total_time, {}) + self.assertEqual(self.metrics_instance.get_metrics(), {}) + + # Test the global instance + @patch('cProfile.Profile') + @patch('pstats.Stats') + def test_global_metrics_instance_usage(self, MockPStats, MockCProfile): + mock_pstats_instance = MockPStats.return_value + + # Decorate a function with the global instance + @global_metrics_instance.measure + def globally_decorated_func(): + return "global_output" + + # Setup mock stats for the globally decorated function + # Access __wrapped__ to get the original function if other decorators might be present or for consistency. + original_g_func = globally_decorated_func.__wrapped__ + func_code = original_g_func.__code__ + func_key = (func_code.co_filename, func_code.co_firstlineno, func_code.co_name) + mock_pstats_instance.stats = {func_key: (1,1,0.05,0.05,{})} + + globally_decorated_func() + + metrics_data = global_metrics_instance.get_metrics() + self.assertIn("globally_decorated_func", metrics_data) + self.assertEqual(metrics_data["globally_decorated_func"]["call_count"], 1) + self.assertEqual(metrics_data["globally_decorated_func"]["total_time"], 0.05) + +if __name__ == '__main__': + unittest.main()