Dict inflator and deflator support

This commit is contained in:
2026-02-19 22:39:56 +03:00
parent 8e38c41aa5
commit 9267f744d8
7 changed files with 198 additions and 17 deletions

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "megasniff" name = "megasniff"
version = "0.2.7" version = "0.2.8"
description = "Library for in-time codegened type validation" description = "Library for in-time codegened type validation"
authors = [ authors = [
{ name = "nikto_b", email = "niktob560@yandex.ru" } { name = "nikto_b", email = "niktob560@yandex.ru" }

View File

@@ -283,6 +283,8 @@ class SchemaDeflatorGenerator:
ret_unw = OtherUnwrapping() ret_unw = OtherUnwrapping()
elif isinstance(schema, EnumType): elif isinstance(schema, EnumType):
ret_unw = EnumUnwrapping() ret_unw = EnumUnwrapping()
elif issubclass(schema, uuid.UUID):
ret_unw = FuncUnwrapping('str')
elif is_class_definition(schema): elif is_class_definition(schema):
hints = typing.get_type_hints(schema) hints = typing.get_type_hints(schema)
fields = [] fields = []

View File

@@ -3,13 +3,14 @@
from __future__ import annotations from __future__ import annotations
import collections.abc import collections.abc
import importlib.resources import importlib.resources
import uuid
from collections import defaultdict from collections import defaultdict
from collections.abc import Callable from collections.abc import Callable
from dataclasses import dataclass from dataclasses import dataclass
from enum import EnumType from enum import EnumType
from types import NoneType, UnionType from types import NoneType, UnionType
from typing import Optional, get_origin, get_args, Union, Annotated, Literal, Sequence, List, Set, TypeAliasType, \ from typing import Optional, get_origin, get_args, Union, Annotated, Literal, Sequence, List, Set, TypeAliasType, \
OrderedDict OrderedDict, Dict
import jinja2 import jinja2
@@ -22,6 +23,7 @@ class TypeRenderData:
typeref: list[TypeRenderData] | TypeRenderData | str typeref: list[TypeRenderData] | TypeRenderData | str
allow_none: bool allow_none: bool
is_list: bool is_list: bool
is_dict: bool
is_union: bool is_union: bool
is_strict: bool is_strict: bool
@@ -30,6 +32,14 @@ class TypeRenderData:
class IterableTypeRenderData(TypeRenderData): class IterableTypeRenderData(TypeRenderData):
iterable_type: str iterable_type: str
is_list = True is_list = True
is_dict = False
is_union = False
@dataclass
class DictTypeRenderData(TypeRenderData):
is_list = False
is_dict = True
is_union = False is_union = False
@@ -152,21 +162,24 @@ class SchemaInflatorGenerator:
allow_none = False allow_none = False
argtypes = t, argtypes = t,
if any(map(lambda x: type_origin is x, [Union, UnionType, Optional, Annotated, list, List, set, Set])): if any(map(lambda x: type_origin is x,
[Union, UnionType, Optional, Annotated, list, List, set, Set, dict, Dict])):
argtypes = get_args(t) argtypes = get_args(t)
if NoneType in argtypes or None in argtypes: if NoneType in argtypes or None in argtypes:
argtypes = tuple(filter(lambda x: x is not None and x is not NoneType, argtypes)) argtypes = tuple(filter(lambda x: x is not None and x is not NoneType, argtypes))
allow_none = True allow_none = True
is_union = len(argtypes) > 1 if type_origin in [dict, Dict]:
k = self._unwrap_typeref(argtypes[0], strict_mode)
if is_union: v = self._unwrap_typeref(argtypes[1], strict_mode)
return DictTypeRenderData([k, v], allow_none, False, True, False, False)
elif len(argtypes) > 1:
typerefs = list(map(lambda x: self._unwrap_typeref(x, strict_mode), argtypes)) typerefs = list(map(lambda x: self._unwrap_typeref(x, strict_mode), argtypes))
return TypeRenderData(typerefs, allow_none, False, True, False) return TypeRenderData(typerefs, allow_none, False, False, True, False)
elif type_origin in [list, set]: elif type_origin in [list, set, List, Set]:
rd = self._unwrap_typeref(argtypes[0], strict_mode) rd = self._unwrap_typeref(argtypes[0], strict_mode)
return IterableTypeRenderData(rd, allow_none, True, False, False, type_origin.__name__) return IterableTypeRenderData(rd, allow_none, True, False, False, False, type_origin.__name__)
else: else:
t = argtypes[0] t = argtypes[0]
@@ -179,6 +192,7 @@ class SchemaInflatorGenerator:
allow_none, allow_none,
is_list, is_list,
False, False,
False,
strict_mode if is_builtin else False) strict_mode if is_builtin else False)
def _schema_to_inflator(self, def _schema_to_inflator(self,
@@ -208,7 +222,8 @@ class SchemaInflatorGenerator:
schema = new_schema schema = new_schema
if isinstance(schema, EnumType): if isinstance(schema, EnumType):
namespace[f'inflate_{schema.__name__}'] = schema if not is_builtin_type(schema):
namespace[f'inflate_{schema.__name__}'] = schema
return '\n', namespace return '\n', namespace
if isinstance(schema, collections.abc.Iterable): if isinstance(schema, collections.abc.Iterable):
@@ -239,8 +254,9 @@ class SchemaInflatorGenerator:
return '', namespace return '', namespace
if mode == 'object': if mode == 'object':
namespace[f'{_funcname}_tgt_type'] = schema if not is_builtin_type(schema):
namespace[utils.typename(schema)] = schema namespace[f'{_funcname}_tgt_type'] = schema
namespace[utils.typename(schema)] = schema
if from_type_override is not None: if from_type_override is not None:
namespace['_from_type'] = from_type_override namespace['_from_type'] = from_type_override
@@ -258,8 +274,8 @@ class SchemaInflatorGenerator:
while get_origin(argtype) is not None: while get_origin(argtype) is not None:
type_origin = get_origin(argtype) type_origin = get_origin(argtype)
if any(map(lambda x: type_origin is x,
if any(map(lambda x: type_origin is x, [Union, UnionType, Optional, Annotated, list, List, set, Set])): [Union, UnionType, Optional, Annotated, list, List, set, Set, dict, Dict])):
argtypes = get_args(argtype) argtypes = get_args(argtype)
if len(argtypes) == 1: if len(argtypes) == 1:
argtype = argtypes[0] argtype = argtypes[0]
@@ -297,12 +313,14 @@ class SchemaInflatorGenerator:
elif argt is schema: elif argt is schema:
pass pass
else: else:
namespace[argt.__name__] = argt if not is_builtin_type(argt):
namespace[argt.__name__] = argt
convertor_functext = template.render( convertor_functext = template.render(
funcname=_funcname, funcname=_funcname,
conversions=render_data, conversions=render_data,
tgt_type=utils.typename(schema), tgt_type=utils.typename(schema),
is_opaque=len(type_hints) == 0,
from_type='_from_type' if from_type_override is not None else None from_type='_from_type' if from_type_override is not None else None
) )

View File

@@ -6,6 +6,7 @@ def {{funcname}}(from_data: {% if from_type is none %}dict[str, Any]{% else %}{{
""" """
{{tgt_type}} {{tgt_type}}
""" """
{% if not is_opaque %}
from_data_keys = from_data.keys() from_data_keys = from_data.keys()
{% for conv in conversions %} {% for conv in conversions %}
@@ -29,4 +30,9 @@ def {{funcname}}(from_data: {% if from_type is none %}dict[str, Any]{% else %}{{
{% endfor %} {% endfor %}
return {{funcname}}_tgt_type({% for conv in conversions %}{{conv.argname_escaped}}={{conv.argname_escaped}}, {% endfor %}) return {{funcname}}_tgt_type({% for conv in conversions %}{{conv.argname_escaped}}={{conv.argname_escaped}}, {% endfor %})
{% else %}
return {{funcname}}_tgt_type(from_data)
{% endif %}

View File

@@ -3,8 +3,8 @@
{{argname}} = [] {{argname}} = []
if not isinstance({{conv_data}}, list): if not isinstance({{conv_data}}, list):
raise FieldValidationException('{{argname}}', "list", conv_data, []) raise FieldValidationException('{{argname}}', "list", conv_data, [])
for item in {{conv_data}}: for {{"_" + argname + "_item"}} in {{conv_data}}:
{{ render_segment("_" + argname, typedef, "item", false ) | indent(4) }} {{ render_segment("_" + argname, typedef, "_" + argname + "_item", false ) | indent(4) }}
{{argname}}.append(_{{argname}}) {{argname}}.append(_{{argname}})
{%- endset %} {%- endset %}
{{out}} {{out}}
@@ -39,12 +39,21 @@ if not isinstance({{conv_data}}, {{typeref}}):
{{argname}} = {{typeref}}({{conv_data}}) {{argname}} = {{typeref}}({{conv_data}})
{% elif typeref.is_union %} {% elif typeref.is_union %}
# union typeref
{{render_union(argname, typeref, conv_data)}} {{render_union(argname, typeref, conv_data)}}
{% elif typeref.is_list %} {% elif typeref.is_list %}
# list typeref
{{render_iterable(argname, typeref.typeref, conv_data)}} {{render_iterable(argname, typeref.typeref, conv_data)}}
{{argname}} = {{typeref.iterable_type}}({{argname}}) {{argname}} = {{typeref.iterable_type}}({{argname}})
{% elif typeref.is_dict %}
# dict typeref
{{render_iterable(argname + "_k", typeref.typeref[0], "list(" + conv_data + ".keys())")}}
{{render_iterable(argname + "_v", typeref.typeref[1], "list(" + conv_data + ".values())")}}
{{argname}} = dict(zip({{argname}}_k,{{argname}}_v))
{% else %} {% else %}
{{render_segment(argname, typeref.typeref, conv_data, typeref.is_strict)}} {{render_segment(argname, typeref.typeref, conv_data, typeref.is_strict)}}

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
import uuid
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
from typing import Optional, get_type_hints from typing import Optional, get_type_hints
@@ -66,6 +67,52 @@ def test_optional():
assert c.a is None assert c.a is None
def test_uuid():
@dataclass
class K:
a: uuid.UUID
b: list[uuid.UUID]
c: Optional[uuid.UUID]
d: dict[uuid.UUID, uuid.UUID]
infl = SchemaInflatorGenerator(store_sources=True)
defl = SchemaDeflatorGenerator(store_sources=True)
infl_fn = infl.schema_to_inflator(K)
defl_fn = defl.schema_to_deflator(K)
okd = {
'a': str(uuid.uuid4()),
'b': [str(uuid.uuid4()), str(uuid.uuid4()), str(uuid.uuid4())],
'c': None,
'd': {str(uuid.uuid4()): str(uuid.uuid4()), str(uuid.uuid4()): str(uuid.uuid4())}
}
k = infl_fn(okd)
kd = defl_fn(k)
assert isinstance(kd['a'], str)
assert isinstance(kd['b'], list)
assert len(kd['b']) == 3
assert isinstance(kd['b'][0], str)
assert isinstance(kd['b'][1], str)
assert isinstance(kd['b'][2], str)
assert kd['c'] is None
assert isinstance(kd['d'], dict)
assert len(kd['d']) == 2
assert all(map(lambda x: isinstance(x, str), kd['d'].keys()))
assert all(map(lambda x: isinstance(x, str), kd['d'].values()))
assert isinstance(k.a, uuid.UUID)
assert isinstance(k.b[0], uuid.UUID)
assert isinstance(k.b[1], uuid.UUID)
assert isinstance(k.b[2], uuid.UUID)
assert k.c is None
assert isinstance(k.d, dict)
assert len(k.d) == 2
assert all(map(lambda x: isinstance(x, uuid.UUID), k.d.keys()))
assert all(map(lambda x: isinstance(x, uuid.UUID), k.d.values()))
class AEnum(Enum): class AEnum(Enum):
a = 'a' a = 'a'
b = 'b' b = 'b'

99
tests/test_dicts.py Normal file
View File

@@ -0,0 +1,99 @@
from __future__ import annotations
import json
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Optional, get_type_hints, Dict
from megasniff import SchemaDeflatorGenerator
from src.megasniff import SchemaInflatorGenerator
def test_dicts():
@dataclass()
class A:
a: dict[str, int]
b: Dict[int, int]
c: dict[str, list[int]]
d: dict[str, dict[str, int | str]]
infl = SchemaInflatorGenerator(store_sources=True)
defl = SchemaDeflatorGenerator(store_sources=True)
infl_fn = infl.schema_to_inflator(A)
defl_fn = defl.schema_to_deflator(A)
a = infl_fn({
'a': {
1: '42',
2: '123',
'asdf': 42
},
'b': {
1: 1,
'2': '2',
'3': 3,
4: '4'
},
'c': {
'a': [1, 2, 3, '4']
},
'd': {
'a': {
'a': 1,
'b': '1',
'c': 'asdf'
}
}
})
assert a.a['1'] == 42
assert a.a['2'] == 123
assert a.a['asdf'] == 42
assert a.b[1] == 1
assert a.b[2] == 2
assert a.b[3] == 3
assert a.b[4] == 4
assert a.c['a'][0] == 1
assert a.c['a'][1] == 2
assert a.c['a'][2] == 3
assert a.c['a'][3] == 4
assert a.d['a']['a'] == 1
assert a.d['a']['b'] == 1
assert a.d['a']['c'] == 'asdf'
def test_uuid_dicts():
@dataclass()
class A:
a: dict[uuid.UUID, uuid.UUID]
infl = SchemaInflatorGenerator(store_sources=True)
defl = SchemaDeflatorGenerator(store_sources=True)
infl_fn = infl.schema_to_inflator(A)
defl_fn = defl.schema_to_deflator(A)
uuids = [uuid.uuid4() for _ in range(32)]
a = infl_fn({
'a': {
str(uuids[0]): str(uuids[0]),
str(uuids[1]): str(uuids[2]),
str(uuids[3]): str(uuids[4]),
}
})
assert a.a[uuids[0]] == uuids[0]
assert a.a[uuids[1]] == uuids[2]
assert a.a[uuids[3]] == uuids[4]
ad = json.loads(json.dumps(defl_fn(a), default=str))
assert ad['a'][str(uuids[0])] == str(uuids[0])
assert ad['a'][str(uuids[1])] == str(uuids[2])
assert ad['a'][str(uuids[3])] == str(uuids[4])