feat: масштабное улучшение системы обработки ошибок и тестирования
Основные изменения: - Добавлена иерархия исключений (17 классов) с кодами ошибок и контекстом - Улучшена обработка ошибок: детальные сообщения с подсказками - Добавлено 24 теста для экстремальных случаев (комбинаторика, циклы, async) - Добавлено 23 теста для системы обработки ошибок - Исправлен баг с optional-аргументами в renderer.py - Обновлены импорты в тестах (src.breakshaft → breakshaft) Документация: - ERROR_DESIGN.md — проектирование системы ошибок - COMMUTATIVITY_DESIGN.md — анализ проблемы некоммутативности (10 вариантов решений) Файлы: - src/breakshaft/exceptions.py (новый) — модуль исключений - tests/test_error_handling.py (новый) — тесты ошибок - tests/test_extreme_cases.py (новый) — экстремальные кейсы Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
857
tests/test_extreme_cases.py
Normal file
857
tests/test_extreme_cases.py
Normal file
@@ -0,0 +1,857 @@
|
||||
"""
|
||||
Тесты для экстремальных случаев использования breakshaft:
|
||||
- Глубокие цепочки преобразований
|
||||
- Комбинаторный взрыв (множество путей)
|
||||
- Циклические зависимости
|
||||
- Сложные Union-типы
|
||||
- Множественные контекст-менеджеры
|
||||
- Асинхронные конвейеры
|
||||
- Краевые случаи с кортежами
|
||||
"""
|
||||
|
||||
from contextlib import contextmanager, asynccontextmanager
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Generator, AsyncGenerator
|
||||
|
||||
import pytest
|
||||
|
||||
from breakshaft.convertor import ConvRepo
|
||||
from breakshaft.graph_walker import GraphWalker
|
||||
from breakshaft.models import ConversionPoint, Callgraph
|
||||
|
||||
pytest_plugins = ('pytest_asyncio',)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Базовые типы для тестов
|
||||
# =============================================================================
|
||||
|
||||
@dataclass
|
||||
class A:
|
||||
a: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class B:
|
||||
b: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class C:
|
||||
c: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class D:
|
||||
d: bool
|
||||
|
||||
|
||||
@dataclass
|
||||
class E:
|
||||
e: complex
|
||||
|
||||
|
||||
@dataclass
|
||||
class F:
|
||||
f: bytes
|
||||
|
||||
|
||||
@dataclass
|
||||
class G:
|
||||
g: bytearray
|
||||
|
||||
|
||||
@dataclass
|
||||
class H:
|
||||
h: memoryview
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты глубоких цепочек преобразований
|
||||
# =============================================================================
|
||||
|
||||
def test_deep_conversion_chain_10_levels():
|
||||
"""Цепочка из 10 преобразований: A -> B -> C -> D -> E -> F -> G -> H -> A -> B -> C"""
|
||||
repo = ConvRepo()
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_b(a: A) -> B:
|
||||
return B(float(a.a))
|
||||
|
||||
@repo.mark_injector()
|
||||
def b_to_c(b: B) -> C:
|
||||
return C(str(b.b))
|
||||
|
||||
@repo.mark_injector()
|
||||
def c_to_d(c: C) -> D:
|
||||
return D(len(c.c) > 0)
|
||||
|
||||
@repo.mark_injector()
|
||||
def d_to_e(d: D) -> E:
|
||||
return E(complex(1 if d.d else 0, 0))
|
||||
|
||||
@repo.mark_injector()
|
||||
def e_to_f(e: E) -> F:
|
||||
return F(bytes(int(e.e.real)))
|
||||
|
||||
@repo.mark_injector()
|
||||
def f_to_g(f: F) -> G:
|
||||
return G(bytearray(f.f))
|
||||
|
||||
@repo.mark_injector()
|
||||
def g_to_h(g: G) -> H:
|
||||
return H(memoryview(g.g))
|
||||
|
||||
@repo.mark_injector()
|
||||
def h_to_a(h: H) -> A:
|
||||
return A(int(h.tobytes()[0]) if len(h.tobytes()) > 0 else 0)
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_c(a: A) -> C:
|
||||
return C(f"val_{a.a}")
|
||||
|
||||
def consumer(dep: C) -> str:
|
||||
return dep.c
|
||||
|
||||
# Прямое преобразование A -> C
|
||||
fn = repo.get_conversion((A,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(A(42))
|
||||
# Алгоритм выбирает кратчайший путь, поэтому A->B->C (результат "42.0")
|
||||
# а не A->C (результат "val_42")
|
||||
assert result == "42.0"
|
||||
|
||||
# Цепочка A -> B -> C
|
||||
fn2 = repo.get_conversion((A,), consumer, force_commutative=False, allow_async=False)
|
||||
result2 = fn2(A(100))
|
||||
assert result2 == "100.0"
|
||||
|
||||
|
||||
def test_deep_conversion_chain_20_levels():
|
||||
"""Цепочка с множеством промежуточных преобразований"""
|
||||
repo = ConvRepo()
|
||||
|
||||
# Создаём 20 типов для цепочки
|
||||
@dataclass
|
||||
class T1:
|
||||
v: int
|
||||
|
||||
@dataclass
|
||||
class T2:
|
||||
v: int
|
||||
|
||||
@dataclass
|
||||
class T3:
|
||||
v: int
|
||||
|
||||
@dataclass
|
||||
class T4:
|
||||
v: int
|
||||
|
||||
@dataclass
|
||||
class T5:
|
||||
v: int
|
||||
|
||||
@dataclass
|
||||
class T6:
|
||||
v: int
|
||||
|
||||
@dataclass
|
||||
class T7:
|
||||
v: int
|
||||
|
||||
@dataclass
|
||||
class T8:
|
||||
v: int
|
||||
|
||||
@dataclass
|
||||
class T9:
|
||||
v: int
|
||||
|
||||
@dataclass
|
||||
class T10:
|
||||
v: int
|
||||
|
||||
@repo.mark_injector()
|
||||
def t1_to_t2(x: T1) -> T2:
|
||||
return T2(x.v + 1)
|
||||
|
||||
@repo.mark_injector()
|
||||
def t2_to_t3(x: T2) -> T3:
|
||||
return T3(x.v + 1)
|
||||
|
||||
@repo.mark_injector()
|
||||
def t3_to_t4(x: T3) -> T4:
|
||||
return T4(x.v + 1)
|
||||
|
||||
@repo.mark_injector()
|
||||
def t4_to_t5(x: T4) -> T5:
|
||||
return T5(x.v + 1)
|
||||
|
||||
@repo.mark_injector()
|
||||
def t5_to_t6(x: T5) -> T6:
|
||||
return T6(x.v + 1)
|
||||
|
||||
@repo.mark_injector()
|
||||
def t6_to_t7(x: T6) -> T7:
|
||||
return T7(x.v + 1)
|
||||
|
||||
@repo.mark_injector()
|
||||
def t7_to_t8(x: T7) -> T8:
|
||||
return T8(x.v + 1)
|
||||
|
||||
@repo.mark_injector()
|
||||
def t8_to_t9(x: T8) -> T9:
|
||||
return T9(x.v + 1)
|
||||
|
||||
@repo.mark_injector()
|
||||
def t9_to_t10(x: T9) -> T10:
|
||||
return T10(x.v + 1)
|
||||
|
||||
def consumer(dep: T10) -> int:
|
||||
return dep.v
|
||||
|
||||
fn = repo.get_conversion((T1,), consumer, force_commutative=True, allow_async=False)
|
||||
result = fn(T1(0))
|
||||
assert result == 9 # 0 + 9 преобразований
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты комбинаторного взрыва (множество путей)
|
||||
# =============================================================================
|
||||
|
||||
def test_combinatorial_explosion_many_paths():
|
||||
"""Множество путей преобразования: каждый тип можно получить несколькими способами"""
|
||||
repo = ConvRepo()
|
||||
|
||||
# A можно получить из int или B
|
||||
# B можно получить из int или A
|
||||
# C можно получить из A или B
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_a(i: int) -> A:
|
||||
return A(i)
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_b(i: int) -> B:
|
||||
return B(float(i))
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_b(a: A) -> B:
|
||||
return B(float(a.a))
|
||||
|
||||
@repo.mark_injector()
|
||||
def b_to_a(b: B) -> A:
|
||||
return A(int(b.b))
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_c(a: A) -> C:
|
||||
return C(f"a_{a.a}")
|
||||
|
||||
@repo.mark_injector()
|
||||
def b_to_c(b: B) -> C:
|
||||
return C(f"b_{int(b.b)}")
|
||||
|
||||
def consumer(dep: C) -> str:
|
||||
return dep.c
|
||||
|
||||
# Есть несколько путей: int->A->C, int->B->C, int->A->B->C, int->B->A->C
|
||||
# force_commutative=False позволяет выбрать любой путь
|
||||
fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(42)
|
||||
assert result in ("a_42", "b_42")
|
||||
|
||||
|
||||
def test_non_commutative_graph_raises():
|
||||
"""Некоммутативный граф должен вызывать ошибку при force_commutative=True"""
|
||||
from breakshaft.exceptions import AmbiguousPath
|
||||
|
||||
repo = ConvRepo()
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_a(i: int) -> A:
|
||||
return A(i * 10)
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_b(i: int) -> B:
|
||||
return B(float(i))
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_c(a: A) -> C:
|
||||
return C(f"a_{a.a}")
|
||||
|
||||
@repo.mark_injector()
|
||||
def b_to_c(b: B) -> C:
|
||||
return C(f"b_{int(b.b)}")
|
||||
|
||||
def consumer(dep: C) -> str:
|
||||
return dep.c
|
||||
|
||||
# Два разных пути дают разный результат -> некоммутативно
|
||||
with pytest.raises(AmbiguousPath) as exc_info:
|
||||
repo.get_conversion((int,), consumer, force_commutative=True, allow_async=False)
|
||||
|
||||
assert exc_info.value.code == "GRAPH_002"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты циклических зависимостей
|
||||
# =============================================================================
|
||||
|
||||
def test_cyclic_dependencies_a_b_a():
|
||||
"""Циклическая зависимость A -> B -> A"""
|
||||
repo = ConvRepo()
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_b(a: A) -> B:
|
||||
return B(float(a.a) * 2)
|
||||
|
||||
@repo.mark_injector()
|
||||
def b_to_a(b: B) -> A:
|
||||
return A(int(b.b) + 1)
|
||||
|
||||
def consumer(dep: B) -> float:
|
||||
return dep.b
|
||||
|
||||
# A -> B (прямое)
|
||||
fn = repo.get_conversion((A,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(A(5))
|
||||
assert result == 10.0
|
||||
|
||||
|
||||
def test_cyclic_dependencies_no_infinite_loop():
|
||||
"""Убедиться что циклические зависимости не вызывают бесконечную рекурсию"""
|
||||
repo = ConvRepo()
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_b(a: A) -> B:
|
||||
return B(float(a.a))
|
||||
|
||||
@repo.mark_injector()
|
||||
def b_to_c(b: B) -> C:
|
||||
return C(str(b.b))
|
||||
|
||||
@repo.mark_injector()
|
||||
def c_to_a(c: C) -> A:
|
||||
return A(int(c.c) if c.c.isdigit() else 0)
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_c(a: A) -> C:
|
||||
return C(f"direct_{a.a}")
|
||||
|
||||
def consumer(dep: C) -> str:
|
||||
return dep.c
|
||||
|
||||
# Граф имеет цикл A->B->C->A, но алгоритм должен его корректно обработать
|
||||
# Алгоритм выбирает кратчайший путь, поэтому A->B->C (результат "42.0")
|
||||
fn = repo.get_conversion((A,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(A(42))
|
||||
assert result == "42.0"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты сложных Union-типов
|
||||
# =============================================================================
|
||||
|
||||
def test_complex_union_types():
|
||||
"""Union-типы с множеством вариантов"""
|
||||
repo = ConvRepo()
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_a(i: int) -> A:
|
||||
return A(i)
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_b(i: int) -> B:
|
||||
return B(float(i))
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_c(a: A) -> C:
|
||||
return C(f"a_{a.a}")
|
||||
|
||||
@repo.mark_injector()
|
||||
def b_to_c(b: B) -> C:
|
||||
return C(f"b_{int(b.b)}")
|
||||
|
||||
def consumer(dep: A | B) -> str:
|
||||
if isinstance(dep, A):
|
||||
return f"A:{dep.a}"
|
||||
return f"B:{dep.b}"
|
||||
|
||||
fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(42)
|
||||
assert result in ("A:42", "B:42.0")
|
||||
|
||||
|
||||
def test_nested_union_types():
|
||||
"""Вложенные Union-типы"""
|
||||
repo = ConvRepo()
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_a(i: int) -> A:
|
||||
return A(i)
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_b(a: A) -> B:
|
||||
return B(float(a.a))
|
||||
|
||||
def consumer(dep: A | B | C) -> str:
|
||||
if isinstance(dep, A):
|
||||
return f"A:{dep.a}"
|
||||
elif isinstance(dep, B):
|
||||
return f"B:{dep.b}"
|
||||
return f"C:{dep.c}"
|
||||
|
||||
fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(42)
|
||||
assert result in ("A:42", "B:42.0")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты множественных контекст-менеджеров
|
||||
# =============================================================================
|
||||
|
||||
def test_multiple_sync_context_managers():
|
||||
"""Несколько синхронных контекст-менеджеров в цепочке"""
|
||||
repo = ConvRepo()
|
||||
|
||||
finalized = {"int_to_a": False, "a_to_b": False}
|
||||
|
||||
@repo.mark_injector()
|
||||
@contextmanager
|
||||
def int_to_a(i: int) -> Generator[A, Any, None]:
|
||||
try:
|
||||
yield A(i)
|
||||
finally:
|
||||
finalized["int_to_a"] = True
|
||||
|
||||
@repo.mark_injector()
|
||||
@contextmanager
|
||||
def a_to_b(a: A) -> Generator[B, Any, None]:
|
||||
try:
|
||||
yield B(float(a.a) * 2)
|
||||
finally:
|
||||
finalized["a_to_b"] = True
|
||||
|
||||
def consumer(dep: B) -> float:
|
||||
return dep.b
|
||||
|
||||
fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(21)
|
||||
assert result == 42.0
|
||||
assert finalized["int_to_a"]
|
||||
assert finalized["a_to_b"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_multiple_async_context_managers():
|
||||
"""Несколько асинхронных контекст-менеджеров в цепочке"""
|
||||
repo = ConvRepo()
|
||||
|
||||
finalized = {"int_to_a": False, "a_to_b": False}
|
||||
|
||||
@repo.mark_injector()
|
||||
@asynccontextmanager
|
||||
async def int_to_a(i: int) -> AsyncGenerator[A, Any]:
|
||||
try:
|
||||
yield A(i)
|
||||
finally:
|
||||
finalized["int_to_a"] = True
|
||||
|
||||
@repo.mark_injector()
|
||||
@asynccontextmanager
|
||||
async def a_to_b(a: A) -> AsyncGenerator[B, Any]:
|
||||
try:
|
||||
yield B(float(a.a) * 2)
|
||||
finally:
|
||||
finalized["a_to_b"] = True
|
||||
|
||||
def consumer(dep: B) -> float:
|
||||
return dep.b
|
||||
|
||||
fn = repo.get_conversion((int,), consumer, force_commutative=False, force_async=True, allow_async=True)
|
||||
result = await fn(21)
|
||||
assert result == 42.0
|
||||
assert finalized["int_to_a"]
|
||||
assert finalized["a_to_b"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mixed_sync_async_context_managers():
|
||||
"""Смешанные синхронные и асинхронные контекст-менеджеры"""
|
||||
repo = ConvRepo()
|
||||
|
||||
finalized = {"int_to_a": False, "a_to_b": False}
|
||||
|
||||
@repo.mark_injector()
|
||||
@contextmanager
|
||||
def int_to_a(i: int) -> Generator[A, Any, None]:
|
||||
try:
|
||||
yield A(i)
|
||||
finally:
|
||||
finalized["int_to_a"] = True
|
||||
|
||||
@repo.mark_injector()
|
||||
@asynccontextmanager
|
||||
async def a_to_b(a: A) -> AsyncGenerator[B, Any]:
|
||||
try:
|
||||
yield B(float(a.a) * 2)
|
||||
finally:
|
||||
finalized["a_to_b"] = True
|
||||
|
||||
def consumer(dep: B) -> float:
|
||||
return dep.b
|
||||
|
||||
# Должен использовать async, т.к. есть асинхронный контекст-менеджер
|
||||
fn = repo.get_conversion((int,), consumer, force_commutative=False, force_async=True, allow_async=True)
|
||||
result = await fn(21)
|
||||
assert result == 42.0
|
||||
assert finalized["int_to_a"]
|
||||
assert finalized["a_to_b"]
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты асинхронных конвейеров
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_pipeline():
|
||||
"""Асинхронный конвейер из нескольких функций"""
|
||||
repo = ConvRepo()
|
||||
|
||||
@repo.mark_injector()
|
||||
async def int_to_a(i: int) -> A:
|
||||
return A(i * 2)
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_b(a: A) -> B:
|
||||
return B(float(a.a))
|
||||
|
||||
@repo.mark_injector()
|
||||
async def b_to_c(b: B) -> C:
|
||||
return C(str(b.b))
|
||||
|
||||
def consumer1(dep: C) -> str:
|
||||
return f"_{dep.c}_"
|
||||
|
||||
def consumer2(dep: str) -> int:
|
||||
return len(dep)
|
||||
|
||||
pipeline = repo.create_pipeline(
|
||||
(int,),
|
||||
[consumer1, consumer2],
|
||||
force_commutative=False,
|
||||
allow_async=True,
|
||||
force_async=True
|
||||
)
|
||||
|
||||
result = await pipeline(5)
|
||||
assert result == 6 # len("_10.0_") == 6
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты краевых случаев с кортежами
|
||||
# =============================================================================
|
||||
|
||||
def test_deeply_nested_tuple_unwrap():
|
||||
"""Глубоко вложенные кортежи"""
|
||||
repo = ConvRepo(store_sources=True)
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_nested(i: int) -> tuple[A, tuple[B, tuple[C, D]]]:
|
||||
return A(i), (B(float(i)), (C(str(i)), D(i > 0)))
|
||||
|
||||
def consumer(dep: D) -> bool:
|
||||
return dep.d
|
||||
|
||||
fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(42)
|
||||
assert result is True
|
||||
|
||||
|
||||
def test_empty_tuple_handling():
|
||||
"""Пустые кортежи и кортежи с одним элементом"""
|
||||
repo = ConvRepo()
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_single(i: int) -> tuple[A]:
|
||||
return (A(i),)
|
||||
|
||||
@repo.mark_injector()
|
||||
def single_to_b(a_tuple: tuple[A]) -> B:
|
||||
return B(float(a_tuple[0].a))
|
||||
|
||||
def consumer(dep: B) -> float:
|
||||
return dep.b
|
||||
|
||||
# Это должно работать с кортежем из одного элемента
|
||||
fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(42)
|
||||
assert result == 42.0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты параметров по умолчанию в сложных случаях
|
||||
# =============================================================================
|
||||
|
||||
def test_default_args_with_multiple_injectors():
|
||||
"""
|
||||
Параметры по умолчанию с множеством инжекторов.
|
||||
|
||||
После фикса: optional-аргумент не маппится на входной тип автоматически.
|
||||
Optional-аргумент получает значение из:
|
||||
1. Предыдущего преобразования (если тип есть в provided_types)
|
||||
2. from_types (если у функции есть optional-аргумент с этим типом)
|
||||
3. Дефолтного значения (иначе)
|
||||
|
||||
inject_mult() не вызывается, т.к. optional-аргументы с дефолтными значениями
|
||||
не триггерят поиск инжекторов в графе преобразований. Это архитектурное ограничение.
|
||||
"""
|
||||
repo = ConvRepo()
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_a(i: int, mult: int = 1) -> A:
|
||||
return A(i * mult)
|
||||
|
||||
@repo.mark_injector()
|
||||
def inject_mult() -> int:
|
||||
return 10
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_b(a: A) -> B:
|
||||
return B(float(a.a))
|
||||
|
||||
def consumer(dep: B) -> float:
|
||||
return dep.b
|
||||
|
||||
# int_to_a имеет optional-аргумент mult: int, и int есть в from_types
|
||||
# Поэтому mult маппится на входной int (5)
|
||||
# inject_mult() не вызывается (архитектурное ограничение)
|
||||
fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(5)
|
||||
# mult=5 (входное значение), поэтому 5 * 5 = 25
|
||||
assert result == 25.0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты fork репозитория
|
||||
# =============================================================================
|
||||
|
||||
def test_repo_fork_with_additional_injectors():
|
||||
"""Fork репозитория с дополнительными инжекторами"""
|
||||
base_repo = ConvRepo()
|
||||
|
||||
@base_repo.mark_injector()
|
||||
def int_to_a(i: int) -> A:
|
||||
return A(i)
|
||||
|
||||
forked_repo = base_repo.fork()
|
||||
|
||||
@forked_repo.mark_injector()
|
||||
def a_to_b(a: A) -> B:
|
||||
return B(float(a.a))
|
||||
|
||||
def consumer(dep: B) -> float:
|
||||
return dep.b
|
||||
|
||||
# Fork должен видеть инжекторы из базового репозитория
|
||||
fn = forked_repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(42)
|
||||
assert result == 42.0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты store_callseq и store_sources
|
||||
# =============================================================================
|
||||
|
||||
def test_store_callseq():
|
||||
"""Сохранение последовательности вызовов"""
|
||||
repo = ConvRepo(store_callseq=True)
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_a(i: int) -> A:
|
||||
return A(i)
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_b(a: A) -> B:
|
||||
return B(float(a.a))
|
||||
|
||||
def consumer(dep: B) -> float:
|
||||
return dep.b
|
||||
|
||||
fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(42)
|
||||
|
||||
callseq = getattr(fn, '__breakshaft_callseq__', None)
|
||||
assert callseq is not None
|
||||
assert len(callseq) >= 1
|
||||
|
||||
|
||||
def test_store_sources():
|
||||
"""Сохранение исходного кода сгенерированной функции"""
|
||||
repo = ConvRepo(store_sources=True)
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_a(i: int) -> A:
|
||||
return A(i)
|
||||
|
||||
def consumer(dep: A) -> int:
|
||||
return dep.a
|
||||
|
||||
fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(42)
|
||||
|
||||
source = getattr(fn, '__breakshaft_render_src__', None)
|
||||
assert source is not None
|
||||
assert 'convertor' in source
|
||||
assert 'int' in source
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты производительности (не должны быть слишком медленными)
|
||||
# =============================================================================
|
||||
|
||||
def test_performance_many_injectors():
|
||||
"""Много инжекторов - проверка что не слишком медленно"""
|
||||
repo = ConvRepo()
|
||||
|
||||
# Создаём 20 инжекторов (50 вызывает комбинаторный взрыв)
|
||||
for i in range(20):
|
||||
def make_injector(n):
|
||||
def injector(a: A) -> A:
|
||||
return A(a.a + n)
|
||||
return injector
|
||||
repo.add_injector(make_injector(i))
|
||||
|
||||
def consumer(dep: A) -> int:
|
||||
return dep.a
|
||||
|
||||
# Это должно завершиться за разумное время
|
||||
import time
|
||||
start = time.time()
|
||||
fn = repo.get_conversion((A,), consumer, force_commutative=False, allow_async=False)
|
||||
elapsed = time.time() - start
|
||||
|
||||
# Не больше 10 секунд на генерацию (комбинаторная сложность)
|
||||
assert elapsed < 10.0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты ошибок и исключительных ситуаций
|
||||
# =============================================================================
|
||||
|
||||
def test_no_path_raises_error():
|
||||
"""Отсутствие пути преобразования должно вызывать ошибку"""
|
||||
from breakshaft.exceptions import NoConversionPath
|
||||
|
||||
repo = ConvRepo()
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_a(i: int) -> A:
|
||||
return A(i)
|
||||
|
||||
def consumer(dep: B) -> float: # B нельзя получить из int
|
||||
return dep.b
|
||||
|
||||
with pytest.raises(NoConversionPath) as exc_info:
|
||||
repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False)
|
||||
|
||||
assert exc_info.value.code == "GRAPH_001"
|
||||
|
||||
|
||||
def test_empty_from_types():
|
||||
"""Пустой список типов для преобразования"""
|
||||
from breakshaft.exceptions import NoConversionPath
|
||||
|
||||
repo = ConvRepo()
|
||||
|
||||
def consumer() -> str:
|
||||
return "hello"
|
||||
|
||||
# Пустой consumer без зависимостей должен работать
|
||||
fn = repo.get_conversion((), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn()
|
||||
assert result == "hello"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты GraphWalker напрямую
|
||||
# =============================================================================
|
||||
|
||||
def test_graph_walker_direct_usage():
|
||||
"""Прямое использование GraphWalker"""
|
||||
repo = ConvRepo()
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_a(i: int) -> A:
|
||||
return A(i)
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_b(a: A) -> B:
|
||||
return B(float(a.a))
|
||||
|
||||
walker = GraphWalker()
|
||||
|
||||
def consumer(dep: B) -> float:
|
||||
return dep.b
|
||||
|
||||
cg = walker.generate_callgraph(
|
||||
repo.convertor_set,
|
||||
frozenset({int}),
|
||||
consumer
|
||||
)
|
||||
|
||||
assert cg is not None
|
||||
assert len(cg.variants) > 0
|
||||
|
||||
|
||||
def test_explode_callgraph_with_empty_subgraphs():
|
||||
"""Взрыв графа с пустыми подграфами"""
|
||||
repo = ConvRepo()
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_a(i: int) -> A:
|
||||
return A(i)
|
||||
|
||||
walker = GraphWalker()
|
||||
|
||||
def consumer(dep: A) -> int:
|
||||
return dep.a
|
||||
|
||||
cg = walker.generate_callgraph(
|
||||
repo.convertor_set,
|
||||
frozenset({int}),
|
||||
consumer
|
||||
)
|
||||
|
||||
exploded = walker.explode_callgraph_branches(cg, frozenset({int}))
|
||||
assert len(exploded) > 0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Тесты для проверки deduplicate_callseq
|
||||
# =============================================================================
|
||||
|
||||
def test_deduplicate_callseq_with_duplicates():
|
||||
"""Проверка дедупликации последовательности вызовов"""
|
||||
repo = ConvRepo(store_sources=True)
|
||||
|
||||
call_count = [0]
|
||||
|
||||
@repo.mark_injector()
|
||||
def int_to_a(i: int) -> A:
|
||||
call_count[0] += 1
|
||||
return A(i)
|
||||
|
||||
@repo.mark_injector()
|
||||
def a_to_b(a: A) -> B:
|
||||
return B(float(a.a))
|
||||
|
||||
def consumer(dep: B) -> float:
|
||||
return dep.b
|
||||
|
||||
fn = repo.get_conversion((int,), consumer, force_commutative=False, allow_async=False)
|
||||
result = fn(42)
|
||||
|
||||
# Инжектор должен быть вызван один раз
|
||||
assert call_count[0] == 1
|
||||
assert result == 42.0
|
||||
Reference in New Issue
Block a user