Compare commits

..

1 Commits

Author SHA1 Message Date
0795a5f8bb Add basic inflator cython support 2025-08-26 17:34:14 +03:00
12 changed files with 345 additions and 739 deletions

View File

@@ -8,8 +8,10 @@ authors = [
license = "LGPL-3.0-or-later"
requires-python = ">=3.13"
dependencies = [
"cython>=3.1.3",
"hatchling>=1.27.0",
"jinja2>=3.1.6",
"setuptools>=80.9.0",
]
[build-system]

View File

@@ -1,2 +1 @@
from .inflator import SchemaInflatorGenerator
from .deflator import SchemaDeflatorGenerator

View File

@@ -1,23 +1,28 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from types import NoneType
from typing import Optional
from typing import TypedDict
import megasniff.exceptions
from megasniff.deflator import SchemaDeflatorGenerator, JsonObject
from megasniff.python_to_cython import python_obj_to_cython
from . import SchemaInflatorGenerator
@dataclass(frozen=True)
@dataclass
class ASchema:
a: int
a: int | None
b: float | str
bs: Optional[BSchema]
d: int
c: float = 1.1
def __init__(self, a: int | None, b: float | str, bs: Optional[BSchema], c: float = 1.1):
self.a = a
self.b = b
self.bs = bs
self.c = c
self.d = a or 0
@dataclass
class BSchema:
@@ -31,10 +36,23 @@ class BSchema:
class CSchema:
l: set[int | ASchema]
@dataclass
class SomeData:
a: int
b: float
c: str
def main():
# ccode = python_obj_to_cython(ASchema)
# print(ccode)
# exit(0)
# infl = SchemaInflatorGenerator(strict_mode=True)
# fn = infl.schema_to_inflator(SomeData)
# print(fn({'a': 1, 'b': 1.1, 'c': 'asdf'}))
infl = SchemaInflatorGenerator(strict_mode=True)
fn = infl.schema_to_inflator(ASchema)
# exit(0)
# print(t)
# print(n)
# exec(t, n)
@@ -44,7 +62,8 @@ def main():
# d = {'a': 1, 'b': 1, 'c': 0, 'bs': {'a': 1, 'b': 2, 'c': 3, 'd': {'a': 1, 'b': 2.1, 'bs': None}}}
# d = {'a': 2, 'b': 2, 'bs': {'a': 2, 'b': 'a', 'c': 0, 'd': {'a': 2, 'b': 2}}}
# d = {'l': ['1', {'a': 42, 'b': 1}]}
d = {'a': 2, 'b': '2', 'bs': None}
# d = {'a': None, 'b': '2', 'bs': None}
d = {'a': None, 'b': '2', 'bs': {'a': 1, 'b': 'a', 'c': 1.1, 'd': {'a': 1, 'b': '', 'bs': None}}}
try:
o = fn(d)
print(o)
@@ -59,40 +78,5 @@ def main():
print(e)
@dataclass
class DSchema:
a: dict
b: dict[str, int | float | dict]
c: str | float | ASchema
d: ESchema
@dataclass
class ESchema:
a: list[list[list[str]]]
b: str | int
@dataclass
class ZSchema:
z: ZSchema | None
d: ZSchema | int
def main_deflator():
deflator = SchemaDeflatorGenerator(store_sources=True, explicit_casts=True, strict_mode=True)
fn = deflator.schema_to_deflator(DSchema)
print(getattr(fn, '__megasniff_sources__', '## No data'))
# ret = fn(ZSchema(ZSchema(ZSchema(None, 42), 42), ZSchema(None, 42)))
ret = fn(DSchema({'a': 34}, {}, ASchema(1, 'a', None), ESchema([[['a'], ['b']]], ['b'])))
# assert ret['a'] == 1
# assert ret['b'] == 1.1
# assert ret['c'] == 'a'
# assert ret['d']['a'][0][0][0] == 'a'
# assert ret['d']['b'] == 'b'
print(json.dumps(ret, indent=4))
pass
if __name__ == '__main__':
main_deflator()
main()

View File

@@ -1,307 +0,0 @@
# Copyright (C) 2025 Shevchenko A
# SPDX-License-Identifier: LGPL-3.0-or-later
from __future__ import annotations
import importlib.resources
import typing
from collections.abc import Callable
from dataclasses import dataclass
from types import NoneType, UnionType
from typing import get_args, Union, Annotated, Sequence, TypeAliasType, \
OrderedDict, TypeAlias
import jinja2
from .utils import *
JsonObject: TypeAlias = Union[None, bool, int, float, str, list['JsonObject'], dict[str, 'JsonObject']]
class Unwrapping:
kind: str
class OtherUnwrapping(Unwrapping):
tp: str
def __init__(self, tp: str = ''):
self.kind = 'other'
self.tp = tp
@dataclass()
class ObjectFieldUnwrapping:
key: str
object_key: str
unwrapping: Unwrapping
class ObjectUnwrapping(Unwrapping):
fields: list[ObjectFieldUnwrapping]
def __init__(self, fields: list[ObjectFieldUnwrapping]):
self.kind = 'object'
self.fields = fields
class ListUnwrapping(Unwrapping):
item_unwrap: Unwrapping
def __init__(self, item_unwrap: Unwrapping):
self.kind = 'list'
self.item_unwrap = item_unwrap
class DictUnwrapping(Unwrapping):
key_unwrap: Unwrapping
value_unwrap: Unwrapping
def __init__(self, key_unwrap: Unwrapping, value_unwrap: Unwrapping):
self.kind = 'dict'
self.key_unwrap = key_unwrap
self.value_unwrap = value_unwrap
class FuncUnwrapping(Unwrapping):
fn: str
def __init__(self, fn: str):
self.kind = 'fn'
self.fn = fn
@dataclass
class UnionKindUnwrapping:
kind: str
unwrapping: Unwrapping
class UnionUnwrapping(Unwrapping):
union_kinds: list[UnionKindUnwrapping]
def __init__(self, union_kinds: list[UnionKindUnwrapping]):
self.kind = 'union'
self.union_kinds = union_kinds
def _flatten_type(t: type | TypeAliasType) -> tuple[type, Optional[str]]:
if isinstance(t, TypeAliasType):
return _flatten_type(t.__value__)
origin = get_origin(t)
if origin is Annotated:
args = get_args(t)
return _flatten_type(args[0])[0], args[1]
return t, None
def _schema_to_deflator_func(t: type | TypeAliasType) -> str:
t, _ = _flatten_type(t)
return 'deflate_' + typename(t).replace('.', '_')
def _fallback_unwrapper(obj: Any) -> JsonObject:
if isinstance(obj, (int, float, str, bool)):
return obj
elif isinstance(obj, list):
return list(map(_fallback_unwrapper, obj))
elif isinstance(obj, dict):
return dict(map(lambda x: (_fallback_unwrapper(x[0]), _fallback_unwrapper(x[1])), obj.items()))
elif hasattr(obj, '__dict__'):
ret = {}
for k, v in obj.__dict__:
if isinstance(k, str) and k.startswith('_'):
continue
k = _fallback_unwrapper(k)
v = _fallback_unwrapper(v)
ret[k] = v
return ret
return None
class SchemaDeflatorGenerator:
templateLoader: jinja2.BaseLoader
templateEnv: jinja2.Environment
object_template: jinja2.Template
_store_sources: bool
_strict_mode: bool
_explicit_casts: bool
def __init__(self,
loader: Optional[jinja2.BaseLoader] = None,
strict_mode: bool = False,
explicit_casts: bool = False,
store_sources: bool = False,
*,
object_template_filename: str = 'deflator.jinja2',
):
self._strict_mode = strict_mode
self._store_sources = store_sources
self._explicit_casts = explicit_casts
if loader is None:
template_path = importlib.resources.files('megasniff.templates')
loader = jinja2.FileSystemLoader(str(template_path))
self.templateLoader = loader
self.templateEnv = jinja2.Environment(loader=self.templateLoader)
self.object_template = self.templateEnv.get_template(object_template_filename)
def schema_to_deflator(self,
schema: type,
strict_mode_override: Optional[bool] = None,
explicit_casts_override: Optional[bool] = None,
) -> Callable[[Any], dict[str, Any]]:
txt, namespace = self._schema_to_deflator(schema,
strict_mode_override=strict_mode_override,
explicit_casts_override=explicit_casts_override,
)
imports = ('from typing import Any\n'
'from megasniff.exceptions import MissingFieldException, FieldValidationException\n')
txt = imports + '\n' + txt
exec(txt, namespace)
fn = namespace[_schema_to_deflator_func(schema)]
if self._store_sources:
setattr(fn, '__megasniff_sources__', txt)
return fn
def schema_to_unwrapper(self, schema: type | TypeAliasType, *, _visited_types: Optional[list[type]] = None):
if _visited_types is None:
_visited_types = []
else:
_visited_types = _visited_types.copy()
schema, field_rename = _flatten_type(schema)
if schema in _visited_types:
return FuncUnwrapping(_schema_to_deflator_func(schema)), field_rename, set(), {schema}
_visited_types.append(schema)
ongoing_types = set()
recurcive_types = set()
origin = get_origin(schema)
ret_unw = None
if origin is not None:
if origin is list:
args = get_args(schema)
item_unw, arg_rename, ongoings, item_rec = self.schema_to_unwrapper(args[0],
_visited_types=_visited_types)
ret_unw = ListUnwrapping(item_unw)
recurcive_types |= item_rec
ongoing_types |= ongoings
elif origin is dict:
args = get_args(schema)
if len(args) != 2:
ret_unw = OtherUnwrapping()
else:
k, v = args
k_unw, _, k_ongoings, k_rec = self.schema_to_unwrapper(k, _visited_types=_visited_types)
v_unw, _, v_ongoings, v_rec = self.schema_to_unwrapper(k, _visited_types=_visited_types)
ongoing_types |= k_ongoings | v_ongoings
recurcive_types |= k_rec | v_rec
ret_unw = DictUnwrapping(k_unw, v_unw)
elif origin is UnionType or origin is Union:
args = get_args(schema)
union_unwraps = []
for targ in args:
arg_unw, arg_rename, ongoings, arg_rec = self.schema_to_unwrapper(targ,
_visited_types=_visited_types)
union_unwraps.append(UnionKindUnwrapping(typename(targ), arg_unw))
ongoing_types |= ongoings
recurcive_types |= arg_rec
ret_unw = UnionUnwrapping(union_unwraps)
else:
raise NotImplementedError
else:
if schema is int:
ret_unw = OtherUnwrapping('int')
elif schema is float:
ret_unw = OtherUnwrapping('float')
elif schema is bool:
ret_unw = OtherUnwrapping('bool')
elif schema is str:
ret_unw = OtherUnwrapping('str')
elif schema is None or schema is NoneType:
ret_unw = OtherUnwrapping()
elif schema is dict:
ret_unw = OtherUnwrapping()
elif schema is list:
ret_unw = OtherUnwrapping()
elif is_class_definition(schema):
hints = typing.get_type_hints(schema)
fields = []
for k, f in hints.items():
f_unw, f_rename, ongoings, f_rec = self.schema_to_unwrapper(f, _visited_types=_visited_types)
fields.append(ObjectFieldUnwrapping(f_rename or k, k, f_unw))
ongoing_types |= ongoings
recurcive_types |= f_rec
ret_unw = ObjectUnwrapping(fields)
else:
raise NotImplementedError()
return ret_unw, field_rename, set(_visited_types) | ongoing_types, recurcive_types
def _schema_to_deflator(self,
schema: type | Sequence[TupleSchemaItem | tuple[str, type]] | OrderedDict[str, type],
strict_mode_override: Optional[bool] = None,
explicit_casts_override: Optional[bool] = None,
into_type_override: Optional[type | TypeAliasType] = None,
*,
_funcname='deflate',
_namespace=None,
) -> tuple[str, dict]:
if strict_mode_override is not None:
strict_mode = strict_mode_override
else:
strict_mode = self._strict_mode
if explicit_casts_override is not None:
explicit_casts = explicit_casts_override
else:
explicit_casts = self._explicit_casts
template = self.object_template
types_for_namespace = set()
recursive_types = {schema}
namespace = {
'JsonObject': JsonObject,
'fallback_unwrapper': _fallback_unwrapper,
}
convertor_functext = ''
added_types = set()
while len(recursive_types ^ (recursive_types & added_types)) > 0:
rec_t = list(recursive_types ^ (recursive_types & added_types))[0]
rec_unw, _, rec_t_namespace, rec_rec_t = self.schema_to_unwrapper(rec_t)
recursive_types |= rec_rec_t
types_for_namespace |= rec_t_namespace
rec_functext = template.render(
funcname=_schema_to_deflator_func(rec_t),
from_type=typename(rec_t),
into_type=None,
root_unwrap=rec_unw,
hashname=hashname,
strict_check=strict_mode,
explicit_cast=explicit_casts,
)
convertor_functext += '\n\n\n' + rec_functext
added_types.add(rec_t)
for t in types_for_namespace:
namespace[typename(t)] = t
convertor_functext = '\n'.join(list(filter(lambda x: len(x.strip()), convertor_functext.split('\n'))))
return convertor_functext, namespace

View File

@@ -2,11 +2,18 @@
# SPDX-License-Identifier: LGPL-3.0-or-later
from __future__ import annotations
import collections.abc
import hashlib
import importlib.resources
import importlib.util
import os
import subprocess
import sys
import tempfile
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass
from types import NoneType, UnionType
from pathlib import Path
from types import NoneType, UnionType, ModuleType
from typing import Optional, get_origin, get_args, Union, Annotated, Literal, Sequence, List, Set, TypeAliasType, \
OrderedDict
@@ -14,6 +21,7 @@ import jinja2
from . import utils
from .utils import *
import random, string
@dataclass
@@ -23,6 +31,7 @@ class TypeRenderData:
is_list: bool
is_union: bool
is_strict: bool
ctype: str
@dataclass
@@ -48,6 +57,7 @@ class FieldRenderData:
is_optional: bool
allow_none: bool
default_option: Optional[str]
ctype: str
def __init__(self,
argname: str,
@@ -55,7 +65,8 @@ class FieldRenderData:
typename: str,
is_optional: bool,
allow_none: bool,
default_option: Optional[str]):
default_option: Optional[str],
ctype: str):
self.argname = argname
self.constrs = constrs
self.typename = typename
@@ -63,6 +74,221 @@ class FieldRenderData:
self.allow_none = allow_none
self.default_option = default_option
self.argname_escaped = _escape_python_name(argname)
self.ctype = ctype
def randomword(length):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
def exec_cython(txt: str, namespace: dict, name: str):
"""
Drop-in замена exec(txt, namespace), но через cython.
Возвращает callable объект из namespace['inflator'].
"""
# генерируем уникальное имя для модуля
h = hashlib.sha256(txt.encode() + str(sorted(namespace.keys())).encode()).hexdigest()[:16]
modname = f"_cyexec_{h}"
build_dir = tempfile.mkdtemp(prefix="cyexec_")
pyx_file = os.path.join(build_dir, f"{modname}.pyx")
# соберём код для .pyx
# сначала экспортируем namespace
export_lines = []
for k, v in namespace.items():
if k not in {'int', 'float', 'str'}:
export_lines.append(f"{k} = __ns__['{k}']")
pyx_code = f"""
# cython: language_level=3
# cython: boundscheck=False, wraparound=False, nonecheck=False
# AUTO-GENERATED
# Вставляем runtime namespace
import builtins
__ns__ = builtins.__dict__['_cyexec_ns']
cdef class NullableInt:
cdef long value
cdef bint has
cpdef set(self, long value):
self.value = value
self.has = 1
cpdef unset(self):
self.has = 0
cdef class NullableDouble:
cdef double value
cdef bint has
cpdef set(self, double value):
self.value = value
self.has = 1
cpdef unset(self):
self.has = 0
{os.linesep.join(export_lines)}
# пользовательский код
{txt}
"""
# пишем файл
with open(pyx_file, "w") as f:
f.write(pyx_code)
# нужно сохранить namespace в builtins, чтобы cython его видел
import builtins
builtins._cyexec_ns = namespace
# компилируем через cythonize
setup_code = f"""
from setuptools import setup
from Cython.Build import cythonize
setup(
name="{modname}",
ext_modules=cythonize("{pyx_file}", compiler_directives={{"language_level": "3"}}),
script_args=["build_ext", "--inplace"],
)
"""
setup_file = os.path.join(build_dir, "setup.py")
with open(setup_file, "w") as f:
f.write(setup_code)
subprocess.check_call([sys.executable, setup_file, "build_ext", "--inplace"], cwd=build_dir)
# находим .so файл
for fn in os.listdir(build_dir):
if fn.startswith(modname) and fn.endswith((".so", ".pyd")):
so_path = os.path.join(build_dir, fn)
break
else:
raise RuntimeError("Cython build failed, no .so produced")
# импортим как модуль
spec = importlib.util.spec_from_file_location(modname, so_path)
mod = importlib.util.module_from_spec(spec)
sys.modules[modname] = mod
spec.loader.exec_module(mod)
# чистим временный namespace в builtins
del builtins._cyexec_ns
return getattr(mod, name)
@dataclass
class BasicTypeVariationTest:
index: int
basic_type: str
@dataclass
class ObjectTypeVariationTest:
index: int
fields_contains: list[tuple[str, TypeVariationTest | None]]
@dataclass
class TypeVariationTest:
types: list[TypeConstructionSchema]
basic_tests: list[BasicTypeVariationTest]
object_tests: list[ObjectTypeVariationTest]
@dataclass
class TypeConstructionSchema:
tp: type
allow_none: bool
kwargs: Optional[dict[str, tuple[str, list[TypeConstructionSchema] | type]]]
@property
def typed_key_pairs(self) -> set[str]:
ret = set()
for k, (_, v) in self.kwargs.items():
if isinstance(v, type):
ret.add(f'{k}:{v}')
else:
if not isinstance(v, list):
v = [v]
for _v in v:
ret.add(f'{k}:{_v.tp}')
return ret
class Z1:
a: int
b: int
class Z2:
c: int
d: int
class Z3:
e: int
f: int
class A:
a: int
b: Z1 | Z3
class B:
a: int
b: Z2
class C:
z1: Z1
z2: Z2
_ = A | B | C
def find_types_variations(types: list[TypeConstructionSchema]) -> TypeVariationTest:
basic_tests = []
object_tests = []
for i, tp in enumerate(types):
if tp.kwargs is None:
basic_tests.append(BasicTypeVariationTest(i, _type_to_ctype(tp.tp, tp.allow_none)))
for i, tp in enumerate(types):
if tp.kwargs is not None:
tp_keys = []
uniq_keys = set()
if i < len(types):
keys = set()
for t in types[i + 1:]:
keys |= set((t.kwargs or {}).keys())
obj_keys = set(tp.kwargs.keys())
uniq_keys = (obj_keys ^ keys) & obj_keys
if len(uniq_keys) > 0:
k = list(uniq_keys)[0]
object_tests.append(ObjectTypeVariationTest(i, [(k, None)]))
else:
pass
return TypeVariationTest(types, basic_tests, object_tests)
def _type_to_ctype(t: type, allow_none: bool) -> str:
if t is int:
return 'NullableInt' if allow_none else 'long'
if t is float:
return 'NullableFloat' if allow_none else 'double'
return 'object'
class SchemaInflatorGenerator:
@@ -100,18 +326,23 @@ class SchemaInflatorGenerator:
strict_mode_override: Optional[bool] = None,
from_type_override: Optional[type | TypeAliasType] = None
) -> Callable[[dict[str, Any]], Any]:
name = 'inflate'
if isinstance(schema, type):
name = f'inflate_{schema.__name__}'
if from_type_override is not None and '__getitem__' not in dir(from_type_override):
raise RuntimeError('from_type_override must provide __getitem__')
txt, namespace = self._schema_to_inflator(schema,
_funcname='inflate',
_funcname=name,
strict_mode_override=strict_mode_override,
from_type_override=from_type_override,
)
imports = ('from typing import Any\n'
'from megasniff.exceptions import MissingFieldException, FieldValidationException\n')
txt = imports + '\n' + txt
exec(txt, namespace)
fn = namespace['inflate']
fn = exec_cython(txt, namespace, name)
# fn = exec_numba(txt, namespace, func_name=name)
# exec(txt, namespace)
# fn = namespace[name]
if self._store_sources:
setattr(fn, '__megasniff_sources__', txt)
return fn
@@ -132,10 +363,11 @@ class SchemaInflatorGenerator:
if is_union:
typerefs = list(map(lambda x: self._unwrap_typeref(x, strict_mode), argtypes))
return TypeRenderData(typerefs, allow_none, False, True, False)
return TypeRenderData(typerefs, allow_none, False, True, False, 'object')
elif type_origin in [list, set]:
rd = self._unwrap_typeref(argtypes[0], strict_mode)
return IterableTypeRenderData(rd, allow_none, True, False, False, type_origin.__name__)
return IterableTypeRenderData(rd, allow_none, True, False, False, type_origin.__name__,
'NullableList' if allow_none else 'list')
else:
t = argtypes[0]
@@ -148,7 +380,8 @@ class SchemaInflatorGenerator:
allow_none,
is_list,
False,
strict_mode if is_builtin else False)
strict_mode if is_builtin else False,
_type_to_ctype(t, allow_none))
def _schema_to_inflator(self,
schema: type | Sequence[TupleSchemaItem | tuple[str, type]] | OrderedDict[str, type],
@@ -243,6 +476,7 @@ class SchemaInflatorGenerator:
has_default,
allow_none,
default_option if not isinstance(default_option, str) else f"'{default_option}'",
typeref.ctype
)
)

View File

@@ -1,110 +0,0 @@
{% macro render_unwrap_object(unwrapping, from_container, into_container) -%}
{%- set out -%}
{{ into_container }} = {}
{% for kv in unwrapping.fields %}
{{ render_unwrap(kv.unwrapping, from_container + '.' + kv.object_key, into_container + "['" + kv.key + "']") }}
{% endfor %}
{%- endset %}
{{out}}
{%- endmacro %}
{% macro render_unwrap_dict(unwrapping, from_container, into_container) -%}
{%- set out -%}
{{ into_container }} = {}
{% if strict_check %}
if not isinstance({{from_container}}, dict):
raise FieldValidationException('{{from_container.replace("'", "\\'")}}', 'dict', str(type({{from_container}})))
{% endif %}
{% if explicit_cast %}
{% set from_container = 'dict(' + from_container + ')' %}
{% endif %}
for k_{{hashname(unwrapping)}}, v_{{hashname(unwrapping)}} in {{from_container}}.items():
{{ render_unwrap(unwrapping.key_unwrap, 'k_' + hashname(unwrapping), 'k_' + hashname(unwrapping)) | indent(4) }}
{{ render_unwrap(unwrapping.value_unwrap, into_container + '[v_' + hashname(unwrapping) + ']', 'v_' + hashname(unwrapping)) | indent(4) }}
{%- endset %}
{{out}}
{%- endmacro %}
{% macro render_unwrap_list(unwrapping, from_container, into_container) -%}
{%- set out -%}
{{into_container}} = []
{% if strict_check %}
if not isinstance({{from_container}}, list):
raise FieldValidationException('{{from_container.replace("'", "\\'")}}', 'list', str(type({{from_container}})))
{% endif %}
{% if explicit_cast %}
{% set from_container = 'list(' + from_container + ')' %}
{% endif %}
for {{hashname(unwrapping)}} in {{from_container}}:
{{ render_unwrap(unwrapping.item_unwrap, hashname(unwrapping), hashname(unwrapping)+'_tmp_container') | indent(4) }}
{{into_container}}.append({{hashname(unwrapping)}}_tmp_container)
{%- endset %}
{{out}}
{%- endmacro %}
{% macro render_unwrap_other(unwrapping, from_container, into_container) -%}
{%- set out -%}
{% if unwrapping.tp != '' and strict_check %}
if not isinstance({{from_container}}, {{unwrapping.tp}}):
raise FieldValidationException('{{from_container.replace("'", "\\'")}}', '{{unwrapping.tp}}', str(type({{from_container}})))
{% endif %}
{% if unwrapping.tp != '' and explicit_cast %}
{{into_container}} = {{unwrapping.tp}}({{from_container}})
{% else %}
{{into_container}} = {{from_container}}
{% endif %}
{%- endset %}
{{out}}
{%- endmacro %}
{% macro render_unwrap_union(unwrapping, from_container, into_container) -%}
{%- set out -%}
{% for union_kind in unwrapping.union_kinds %}
{% if loop.index > 1 %}el{% endif %}if isinstance({{from_container}}, {{union_kind.kind}}):
{{render_unwrap(union_kind.unwrapping, from_container, into_container) | indent(4)}}
{% endfor %}
{% if strict_check %}
else:
raise FieldValidationException('{{from_container.replace("'", "\\'")}}', 'dict', str(type({{from_container}})))
{% elif explicit_cast %}
else:
{{render_unwrap(unwrapping.union_kinds[-1], from_container, into_container) | indent(4)}}
{% else %}
else:
{{into_container}} = fallback_unwrap({{from_container}})
{% endif %}
{%- endset %}
{{out}}
{%- endmacro %}
{% macro render_unwrap(unwrapping, from_container, into_container) -%}
{%- set out -%}
{% if unwrapping.kind == 'dict' %}
{{ render_unwrap_dict(unwrapping, from_container, into_container) }}
{% elif unwrapping.kind == 'list' %}
{{ render_unwrap_list(unwrapping, from_container, into_container) }}
{% elif unwrapping.kind == 'object' %}
{{ render_unwrap_object(unwrapping, from_container, into_container) }}
{% elif unwrapping.kind == 'union' %}
{{ render_unwrap_union(unwrapping, from_container, into_container) }}
{% elif unwrapping.kind == 'fn' %}
{{into_container}} = {{ unwrapping.fn }}({{from_container}})
{% else %}
{{ render_unwrap_other(unwrapping, from_container, into_container) }}
{% endif %}
{%- endset %}
{{out}}
{%- endmacro %}
def {{funcname}}(from_data{% if from_type is not none%}: {{from_type}}{%endif%}) -> {% if into_type is none %}JsonObject{%else%}{{into_type}}{%endif%}:
"""
{{from_type}} -> {{into_type}}
"""
{{ render_unwrap(root_unwrap, 'from_data', 'ret') | indent(4) }}
return ret

View File

@@ -2,16 +2,45 @@
{% import "unwrap_type_data.jinja2" as unwrap_type_data %}
def {{funcname}}(from_data: {% if from_type is none %}dict[str, Any]{% else %}{{from_type}}{% endif %}) {% if tgt_type is not none %} -> {{tgt_type}} {% endif %}:
{% macro render_setter(argname, argval) -%}
{%- set out -%}
{% if argname.startswith('Nullable') %}
{% if argval == 'None' %}
{{argname}}.unset()
{% else %}
{{argname}}.set({{argval}})
{%endif%}
{% else %}
{{argname}} = {{argval}}
{% endif %}
{%- endset %}
{{out}}
{%- endmacro %}
{% macro check_null(argname) -%}
{%- set out -%}
{% if argname.startswith('Nullable') %}
if {{argname}}.has:
{% else %}
if {{argname}} is None:
{% endif %}
{%- endset %}
{{out}}
{%- endmacro %}
cpdef object {{funcname}}(dict from_data):
"""
{{tgt_type}}
"""
from_data_keys = from_data.keys()
cdef object conv_data
{% for conv in conversions %}
cdef {{conv.argctype}} {{conv.argname_escaped}}
if '{{conv.argname}}' not in from_data_keys:
{% if conv.is_optional %}
{{conv.argname_escaped}} = {{conv.default_option}}
{{ render_setter(conv.argname_escaped, conv.default_option) | indent(4*2) }}
{% else %}
raise MissingFieldException('{{conv.argname}}', "{{conv.typename | replace('"', "'")}}")
{% endif %}
@@ -21,7 +50,7 @@ def {{funcname}}(from_data: {% if from_type is none %}dict[str, Any]{% else %}{{
{% if not conv.allow_none %}
raise FieldValidationException('{{conv.argname}}', "{{conv.typename | replace('"', "'")}}", conv_data)
{% else %}
{{conv.argname_escaped}} = None
{{ render_setter(conv.argname_escaped, 'None') | indent(4*3) }}
{% endif %}
else:

View File

@@ -71,11 +71,3 @@ def typename(tp: type) -> str:
if get_origin(tp) is None and hasattr(tp, '__name__'):
return tp.__name__
return str(tp)
def is_class_definition(obj):
return isinstance(obj, type) or inspect.isclass(obj)
def hashname(obj) -> str:
return '_' + str(hash(obj)).replace('-', '_')

View File

@@ -1,77 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
from megasniff.deflator import SchemaDeflatorGenerator
from src.megasniff import SchemaInflatorGenerator
def test_basic_deflator():
class A:
a: int
def __init__(self, a: int):
self.a = a
class B:
def __init__(self, b: int):
self.b = b
defl = SchemaDeflatorGenerator()
fn = defl.schema_to_deflator(A)
a = fn(A(42))
assert a['a'] == 42
fnb = defl.schema_to_deflator(B)
b = fnb(B(11))
assert len(b) == 0
def test_unions():
@dataclass
class A:
a: int | str
defl = SchemaDeflatorGenerator()
fn = defl.schema_to_deflator(A)
a = fn(A(42))
assert a['a'] == 42
a = fn(A('42'))
assert a['a'] == '42'
a = fn(A('42a'))
assert a['a'] == '42a'
@dataclass
class CircA:
b: CircB
@dataclass
class CircB:
a: CircA | None
def test_circular():
defl = SchemaDeflatorGenerator()
fn = defl.schema_to_deflator(CircA)
a = fn(CircA(CircB(CircA(CircB(None)))))
assert isinstance(a['b'], dict)
assert isinstance(a['b']['a'], dict)
assert a['b']['a']['b']['a'] is None
def test_optional():
@dataclass
class C:
a: Optional[int] = None
defl = SchemaDeflatorGenerator()
fn = defl.schema_to_deflator(C)
c = fn(C())
assert c['a'] is None
c = fn(C(123))
assert c['a'] == 123

View File

@@ -1,94 +0,0 @@
from dataclasses import dataclass
import pytest
from megasniff import SchemaDeflatorGenerator
from megasniff.exceptions import FieldValidationException
def test_global_explicit_casts_basic():
class A:
a: int
def __init__(self, a):
self.a = a
defl = SchemaDeflatorGenerator(explicit_casts=True)
fn = defl.schema_to_deflator(A)
a = fn(A(42))
assert a['a'] == 42
a = fn(A(42.0))
assert a['a'] == 42
a = fn(A('42'))
assert a['a'] == 42
with pytest.raises(TypeError):
fn(A(['42']))
def test_global_explicit_casts_basic_override():
class A:
a: int
def __init__(self, a):
self.a = a
defl = SchemaDeflatorGenerator(explicit_casts=False)
fn = defl.schema_to_deflator(A, explicit_casts_override=True)
a = fn(A(42))
assert a['a'] == 42
a = fn(A(42.0))
assert a['a'] == 42
a = fn(A('42'))
assert a['a'] == 42
with pytest.raises(TypeError):
fn(A(['42']))
def test_global_explicit_casts_list():
@dataclass
class A:
a: list[int]
defl = SchemaDeflatorGenerator(explicit_casts=True)
fn = defl.schema_to_deflator(A)
a = fn(A([42]))
assert a['a'] == [42]
a = fn(A([42.0, 42]))
assert len(a['a']) == 2
assert a['a'][0] == 42
assert a['a'][1] == 42
def test_global_explicit_casts_circular():
@dataclass
class A:
a: list[int]
@dataclass
class B:
b: list[A | int]
defl = SchemaDeflatorGenerator(explicit_casts=True)
fn = defl.schema_to_deflator(B)
b = fn(B([A([]), 42]))
assert len(b['b']) == 2
assert isinstance(b['b'][0], dict)
assert len(b['b'][0]['a']) == 0
assert isinstance(b['b'][1], int)
b = fn(B([42.0]))
assert b['b'][0] == 42
b = fn(B([A([1.1])]))
assert b['b'][0]['a'][0] == 1

View File

@@ -1,88 +0,0 @@
from dataclasses import dataclass
import pytest
from megasniff import SchemaDeflatorGenerator
from megasniff.exceptions import FieldValidationException
def test_global_strict_mode_basic():
class A:
a: int
def __init__(self, a):
self.a = a
defl = SchemaDeflatorGenerator(strict_mode=True)
fn = defl.schema_to_deflator(A)
a = fn(A(42))
assert a['a'] == 42
with pytest.raises(FieldValidationException):
fn(A(42.0))
with pytest.raises(FieldValidationException):
fn(A('42'))
with pytest.raises(FieldValidationException):
fn(A(['42']))
def test_global_strict_mode_basic_override():
class A:
a: int
def __init__(self, a):
self.a = a
defl = SchemaDeflatorGenerator(strict_mode=False)
fn = defl.schema_to_deflator(A, strict_mode_override=True)
a = fn(A(42))
assert a['a'] == 42
with pytest.raises(FieldValidationException):
fn(A(42.0))
with pytest.raises(FieldValidationException):
fn(A('42'))
with pytest.raises(FieldValidationException):
fn(A(['42']))
def test_global_strict_mode_list():
@dataclass
class A:
a: list[int]
defl = SchemaDeflatorGenerator(strict_mode=True)
fn = defl.schema_to_deflator(A)
a = fn(A([42]))
assert a['a'] == [42]
with pytest.raises(FieldValidationException):
fn(A([42.0, 42]))
def test_global_strict_mode_circular():
@dataclass
class A:
a: list[int]
@dataclass
class B:
b: list[A | int]
defl = SchemaDeflatorGenerator(strict_mode=True)
fn = defl.schema_to_deflator(B)
b = fn(B([A([]), 42]))
assert len(b['b']) == 2
assert isinstance(b['b'][0], dict)
assert len(b['b'][0]['a']) == 0
assert isinstance(b['b'][1], int)
with pytest.raises(FieldValidationException):
fn(B([42.0]))
with pytest.raises(FieldValidationException):
fn(B([A([1.1])]))

44
uv.lock generated
View File

@@ -42,6 +42,35 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/3c/38/bbe2e63902847cf79036ecc75550d0698af31c91c7575352eb25190d0fb3/coverage-7.9.2-py3-none-any.whl", hash = "sha256:e425cd5b00f6fc0ed7cdbd766c70be8baab4b7839e4d4fe5fac48581dd968ea4", size = 204005, upload-time = "2025-07-03T10:54:13.491Z" },
]
[[package]]
name = "cython"
version = "3.1.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/18/ab/915337fb39ab4f4539a313df38fc69938df3bf14141b90d61dfd5c2919de/cython-3.1.3.tar.gz", hash = "sha256:10ee785e42328924b78f75a74f66a813cb956b4a9bc91c44816d089d5934c089", size = 3186689, upload-time = "2025-08-13T06:19:13.619Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7f/93/0e5dfcc6215a6c2cae509d7e40f8fb197237ba5998c936e9c19692f8eedf/cython-3.1.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:9458d540ef0853ea4fc65b8a946587bd483ef7244b470b3d93424eb7b04edeb1", size = 2998232, upload-time = "2025-08-13T06:20:35.817Z" },
{ url = "https://files.pythonhosted.org/packages/6b/6c/01b22de45e3a9b86fbe4a18cd470146514209448cb4d3d3ba9c72390d45b/cython-3.1.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:32d1b22c3b231326e9f16480a7f508c6841bbf7d0615c2d6f489ebc72dd05205", size = 2830052, upload-time = "2025-08-13T06:20:37.71Z" },
{ url = "https://files.pythonhosted.org/packages/52/08/a7d4b91b144b4bd015e932303861061cd43221f737ecdc6e380a438f245f/cython-3.1.3-cp313-cp313-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:d4c7e0b8584b02a349952de7d7d47f89c97cbf3fee74962e89e3caa78139ec84", size = 3359478, upload-time = "2025-08-13T06:20:39.811Z" },
{ url = "https://files.pythonhosted.org/packages/f5/7d/b44ee735439ee73a88c6532536cfbc5b2f146c5f315effa124e85aadb447/cython-3.1.3-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c9178f0c06f4bc92372dc44e3867e9285bebd556953e47857c26b389aabe2828", size = 3155157, upload-time = "2025-08-13T06:20:42.305Z" },
{ url = "https://files.pythonhosted.org/packages/a8/e0/ef1a44ba765057b04e99cf34dcc1910706a666ea66fcd2b92175ab645416/cython-3.1.3-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:d4da2e624d381e9790152672bfc599a5fb4b823b99d82700a10f5db3311851f9", size = 3305331, upload-time = "2025-08-13T06:20:44.423Z" },
{ url = "https://files.pythonhosted.org/packages/62/f1/8bf3ea5babdef82df3023e72522c71bfc5cc5091e9710828a0dda81bda88/cython-3.1.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:680c9168882c5e8031dd31df199b9a5ee897e95136d15f8c6454b62162ede25e", size = 3171968, upload-time = "2025-08-13T06:20:48.962Z" },
{ url = "https://files.pythonhosted.org/packages/b5/c3/c1383f987d3add9cb8655943f6a0f164bfd06951f28e51b7887d12c8716a/cython-3.1.3-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:833cd0fdba9210d2f1f29e097579565a296d7ff567fd63e8cf5fde4c14339f4f", size = 3372840, upload-time = "2025-08-13T06:20:51.495Z" },
{ url = "https://files.pythonhosted.org/packages/71/d5/02fb7454756cb31b0c044050ee563ac172314aa8e74e5a4dd73bf77041d3/cython-3.1.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:c04367fa0e6c35b199eb51d64b5e185584b810f6c2b96726ce450300faf99686", size = 3317912, upload-time = "2025-08-13T06:20:53.461Z" },
{ url = "https://files.pythonhosted.org/packages/91/62/b96227adf45236952f7cf07f869ff4157b82fe25ff7bb5ba9a3037c98993/cython-3.1.3-cp313-cp313-win32.whl", hash = "sha256:f02ef2bf72a576bf541534c704971b8901616db431bc46d368eed1d6b20aaa1e", size = 2479889, upload-time = "2025-08-13T06:20:55.437Z" },
{ url = "https://files.pythonhosted.org/packages/74/09/100c0727d0fc8e4d7134c44c12b8c623e40f309401af56b7f6faf795c4bb/cython-3.1.3-cp313-cp313-win_amd64.whl", hash = "sha256:00264cafcc451dcefc01eaf29ed5ec150fb73af21d4d21105d97e9d829a53e99", size = 2701550, upload-time = "2025-08-13T06:20:57.503Z" },
{ url = "https://files.pythonhosted.org/packages/23/0e/6e535f2eedf0ddc3c84b087e5d0f04a7b88d8229ec8c27be41a142bcbbfa/cython-3.1.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:62b0a9514b68391aae9784405b65738bbe19cdead3dd7b90dd9e963281db1ee3", size = 2995613, upload-time = "2025-08-13T06:20:59.408Z" },
{ url = "https://files.pythonhosted.org/packages/77/10/3c9e2abf315f608bc22f49b6f9ee66859c23e07edbf484522d5f27b61ab7/cython-3.1.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:976db373c315f342dcb24cd65b5e4c08d2c7b42f9f6ac1b3f677eb2abc9bfb0f", size = 2841282, upload-time = "2025-08-13T06:21:01.274Z" },
{ url = "https://files.pythonhosted.org/packages/cd/77/04e39af308d5716640bc638e7d90d8be34277ebc642ea5bda5ac09628215/cython-3.1.3-cp314-cp314-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:e765c12a02dea0bd968cf1e85af77be1dc6d21909c3fbf5bd81815a7cdd4a65e", size = 3361624, upload-time = "2025-08-13T06:21:03.418Z" },
{ url = "https://files.pythonhosted.org/packages/75/f4/bdbc989ad88401e03ffe17e0bc3a03e3fe5dccbeb9c90e8762d7da4c7a45/cython-3.1.3-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:097374fa1370e9967e48442a41a0acbebb94fe9d63976cad31eacd38424847bf", size = 3194014, upload-time = "2025-08-13T06:21:05.719Z" },
{ url = "https://files.pythonhosted.org/packages/a2/c8/9f282e5d31280f3912199b638c71557062443608eb3909a562283eda376d/cython-3.1.3-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:29d8fda4d62b693e62992c665a688e3a220be70958c48eb4c2634093c9998156", size = 3309703, upload-time = "2025-08-13T06:21:08.026Z" },
{ url = "https://files.pythonhosted.org/packages/0a/09/83416a454a575e3ea7e84ec138f0b6dbfb34de28de4968359d7fdb428028/cython-3.1.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:da23fa5082940ae1eed487ee9b7c1da7015b53f9feffeee661f4ee57f696dcd5", size = 3210317, upload-time = "2025-08-13T06:21:10.92Z" },
{ url = "https://files.pythonhosted.org/packages/8f/dc/901ed74302d52105588c59a41a239ef6bd01ff708391a15938aba9670b9e/cython-3.1.3-cp314-cp314-musllinux_1_2_i686.whl", hash = "sha256:8880daa7a0ddf971593f24da161c976bc1bea895393fdfebb8e54269321d9d2b", size = 3378211, upload-time = "2025-08-13T06:21:13.067Z" },
{ url = "https://files.pythonhosted.org/packages/b7/6d/1e077b99a678b69a39bfe96e1888bcf6c868830220e635f862a44c7761b4/cython-3.1.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:20d6b5a9fc210d3bc2880413011f606e1208e12ee6efc74717445a63f9795af1", size = 3321051, upload-time = "2025-08-13T06:21:17.314Z" },
{ url = "https://files.pythonhosted.org/packages/00/cd/2c442e9e41eafa851d89af1f62720007e03a12e1c01d9a71ed75f550a6c5/cython-3.1.3-cp314-cp314-win32.whl", hash = "sha256:3b2243fed3eeb129dedf2cebbe3be0d9b02fbf3bc75b387aafd54aac3950baa6", size = 2502067, upload-time = "2025-08-13T06:21:19.404Z" },
{ url = "https://files.pythonhosted.org/packages/ae/63/7a1f2f06331f7dcf3fd31721fdaa8b60762748b82395631c0324672a4f2b/cython-3.1.3-cp314-cp314-win_amd64.whl", hash = "sha256:d32792c80b1fa8be9de207ec8844d49c4d1d0d60e5136d20f344729270db6490", size = 2733427, upload-time = "2025-08-13T06:21:21.525Z" },
{ url = "https://files.pythonhosted.org/packages/56/c8/46ac27096684f33e27dab749ef43c6b0119c6a0d852971eaefb73256dc4c/cython-3.1.3-py3-none-any.whl", hash = "sha256:d13025b34f72f77bf7f65c1cd628914763e6c285f4deb934314c922b91e6be5a", size = 1225725, upload-time = "2025-08-13T06:19:09.593Z" },
]
[[package]]
name = "hatchling"
version = "1.27.0"
@@ -108,11 +137,13 @@ wheels = [
[[package]]
name = "megasniff"
version = "0.1.2"
version = "0.2.3.post2"
source = { editable = "." }
dependencies = [
{ name = "cython" },
{ name = "hatchling" },
{ name = "jinja2" },
{ name = "setuptools" },
]
[package.dev-dependencies]
@@ -123,8 +154,10 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "cython", specifier = ">=3.1.3" },
{ name = "hatchling", specifier = ">=1.27.0" },
{ name = "jinja2", specifier = ">=3.1.6" },
{ name = "setuptools", specifier = ">=80.9.0" },
]
[package.metadata.requires-dev]
@@ -199,6 +232,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/bc/16/4ea354101abb1287856baa4af2732be351c7bee728065aed451b678153fd/pytest_cov-6.2.1-py3-none-any.whl", hash = "sha256:f5bc4c23f42f1cdd23c70b1dab1bbaef4fc505ba950d53e0081d0730dd7e86d5", size = 24644, upload-time = "2025-06-12T10:47:45.932Z" },
]
[[package]]
name = "setuptools"
version = "80.9.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/18/5d/3bf57dcd21979b887f014ea83c24ae194cfcd12b9e0fda66b957c69d1fca/setuptools-80.9.0.tar.gz", hash = "sha256:f36b47402ecde768dbfafc46e8e4207b4360c654f1f3bb84475f0a28628fb19c", size = 1319958, upload-time = "2025-05-27T00:56:51.443Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a3/dc/17031897dae0efacfea57dfd3a82fdd2a2aeb58e0ff71b77b87e44edc772/setuptools-80.9.0-py3-none-any.whl", hash = "sha256:062d34222ad13e0cc312a4c02d73f059e86a4acbfbdea8f8f76b28c99f306922", size = 1201486, upload-time = "2025-05-27T00:56:49.664Z" },
]
[[package]]
name = "trove-classifiers"
version = "2025.5.9.12"