Compare commits

4 Commits

Author SHA1 Message Date
c405797262 Add .coveragerc 2026-02-19 22:51:53 +03:00
9267f744d8 Dict inflator and deflator support 2026-02-19 22:39:56 +03:00
8e38c41aa5 Support Enum types 2026-02-18 20:10:23 +03:00
bc9da11db4 Add deflator "Unable to generate unwrapper for {schema}" message 2026-02-18 19:34:19 +03:00
11 changed files with 270 additions and 25 deletions

2
.coveragerc Normal file
View File

@@ -0,0 +1,2 @@
[run]
omit = src/megasniff/__main__.py

1
.gitignore vendored
View File

@@ -176,3 +176,4 @@ cython_debug/
.pypirc
.idea
.coverage

View File

@@ -1,6 +1,6 @@
[project]
name = "megasniff"
version = "0.2.6.post1"
version = "0.2.8"
description = "Library for in-time codegened type validation"
authors = [
{ name = "nikto_b", email = "niktob560@yandex.ru" }

View File

@@ -6,6 +6,7 @@ import importlib.resources
import typing
from collections.abc import Callable
from dataclasses import dataclass
from enum import EnumType
from types import NoneType, UnionType
from typing import get_args, Union, Annotated, Sequence, TypeAliasType, \
OrderedDict, TypeAlias
@@ -17,6 +18,7 @@ import uuid
from pathlib import Path
import tempfile
import importlib.util
import enum
JsonObject: TypeAlias = Union[None, bool, int, float, str, list['JsonObject'], dict[str, 'JsonObject']]
@@ -48,6 +50,11 @@ class ObjectUnwrapping(Unwrapping):
self.fields = fields
class EnumUnwrapping(Unwrapping):
def __init__(self, ):
self.kind = 'enum'
class ListUnwrapping(Unwrapping):
item_unwrap: Unwrapping
@@ -257,6 +264,7 @@ class SchemaDeflatorGenerator:
recurcive_types |= arg_rec
ret_unw = UnionUnwrapping(union_unwraps)
else:
print(f'Unable to generate unwrapper for {schema} -- origin {origin} is not supported')
raise NotImplementedError
else:
if schema is int:
@@ -273,6 +281,10 @@ class SchemaDeflatorGenerator:
ret_unw = OtherUnwrapping()
elif schema is list:
ret_unw = OtherUnwrapping()
elif isinstance(schema, EnumType):
ret_unw = EnumUnwrapping()
elif issubclass(schema, uuid.UUID):
ret_unw = FuncUnwrapping('str')
elif is_class_definition(schema):
hints = typing.get_type_hints(schema)
fields = []

View File

@@ -3,12 +3,14 @@
from __future__ import annotations
import collections.abc
import importlib.resources
import uuid
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass
from enum import EnumType
from types import NoneType, UnionType
from typing import Optional, get_origin, get_args, Union, Annotated, Literal, Sequence, List, Set, TypeAliasType, \
OrderedDict
OrderedDict, Dict
import jinja2
@@ -21,6 +23,7 @@ class TypeRenderData:
typeref: list[TypeRenderData] | TypeRenderData | str
allow_none: bool
is_list: bool
is_dict: bool
is_union: bool
is_strict: bool
@@ -29,6 +32,14 @@ class TypeRenderData:
class IterableTypeRenderData(TypeRenderData):
iterable_type: str
is_list = True
is_dict = False
is_union = False
@dataclass
class DictTypeRenderData(TypeRenderData):
is_list = False
is_dict = True
is_union = False
@@ -151,21 +162,24 @@ class SchemaInflatorGenerator:
allow_none = False
argtypes = t,
if any(map(lambda x: type_origin is x, [Union, UnionType, Optional, Annotated, list, List, set, Set])):
if any(map(lambda x: type_origin is x,
[Union, UnionType, Optional, Annotated, list, List, set, Set, dict, Dict])):
argtypes = get_args(t)
if NoneType in argtypes or None in argtypes:
argtypes = tuple(filter(lambda x: x is not None and x is not NoneType, argtypes))
allow_none = True
is_union = len(argtypes) > 1
if is_union:
if type_origin in [dict, Dict]:
k = self._unwrap_typeref(argtypes[0], strict_mode)
v = self._unwrap_typeref(argtypes[1], strict_mode)
return DictTypeRenderData([k, v], allow_none, False, True, False, False)
elif len(argtypes) > 1:
typerefs = list(map(lambda x: self._unwrap_typeref(x, strict_mode), argtypes))
return TypeRenderData(typerefs, allow_none, False, True, False)
elif type_origin in [list, set]:
return TypeRenderData(typerefs, allow_none, False, False, True, False)
elif type_origin in [list, set, List, Set]:
rd = self._unwrap_typeref(argtypes[0], strict_mode)
return IterableTypeRenderData(rd, allow_none, True, False, False, type_origin.__name__)
return IterableTypeRenderData(rd, allow_none, True, False, False, False, type_origin.__name__)
else:
t = argtypes[0]
@@ -178,6 +192,7 @@ class SchemaInflatorGenerator:
allow_none,
is_list,
False,
False,
strict_mode if is_builtin else False)
def _schema_to_inflator(self,
@@ -193,6 +208,11 @@ class SchemaInflatorGenerator:
else:
strict_mode = self._strict_mode
if _namespace is None:
namespace = {}
else:
namespace = _namespace
template = self.object_template
mode = 'object'
if isinstance(schema, dict):
@@ -201,6 +221,11 @@ class SchemaInflatorGenerator:
new_schema.append((argname, argtype))
schema = new_schema
if isinstance(schema, EnumType):
if not is_builtin_type(schema):
namespace[f'inflate_{schema.__name__}'] = schema
return '\n', namespace
if isinstance(schema, collections.abc.Iterable):
template = self.tuple_template
mode = 'tuple'
@@ -225,15 +250,11 @@ class SchemaInflatorGenerator:
txt_segments = []
if _namespace is None:
namespace = {}
else:
namespace = _namespace
if namespace.get(f'{_funcname}_tgt_type') is not None:
return '', namespace
if mode == 'object':
if not is_builtin_type(schema):
namespace[f'{_funcname}_tgt_type'] = schema
namespace[utils.typename(schema)] = schema
@@ -253,8 +274,8 @@ class SchemaInflatorGenerator:
while get_origin(argtype) is not None:
type_origin = get_origin(argtype)
if any(map(lambda x: type_origin is x, [Union, UnionType, Optional, Annotated, list, List, set, Set])):
if any(map(lambda x: type_origin is x,
[Union, UnionType, Optional, Annotated, list, List, set, Set, dict, Dict])):
argtypes = get_args(argtype)
if len(argtypes) == 1:
argtype = argtypes[0]
@@ -292,12 +313,14 @@ class SchemaInflatorGenerator:
elif argt is schema:
pass
else:
if not is_builtin_type(argt):
namespace[argt.__name__] = argt
convertor_functext = template.render(
funcname=_funcname,
conversions=render_data,
tgt_type=utils.typename(schema),
is_opaque=len(type_hints) == 0,
from_type='_from_type' if from_type_override is not None else None
)

View File

@@ -8,6 +8,13 @@
{{out}}
{%- endmacro %}
{% macro render_unwrap_enum(unwrapping, from_container, into_container) -%}
{%- set out -%}
{{ into_container }} = {{ from_container }}.value
{%- endset %}
{{out}}
{%- endmacro %}
{% macro render_unwrap_dict(unwrapping, from_container, into_container) -%}
{%- set out -%}
{{ into_container }} = {}
@@ -88,6 +95,8 @@ else:
{{ render_unwrap_list(unwrapping, from_container, into_container) }}
{% elif unwrapping.kind == 'object' %}
{{ render_unwrap_object(unwrapping, from_container, into_container) }}
{% elif unwrapping.kind == 'enum' %}
{{ render_unwrap_enum(unwrapping, from_container, into_container) }}
{% elif unwrapping.kind == 'union' %}
{{ render_unwrap_union(unwrapping, from_container, into_container) }}
{% elif unwrapping.kind == 'fn' %}

View File

@@ -6,6 +6,7 @@ def {{funcname}}(from_data: {% if from_type is none %}dict[str, Any]{% else %}{{
"""
{{tgt_type}}
"""
{% if not is_opaque %}
from_data_keys = from_data.keys()
{% for conv in conversions %}
@@ -29,4 +30,9 @@ def {{funcname}}(from_data: {% if from_type is none %}dict[str, Any]{% else %}{{
{% endfor %}
return {{funcname}}_tgt_type({% for conv in conversions %}{{conv.argname_escaped}}={{conv.argname_escaped}}, {% endfor %})
{% else %}
return {{funcname}}_tgt_type(from_data)
{% endif %}

View File

@@ -3,8 +3,8 @@
{{argname}} = []
if not isinstance({{conv_data}}, list):
raise FieldValidationException('{{argname}}', "list", conv_data, [])
for item in {{conv_data}}:
{{ render_segment("_" + argname, typedef, "item", false ) | indent(4) }}
for {{"_" + argname + "_item"}} in {{conv_data}}:
{{ render_segment("_" + argname, typedef, "_" + argname + "_item", false ) | indent(4) }}
{{argname}}.append(_{{argname}})
{%- endset %}
{{out}}
@@ -39,12 +39,21 @@ if not isinstance({{conv_data}}, {{typeref}}):
{{argname}} = {{typeref}}({{conv_data}})
{% elif typeref.is_union %}
# union typeref
{{render_union(argname, typeref, conv_data)}}
{% elif typeref.is_list %}
# list typeref
{{render_iterable(argname, typeref.typeref, conv_data)}}
{{argname}} = {{typeref.iterable_type}}({{argname}})
{% elif typeref.is_dict %}
# dict typeref
{{render_iterable(argname + "_k", typeref.typeref[0], "list(" + conv_data + ".keys())")}}
{{render_iterable(argname + "_v", typeref.typeref[1], "list(" + conv_data + ".values())")}}
{{argname}} = dict(zip({{argname}}_k,{{argname}}_v))
{% else %}
{{render_segment(argname, typeref.typeref, conv_data, typeref.is_strict)}}

View File

@@ -1,7 +1,12 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import json
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Optional, get_type_hints
from megasniff import SchemaDeflatorGenerator
from src.megasniff import SchemaInflatorGenerator
@@ -60,3 +65,82 @@ def test_optional():
fn = infl.schema_to_inflator(C)
c = fn({})
assert c.a is None
def test_uuid():
@dataclass
class K:
a: uuid.UUID
b: list[uuid.UUID]
c: Optional[uuid.UUID]
d: dict[uuid.UUID, uuid.UUID]
infl = SchemaInflatorGenerator(store_sources=True)
defl = SchemaDeflatorGenerator(store_sources=True)
infl_fn = infl.schema_to_inflator(K)
defl_fn = defl.schema_to_deflator(K)
okd = {
'a': str(uuid.uuid4()),
'b': [str(uuid.uuid4()), str(uuid.uuid4()), str(uuid.uuid4())],
'c': None,
'd': {str(uuid.uuid4()): str(uuid.uuid4()), str(uuid.uuid4()): str(uuid.uuid4())}
}
k = infl_fn(okd)
kd = defl_fn(k)
assert isinstance(kd['a'], str)
assert isinstance(kd['b'], list)
assert len(kd['b']) == 3
assert isinstance(kd['b'][0], str)
assert isinstance(kd['b'][1], str)
assert isinstance(kd['b'][2], str)
assert kd['c'] is None
assert isinstance(kd['d'], dict)
assert len(kd['d']) == 2
assert all(map(lambda x: isinstance(x, str), kd['d'].keys()))
assert all(map(lambda x: isinstance(x, str), kd['d'].values()))
assert isinstance(k.a, uuid.UUID)
assert isinstance(k.b[0], uuid.UUID)
assert isinstance(k.b[1], uuid.UUID)
assert isinstance(k.b[2], uuid.UUID)
assert k.c is None
assert isinstance(k.d, dict)
assert len(k.d) == 2
assert all(map(lambda x: isinstance(x, uuid.UUID), k.d.keys()))
assert all(map(lambda x: isinstance(x, uuid.UUID), k.d.values()))
class AEnum(Enum):
a = 'a'
b = 'b'
c = 42
e1 = {'a': 'b'}
e2 = ['a', 'b']
@dataclass
class Z:
a: Optional[AEnum] = None
def test_enum():
infl = SchemaInflatorGenerator()
defl = SchemaDeflatorGenerator()
infl_fn = infl.schema_to_inflator(Z)
defl_fn = defl.schema_to_deflator(Z)
for it in AEnum:
ref = {'a': it.value}
ref_str = json.dumps(ref)
z = infl_fn(json.loads(ref_str))
assert z.a is not None
assert z.a.value == it.value
assert z.a.name == it.name
zdict = defl_fn(z)
assert len(zdict) == 1
assert zdict['a'] == it.value
assert json.dumps(zdict) == ref_str
assert infl_fn(zdict) == z

99
tests/test_dicts.py Normal file
View File

@@ -0,0 +1,99 @@
from __future__ import annotations
import json
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Optional, get_type_hints, Dict
from megasniff import SchemaDeflatorGenerator
from src.megasniff import SchemaInflatorGenerator
def test_dicts():
@dataclass()
class A:
a: dict[str, int]
b: Dict[int, int]
c: dict[str, list[int]]
d: dict[str, dict[str, int | str]]
infl = SchemaInflatorGenerator(store_sources=True)
defl = SchemaDeflatorGenerator(store_sources=True)
infl_fn = infl.schema_to_inflator(A)
defl_fn = defl.schema_to_deflator(A)
a = infl_fn({
'a': {
1: '42',
2: '123',
'asdf': 42
},
'b': {
1: 1,
'2': '2',
'3': 3,
4: '4'
},
'c': {
'a': [1, 2, 3, '4']
},
'd': {
'a': {
'a': 1,
'b': '1',
'c': 'asdf'
}
}
})
assert a.a['1'] == 42
assert a.a['2'] == 123
assert a.a['asdf'] == 42
assert a.b[1] == 1
assert a.b[2] == 2
assert a.b[3] == 3
assert a.b[4] == 4
assert a.c['a'][0] == 1
assert a.c['a'][1] == 2
assert a.c['a'][2] == 3
assert a.c['a'][3] == 4
assert a.d['a']['a'] == 1
assert a.d['a']['b'] == 1
assert a.d['a']['c'] == 'asdf'
def test_uuid_dicts():
@dataclass()
class A:
a: dict[uuid.UUID, uuid.UUID]
infl = SchemaInflatorGenerator(store_sources=True)
defl = SchemaDeflatorGenerator(store_sources=True)
infl_fn = infl.schema_to_inflator(A)
defl_fn = defl.schema_to_deflator(A)
uuids = [uuid.uuid4() for _ in range(32)]
a = infl_fn({
'a': {
str(uuids[0]): str(uuids[0]),
str(uuids[1]): str(uuids[2]),
str(uuids[3]): str(uuids[4]),
}
})
assert a.a[uuids[0]] == uuids[0]
assert a.a[uuids[1]] == uuids[2]
assert a.a[uuids[3]] == uuids[4]
ad = json.loads(json.dumps(defl_fn(a), default=str))
assert ad['a'][str(uuids[0])] == str(uuids[0])
assert ad['a'][str(uuids[1])] == str(uuids[2])
assert ad['a'][str(uuids[3])] == str(uuids[4])

4
uv.lock generated
View File

@@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.13"
[[package]]
@@ -108,7 +108,7 @@ wheels = [
[[package]]
name = "megasniff"
version = "0.1.2"
version = "0.2.8"
source = { editable = "." }
dependencies = [
{ name = "hatchling" },