""" Тесты для экстремальных случаев использования breakshaft: - Глубокие цепочки преобразований - Комбинаторный взрыв (множество путей) - Циклические зависимости - Сложные Union-типы - Множественные контекст-менеджеры - Асинхронные конвейеры - Краевые случаи с кортежами """ from contextlib import contextmanager, asynccontextmanager from dataclasses import dataclass from typing import Any, Generator, AsyncGenerator import pytest from breakshaft.convertor import ConvRepo from breakshaft.graph_walker import GraphWalker from breakshaft.models import ConversionPoint, Callgraph pytest_plugins = ('pytest_asyncio',) # ============================================================================= # Базовые типы для тестов # ============================================================================= @dataclass class A: a: int @dataclass class B: b: float @dataclass class C: c: str @dataclass class D: d: bool @dataclass class E: e: complex @dataclass class F: f: bytes @dataclass class G: g: bytearray @dataclass class H: h: memoryview # ============================================================================= # Тесты глубоких цепочек преобразований # ============================================================================= def test_deep_conversion_chain_10_levels(): """Цепочка из 10 преобразований: A -> B -> C -> D -> E -> F -> G -> H -> A -> B -> C""" repo = ConvRepo() @repo.mark_injector() def a_to_b(a: A) -> B: return B(float(a.a)) @repo.mark_injector() def b_to_c(b: B) -> C: return C(str(b.b)) @repo.mark_injector() def c_to_d(c: C) -> D: return D(len(c.c) > 0) @repo.mark_injector() def d_to_e(d: D) -> E: return E(complex(1 if d.d else 0, 0)) @repo.mark_injector() def e_to_f(e: E) -> F: return F(bytes(int(e.e.real))) @repo.mark_injector() def f_to_g(f: F) -> G: return G(bytearray(f.f)) @repo.mark_injector() def g_to_h(g: G) -> H: return H(memoryview(g.g)) @repo.mark_injector() def h_to_a(h: H) -> A: return A(int(h.tobytes()[0]) if len(h.tobytes()) > 0 else 0) @repo.mark_injector() def a_to_c(a: A) -> C: return C(f"val_{a.a}") def consumer(dep: C) -> str: return dep.c # Прямое преобразование A -> C fn = repo.get_conversion((A,), consumer, force_commutative=False, allow_async=False) result = fn(A(42)) # Алгоритм выбирает кратчайший путь, поэтому A->B->C (результат "42.0") # а не A->C (результат "val_42") assert result == "42.0" # Цепочка A -> B -> C fn2 = repo.get_conversion((A,), consumer, force_commutative=False, allow_async=False) result2 = fn2(A(100)) assert result2 == "100.0" def test_deep_conversion_chain_20_levels(): """Цепочка с множеством промежуточных преобразований""" repo = ConvRepo() # Создаём 20 типов для цепочки @dataclass class T1: v: int @dataclass class T2: v: int @dataclass class T3: v: int @dataclass class T4: v: int @dataclass class T5: v: int @dataclass class T6: v: int @dataclass class T7: v: int @dataclass class T8: v: int @dataclass class T9: v: int @dataclass class T10: v: int @repo.mark_injector() def t1_to_t2(x: T1) -> T2: return T2(x.v + 1) @repo.mark_injector() def t2_to_t3(x: T2) -> T3: return T3(x.v + 1) @repo.mark_injector() def t3_to_t4(x: T3) -> T4: return T4(x.v + 1) @repo.mark_injector() def t4_to_t5(x: T4) -> T5: return T5(x.v + 1) @repo.mark_injector() def t5_to_t6(x: T5) -> T6: return T6(x.v + 1) @repo.mark_injector() def t6_to_t7(x: T6) -> T7: return T7(x.v + 1) @repo.mark_injector() def t7_to_t8(x: T7) -> T8: return T8(x.v + 1) @repo.mark_injector() def t8_to_t9(x: T8) -> T9: return T9(x.v + 1) @repo.mark_injector() def t9_to_t10(x: T9) -> T10: return T10(x.v + 1) def consumer(dep: T10) -> int: return dep.v fn = repo.get_conversion((T1,), consumer, force_commutative=True, allow_async=False) result = fn(T1(0)) assert result == 9 # 0 + 9 преобразований # ============================================================================= # Тесты комбинаторного взрыва (множество путей) # ============================================================================= def test_combinatorial_explosion_many_paths(): """Множество путей преобразования: каждый тип можно получить несколькими способами""" repo = ConvRepo() # A можно получить из int или B # B можно получить из int или A # C можно получить из A или B @repo.mark_injector() def int_to_a(i: int) -> A: return A(i) @repo.mark_injector() def int_to_b(i: int) -> B: return B(float(i)) @repo.mark_injector() def a_to_b(a: A) -> B: return B(float(a.a)) @repo.mark_injector() def b_to_a(b: B) -> A: return A(int(b.b)) @repo.mark_injector() def a_to_c(a: A) -> C: return C(f"a_{a.a}") @repo.mark_injector() def b_to_c(b: B) -> C: return C(f"b_{int(b.b)}") def consumer(dep: C) -> str: return dep.c # Есть несколько путей: int->A->C, int->B->C, int->A->B->C, int->B->A->C # force_commutative=False позволяет выбрать любой путь fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False) result = fn(42) assert result in ("a_42", "b_42") def test_non_commutative_graph_raises(): """Некоммутативный граф должен вызывать ошибку при force_commutative=True""" from breakshaft.exceptions import AmbiguousPath repo = ConvRepo() @repo.mark_injector() def int_to_a(i: int) -> A: return A(i * 10) @repo.mark_injector() def int_to_b(i: int) -> B: return B(float(i)) @repo.mark_injector() def a_to_c(a: A) -> C: return C(f"a_{a.a}") @repo.mark_injector() def b_to_c(b: B) -> C: return C(f"b_{int(b.b)}") def consumer(dep: C) -> str: return dep.c # Два разных пути дают разный результат -> некоммутативно with pytest.raises(AmbiguousPath) as exc_info: repo.get_conversion((int,), consumer, force_commutative=True, allow_async=False) assert exc_info.value.code == "GRAPH_002" # ============================================================================= # Тесты циклических зависимостей # ============================================================================= def test_cyclic_dependencies_a_b_a(): """Циклическая зависимость A -> B -> A""" repo = ConvRepo() @repo.mark_injector() def a_to_b(a: A) -> B: return B(float(a.a) * 2) @repo.mark_injector() def b_to_a(b: B) -> A: return A(int(b.b) + 1) def consumer(dep: B) -> float: return dep.b # A -> B (прямое) fn = repo.get_conversion((A,), consumer, force_commutative=False, allow_async=False) result = fn(A(5)) assert result == 10.0 def test_cyclic_dependencies_no_infinite_loop(): """Убедиться что циклические зависимости не вызывают бесконечную рекурсию""" repo = ConvRepo() @repo.mark_injector() def a_to_b(a: A) -> B: return B(float(a.a)) @repo.mark_injector() def b_to_c(b: B) -> C: return C(str(b.b)) @repo.mark_injector() def c_to_a(c: C) -> A: return A(int(c.c) if c.c.isdigit() else 0) @repo.mark_injector() def a_to_c(a: A) -> C: return C(f"direct_{a.a}") def consumer(dep: C) -> str: return dep.c # Граф имеет цикл A->B->C->A, но алгоритм должен его корректно обработать # Алгоритм выбирает кратчайший путь, поэтому A->B->C (результат "42.0") fn = repo.get_conversion((A,), consumer, force_commutative=False, allow_async=False) result = fn(A(42)) assert result == "42.0" # ============================================================================= # Тесты сложных Union-типов # ============================================================================= def test_complex_union_types(): """Union-типы с множеством вариантов""" repo = ConvRepo() @repo.mark_injector() def int_to_a(i: int) -> A: return A(i) @repo.mark_injector() def int_to_b(i: int) -> B: return B(float(i)) @repo.mark_injector() def a_to_c(a: A) -> C: return C(f"a_{a.a}") @repo.mark_injector() def b_to_c(b: B) -> C: return C(f"b_{int(b.b)}") def consumer(dep: A | B) -> str: if isinstance(dep, A): return f"A:{dep.a}" return f"B:{dep.b}" fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False) result = fn(42) assert result in ("A:42", "B:42.0") def test_nested_union_types(): """Вложенные Union-типы""" repo = ConvRepo() @repo.mark_injector() def int_to_a(i: int) -> A: return A(i) @repo.mark_injector() def a_to_b(a: A) -> B: return B(float(a.a)) def consumer(dep: A | B | C) -> str: if isinstance(dep, A): return f"A:{dep.a}" elif isinstance(dep, B): return f"B:{dep.b}" return f"C:{dep.c}" fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False) result = fn(42) assert result in ("A:42", "B:42.0") # ============================================================================= # Тесты множественных контекст-менеджеров # ============================================================================= def test_multiple_sync_context_managers(): """Несколько синхронных контекст-менеджеров в цепочке""" repo = ConvRepo() finalized = {"int_to_a": False, "a_to_b": False} @repo.mark_injector() @contextmanager def int_to_a(i: int) -> Generator[A, Any, None]: try: yield A(i) finally: finalized["int_to_a"] = True @repo.mark_injector() @contextmanager def a_to_b(a: A) -> Generator[B, Any, None]: try: yield B(float(a.a) * 2) finally: finalized["a_to_b"] = True def consumer(dep: B) -> float: return dep.b fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False) result = fn(21) assert result == 42.0 assert finalized["int_to_a"] assert finalized["a_to_b"] @pytest.mark.asyncio async def test_multiple_async_context_managers(): """Несколько асинхронных контекст-менеджеров в цепочке""" repo = ConvRepo() finalized = {"int_to_a": False, "a_to_b": False} @repo.mark_injector() @asynccontextmanager async def int_to_a(i: int) -> AsyncGenerator[A, Any]: try: yield A(i) finally: finalized["int_to_a"] = True @repo.mark_injector() @asynccontextmanager async def a_to_b(a: A) -> AsyncGenerator[B, Any]: try: yield B(float(a.a) * 2) finally: finalized["a_to_b"] = True def consumer(dep: B) -> float: return dep.b fn = repo.get_conversion((int,), consumer, force_commutative=False, force_async=True, allow_async=True) result = await fn(21) assert result == 42.0 assert finalized["int_to_a"] assert finalized["a_to_b"] @pytest.mark.asyncio async def test_mixed_sync_async_context_managers(): """Смешанные синхронные и асинхронные контекст-менеджеры""" repo = ConvRepo() finalized = {"int_to_a": False, "a_to_b": False} @repo.mark_injector() @contextmanager def int_to_a(i: int) -> Generator[A, Any, None]: try: yield A(i) finally: finalized["int_to_a"] = True @repo.mark_injector() @asynccontextmanager async def a_to_b(a: A) -> AsyncGenerator[B, Any]: try: yield B(float(a.a) * 2) finally: finalized["a_to_b"] = True def consumer(dep: B) -> float: return dep.b # Должен использовать async, т.к. есть асинхронный контекст-менеджер fn = repo.get_conversion((int,), consumer, force_commutative=False, force_async=True, allow_async=True) result = await fn(21) assert result == 42.0 assert finalized["int_to_a"] assert finalized["a_to_b"] # ============================================================================= # Тесты асинхронных конвейеров # ============================================================================= @pytest.mark.asyncio async def test_async_pipeline(): """Асинхронный конвейер из нескольких функций""" repo = ConvRepo() @repo.mark_injector() async def int_to_a(i: int) -> A: return A(i * 2) @repo.mark_injector() def a_to_b(a: A) -> B: return B(float(a.a)) @repo.mark_injector() async def b_to_c(b: B) -> C: return C(str(b.b)) def consumer1(dep: C) -> str: return f"_{dep.c}_" def consumer2(dep: str) -> int: return len(dep) pipeline = repo.create_pipeline( (int,), [consumer1, consumer2], force_commutative=False, allow_async=True, force_async=True ) result = await pipeline(5) assert result == 6 # len("_10.0_") == 6 # ============================================================================= # Тесты краевых случаев с кортежами # ============================================================================= def test_deeply_nested_tuple_unwrap(): """Глубоко вложенные кортежи""" repo = ConvRepo(store_sources=True) @repo.mark_injector() def int_to_nested(i: int) -> tuple[A, tuple[B, tuple[C, D]]]: return A(i), (B(float(i)), (C(str(i)), D(i > 0))) def consumer(dep: D) -> bool: return dep.d fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False) result = fn(42) assert result is True def test_empty_tuple_handling(): """Пустые кортежи и кортежи с одним элементом""" repo = ConvRepo() @repo.mark_injector() def int_to_single(i: int) -> tuple[A]: return (A(i),) @repo.mark_injector() def single_to_b(a_tuple: tuple[A]) -> B: return B(float(a_tuple[0].a)) def consumer(dep: B) -> float: return dep.b # Это должно работать с кортежем из одного элемента fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False) result = fn(42) assert result == 42.0 # ============================================================================= # Тесты параметров по умолчанию в сложных случаях # ============================================================================= def test_default_args_with_multiple_injectors(): """ Параметры по умолчанию с множеством инжекторов. После фикса: optional-аргумент не маппится на входной тип автоматически. Optional-аргумент получает значение из: 1. Предыдущего преобразования (если тип есть в provided_types) 2. from_types (если у функции есть optional-аргумент с этим типом) 3. Дефолтного значения (иначе) inject_mult() не вызывается, т.к. optional-аргументы с дефолтными значениями не триггерят поиск инжекторов в графе преобразований. Это архитектурное ограничение. """ repo = ConvRepo() @repo.mark_injector() def int_to_a(i: int, mult: int = 1) -> A: return A(i * mult) @repo.mark_injector() def inject_mult() -> int: return 10 @repo.mark_injector() def a_to_b(a: A) -> B: return B(float(a.a)) def consumer(dep: B) -> float: return dep.b # int_to_a имеет optional-аргумент mult: int, и int есть в from_types # Поэтому mult маппится на входной int (5) # inject_mult() не вызывается (архитектурное ограничение) fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False) result = fn(5) # mult=5 (входное значение), поэтому 5 * 5 = 25 assert result == 25.0 # ============================================================================= # Тесты fork репозитория # ============================================================================= def test_repo_fork_with_additional_injectors(): """Fork репозитория с дополнительными инжекторами""" base_repo = ConvRepo() @base_repo.mark_injector() def int_to_a(i: int) -> A: return A(i) forked_repo = base_repo.fork() @forked_repo.mark_injector() def a_to_b(a: A) -> B: return B(float(a.a)) def consumer(dep: B) -> float: return dep.b # Fork должен видеть инжекторы из базового репозитория fn = forked_repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False) result = fn(42) assert result == 42.0 # ============================================================================= # Тесты store_callseq и store_sources # ============================================================================= def test_store_callseq(): """Сохранение последовательности вызовов""" repo = ConvRepo(store_callseq=True) @repo.mark_injector() def int_to_a(i: int) -> A: return A(i) @repo.mark_injector() def a_to_b(a: A) -> B: return B(float(a.a)) def consumer(dep: B) -> float: return dep.b fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False) result = fn(42) callseq = getattr(fn, '__breakshaft_callseq__', None) assert callseq is not None assert len(callseq) >= 1 def test_store_sources(): """Сохранение исходного кода сгенерированной функции""" repo = ConvRepo(store_sources=True) @repo.mark_injector() def int_to_a(i: int) -> A: return A(i) def consumer(dep: A) -> int: return dep.a fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False) result = fn(42) source = getattr(fn, '__breakshaft_render_src__', None) assert source is not None assert 'convertor' in source assert 'int' in source # ============================================================================= # Тесты производительности (не должны быть слишком медленными) # ============================================================================= def test_performance_many_injectors(): """Много инжекторов - проверка что не слишком медленно""" repo = ConvRepo() # Создаём 20 инжекторов (50 вызывает комбинаторный взрыв) for i in range(20): def make_injector(n): def injector(a: A) -> A: return A(a.a + n) return injector repo.add_injector(make_injector(i)) def consumer(dep: A) -> int: return dep.a # Это должно завершиться за разумное время import time start = time.time() fn = repo.get_conversion((A,), consumer, force_commutative=False, allow_async=False) elapsed = time.time() - start # Не больше 10 секунд на генерацию (комбинаторная сложность) assert elapsed < 10.0 # ============================================================================= # Тесты ошибок и исключительных ситуаций # ============================================================================= def test_no_path_raises_error(): """Отсутствие пути преобразования должно вызывать ошибку""" from breakshaft.exceptions import NoConversionPath repo = ConvRepo() @repo.mark_injector() def int_to_a(i: int) -> A: return A(i) def consumer(dep: B) -> float: # B нельзя получить из int return dep.b with pytest.raises(NoConversionPath) as exc_info: repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False) assert exc_info.value.code == "GRAPH_001" def test_empty_from_types(): """Пустой список типов для преобразования""" from breakshaft.exceptions import NoConversionPath repo = ConvRepo() def consumer() -> str: return "hello" # Пустой consumer без зависимостей должен работать fn = repo.get_conversion((), consumer, force_commutative=False, allow_async=False) result = fn() assert result == "hello" # ============================================================================= # Тесты GraphWalker напрямую # ============================================================================= def test_graph_walker_direct_usage(): """Прямое использование GraphWalker""" repo = ConvRepo() @repo.mark_injector() def int_to_a(i: int) -> A: return A(i) @repo.mark_injector() def a_to_b(a: A) -> B: return B(float(a.a)) walker = GraphWalker() def consumer(dep: B) -> float: return dep.b cg = walker.generate_callgraph( repo.convertor_set, frozenset({int}), consumer ) assert cg is not None assert len(cg.variants) > 0 def test_explode_callgraph_with_empty_subgraphs(): """Взрыв графа с пустыми подграфами""" repo = ConvRepo() @repo.mark_injector() def int_to_a(i: int) -> A: return A(i) walker = GraphWalker() def consumer(dep: A) -> int: return dep.a cg = walker.generate_callgraph( repo.convertor_set, frozenset({int}), consumer ) exploded = walker.explode_callgraph_branches(cg, frozenset({int})) assert len(exploded) > 0 # ============================================================================= # Тесты для проверки deduplicate_callseq # ============================================================================= def test_deduplicate_callseq_with_duplicates(): """Проверка дедупликации последовательности вызовов""" repo = ConvRepo(store_sources=True) call_count = [0] @repo.mark_injector() def int_to_a(i: int) -> A: call_count[0] += 1 return A(i) @repo.mark_injector() def a_to_b(a: A) -> B: return B(float(a.a)) def consumer(dep: B) -> float: return dep.b fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False) result = fn(42) # Инжектор должен быть вызван один раз assert call_count[0] == 1 assert result == 42.0