Compare commits

4 Commits

Author SHA1 Message Date
c405797262 Add .coveragerc 2026-02-19 22:51:53 +03:00
9267f744d8 Dict inflator and deflator support 2026-02-19 22:39:56 +03:00
8e38c41aa5 Support Enum types 2026-02-18 20:10:23 +03:00
bc9da11db4 Add deflator "Unable to generate unwrapper for {schema}" message 2026-02-18 19:34:19 +03:00
11 changed files with 270 additions and 25 deletions

2
.coveragerc Normal file
View File

@@ -0,0 +1,2 @@
[run]
omit = src/megasniff/__main__.py

1
.gitignore vendored
View File

@@ -176,3 +176,4 @@ cython_debug/
.pypirc .pypirc
.idea .idea
.coverage

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "megasniff" name = "megasniff"
version = "0.2.6.post1" version = "0.2.8"
description = "Library for in-time codegened type validation" description = "Library for in-time codegened type validation"
authors = [ authors = [
{ name = "nikto_b", email = "niktob560@yandex.ru" } { name = "nikto_b", email = "niktob560@yandex.ru" }

View File

@@ -6,6 +6,7 @@ import importlib.resources
import typing import typing
from collections.abc import Callable from collections.abc import Callable
from dataclasses import dataclass from dataclasses import dataclass
from enum import EnumType
from types import NoneType, UnionType from types import NoneType, UnionType
from typing import get_args, Union, Annotated, Sequence, TypeAliasType, \ from typing import get_args, Union, Annotated, Sequence, TypeAliasType, \
OrderedDict, TypeAlias OrderedDict, TypeAlias
@@ -17,6 +18,7 @@ import uuid
from pathlib import Path from pathlib import Path
import tempfile import tempfile
import importlib.util import importlib.util
import enum
JsonObject: TypeAlias = Union[None, bool, int, float, str, list['JsonObject'], dict[str, 'JsonObject']] JsonObject: TypeAlias = Union[None, bool, int, float, str, list['JsonObject'], dict[str, 'JsonObject']]
@@ -48,6 +50,11 @@ class ObjectUnwrapping(Unwrapping):
self.fields = fields self.fields = fields
class EnumUnwrapping(Unwrapping):
def __init__(self, ):
self.kind = 'enum'
class ListUnwrapping(Unwrapping): class ListUnwrapping(Unwrapping):
item_unwrap: Unwrapping item_unwrap: Unwrapping
@@ -257,6 +264,7 @@ class SchemaDeflatorGenerator:
recurcive_types |= arg_rec recurcive_types |= arg_rec
ret_unw = UnionUnwrapping(union_unwraps) ret_unw = UnionUnwrapping(union_unwraps)
else: else:
print(f'Unable to generate unwrapper for {schema} -- origin {origin} is not supported')
raise NotImplementedError raise NotImplementedError
else: else:
if schema is int: if schema is int:
@@ -273,6 +281,10 @@ class SchemaDeflatorGenerator:
ret_unw = OtherUnwrapping() ret_unw = OtherUnwrapping()
elif schema is list: elif schema is list:
ret_unw = OtherUnwrapping() ret_unw = OtherUnwrapping()
elif isinstance(schema, EnumType):
ret_unw = EnumUnwrapping()
elif issubclass(schema, uuid.UUID):
ret_unw = FuncUnwrapping('str')
elif is_class_definition(schema): elif is_class_definition(schema):
hints = typing.get_type_hints(schema) hints = typing.get_type_hints(schema)
fields = [] fields = []

View File

@@ -3,12 +3,14 @@
from __future__ import annotations from __future__ import annotations
import collections.abc import collections.abc
import importlib.resources import importlib.resources
import uuid
from collections import defaultdict from collections import defaultdict
from collections.abc import Callable from collections.abc import Callable
from dataclasses import dataclass from dataclasses import dataclass
from enum import EnumType
from types import NoneType, UnionType from types import NoneType, UnionType
from typing import Optional, get_origin, get_args, Union, Annotated, Literal, Sequence, List, Set, TypeAliasType, \ from typing import Optional, get_origin, get_args, Union, Annotated, Literal, Sequence, List, Set, TypeAliasType, \
OrderedDict OrderedDict, Dict
import jinja2 import jinja2
@@ -21,6 +23,7 @@ class TypeRenderData:
typeref: list[TypeRenderData] | TypeRenderData | str typeref: list[TypeRenderData] | TypeRenderData | str
allow_none: bool allow_none: bool
is_list: bool is_list: bool
is_dict: bool
is_union: bool is_union: bool
is_strict: bool is_strict: bool
@@ -29,6 +32,14 @@ class TypeRenderData:
class IterableTypeRenderData(TypeRenderData): class IterableTypeRenderData(TypeRenderData):
iterable_type: str iterable_type: str
is_list = True is_list = True
is_dict = False
is_union = False
@dataclass
class DictTypeRenderData(TypeRenderData):
is_list = False
is_dict = True
is_union = False is_union = False
@@ -151,21 +162,24 @@ class SchemaInflatorGenerator:
allow_none = False allow_none = False
argtypes = t, argtypes = t,
if any(map(lambda x: type_origin is x, [Union, UnionType, Optional, Annotated, list, List, set, Set])): if any(map(lambda x: type_origin is x,
[Union, UnionType, Optional, Annotated, list, List, set, Set, dict, Dict])):
argtypes = get_args(t) argtypes = get_args(t)
if NoneType in argtypes or None in argtypes: if NoneType in argtypes or None in argtypes:
argtypes = tuple(filter(lambda x: x is not None and x is not NoneType, argtypes)) argtypes = tuple(filter(lambda x: x is not None and x is not NoneType, argtypes))
allow_none = True allow_none = True
is_union = len(argtypes) > 1 if type_origin in [dict, Dict]:
k = self._unwrap_typeref(argtypes[0], strict_mode)
if is_union: v = self._unwrap_typeref(argtypes[1], strict_mode)
return DictTypeRenderData([k, v], allow_none, False, True, False, False)
elif len(argtypes) > 1:
typerefs = list(map(lambda x: self._unwrap_typeref(x, strict_mode), argtypes)) typerefs = list(map(lambda x: self._unwrap_typeref(x, strict_mode), argtypes))
return TypeRenderData(typerefs, allow_none, False, True, False) return TypeRenderData(typerefs, allow_none, False, False, True, False)
elif type_origin in [list, set]: elif type_origin in [list, set, List, Set]:
rd = self._unwrap_typeref(argtypes[0], strict_mode) rd = self._unwrap_typeref(argtypes[0], strict_mode)
return IterableTypeRenderData(rd, allow_none, True, False, False, type_origin.__name__) return IterableTypeRenderData(rd, allow_none, True, False, False, False, type_origin.__name__)
else: else:
t = argtypes[0] t = argtypes[0]
@@ -178,6 +192,7 @@ class SchemaInflatorGenerator:
allow_none, allow_none,
is_list, is_list,
False, False,
False,
strict_mode if is_builtin else False) strict_mode if is_builtin else False)
def _schema_to_inflator(self, def _schema_to_inflator(self,
@@ -193,6 +208,11 @@ class SchemaInflatorGenerator:
else: else:
strict_mode = self._strict_mode strict_mode = self._strict_mode
if _namespace is None:
namespace = {}
else:
namespace = _namespace
template = self.object_template template = self.object_template
mode = 'object' mode = 'object'
if isinstance(schema, dict): if isinstance(schema, dict):
@@ -201,6 +221,11 @@ class SchemaInflatorGenerator:
new_schema.append((argname, argtype)) new_schema.append((argname, argtype))
schema = new_schema schema = new_schema
if isinstance(schema, EnumType):
if not is_builtin_type(schema):
namespace[f'inflate_{schema.__name__}'] = schema
return '\n', namespace
if isinstance(schema, collections.abc.Iterable): if isinstance(schema, collections.abc.Iterable):
template = self.tuple_template template = self.tuple_template
mode = 'tuple' mode = 'tuple'
@@ -225,17 +250,13 @@ class SchemaInflatorGenerator:
txt_segments = [] txt_segments = []
if _namespace is None:
namespace = {}
else:
namespace = _namespace
if namespace.get(f'{_funcname}_tgt_type') is not None: if namespace.get(f'{_funcname}_tgt_type') is not None:
return '', namespace return '', namespace
if mode == 'object': if mode == 'object':
namespace[f'{_funcname}_tgt_type'] = schema if not is_builtin_type(schema):
namespace[utils.typename(schema)] = schema namespace[f'{_funcname}_tgt_type'] = schema
namespace[utils.typename(schema)] = schema
if from_type_override is not None: if from_type_override is not None:
namespace['_from_type'] = from_type_override namespace['_from_type'] = from_type_override
@@ -253,8 +274,8 @@ class SchemaInflatorGenerator:
while get_origin(argtype) is not None: while get_origin(argtype) is not None:
type_origin = get_origin(argtype) type_origin = get_origin(argtype)
if any(map(lambda x: type_origin is x,
if any(map(lambda x: type_origin is x, [Union, UnionType, Optional, Annotated, list, List, set, Set])): [Union, UnionType, Optional, Annotated, list, List, set, Set, dict, Dict])):
argtypes = get_args(argtype) argtypes = get_args(argtype)
if len(argtypes) == 1: if len(argtypes) == 1:
argtype = argtypes[0] argtype = argtypes[0]
@@ -292,12 +313,14 @@ class SchemaInflatorGenerator:
elif argt is schema: elif argt is schema:
pass pass
else: else:
namespace[argt.__name__] = argt if not is_builtin_type(argt):
namespace[argt.__name__] = argt
convertor_functext = template.render( convertor_functext = template.render(
funcname=_funcname, funcname=_funcname,
conversions=render_data, conversions=render_data,
tgt_type=utils.typename(schema), tgt_type=utils.typename(schema),
is_opaque=len(type_hints) == 0,
from_type='_from_type' if from_type_override is not None else None from_type='_from_type' if from_type_override is not None else None
) )

View File

@@ -8,6 +8,13 @@
{{out}} {{out}}
{%- endmacro %} {%- endmacro %}
{% macro render_unwrap_enum(unwrapping, from_container, into_container) -%}
{%- set out -%}
{{ into_container }} = {{ from_container }}.value
{%- endset %}
{{out}}
{%- endmacro %}
{% macro render_unwrap_dict(unwrapping, from_container, into_container) -%} {% macro render_unwrap_dict(unwrapping, from_container, into_container) -%}
{%- set out -%} {%- set out -%}
{{ into_container }} = {} {{ into_container }} = {}
@@ -88,6 +95,8 @@ else:
{{ render_unwrap_list(unwrapping, from_container, into_container) }} {{ render_unwrap_list(unwrapping, from_container, into_container) }}
{% elif unwrapping.kind == 'object' %} {% elif unwrapping.kind == 'object' %}
{{ render_unwrap_object(unwrapping, from_container, into_container) }} {{ render_unwrap_object(unwrapping, from_container, into_container) }}
{% elif unwrapping.kind == 'enum' %}
{{ render_unwrap_enum(unwrapping, from_container, into_container) }}
{% elif unwrapping.kind == 'union' %} {% elif unwrapping.kind == 'union' %}
{{ render_unwrap_union(unwrapping, from_container, into_container) }} {{ render_unwrap_union(unwrapping, from_container, into_container) }}
{% elif unwrapping.kind == 'fn' %} {% elif unwrapping.kind == 'fn' %}

View File

@@ -6,6 +6,7 @@ def {{funcname}}(from_data: {% if from_type is none %}dict[str, Any]{% else %}{{
""" """
{{tgt_type}} {{tgt_type}}
""" """
{% if not is_opaque %}
from_data_keys = from_data.keys() from_data_keys = from_data.keys()
{% for conv in conversions %} {% for conv in conversions %}
@@ -29,4 +30,9 @@ def {{funcname}}(from_data: {% if from_type is none %}dict[str, Any]{% else %}{{
{% endfor %} {% endfor %}
return {{funcname}}_tgt_type({% for conv in conversions %}{{conv.argname_escaped}}={{conv.argname_escaped}}, {% endfor %}) return {{funcname}}_tgt_type({% for conv in conversions %}{{conv.argname_escaped}}={{conv.argname_escaped}}, {% endfor %})
{% else %}
return {{funcname}}_tgt_type(from_data)
{% endif %}

View File

@@ -3,8 +3,8 @@
{{argname}} = [] {{argname}} = []
if not isinstance({{conv_data}}, list): if not isinstance({{conv_data}}, list):
raise FieldValidationException('{{argname}}', "list", conv_data, []) raise FieldValidationException('{{argname}}', "list", conv_data, [])
for item in {{conv_data}}: for {{"_" + argname + "_item"}} in {{conv_data}}:
{{ render_segment("_" + argname, typedef, "item", false ) | indent(4) }} {{ render_segment("_" + argname, typedef, "_" + argname + "_item", false ) | indent(4) }}
{{argname}}.append(_{{argname}}) {{argname}}.append(_{{argname}})
{%- endset %} {%- endset %}
{{out}} {{out}}
@@ -39,12 +39,21 @@ if not isinstance({{conv_data}}, {{typeref}}):
{{argname}} = {{typeref}}({{conv_data}}) {{argname}} = {{typeref}}({{conv_data}})
{% elif typeref.is_union %} {% elif typeref.is_union %}
# union typeref
{{render_union(argname, typeref, conv_data)}} {{render_union(argname, typeref, conv_data)}}
{% elif typeref.is_list %} {% elif typeref.is_list %}
# list typeref
{{render_iterable(argname, typeref.typeref, conv_data)}} {{render_iterable(argname, typeref.typeref, conv_data)}}
{{argname}} = {{typeref.iterable_type}}({{argname}}) {{argname}} = {{typeref.iterable_type}}({{argname}})
{% elif typeref.is_dict %}
# dict typeref
{{render_iterable(argname + "_k", typeref.typeref[0], "list(" + conv_data + ".keys())")}}
{{render_iterable(argname + "_v", typeref.typeref[1], "list(" + conv_data + ".values())")}}
{{argname}} = dict(zip({{argname}}_k,{{argname}}_v))
{% else %} {% else %}
{{render_segment(argname, typeref.typeref, conv_data, typeref.is_strict)}} {{render_segment(argname, typeref.typeref, conv_data, typeref.is_strict)}}

View File

@@ -1,7 +1,12 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import json
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Optional, get_type_hints
from megasniff import SchemaDeflatorGenerator
from src.megasniff import SchemaInflatorGenerator from src.megasniff import SchemaInflatorGenerator
@@ -60,3 +65,82 @@ def test_optional():
fn = infl.schema_to_inflator(C) fn = infl.schema_to_inflator(C)
c = fn({}) c = fn({})
assert c.a is None assert c.a is None
def test_uuid():
@dataclass
class K:
a: uuid.UUID
b: list[uuid.UUID]
c: Optional[uuid.UUID]
d: dict[uuid.UUID, uuid.UUID]
infl = SchemaInflatorGenerator(store_sources=True)
defl = SchemaDeflatorGenerator(store_sources=True)
infl_fn = infl.schema_to_inflator(K)
defl_fn = defl.schema_to_deflator(K)
okd = {
'a': str(uuid.uuid4()),
'b': [str(uuid.uuid4()), str(uuid.uuid4()), str(uuid.uuid4())],
'c': None,
'd': {str(uuid.uuid4()): str(uuid.uuid4()), str(uuid.uuid4()): str(uuid.uuid4())}
}
k = infl_fn(okd)
kd = defl_fn(k)
assert isinstance(kd['a'], str)
assert isinstance(kd['b'], list)
assert len(kd['b']) == 3
assert isinstance(kd['b'][0], str)
assert isinstance(kd['b'][1], str)
assert isinstance(kd['b'][2], str)
assert kd['c'] is None
assert isinstance(kd['d'], dict)
assert len(kd['d']) == 2
assert all(map(lambda x: isinstance(x, str), kd['d'].keys()))
assert all(map(lambda x: isinstance(x, str), kd['d'].values()))
assert isinstance(k.a, uuid.UUID)
assert isinstance(k.b[0], uuid.UUID)
assert isinstance(k.b[1], uuid.UUID)
assert isinstance(k.b[2], uuid.UUID)
assert k.c is None
assert isinstance(k.d, dict)
assert len(k.d) == 2
assert all(map(lambda x: isinstance(x, uuid.UUID), k.d.keys()))
assert all(map(lambda x: isinstance(x, uuid.UUID), k.d.values()))
class AEnum(Enum):
a = 'a'
b = 'b'
c = 42
e1 = {'a': 'b'}
e2 = ['a', 'b']
@dataclass
class Z:
a: Optional[AEnum] = None
def test_enum():
infl = SchemaInflatorGenerator()
defl = SchemaDeflatorGenerator()
infl_fn = infl.schema_to_inflator(Z)
defl_fn = defl.schema_to_deflator(Z)
for it in AEnum:
ref = {'a': it.value}
ref_str = json.dumps(ref)
z = infl_fn(json.loads(ref_str))
assert z.a is not None
assert z.a.value == it.value
assert z.a.name == it.name
zdict = defl_fn(z)
assert len(zdict) == 1
assert zdict['a'] == it.value
assert json.dumps(zdict) == ref_str
assert infl_fn(zdict) == z

99
tests/test_dicts.py Normal file
View File

@@ -0,0 +1,99 @@
from __future__ import annotations
import json
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Optional, get_type_hints, Dict
from megasniff import SchemaDeflatorGenerator
from src.megasniff import SchemaInflatorGenerator
def test_dicts():
@dataclass()
class A:
a: dict[str, int]
b: Dict[int, int]
c: dict[str, list[int]]
d: dict[str, dict[str, int | str]]
infl = SchemaInflatorGenerator(store_sources=True)
defl = SchemaDeflatorGenerator(store_sources=True)
infl_fn = infl.schema_to_inflator(A)
defl_fn = defl.schema_to_deflator(A)
a = infl_fn({
'a': {
1: '42',
2: '123',
'asdf': 42
},
'b': {
1: 1,
'2': '2',
'3': 3,
4: '4'
},
'c': {
'a': [1, 2, 3, '4']
},
'd': {
'a': {
'a': 1,
'b': '1',
'c': 'asdf'
}
}
})
assert a.a['1'] == 42
assert a.a['2'] == 123
assert a.a['asdf'] == 42
assert a.b[1] == 1
assert a.b[2] == 2
assert a.b[3] == 3
assert a.b[4] == 4
assert a.c['a'][0] == 1
assert a.c['a'][1] == 2
assert a.c['a'][2] == 3
assert a.c['a'][3] == 4
assert a.d['a']['a'] == 1
assert a.d['a']['b'] == 1
assert a.d['a']['c'] == 'asdf'
def test_uuid_dicts():
@dataclass()
class A:
a: dict[uuid.UUID, uuid.UUID]
infl = SchemaInflatorGenerator(store_sources=True)
defl = SchemaDeflatorGenerator(store_sources=True)
infl_fn = infl.schema_to_inflator(A)
defl_fn = defl.schema_to_deflator(A)
uuids = [uuid.uuid4() for _ in range(32)]
a = infl_fn({
'a': {
str(uuids[0]): str(uuids[0]),
str(uuids[1]): str(uuids[2]),
str(uuids[3]): str(uuids[4]),
}
})
assert a.a[uuids[0]] == uuids[0]
assert a.a[uuids[1]] == uuids[2]
assert a.a[uuids[3]] == uuids[4]
ad = json.loads(json.dumps(defl_fn(a), default=str))
assert ad['a'][str(uuids[0])] == str(uuids[0])
assert ad['a'][str(uuids[1])] == str(uuids[2])
assert ad['a'][str(uuids[3])] == str(uuids[4])

4
uv.lock generated
View File

@@ -1,5 +1,5 @@
version = 1 version = 1
revision = 2 revision = 3
requires-python = ">=3.13" requires-python = ">=3.13"
[[package]] [[package]]
@@ -108,7 +108,7 @@ wheels = [
[[package]] [[package]]
name = "megasniff" name = "megasniff"
version = "0.1.2" version = "0.2.8"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "hatchling" }, { name = "hatchling" },