Add basic pipeline construction, callseq deduplication pending

This commit is contained in:
2025-07-19 22:32:40 +03:00
parent eae2cd9a4b
commit b058a701a0
5 changed files with 139 additions and 23 deletions

View File

@@ -1,9 +1,10 @@
from __future__ import annotations
from typing import Optional, Callable, Unpack, TypeVarTuple, TypeVar, Awaitable, Any
from typing import Optional, Callable, Unpack, TypeVarTuple, TypeVar, Awaitable, Any, Sequence
from .graph_walker import GraphWalker
from .models import ConversionPoint, Callgraph
from .renderer import ConvertorRenderer, InTimeGenerationConvertorRenderer
from .util import extract_return_type
Tin = TypeVarTuple('Tin')
Tout = TypeVar('Tout')
@@ -27,6 +28,31 @@ class ConvRepo:
self.walker = graph_walker
self.renderer = renderer
def create_pipeline(self,
from_types: Sequence[type],
fns: Sequence[Callable],
force_commutative: bool = True,
allow_async: bool = True,
allow_sync: bool = True,
force_async: bool = False
):
filtered_injectors = self.filtered_injectors(allow_async, allow_sync)
pipeline_callseq = []
orig_from_types = tuple(from_types)
from_types = tuple(from_types)
for fn in fns:
injects = extract_return_type(fn)
callseq = self.get_callseq(filtered_injectors, frozenset(from_types), fn, force_commutative)
pipeline_callseq += callseq
if injects is not None:
from_types += (injects,)
return self.renderer.render(orig_from_types, pipeline_callseq, force_async=force_async)
@property
def convertor_set(self):
return self._convertor_set
@@ -46,30 +72,26 @@ class ConvRepo:
ret += [variant.injector]
return ret
def get_conversion(self,
from_types: tuple[type[Unpack[Tin]]],
fn: Callable[..., Tout],
force_commutative: bool = True,
allow_async: bool = True,
allow_sync: bool = True,
force_async: bool = False
) -> Callable[[Unpack[Tin]], Tout] | Awaitable[Callable[[Unpack[Tin]], Tout]]:
if not allow_async or force_async:
filtered_injectors: frozenset[ConversionPoint] = frozenset()
for inj in self.convertor_set:
if inj.is_async and not allow_async:
continue
if not inj.is_async and not allow_sync:
continue
filtered_injectors |= {inj}
else:
filtered_injectors = frozenset(self.convertor_set)
def filtered_injectors(self, allow_async: bool, allow_sync: bool) -> frozenset[ConversionPoint]:
filtered_injectors: frozenset[ConversionPoint] = frozenset()
for inj in self.convertor_set:
if inj.is_async and not allow_async:
continue
if not inj.is_async and not allow_sync:
continue
filtered_injectors |= {inj}
return filtered_injectors
cg = self.walker.generate_callgraph(filtered_injectors, frozenset(from_types), fn)
def get_callseq(self,
injectors: frozenset[ConversionPoint],
from_types: frozenset[type], fn: Callable,
force_commutative: bool) -> list[ConversionPoint]:
cg = self.walker.generate_callgraph(injectors, from_types, fn)
if cg is None:
raise ValueError(f'Unable to compute conversion graph on {from_types}->{fn.__qualname__}')
exploded = self.walker.explode_callgraph_branches(cg, frozenset(from_types))
exploded = self.walker.explode_callgraph_branches(cg, from_types)
selected = self.walker.filter_exploded_callgraph_branch(exploded)
if len(selected) == 0:
@@ -79,6 +101,25 @@ class ConvRepo:
raise ValueError('Conversion path is not commutative')
callseq = self._callseq_from_callgraph(Callgraph(frozenset([selected[0]])))
if len(callseq) > 0:
injects = extract_return_type(fn)
callseq[-1] = callseq[-1].copy_with(injects=injects)
return callseq
def get_conversion(self,
from_types: Sequence[type[Unpack[Tin]]],
fn: Callable[..., Tout],
force_commutative: bool = True,
allow_async: bool = True,
allow_sync: bool = True,
force_async: bool = False
) -> Callable[[Unpack[Tin]], Tout] | Awaitable[Callable[[Unpack[Tin]], Tout]]:
filtered_injectors = self.filtered_injectors(allow_async, allow_sync)
callseq = self.get_callseq(filtered_injectors, frozenset(from_types), fn, force_commutative)
return self.renderer.render(from_types, callseq, force_async=force_async)
def mark_injector(self, *, rettype: Optional[type] = None):

View File

@@ -3,7 +3,7 @@ from types import NoneType
from typing import Callable, Optional
from .models import ConversionPoint, Callgraph, CallgraphVariant, TransformationPoint, CompositionDirection
from .util import extract_func_argtypes, all_combinations, extract_func_argtypes_seq
from .util import extract_func_argtypes, all_combinations, extract_func_argtypes_seq, extract_return_type
class GraphWalker:
@@ -15,6 +15,7 @@ class GraphWalker:
consumer_fn: Callable) -> Optional[Callgraph]:
branches: frozenset[Callgraph] = frozenset()
rettype = extract_return_type(consumer_fn)
# Хак, чтобы вынудить систему поставить первым преобразованием требуемый consumer
# Новый TypeAliasType каждый раз будет иметь эксклюзивный хэш, вне зависимости от содержимого

View File

@@ -20,6 +20,13 @@ class ConversionPoint:
requires: tuple[type, ...]
opt_args: tuple[type, ...]
def copy_with(self, **kwargs):
fn = kwargs.get('fn', self.fn)
injects = kwargs.get('injects', self.injects)
requires = kwargs.get('requires', self.requires)
opt_args = kwargs.get('opt_args', self.opt_args)
return ConversionPoint(fn, injects, requires, opt_args)
def __hash__(self):
return hash((self.fn, self.injects, self.requires))

View File

@@ -1,6 +1,11 @@
import inspect
from itertools import product
from typing import Callable, get_type_hints, TypeVar, Any
from typing import Callable, get_type_hints, TypeVar, Any, Optional
def extract_return_type(func: Callable) -> Optional[type]:
hints = get_type_hints(func)
return hints.get('return')
def extract_func_args(func: Callable) -> list[tuple[str, type]]: