Create toggle for strict-mode inflate

This commit is contained in:
2025-07-14 16:54:34 +03:00
parent bc6acb099f
commit 9f54115160
7 changed files with 170 additions and 25 deletions

View File

@@ -1,12 +1,16 @@
# megasniff
### Автоматическая валидация данных по схеме и сборка объекта в одном флаконе
### Автоматическая валидация данных по схеме и сборка объекта в одном флаконе
#### Как применять:
```python
# 1. Объявляем схемы
from __future__ import annotations
import dataclasses
import typing
@dataclasses.dataclass
class SomeSchema1:
a: int
@@ -19,40 +23,81 @@ class SomeSchema2(typing.TypedDict):
field2: float
field3: typing.Optional[SomeSchema1]
# 2. Генерируем метод для валидации и сборки
import megasniff
infl = megasniff.SchemaInflatorGenerator()
fn = infl.schema_to_generator(SomeSchema1)
fn = infl.schema_to_inflator(SomeSchema1)
# 3. Проверяем что всё работает
fn({'a': 1, 'b': 2, 'c': {'field1': {}, 'field2': '1.1'}})
fn({'a': 1, 'b': 2, 'c': {'field1': {}, 'field2': '1.1', 'field3': None}})
# SomeSchema1(a=1, b=2.0, c={'field1': {}, 'field2': 1.1, 'field3': None})
```
Особенности работы:
Особенности работы:
- поддерживает циклические зависимости
- проверяем `Union`-типы через ретрай на выбросе исключения
- по умолчанию использует готовый щаблон для кодогенерации и исполняет его по запросу, требуется особое внимание к сохранности данного шаблона
- не проверяет типы списков, словарей, кортежей (реализация ожидается)
- проверяет `Union`-типы через ретрай на выбросе исключения
- по умолчанию использует готовый щаблон для кодогенерации и исполняет его по запросу, требуется особое внимание к
сохранности данного шаблона
- проверяет типы списков, может приводить списки к множествам
- не проверяет типы generic-словарей, кортежей (реализация ожидается)
- пользовательские проверки типов должны быть реализованы через наследование и проверки в конструкторе
- опциональный `strict-mode`: выключение приведения базовых типов
----
### Как установить:
#### [uv](https://docs.astral.sh/uv/concepts/projects/dependencies/#dependency-sources):
```bash
uv add megasniff --index sniff_index=https://git.nikto-b.ru/api/packages/nikto_b/pypi/simple
```
#### [poetry](https://python-poetry.org/docs/repositories/#private-repository-example):
1. Добавить репозиторий в `pyproject.toml`
```bash
poetry source add --priority=supplemental sniff_index https://git.nikto-b.ru/api/packages/nikto_b/pypi/simple
```
2. Поставить пакет
```bash
poetry add --source sniff_index megasniff
```
```
----
### Strict-mode:
#### Strict-mode off:
```
@dataclass
class A:
a: list[int]
```
```
>>> {"a": [1, 1.1, "321"]}
<<< A(a=[1, 1, 321])
```
#### Strict-mode on:
```
@dataclass
class A:
a: list[int]
```
```
>>> {"a": [1, 1.1, "321"]}
<<< FieldValidationException, т.к. 1.1 не является int
```

View File

@@ -29,7 +29,7 @@ class CSchema:
def main():
infl = SchemaInflatorGenerator()
infl = SchemaInflatorGenerator(strict_mode=True)
fn = infl.schema_to_inflator(ASchema)
# print(t)
# print(n)
@@ -40,7 +40,7 @@ def main():
# d = {'a': 1, 'b': 1, 'c': 0, 'bs': {'a': 1, 'b': 2, 'c': 3, 'd': {'a': 1, 'b': 2.1, 'bs': None}}}
# d = {'a': 2, 'b': 2, 'bs': {'a': 2, 'b': 'a', 'c': 0, 'd': {'a': 2, 'b': 2}}}
# d = {'l': ['1', {'a': 42, 'b': 1}]}
d = {'a': 2, 'b': 2, 'bs': None}
d = {'a': 2, 'b': '2', 'bs': None}
try:
o = fn(d)
print(o)

View File

@@ -21,6 +21,7 @@ class TypeRenderData:
allow_none: bool
is_list: bool
is_union: bool
is_strict: bool
@dataclass
@@ -45,11 +46,16 @@ class SchemaInflatorGenerator:
templateEnv: jinja2.Environment
template: jinja2.Template
_strict_mode: bool
def __init__(self,
loader: Optional[jinja2.BaseLoader] = None,
strict_mode: bool = False,
*,
template_filename: str = 'inflator.jinja2'):
self._strict_mode = strict_mode
if loader is None:
template_path = importlib.resources.files('megasniff.templates')
loader = jinja2.FileSystemLoader(str(template_path))
@@ -58,15 +64,19 @@ class SchemaInflatorGenerator:
self.templateEnv = jinja2.Environment(loader=self.templateLoader)
self.template = self.templateEnv.get_template(template_filename)
def schema_to_inflator(self, schema: type) -> Callable[[dict[str, Any]], Any]:
txt, namespace = self._schema_to_inflator(schema, _funcname='inflate')
def schema_to_inflator(self,
schema: type,
strict_mode_override: Optional[bool] = None) -> Callable[[dict[str, Any]], Any]:
txt, namespace = self._schema_to_inflator(schema, _funcname='inflate',
strict_mode_override=strict_mode_override)
imports = ('from typing import Any\n'
'from megasniff.exceptions import MissingFieldException, FieldValidationException\n')
txt = imports + '\n' + txt
print(txt)
exec(txt, namespace)
return namespace['inflate']
def _unwrap_typeref(self, t: type) -> TypeRenderData:
def _unwrap_typeref(self, t: type, strict_mode: bool) -> TypeRenderData:
type_origin = get_origin(t)
allow_none = False
argtypes = t,
@@ -81,11 +91,11 @@ class SchemaInflatorGenerator:
is_union = len(argtypes) > 1
if is_union:
typerefs = list(map(lambda x: self._unwrap_typeref(x), argtypes))
return TypeRenderData(typerefs, allow_none, False, True)
typerefs = list(map(lambda x: self._unwrap_typeref(x, strict_mode), argtypes))
return TypeRenderData(typerefs, allow_none, False, True, False)
elif type_origin in [list, set]:
rd = self._unwrap_typeref(argtypes[0])
return IterableTypeRenderData(rd, allow_none, True, False, type_origin.__name__)
rd = self._unwrap_typeref(argtypes[0], strict_mode)
return IterableTypeRenderData(rd, allow_none, True, False, False, type_origin.__name__)
else:
t = argtypes[0]
@@ -94,13 +104,23 @@ class SchemaInflatorGenerator:
t = type_origin or t
is_builtin = is_builtin_type(t)
return TypeRenderData(t.__name__ if is_builtin else f'inflate_{t.__name__}', allow_none, is_list, False)
return TypeRenderData(t.__name__ if is_builtin else f'inflate_{t.__name__}',
allow_none,
is_list,
False,
strict_mode if is_builtin else False)
def _schema_to_inflator(self,
schema: type,
strict_mode_override: Optional[bool] = None,
*,
_funcname='inflate',
_namespace=None) -> tuple[str, dict]:
if strict_mode_override is not None:
strict_mode = strict_mode_override
else:
strict_mode = self._strict_mode
# Я это написал, оно пока работает, и я не собираюсь это упрощать, сорян
type_hints = get_kwargs_type_hints(schema)
render_data = []
@@ -123,7 +143,7 @@ class SchemaInflatorGenerator:
has_default, default_option = get_field_default(schema, argname)
typeref = self._unwrap_typeref(argtype)
typeref = self._unwrap_typeref(argtype, strict_mode)
argtypes = argtype,
allow_none = False
@@ -163,7 +183,8 @@ class SchemaInflatorGenerator:
if argt.__name__ not in namespace.keys():
t, n = self._schema_to_inflator(argt,
_funcname=f'inflate_{argt.__name__}',
_namespace=namespace)
_namespace=namespace,
strict_mode_override=strict_mode_override)
namespace |= n
txt_segments.append(t)

View File

@@ -25,7 +25,7 @@ def {{funcname}}(from_data: dict[str, Any]):
{% endif %}
else:
{{ unwrap_type_data.render_segment(conv.argname, conv.constrs, "conv_data") | indent(4*3) }}
{{ unwrap_type_data.render_segment(conv.argname, conv.constrs, "conv_data", false) | indent(4*3) }}
{% endfor %}

View File

@@ -4,7 +4,7 @@
if not isinstance({{conv_data}}, list):
raise FieldValidationException('{{argname}}', "list", conv_data, [])
for item in {{conv_data}}:
{{ render_segment("_" + argname, typedef, "item") | indent(4) }}
{{ render_segment("_" + argname, typedef, "item", false ) | indent(4) }}
{{argname}}.append(_{{argname}})
{%- endset %}
{{out}}
@@ -18,7 +18,7 @@ for item in {{conv_data}}:
all_conv_exceptions = []
{% for union_type in conv.typeref %}
{{ ' ' * ns.retry_indent }}try:
{{ render_segment(argname, union_type, conv_data) | indent((ns.retry_indent + 1) * 4) }}
{{ render_segment(argname, union_type, conv_data, false) | indent((ns.retry_indent + 1) * 4) }}
{{ ' ' * ns.retry_indent }}except Exception as e:
{{ ' ' * ns.retry_indent }} all_conv_exceptions.append(e)
{% set ns.retry_indent = ns.retry_indent + 1 %}
@@ -28,10 +28,14 @@ all_conv_exceptions = []
{{out}}
{%- endmacro %}
{% macro render_segment(argname, typeref, conv_data) -%}
{% macro render_segment(argname, typeref, conv_data, strict) -%}
{%- set out -%}
{% if typeref is string %}
{% if strict %}
if not isinstance({{conv_data}}, {{typeref}}):
raise FieldValidationException('{{argname}}', "{{typeref | replace('"', "'")}}", {{conv_data}}, [])
{% endif %}
{{argname}} = {{typeref}}({{conv_data}})
{% elif typeref.is_union %}
@@ -42,7 +46,7 @@ all_conv_exceptions = []
{{argname}} = {{typeref.iterable_type}}({{argname}})
{% else %}
{{render_segment(argname, typeref.typeref, conv_data)}}
{{render_segment(argname, typeref.typeref, conv_data, typeref.is_strict)}}
{% endif %}

75
tests/test_strict_mode.py Normal file
View File

@@ -0,0 +1,75 @@
from dataclasses import dataclass
import pytest
from megasniff import SchemaInflatorGenerator
from megasniff.exceptions import FieldValidationException
def test_global_strict_mode_basic():
class A:
def __init__(self, a: int):
self.a = a
infl = SchemaInflatorGenerator(strict_mode=True)
fn = infl.schema_to_inflator(A)
a = fn({'a': 42})
assert a.a == 42
with pytest.raises(FieldValidationException):
fn({'a': 42.0})
def test_global_strict_mode_basic_override():
class A:
def __init__(self, a: int):
self.a = a
infl = SchemaInflatorGenerator(strict_mode=False)
fn = infl.schema_to_inflator(A, strict_mode_override=True)
a = fn({'a': 42})
assert a.a == 42
with pytest.raises(FieldValidationException):
fn({'a': 42.0})
def test_global_strict_mode_list():
@dataclass
class A:
a: list[int]
infl = SchemaInflatorGenerator(strict_mode=True)
fn = infl.schema_to_inflator(A)
a = fn({'a': [42]})
assert a.a == [42]
with pytest.raises(FieldValidationException):
fn({'a': [42.0, 42]})
def test_global_strict_mode_circular():
@dataclass
class A:
a: list[int]
@dataclass
class B:
b: list[A | int]
infl = SchemaInflatorGenerator(strict_mode=True)
fn = infl.schema_to_inflator(B)
b = fn({'b': [{'a': []}, 42]})
assert len(b.b) == 2
assert isinstance(b.b[0], A)
assert isinstance(b.b[1], int)
with pytest.raises(FieldValidationException):
fn({'b': [42.0]})
with pytest.raises(FieldValidationException):
fn({'b': [{'a': [1.1]}]})