218 lines
10 KiB
Python
218 lines
10 KiB
Python
import unittest
|
|
from unittest.mock import patch, MagicMock, ANY
|
|
import time
|
|
import logging
|
|
|
|
# Ensure tools.metrics is accessible
|
|
from tools.metrics import Metrics # Import the class itself for direct testing
|
|
from tools.metrics import metrics as global_metrics_instance # Import the global instance
|
|
|
|
# A simple function to decorate for testing
|
|
def sample_function_for_metrics(duration=0.01):
|
|
# Simulate some work
|
|
# Note: time.sleep is not always precisely profiled by cProfile in the same way as CPU-bound work.
|
|
# For testing, we will mock the cProfile/pstats interaction rather than relying on actual sleep duration.
|
|
if duration > 0: # Make it conditional so we can test zero-time case too
|
|
pass # The actual work is not important when mocking cProfile results
|
|
return "sample_output"
|
|
|
|
def another_sample_function(x, y):
|
|
return x + y
|
|
|
|
class TestMetrics(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
# Create a fresh Metrics instance for most tests to avoid interference
|
|
self.logger = logging.getLogger('tools.metrics.test')
|
|
if not self.logger.handlers: # Avoid adding handler multiple times
|
|
self.logger.addHandler(logging.NullHandler())
|
|
self.metrics_instance = Metrics(logger=self.logger)
|
|
|
|
# Clear the global instance before each test that might use it
|
|
global_metrics_instance.clear_metrics()
|
|
|
|
def test_measure_decorator_counts_calls(self):
|
|
decorated_func = self.metrics_instance.measure(sample_function_for_metrics)
|
|
|
|
self.assertEqual(self.metrics_instance.call_count["sample_function_for_metrics"], 0)
|
|
decorated_func()
|
|
self.assertEqual(self.metrics_instance.call_count["sample_function_for_metrics"], 1)
|
|
decorated_func()
|
|
self.assertEqual(self.metrics_instance.call_count["sample_function_for_metrics"], 2)
|
|
|
|
@patch('cProfile.Profile')
|
|
@patch('pstats.Stats')
|
|
def test_measure_decorator_records_time(self, MockPStats, MockCProfile):
|
|
# Mock cProfile and pstats to control the time value
|
|
mock_profiler_instance = MockCProfile.return_value
|
|
mock_pstats_instance = MockPStats.return_value
|
|
|
|
# Simulate that pstats.Stats.stats dictionary contains the function's stats
|
|
# Key: (filename, lineno, funcname)
|
|
# Value: (cc, nc, tt, ct, callers) where ct is cumulative_time (index 3)
|
|
|
|
# Get code object of the function *before* decoration for correct key
|
|
original_func_code = sample_function_for_metrics.__code__
|
|
func_key = (original_func_code.co_filename, original_func_code.co_firstlineno, original_func_code.co_name)
|
|
|
|
# Configure mock_pstats_instance.stats to return our desired time
|
|
mock_pstats_instance.stats = {func_key: (1, 1, 0.05, 0.123, {})} # cc, nc, tt, ct=0.123
|
|
|
|
decorated_func = self.metrics_instance.measure(sample_function_for_metrics)
|
|
|
|
self.assertEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0)
|
|
|
|
# Call the decorated function
|
|
decorated_func(duration=0) # Duration arg doesn't matter due to mocking
|
|
|
|
# Assertions
|
|
mock_profiler_instance.enable.assert_called_once()
|
|
mock_profiler_instance.disable.assert_called_once()
|
|
MockPStats.assert_called_once_with(mock_profiler_instance)
|
|
|
|
self.assertEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0.123)
|
|
|
|
# Call again to see accumulation
|
|
# Reset mock stats for a new time value if needed, or assume same time per call
|
|
mock_pstats_instance.stats = {func_key: (1, 1, 0.05, 0.100, {})} # New ct=0.100
|
|
decorated_func(duration=0)
|
|
self.assertAlmostEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0.123 + 0.100)
|
|
|
|
|
|
@patch('cProfile.Profile')
|
|
@patch('pstats.Stats')
|
|
def test_measure_decorator_fallback_time_recording_by_name(self, MockPStats, MockCProfile):
|
|
mock_profiler_instance = MockCProfile.return_value
|
|
mock_pstats_instance = MockPStats.return_value
|
|
|
|
original_func_code = sample_function_for_metrics.__code__ # func to be decorated
|
|
# Simulate the primary key lookup fails by creating a slightly different key for what we expect
|
|
# This is what the code will try to look up first.
|
|
expected_primary_key = (original_func_code.co_filename, original_func_code.co_firstlineno, original_func_code.co_name)
|
|
|
|
# This is the key that will *actually* be in pstats.stats, simulating a mismatch for primary lookup
|
|
# but a match for the by-name fallback.
|
|
actual_stats_key_in_pstats = (original_func_code.co_filename,
|
|
original_func_code.co_firstlineno + 5, # simulate a lineno difference for primary key mismatch
|
|
original_func_code.co_name) # Name is the same for fallback
|
|
|
|
mock_pstats_instance.stats = {
|
|
# expected_primary_key is NOT present
|
|
actual_stats_key_in_pstats: (1, 1, 0.03, 0.077, {}) # ct = 0.077
|
|
}
|
|
|
|
decorated_func = self.metrics_instance.measure(sample_function_for_metrics)
|
|
|
|
# Expecting a debug log for fallback, but assertLogs needs the logger to have a handler that captures.
|
|
# self.logger is already set up with NullHandler. For this test, let's use a specific logger.
|
|
metrics_internal_logger = logging.getLogger('tools.metrics') # Logger used inside Metrics class
|
|
original_level = metrics_internal_logger.level
|
|
metrics_internal_logger.setLevel(logging.DEBUG)
|
|
|
|
with self.assertLogs(metrics_internal_logger, level='DEBUG') as log_capture:
|
|
decorated_func(duration=0)
|
|
|
|
metrics_internal_logger.setLevel(original_level) # Reset logger level
|
|
|
|
self.assertTrue(any("Found stats for sample_function_for_metrics by name" in msg for msg in log_capture.output))
|
|
self.assertEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0.077)
|
|
|
|
|
|
@patch('cProfile.Profile')
|
|
@patch('pstats.Stats')
|
|
def test_measure_decorator_handles_func_stats_not_found(self, MockPStats, MockCProfile):
|
|
mock_profiler_instance = MockCProfile.return_value
|
|
mock_pstats_instance = MockPStats.return_value
|
|
mock_pstats_instance.stats = {} # Empty stats, function will not be found
|
|
|
|
decorated_func = self.metrics_instance.measure(sample_function_for_metrics)
|
|
|
|
metrics_internal_logger = logging.getLogger('tools.metrics')
|
|
original_level = metrics_internal_logger.level
|
|
metrics_internal_logger.setLevel(logging.WARNING)
|
|
with self.assertLogs(metrics_internal_logger, level='WARNING') as log_capture:
|
|
decorated_func(duration=0)
|
|
metrics_internal_logger.setLevel(original_level)
|
|
|
|
self.assertTrue(any("Could not find exact cProfile stats" in msg for msg in log_capture.output))
|
|
self.assertEqual(self.metrics_instance.total_time["sample_function_for_metrics"], 0)
|
|
|
|
|
|
def test_get_metrics_empty(self):
|
|
self.assertEqual(self.metrics_instance.get_metrics(), {})
|
|
|
|
@patch('cProfile.Profile')
|
|
@patch('pstats.Stats')
|
|
def test_get_metrics_with_data(self, MockPStats, MockCProfile):
|
|
mock_pstats_instance = MockPStats.return_value
|
|
|
|
# Decorate two different functions
|
|
decorated_func1 = self.metrics_instance.measure(sample_function_for_metrics)
|
|
decorated_func2 = self.metrics_instance.measure(another_sample_function)
|
|
|
|
# Data for func1
|
|
func1_code = sample_function_for_metrics.__code__
|
|
func1_key = (func1_code.co_filename, func1_code.co_firstlineno, func1_code.co_name)
|
|
mock_pstats_instance.stats = {func1_key: (1,1,0.1,0.1,{})}
|
|
decorated_func1()
|
|
|
|
# Data for func2
|
|
func2_code = another_sample_function.__code__
|
|
func2_key = (func2_code.co_filename, func2_code.co_firstlineno, func2_code.co_name)
|
|
mock_pstats_instance.stats = {func2_key: (1,1,0.2,0.2,{})} # Cumulative time 0.2
|
|
decorated_func2(1,2)
|
|
mock_pstats_instance.stats = {func2_key: (1,1,0.3,0.3,{})} # Cumulative time 0.3 for second call
|
|
decorated_func2(3,4)
|
|
|
|
metrics_data = self.metrics_instance.get_metrics()
|
|
|
|
self.assertIn("sample_function_for_metrics", metrics_data)
|
|
self.assertEqual(metrics_data["sample_function_for_metrics"]["call_count"], 1)
|
|
self.assertEqual(metrics_data["sample_function_for_metrics"]["total_time"], 0.1)
|
|
self.assertEqual(metrics_data["sample_function_for_metrics"]["average_time"], 0.1)
|
|
|
|
self.assertIn("another_sample_function", metrics_data)
|
|
self.assertEqual(metrics_data["another_sample_function"]["call_count"], 2)
|
|
self.assertAlmostEqual(metrics_data["another_sample_function"]["total_time"], 0.5)
|
|
self.assertAlmostEqual(metrics_data["another_sample_function"]["average_time"], 0.25)
|
|
|
|
|
|
def test_clear_metrics(self):
|
|
# Add some data
|
|
self.metrics_instance.call_count["test_func"] = 5
|
|
self.metrics_instance.total_time["test_func"] = 1.234
|
|
|
|
self.metrics_instance.clear_metrics()
|
|
|
|
self.assertEqual(self.metrics_instance.call_count, {})
|
|
self.assertEqual(self.metrics_instance.total_time, {})
|
|
self.assertEqual(self.metrics_instance.get_metrics(), {})
|
|
|
|
# Test the global instance
|
|
@patch('cProfile.Profile')
|
|
@patch('pstats.Stats')
|
|
def test_global_metrics_instance_usage(self, MockPStats, MockCProfile):
|
|
mock_pstats_instance = MockPStats.return_value
|
|
|
|
# Decorate a function with the global instance
|
|
@global_metrics_instance.measure
|
|
def globally_decorated_func():
|
|
return "global_output"
|
|
|
|
# Setup mock stats for the globally decorated function
|
|
# Access __wrapped__ to get the original function if other decorators might be present or for consistency.
|
|
original_g_func = globally_decorated_func.__wrapped__
|
|
func_code = original_g_func.__code__
|
|
func_key = (func_code.co_filename, func_code.co_firstlineno, func_code.co_name)
|
|
mock_pstats_instance.stats = {func_key: (1,1,0.05,0.05,{})}
|
|
|
|
globally_decorated_func()
|
|
|
|
metrics_data = global_metrics_instance.get_metrics()
|
|
self.assertIn("globally_decorated_func", metrics_data)
|
|
self.assertEqual(metrics_data["globally_decorated_func"]["call_count"], 1)
|
|
self.assertEqual(metrics_data["globally_decorated_func"]["total_time"], 0.05)
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|