import unittest from unittest.mock import patch, MagicMock, ANY import time import logging # Ensure tools.metrics is accessible from tools.metrics import Metrics # Import the class itself for direct testing from tools.metrics import metrics as global_metrics_instance # Import the global instance # A simple function to decorate for testing def sample_function_for_metrics(duration=0.01): # Simulate some work # Note: time.sleep is not always precisely profiled by cProfile in the same way as CPU-bound work. # For testing, we will mock the cProfile/pstats interaction rather than relying on actual sleep duration. if duration > 0: # Make it conditional so we can test zero-time case too pass # The actual work is not important when mocking cProfile results return "sample_output" def another_sample_function(x, y): return x + y class TestMetrics(unittest.TestCase): def setUp(self): # Create a fresh Metrics instance for most tests to avoid interference self.logger = logging.getLogger('tools.metrics.test') if not self.logger.handlers: # Avoid adding handler multiple times self.logger.addHandler(logging.NullHandler()) self.metrics_instance = Metrics(logger=self.logger) # Clear the global instance before each test that might use it global_metrics_instance.clear_metrics() def test_measure_decorator_counts_calls(self): decorated_func = self.metrics_instance.measure(sample_function_for_metrics) self.assertEqual(self.metrics_instance.call_count["sample_function_for_metrics"], 0) decorated_func() self.assertEqual(self.metrics_instance.call_count["sample_function_for_metrics"], 1) decorated_func() self.assertEqual(self.metrics_instance.call_count["sample_function_for_metrics"], 2) @patch('cProfile.Profile') @patch('pstats.Stats') def test_measure_decorator_records_time(self, MockPStats, MockCProfile): # Mock cProfile and pstats to control the time value mock_profiler_instance = MockCProfile.return_value mock_pstats_instance = MockPStats.return_value # Simulate that pstats.Stats.stats dictionary contains the function's stats # Key: (filename, lineno, funcname) # Value: (cc, nc, tt, ct, callers) where ct is cumulative_time (index 3) # Get code object of the function *before* decoration for correct key original_func_code = sample_function_for_metrics.__code__ func_key = (original_func_code.co_filename, original_func_code.co_firstlineno, original_func_code.co_name) # Configure mock_pstats_instance.stats to return our desired time mock_pstats_instance.stats = {func_key: (1, 1, 0.05, 0.123, {})} # cc, nc, tt, ct=0.123 decorated_func = self.metrics_instance.measure(sample_function_for_metrics) self.assertEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0) # Call the decorated function decorated_func(duration=0) # Duration arg doesn't matter due to mocking # Assertions mock_profiler_instance.enable.assert_called_once() mock_profiler_instance.disable.assert_called_once() MockPStats.assert_called_once_with(mock_profiler_instance) self.assertEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0.123) # Call again to see accumulation # Reset mock stats for a new time value if needed, or assume same time per call mock_pstats_instance.stats = {func_key: (1, 1, 0.05, 0.100, {})} # New ct=0.100 decorated_func(duration=0) self.assertAlmostEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0.123 + 0.100) @patch('cProfile.Profile') @patch('pstats.Stats') def test_measure_decorator_fallback_time_recording_by_name(self, MockPStats, MockCProfile): mock_profiler_instance = MockCProfile.return_value mock_pstats_instance = MockPStats.return_value original_func_code = sample_function_for_metrics.__code__ # func to be decorated # Simulate the primary key lookup fails by creating a slightly different key for what we expect # This is what the code will try to look up first. expected_primary_key = (original_func_code.co_filename, original_func_code.co_firstlineno, original_func_code.co_name) # This is the key that will *actually* be in pstats.stats, simulating a mismatch for primary lookup # but a match for the by-name fallback. actual_stats_key_in_pstats = (original_func_code.co_filename, original_func_code.co_firstlineno + 5, # simulate a lineno difference for primary key mismatch original_func_code.co_name) # Name is the same for fallback mock_pstats_instance.stats = { # expected_primary_key is NOT present actual_stats_key_in_pstats: (1, 1, 0.03, 0.077, {}) # ct = 0.077 } decorated_func = self.metrics_instance.measure(sample_function_for_metrics) # Expecting a debug log for fallback, but assertLogs needs the logger to have a handler that captures. # self.logger is already set up with NullHandler. For this test, let's use a specific logger. metrics_internal_logger = logging.getLogger('tools.metrics') # Logger used inside Metrics class original_level = metrics_internal_logger.level metrics_internal_logger.setLevel(logging.DEBUG) with self.assertLogs(metrics_internal_logger, level='DEBUG') as log_capture: decorated_func(duration=0) metrics_internal_logger.setLevel(original_level) # Reset logger level self.assertTrue(any("Found stats for sample_function_for_metrics by name" in msg for msg in log_capture.output)) self.assertEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0.077) @patch('cProfile.Profile') @patch('pstats.Stats') def test_measure_decorator_handles_func_stats_not_found(self, MockPStats, MockCProfile): mock_profiler_instance = MockCProfile.return_value mock_pstats_instance = MockPStats.return_value mock_pstats_instance.stats = {} # Empty stats, function will not be found decorated_func = self.metrics_instance.measure(sample_function_for_metrics) metrics_internal_logger = logging.getLogger('tools.metrics') original_level = metrics_internal_logger.level metrics_internal_logger.setLevel(logging.WARNING) with self.assertLogs(metrics_internal_logger, level='WARNING') as log_capture: decorated_func(duration=0) metrics_internal_logger.setLevel(original_level) self.assertTrue(any("Could not find exact cProfile stats" in msg for msg in log_capture.output)) self.assertEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0) def test_get_metrics_empty(self): self.assertEqual(self.metrics_instance.get_metrics(), {}) @patch('cProfile.Profile') @patch('pstats.Stats') def test_get_metrics_with_data(self, MockPStats, MockCProfile): mock_pstats_instance = MockPStats.return_value # Decorate two different functions decorated_func1 = self.metrics_instance.measure(sample_function_for_metrics) decorated_func2 = self.metrics_instance.measure(another_sample_function) # Data for func1 func1_code = sample_function_for_metrics.__code__ func1_key = (func1_code.co_filename, func1_code.co_firstlineno, func1_code.co_name) mock_pstats_instance.stats = {func1_key: (1,1,0.1,0.1,{})} decorated_func1() # Data for func2 func2_code = another_sample_function.__code__ func2_key = (func2_code.co_filename, func2_code.co_firstlineno, func2_code.co_name) mock_pstats_instance.stats = {func2_key: (1,1,0.2,0.2,{})} # Cumulative time 0.2 decorated_func2(1,2) mock_pstats_instance.stats = {func2_key: (1,1,0.3,0.3,{})} # Cumulative time 0.3 for second call decorated_func2(3,4) metrics_data = self.metrics_instance.get_metrics() self.assertIn("sample_function_for_metrics", metrics_data) self.assertEqual(metrics_data["sample_function_for_metrics"]["call_count"], 1) self.assertEqual(metrics_data["sample_function_for_metrics"]["total_time"], 0.1) self.assertEqual(metrics_data["sample_function_for_metrics"]["average_time"], 0.1) self.assertIn("another_sample_function", metrics_data) self.assertEqual(metrics_data["another_sample_function"]["call_count"], 2) self.assertAlmostEqual(metrics_data["another_sample_function"]["total_time"], 0.5) self.assertAlmostEqual(metrics_data["another_sample_function"]["average_time"], 0.25) def test_clear_metrics(self): # Add some data self.metrics_instance.call_count["test_func"] = 5 self.metrics_instance.total_time["test_func"] = 1.234 self.metrics_instance.clear_metrics() self.assertEqual(self.metrics_instance.call_count, {}) self.assertEqual(self.metrics_instance.total_time, {}) self.assertEqual(self.metrics_instance.get_metrics(), {}) # Test the global instance @patch('cProfile.Profile') @patch('pstats.Stats') def test_global_metrics_instance_usage(self, MockPStats, MockCProfile): mock_pstats_instance = MockPStats.return_value # Decorate a function with the global instance @global_metrics_instance.measure def globally_decorated_func(): return "global_output" # Setup mock stats for the globally decorated function # Access __wrapped__ to get the original function if other decorators might be present or for consistency. original_g_func = globally_decorated_func.__wrapped__ func_code = original_g_func.__code__ func_key = (func_code.co_filename, func_code.co_firstlineno, func_code.co_name) mock_pstats_instance.stats = {func_key: (1,1,0.05,0.05,{})} globally_decorated_func() metrics_data = global_metrics_instance.get_metrics() self.assertIn("globally_decorated_func", metrics_data) self.assertEqual(metrics_data["globally_decorated_func"]["call_count"], 1) self.assertEqual(metrics_data["globally_decorated_func"]["total_time"], 0.05) if __name__ == '__main__': unittest.main()