""" Тесты мемоизации (кэширования) для breakshaft. """ from dataclasses import dataclass import pytest from breakshaft import ConvRepo from breakshaft.graph_walker import GraphWalker @dataclass class TypeN: n: int class TestMemoization: """Тесты кэширования explode_callgraph_branches.""" def test_cache_hit(self): """Кэш должен возвращать тот же результат для одинаковых графов.""" repo = ConvRepo() @repo.mark_injector() def int_to_a(i: int) -> TypeN: return TypeN(i) @repo.mark_injector() def a_to_b(a: TypeN) -> TypeN: return TypeN(a.n + 1) walker = GraphWalker() def consumer(dep: TypeN) -> int: return dep.n # Первый вызов (кэш пуст) cg = walker.generate_callgraph(repo.convertor_set, frozenset({int}), consumer) result1 = walker.explode_callgraph_branches(cg, frozenset({int})) # Второй вызов (должен быть из кэша) result2 = walker.explode_callgraph_branches(cg, frozenset({int})) # Результаты должны быть одинаковыми assert len(result1) == len(result2) # Кэш должен содержать запись assert len(walker._explode_cache) > 0 def test_cache_invalidated_on_add_injector(self): """Кэш должен очищаться при добавлении инжектора.""" repo = ConvRepo() @repo.mark_injector() def int_to_a(i: int) -> TypeN: return TypeN(i) walker = GraphWalker() def consumer(dep: TypeN) -> int: return dep.n # Первый вызов cg = walker.generate_callgraph(repo.convertor_set, frozenset({int}), consumer) walker.explode_callgraph_branches(cg, frozenset({int})) cache_size_after_first = len(walker._explode_cache) # Добавляем инжектор @repo.mark_injector() def a_to_b(a: TypeN) -> TypeN: return TypeN(a.n + 1) # Кэш должен очиститься assert len(walker._explode_cache) == 0 def test_cache_different_from_types(self): """Кэш должен различать разные from_types.""" repo = ConvRepo() @repo.mark_injector() def int_to_a(i: int) -> TypeN: return TypeN(i) @repo.mark_injector() def float_to_a(f: float) -> TypeN: return TypeN(int(f)) walker = GraphWalker() def consumer(dep: TypeN) -> int: return dep.n # Очищаем кэш перед тестом walker.clear_cache() # Вызов с int cg1 = walker.generate_callgraph(repo.convertor_set, frozenset({int}), consumer) result1 = walker.explode_callgraph_branches(cg1, frozenset({int})) cache_after_int = len(walker._explode_cache) # Вызов с float cg2 = walker.generate_callgraph(repo.convertor_set, frozenset({float}), consumer) result2 = walker.explode_callgraph_branches(cg2, frozenset({float})) cache_after_float = len(walker._explode_cache) # Кэш должен вырасти (как минимум 2 разные записи) assert cache_after_float > cache_after_int def test_cache_clear_method(self): """Метод clear_cache() должен очищать кэш.""" repo = ConvRepo() @repo.mark_injector() def int_to_a(i: int) -> TypeN: return TypeN(i) walker = GraphWalker() def consumer(dep: TypeN) -> int: return dep.n # Заполняем кэш cg = walker.generate_callgraph(repo.convertor_set, frozenset({int}), consumer) walker.explode_callgraph_branches(cg, frozenset({int})) assert len(walker._explode_cache) > 0 # Очищаем walker.clear_cache() assert len(walker._explode_cache) == 0 class TestMemoizationPerformance: """Бенчмарки кэширования.""" def test_repeated_explode_faster(self): """Повторный explode должен быть быстрее благодаря кэшу.""" import time repo = ConvRepo() @repo.mark_injector() def int_to_a(i: int) -> TypeN: return TypeN(i) @repo.mark_injector() def a_to_b(a: TypeN) -> TypeN: return TypeN(a.n + 1) walker = GraphWalker() def consumer(dep: TypeN) -> int: return dep.n cg = walker.generate_callgraph(repo.convertor_set, frozenset({int}), consumer) # Первый вызов start1 = time.perf_counter() walker.explode_callgraph_branches(cg, frozenset({int})) elapsed1 = time.perf_counter() - start1 # Второй вызов (из кэша) start2 = time.perf_counter() walker.explode_callgraph_branches(cg, frozenset({int})) elapsed2 = time.perf_counter() - start2 # Второй должен быть значительно быстрее print(f"\nexplode: {elapsed1*1000:.3f}ms -> {elapsed2*1000:.3f}ms (cache)") assert elapsed2 < elapsed1 * 0.5 # Хотя бы 2x быстрее