Compare commits
14 Commits
5b4eba5190
...
inflator-c
| Author | SHA1 | Date | |
|---|---|---|---|
| 0795a5f8bb | |||
| 36e343d3bc | |||
| 0786fc600a | |||
| b11266990b | |||
| c11a63c8a5 | |||
| 9e3d4d0a25 | |||
| 9fc218e556 | |||
| f8cacf9319 | |||
| 9f54115160 | |||
| bc6acb099f | |||
| 897eccd8d1 | |||
| aee6dcf3d3 | |||
| 1994eaab0d | |||
| ed5f975e87 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,6 +1,7 @@
|
||||
# ---> Python
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
**/__pycache__
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
|
||||
69
README.md
69
README.md
@@ -1,12 +1,16 @@
|
||||
# megasniff
|
||||
### Автоматическая валидация данных по схеме и сборка объекта в одном флаконе
|
||||
|
||||
### Автоматическая валидация данных по схеме и сборка объекта в одном флаконе
|
||||
|
||||
#### Как применять:
|
||||
|
||||
```python
|
||||
# 1. Объявляем схемы
|
||||
from __future__ import annotations
|
||||
import dataclasses
|
||||
import typing
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class SomeSchema1:
|
||||
a: int
|
||||
@@ -19,40 +23,89 @@ class SomeSchema2(typing.TypedDict):
|
||||
field2: float
|
||||
field3: typing.Optional[SomeSchema1]
|
||||
|
||||
|
||||
# 2. Генерируем метод для валидации и сборки
|
||||
import megasniff
|
||||
|
||||
infl = megasniff.SchemaInflatorGenerator()
|
||||
fn = infl.schema_to_generator(SomeSchema1)
|
||||
fn = infl.schema_to_inflator(SomeSchema1)
|
||||
|
||||
# 3. Проверяем что всё работает
|
||||
|
||||
fn({'a': 1, 'b': 2, 'c': {'field1': {}, 'field2': '1.1'}})
|
||||
fn({'a': 1, 'b': 2, 'c': {'field1': {}, 'field2': '1.1', 'field3': None}})
|
||||
# SomeSchema1(a=1, b=2.0, c={'field1': {}, 'field2': 1.1, 'field3': None})
|
||||
|
||||
```
|
||||
|
||||
Особенности работы:
|
||||
Особенности работы:
|
||||
|
||||
- поддерживает циклические зависимости
|
||||
- проверяем `Union`-типы через ретрай на выбросе исключения
|
||||
- по умолчанию использует готовый щаблон для кодогенерации и исполняет его по запросу, требуется особое внимание к сохранности данного шаблона
|
||||
- не проверяет типы списков, словарей, кортежей (реализация ожидается)
|
||||
- проверяет `Union`-типы через ретрай на выбросе исключения
|
||||
- по умолчанию использует готовый щаблон для кодогенерации и исполняет его по запросу, требуется особое внимание к
|
||||
сохранности данного шаблона
|
||||
- проверяет типы списков, может приводить списки к множествам
|
||||
- не проверяет типы generic-словарей, кортежей (реализация ожидается)
|
||||
- пользовательские проверки типов должны быть реализованы через наследование и проверки в конструкторе
|
||||
- опциональный `strict-mode`: выключение приведения базовых типов
|
||||
- может генерировать кортежи верхнеуровневых объектов при наличии описания схемы (полезно при развертывании аргументов)
|
||||
|
||||
----
|
||||
|
||||
### Как установить:
|
||||
|
||||
#### [uv](https://docs.astral.sh/uv/concepts/projects/dependencies/#dependency-sources):
|
||||
|
||||
```bash
|
||||
uv add megasniff --index sniff_index=https://git.nikto-b.ru/api/packages/nikto_b/pypi/simple
|
||||
```
|
||||
|
||||
#### [poetry](https://python-poetry.org/docs/repositories/#private-repository-example):
|
||||
|
||||
1. Добавить репозиторий в `pyproject.toml`
|
||||
|
||||
```bash
|
||||
poetry source add --priority=supplemental sniff_index https://git.nikto-b.ru/api/packages/nikto_b/pypi/simple
|
||||
```
|
||||
|
||||
2. Поставить пакет
|
||||
|
||||
```bash
|
||||
poetry add --source sniff_index megasniff
|
||||
```
|
||||
```
|
||||
|
||||
----
|
||||
|
||||
### Strict-mode:
|
||||
|
||||
#### Strict-mode off:
|
||||
|
||||
```
|
||||
@dataclass
|
||||
class A:
|
||||
a: list[int]
|
||||
```
|
||||
|
||||
```
|
||||
>>> {"a": [1, 1.1, "321"]}
|
||||
<<< A(a=[1, 1, 321])
|
||||
```
|
||||
|
||||
#### Strict-mode on:
|
||||
|
||||
```
|
||||
@dataclass
|
||||
class A:
|
||||
a: list[int]
|
||||
```
|
||||
|
||||
```
|
||||
>>> {"a": [1, 1.1, "321"]}
|
||||
<<< FieldValidationException, т.к. 1.1 не является int
|
||||
```
|
||||
|
||||
### Tuple unwrap
|
||||
```
|
||||
fn = infl.schema_to_inflator(
|
||||
(('a', int), TupleSchemaItem(Optional[list[int]], key_name='b', has_default=True, default=None)))
|
||||
```
|
||||
Создаёт `fn: (dict[str,Any]) -> tuple[int, Optional[list[int]]]: ...` (сигнатура остаётся `(dict[str,Any])->tuple`)
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "megasniff"
|
||||
version = "0.1.2"
|
||||
version = "0.2.3.post2"
|
||||
description = "Library for in-time codegened type validation"
|
||||
authors = [
|
||||
{ name = "nikto_b", email = "niktob560@yandex.ru" }
|
||||
@@ -8,8 +8,10 @@ authors = [
|
||||
license = "LGPL-3.0-or-later"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = [
|
||||
"cython>=3.1.3",
|
||||
"hatchling>=1.27.0",
|
||||
"jinja2>=3.1.6",
|
||||
"setuptools>=80.9.0",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
|
||||
@@ -4,29 +4,78 @@ from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
from typing import TypedDict
|
||||
|
||||
import megasniff.exceptions
|
||||
from megasniff.python_to_cython import python_obj_to_cython
|
||||
from . import SchemaInflatorGenerator
|
||||
|
||||
|
||||
@dataclass
|
||||
class ASchema:
|
||||
a: int
|
||||
a: int | None
|
||||
b: float | str
|
||||
bs: Optional[BSchema]
|
||||
d: int
|
||||
c: float = 1.1
|
||||
|
||||
def __init__(self, a: int | None, b: float | str, bs: Optional[BSchema], c: float = 1.1):
|
||||
self.a = a
|
||||
self.b = b
|
||||
self.bs = bs
|
||||
self.c = c
|
||||
self.d = a or 0
|
||||
|
||||
class BSchema(TypedDict):
|
||||
@dataclass
|
||||
class BSchema:
|
||||
a: int
|
||||
b: str
|
||||
c: float
|
||||
d: ASchema
|
||||
|
||||
|
||||
@dataclass
|
||||
class CSchema:
|
||||
l: set[int | ASchema]
|
||||
|
||||
@dataclass
|
||||
class SomeData:
|
||||
a: int
|
||||
b: float
|
||||
c: str
|
||||
|
||||
def main():
|
||||
infl = SchemaInflatorGenerator()
|
||||
fn = infl.schema_to_generator(ASchema)
|
||||
d = {'a': '42', 'b': 'a0.3', 'bs': {'a': 1, 'b': 'a', 'c': 1, 'd': {'a': 1, 'b': ''}}}
|
||||
print(fn(d))
|
||||
# ccode = python_obj_to_cython(ASchema)
|
||||
# print(ccode)
|
||||
# exit(0)
|
||||
|
||||
# infl = SchemaInflatorGenerator(strict_mode=True)
|
||||
# fn = infl.schema_to_inflator(SomeData)
|
||||
# print(fn({'a': 1, 'b': 1.1, 'c': 'asdf'}))
|
||||
infl = SchemaInflatorGenerator(strict_mode=True)
|
||||
fn = infl.schema_to_inflator(ASchema)
|
||||
# exit(0)
|
||||
# print(t)
|
||||
# print(n)
|
||||
# exec(t, n)
|
||||
# fn = n['inflate']
|
||||
# fn = infl.schema_to_generator(ASchema)
|
||||
# # d = {'a': '42', 'b': 'a0.3', 'bs': {'a': 1, 'b': 'a', 'c': 1, 'd': {'a': 1, 'b': ''}}}
|
||||
# d = {'a': 1, 'b': 1, 'c': 0, 'bs': {'a': 1, 'b': 2, 'c': 3, 'd': {'a': 1, 'b': 2.1, 'bs': None}}}
|
||||
# d = {'a': 2, 'b': 2, 'bs': {'a': 2, 'b': 'a', 'c': 0, 'd': {'a': 2, 'b': 2}}}
|
||||
# d = {'l': ['1', {'a': 42, 'b': 1}]}
|
||||
# d = {'a': None, 'b': '2', 'bs': None}
|
||||
d = {'a': None, 'b': '2', 'bs': {'a': 1, 'b': 'a', 'c': 1.1, 'd': {'a': 1, 'b': '', 'bs': None}}}
|
||||
try:
|
||||
o = fn(d)
|
||||
print(o)
|
||||
for k, v in o.__dict__.items():
|
||||
print(f'field {k}: {v}')
|
||||
print(f'type: {type(v)}')
|
||||
if isinstance(v, list):
|
||||
for vi in v:
|
||||
print(f'\ttype: {type(vi)}')
|
||||
except megasniff.exceptions.FieldValidationException as e:
|
||||
print(e.exceptions)
|
||||
print(e)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
22
src/megasniff/exceptions.py
Normal file
22
src/megasniff/exceptions.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
class MissingFieldException(Exception):
|
||||
def __init__(self, required_field: str, required_types: str):
|
||||
message = f"No required field provided: {required_field} with type {required_types}"
|
||||
super().__init__(message)
|
||||
self.required_field = required_field
|
||||
self.required_types = required_types
|
||||
|
||||
|
||||
class FieldValidationException(Exception):
|
||||
def __init__(self,
|
||||
required_field: str,
|
||||
required_types: str,
|
||||
provided: Any,
|
||||
exceptions: Optional[list[Exception]] = None):
|
||||
message = f"Required field {required_field} with type {required_types}, provided: {provided}"
|
||||
super().__init__(message)
|
||||
self.required_field = required_field
|
||||
self.required_types = required_types
|
||||
self.exceptions = exceptions or []
|
||||
@@ -1,110 +1,512 @@
|
||||
# Copyright (C) 2025 Shevchenko A
|
||||
# SPDX-License-Identifier: LGPL-3.0-or-later
|
||||
|
||||
from __future__ import annotations
|
||||
import collections.abc
|
||||
import hashlib
|
||||
import importlib.resources
|
||||
import importlib.util
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from collections import defaultdict
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from types import NoneType, UnionType
|
||||
from typing import Optional, get_origin, get_args, Union, Annotated
|
||||
from pathlib import Path
|
||||
from types import NoneType, UnionType, ModuleType
|
||||
from typing import Optional, get_origin, get_args, Union, Annotated, Literal, Sequence, List, Set, TypeAliasType, \
|
||||
OrderedDict
|
||||
|
||||
import jinja2
|
||||
|
||||
from . import utils
|
||||
from .utils import *
|
||||
import random, string
|
||||
|
||||
|
||||
@dataclass
|
||||
class RenderData:
|
||||
argname: str
|
||||
constrs: list[tuple[str, bool]] # typecall / use lookup table
|
||||
typename: str
|
||||
class TypeRenderData:
|
||||
typeref: list[TypeRenderData] | TypeRenderData | str
|
||||
allow_none: bool
|
||||
is_list: bool
|
||||
is_union: bool
|
||||
is_strict: bool
|
||||
ctype: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class IterableTypeRenderData(TypeRenderData):
|
||||
iterable_type: str
|
||||
is_list = True
|
||||
is_union = False
|
||||
|
||||
|
||||
def _escape_python_name(name: str) -> str:
|
||||
name = name.replace('-', '__dash__').replace('+', '__plus__').replace('/', '__shash__')
|
||||
if name[0].isnumeric():
|
||||
name = '__num__' + name
|
||||
return name
|
||||
|
||||
|
||||
@dataclass
|
||||
class FieldRenderData:
|
||||
argname: str
|
||||
argname_escaped: str
|
||||
constrs: TypeRenderData
|
||||
typename: str
|
||||
is_optional: bool
|
||||
allow_none: bool
|
||||
default_option: Optional[str]
|
||||
ctype: str
|
||||
|
||||
def __init__(self,
|
||||
argname: str,
|
||||
constrs: TypeRenderData,
|
||||
typename: str,
|
||||
is_optional: bool,
|
||||
allow_none: bool,
|
||||
default_option: Optional[str],
|
||||
ctype: str):
|
||||
self.argname = argname
|
||||
self.constrs = constrs
|
||||
self.typename = typename
|
||||
self.is_optional = is_optional
|
||||
self.allow_none = allow_none
|
||||
self.default_option = default_option
|
||||
self.argname_escaped = _escape_python_name(argname)
|
||||
self.ctype = ctype
|
||||
|
||||
|
||||
def randomword(length):
|
||||
letters = string.ascii_lowercase
|
||||
return ''.join(random.choice(letters) for i in range(length))
|
||||
|
||||
|
||||
def exec_cython(txt: str, namespace: dict, name: str):
|
||||
"""
|
||||
Drop-in замена exec(txt, namespace), но через cython.
|
||||
Возвращает callable объект из namespace['inflator'].
|
||||
"""
|
||||
# генерируем уникальное имя для модуля
|
||||
h = hashlib.sha256(txt.encode() + str(sorted(namespace.keys())).encode()).hexdigest()[:16]
|
||||
modname = f"_cyexec_{h}"
|
||||
|
||||
build_dir = tempfile.mkdtemp(prefix="cyexec_")
|
||||
pyx_file = os.path.join(build_dir, f"{modname}.pyx")
|
||||
|
||||
# соберём код для .pyx
|
||||
# сначала экспортируем namespace
|
||||
export_lines = []
|
||||
for k, v in namespace.items():
|
||||
if k not in {'int', 'float', 'str'}:
|
||||
export_lines.append(f"{k} = __ns__['{k}']")
|
||||
|
||||
pyx_code = f"""
|
||||
# cython: language_level=3
|
||||
# cython: boundscheck=False, wraparound=False, nonecheck=False
|
||||
# AUTO-GENERATED
|
||||
|
||||
# Вставляем runtime namespace
|
||||
import builtins
|
||||
__ns__ = builtins.__dict__['_cyexec_ns']
|
||||
|
||||
cdef class NullableInt:
|
||||
cdef long value
|
||||
cdef bint has
|
||||
|
||||
cpdef set(self, long value):
|
||||
self.value = value
|
||||
self.has = 1
|
||||
|
||||
cpdef unset(self):
|
||||
self.has = 0
|
||||
|
||||
cdef class NullableDouble:
|
||||
cdef double value
|
||||
cdef bint has
|
||||
|
||||
cpdef set(self, double value):
|
||||
self.value = value
|
||||
self.has = 1
|
||||
|
||||
cpdef unset(self):
|
||||
self.has = 0
|
||||
{os.linesep.join(export_lines)}
|
||||
|
||||
|
||||
|
||||
|
||||
# пользовательский код
|
||||
{txt}
|
||||
"""
|
||||
|
||||
# пишем файл
|
||||
with open(pyx_file, "w") as f:
|
||||
f.write(pyx_code)
|
||||
|
||||
# нужно сохранить namespace в builtins, чтобы cython его видел
|
||||
import builtins
|
||||
builtins._cyexec_ns = namespace
|
||||
|
||||
# компилируем через cythonize
|
||||
setup_code = f"""
|
||||
from setuptools import setup
|
||||
from Cython.Build import cythonize
|
||||
setup(
|
||||
name="{modname}",
|
||||
ext_modules=cythonize("{pyx_file}", compiler_directives={{"language_level": "3"}}),
|
||||
script_args=["build_ext", "--inplace"],
|
||||
)
|
||||
"""
|
||||
setup_file = os.path.join(build_dir, "setup.py")
|
||||
with open(setup_file, "w") as f:
|
||||
f.write(setup_code)
|
||||
|
||||
subprocess.check_call([sys.executable, setup_file, "build_ext", "--inplace"], cwd=build_dir)
|
||||
|
||||
# находим .so файл
|
||||
for fn in os.listdir(build_dir):
|
||||
if fn.startswith(modname) and fn.endswith((".so", ".pyd")):
|
||||
so_path = os.path.join(build_dir, fn)
|
||||
break
|
||||
else:
|
||||
raise RuntimeError("Cython build failed, no .so produced")
|
||||
|
||||
# импортим как модуль
|
||||
spec = importlib.util.spec_from_file_location(modname, so_path)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
sys.modules[modname] = mod
|
||||
spec.loader.exec_module(mod)
|
||||
|
||||
# чистим временный namespace в builtins
|
||||
del builtins._cyexec_ns
|
||||
|
||||
return getattr(mod, name)
|
||||
|
||||
|
||||
@dataclass
|
||||
class BasicTypeVariationTest:
|
||||
index: int
|
||||
basic_type: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class ObjectTypeVariationTest:
|
||||
index: int
|
||||
fields_contains: list[tuple[str, TypeVariationTest | None]]
|
||||
|
||||
|
||||
@dataclass
|
||||
class TypeVariationTest:
|
||||
types: list[TypeConstructionSchema]
|
||||
basic_tests: list[BasicTypeVariationTest]
|
||||
object_tests: list[ObjectTypeVariationTest]
|
||||
|
||||
|
||||
@dataclass
|
||||
class TypeConstructionSchema:
|
||||
tp: type
|
||||
allow_none: bool
|
||||
kwargs: Optional[dict[str, tuple[str, list[TypeConstructionSchema] | type]]]
|
||||
|
||||
@property
|
||||
def typed_key_pairs(self) -> set[str]:
|
||||
ret = set()
|
||||
for k, (_, v) in self.kwargs.items():
|
||||
if isinstance(v, type):
|
||||
ret.add(f'{k}:{v}')
|
||||
else:
|
||||
if not isinstance(v, list):
|
||||
v = [v]
|
||||
for _v in v:
|
||||
ret.add(f'{k}:{_v.tp}')
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
class Z1:
|
||||
a: int
|
||||
b: int
|
||||
|
||||
|
||||
class Z2:
|
||||
c: int
|
||||
d: int
|
||||
|
||||
|
||||
class Z3:
|
||||
e: int
|
||||
f: int
|
||||
|
||||
|
||||
class A:
|
||||
a: int
|
||||
b: Z1 | Z3
|
||||
|
||||
|
||||
class B:
|
||||
a: int
|
||||
b: Z2
|
||||
|
||||
|
||||
class C:
|
||||
z1: Z1
|
||||
z2: Z2
|
||||
|
||||
|
||||
_ = A | B | C
|
||||
|
||||
|
||||
def find_types_variations(types: list[TypeConstructionSchema]) -> TypeVariationTest:
|
||||
basic_tests = []
|
||||
object_tests = []
|
||||
for i, tp in enumerate(types):
|
||||
if tp.kwargs is None:
|
||||
basic_tests.append(BasicTypeVariationTest(i, _type_to_ctype(tp.tp, tp.allow_none)))
|
||||
for i, tp in enumerate(types):
|
||||
if tp.kwargs is not None:
|
||||
tp_keys = []
|
||||
uniq_keys = set()
|
||||
if i < len(types):
|
||||
keys = set()
|
||||
for t in types[i + 1:]:
|
||||
keys |= set((t.kwargs or {}).keys())
|
||||
obj_keys = set(tp.kwargs.keys())
|
||||
uniq_keys = (obj_keys ^ keys) & obj_keys
|
||||
|
||||
if len(uniq_keys) > 0:
|
||||
k = list(uniq_keys)[0]
|
||||
object_tests.append(ObjectTypeVariationTest(i, [(k, None)]))
|
||||
else:
|
||||
pass
|
||||
|
||||
return TypeVariationTest(types, basic_tests, object_tests)
|
||||
|
||||
|
||||
def _type_to_ctype(t: type, allow_none: bool) -> str:
|
||||
if t is int:
|
||||
return 'NullableInt' if allow_none else 'long'
|
||||
if t is float:
|
||||
return 'NullableFloat' if allow_none else 'double'
|
||||
return 'object'
|
||||
|
||||
|
||||
class SchemaInflatorGenerator:
|
||||
templateLoader: jinja2.BaseLoader
|
||||
templateEnv: jinja2.Environment
|
||||
template: jinja2.Template
|
||||
|
||||
object_template: jinja2.Template
|
||||
tuple_template: jinja2.Template
|
||||
_store_sources: bool
|
||||
_strict_mode: bool
|
||||
|
||||
def __init__(self,
|
||||
loader: Optional[jinja2.BaseLoader] = None,
|
||||
convertor_template: str = 'inflator.jinja2'):
|
||||
strict_mode: bool = False,
|
||||
store_sources: bool = False,
|
||||
*,
|
||||
object_template_filename: str = 'inflator.jinja2',
|
||||
tuple_template_filename: str = 'inflator_tuple.jinja2',
|
||||
):
|
||||
|
||||
self._strict_mode = strict_mode
|
||||
self._store_sources = store_sources
|
||||
|
||||
if loader is None:
|
||||
template_path = importlib.resources.files('megasniff.templates')
|
||||
loader = jinja2.FileSystemLoader(str(template_path))
|
||||
|
||||
self.templateLoader = loader
|
||||
self.templateEnv = jinja2.Environment(loader=self.templateLoader)
|
||||
self.template = self.templateEnv.get_template(convertor_template)
|
||||
self.object_template = self.templateEnv.get_template(object_template_filename)
|
||||
self.tuple_template = self.templateEnv.get_template(tuple_template_filename)
|
||||
|
||||
def schema_to_generator(self,
|
||||
schema: type,
|
||||
def schema_to_inflator(self,
|
||||
schema: type | Sequence[TupleSchemaItem | tuple[str, type]] | OrderedDict[str, type],
|
||||
strict_mode_override: Optional[bool] = None,
|
||||
from_type_override: Optional[type | TypeAliasType] = None
|
||||
) -> Callable[[dict[str, Any]], Any]:
|
||||
name = 'inflate'
|
||||
if isinstance(schema, type):
|
||||
name = f'inflate_{schema.__name__}'
|
||||
if from_type_override is not None and '__getitem__' not in dir(from_type_override):
|
||||
raise RuntimeError('from_type_override must provide __getitem__')
|
||||
txt, namespace = self._schema_to_inflator(schema,
|
||||
_funcname=name,
|
||||
strict_mode_override=strict_mode_override,
|
||||
from_type_override=from_type_override,
|
||||
)
|
||||
imports = ('from typing import Any\n'
|
||||
'from megasniff.exceptions import MissingFieldException, FieldValidationException\n')
|
||||
txt = imports + '\n' + txt
|
||||
fn = exec_cython(txt, namespace, name)
|
||||
# fn = exec_numba(txt, namespace, func_name=name)
|
||||
# exec(txt, namespace)
|
||||
# fn = namespace[name]
|
||||
if self._store_sources:
|
||||
setattr(fn, '__megasniff_sources__', txt)
|
||||
return fn
|
||||
|
||||
def _unwrap_typeref(self, t: type, strict_mode: bool) -> TypeRenderData:
|
||||
type_origin = get_origin(t)
|
||||
allow_none = False
|
||||
argtypes = t,
|
||||
|
||||
if any(map(lambda x: type_origin is x, [Union, UnionType, Optional, Annotated, list, List, set, Set])):
|
||||
argtypes = get_args(t)
|
||||
|
||||
if NoneType in argtypes or None in argtypes:
|
||||
argtypes = tuple(filter(lambda x: x is not None and x is not NoneType, argtypes))
|
||||
allow_none = True
|
||||
|
||||
is_union = len(argtypes) > 1
|
||||
|
||||
if is_union:
|
||||
typerefs = list(map(lambda x: self._unwrap_typeref(x, strict_mode), argtypes))
|
||||
return TypeRenderData(typerefs, allow_none, False, True, False, 'object')
|
||||
elif type_origin in [list, set]:
|
||||
rd = self._unwrap_typeref(argtypes[0], strict_mode)
|
||||
return IterableTypeRenderData(rd, allow_none, True, False, False, type_origin.__name__,
|
||||
'NullableList' if allow_none else 'list')
|
||||
else:
|
||||
t = argtypes[0]
|
||||
|
||||
is_list = (type_origin or t) in [list, set]
|
||||
if is_list:
|
||||
t = type_origin or t
|
||||
|
||||
is_builtin = is_builtin_type(t)
|
||||
return TypeRenderData(t.__name__ if is_builtin else f'inflate_{t.__name__}',
|
||||
allow_none,
|
||||
is_list,
|
||||
False,
|
||||
strict_mode if is_builtin else False,
|
||||
_type_to_ctype(t, allow_none))
|
||||
|
||||
def _schema_to_inflator(self,
|
||||
schema: type | Sequence[TupleSchemaItem | tuple[str, type]] | OrderedDict[str, type],
|
||||
strict_mode_override: Optional[bool] = None,
|
||||
from_type_override: Optional[type | TypeAliasType] = None,
|
||||
*,
|
||||
_base_lookup_table: Optional[dict[str, Any]] = None) -> Callable[[dict[str, Any]], Any]:
|
||||
# Я это написал, оно пока работает, и я не собираюсь это упрощать, сорян
|
||||
type_hints = get_kwargs_type_hints(schema)
|
||||
render_data = []
|
||||
lookup_table = _base_lookup_table or {}
|
||||
_funcname='inflate',
|
||||
_namespace=None,
|
||||
) -> tuple[str, dict]:
|
||||
if strict_mode_override is not None:
|
||||
strict_mode = strict_mode_override
|
||||
else:
|
||||
strict_mode = self._strict_mode
|
||||
|
||||
if schema.__name__ not in lookup_table.keys():
|
||||
lookup_table[schema.__name__] = None
|
||||
template = self.object_template
|
||||
mode = 'object'
|
||||
if isinstance(schema, dict):
|
||||
new_schema = []
|
||||
for argname, argtype in schema.items():
|
||||
new_schema.append((argname, argtype))
|
||||
schema = new_schema
|
||||
|
||||
if isinstance(schema, collections.abc.Iterable):
|
||||
template = self.tuple_template
|
||||
mode = 'tuple'
|
||||
new_schema = []
|
||||
for t in schema:
|
||||
if isinstance(t, TupleSchemaItem):
|
||||
new_schema.append(t)
|
||||
else:
|
||||
new_schema.append(TupleSchemaItem(t[1], key_name=t[0]))
|
||||
schema = new_schema
|
||||
|
||||
# Я это написал, оно пока работает, и я не собираюсь это упрощать, сорян
|
||||
if mode == 'object':
|
||||
type_hints = get_kwargs_type_hints(schema)
|
||||
else:
|
||||
type_hints = {}
|
||||
for i, t in enumerate(schema):
|
||||
n = t.key_name or f'_arg_{i}'
|
||||
type_hints[n] = t.schema
|
||||
|
||||
render_data = []
|
||||
|
||||
txt_segments = []
|
||||
|
||||
if _namespace is None:
|
||||
namespace = {}
|
||||
else:
|
||||
namespace = _namespace
|
||||
|
||||
if namespace.get(f'{_funcname}_tgt_type') is not None:
|
||||
return '', namespace
|
||||
|
||||
if mode == 'object':
|
||||
namespace[f'{_funcname}_tgt_type'] = schema
|
||||
namespace[utils.typename(schema)] = schema
|
||||
|
||||
if from_type_override is not None:
|
||||
namespace['_from_type'] = from_type_override
|
||||
|
||||
for argname, argtype in type_hints.items():
|
||||
if argname in {'return', 'self'}:
|
||||
continue
|
||||
|
||||
has_default, default_option = get_field_default(schema, argname)
|
||||
|
||||
typeref = self._unwrap_typeref(argtype, strict_mode)
|
||||
|
||||
argtypes = argtype,
|
||||
type_origin = get_origin(argtype)
|
||||
allow_none = False
|
||||
|
||||
if any(map(lambda x: type_origin is x, [Union, UnionType, Optional, Annotated])):
|
||||
argtypes = get_args(argtype)
|
||||
while get_origin(argtype) is not None:
|
||||
type_origin = get_origin(argtype)
|
||||
|
||||
if any(map(lambda x: type_origin is x, [Union, UnionType, Optional, Annotated, list, List, set, Set])):
|
||||
argtypes = get_args(argtype)
|
||||
if len(argtypes) == 1:
|
||||
argtype = argtypes[0]
|
||||
else:
|
||||
break
|
||||
|
||||
if NoneType in argtypes or None in argtypes:
|
||||
argtypes = tuple(filter(lambda x: x is not None and x is not NoneType, argtypes))
|
||||
allow_none = True
|
||||
|
||||
out_argtypes: list[tuple[str, bool]] = []
|
||||
|
||||
for argt in argtypes:
|
||||
is_builtin = is_builtin_type(argt)
|
||||
if not is_builtin and argt is not schema:
|
||||
if argt.__name__ not in lookup_table.keys():
|
||||
# если случилась циклическая зависимость, мы не хотим бексконечную рекурсию
|
||||
lookup_table[argt.__name__] = self.schema_to_generator(argt, _base_lookup_table=lookup_table)
|
||||
|
||||
if argt is schema:
|
||||
out_argtypes.append(('inflate', True))
|
||||
else:
|
||||
out_argtypes.append((argt.__name__, is_builtin))
|
||||
|
||||
render_data.append(
|
||||
RenderData(
|
||||
FieldRenderData(
|
||||
argname,
|
||||
out_argtypes,
|
||||
repr(argtype),
|
||||
len(argtypes) > 1,
|
||||
typeref,
|
||||
utils.typename(argtype),
|
||||
has_default,
|
||||
allow_none,
|
||||
default_option
|
||||
default_option if not isinstance(default_option, str) else f"'{default_option}'",
|
||||
typeref.ctype
|
||||
)
|
||||
)
|
||||
|
||||
convertor_functext = self.template.render(conversions=render_data)
|
||||
for argt in argtypes:
|
||||
|
||||
is_builtin = is_builtin_type(argt)
|
||||
if not is_builtin and argt is not schema:
|
||||
# если случилась циклическая зависимость, мы не хотим бексконечную рекурсию
|
||||
if argt.__name__ not in namespace.keys():
|
||||
t, n = self._schema_to_inflator(argt,
|
||||
_funcname=f'inflate_{argt.__name__}',
|
||||
_namespace=namespace,
|
||||
strict_mode_override=strict_mode_override)
|
||||
namespace |= n
|
||||
txt_segments.append(t)
|
||||
|
||||
elif argt is schema:
|
||||
pass
|
||||
else:
|
||||
namespace[argt.__name__] = argt
|
||||
|
||||
convertor_functext = template.render(
|
||||
funcname=_funcname,
|
||||
conversions=render_data,
|
||||
tgt_type=utils.typename(schema),
|
||||
from_type='_from_type' if from_type_override is not None else None
|
||||
)
|
||||
|
||||
convertor_functext = '\n'.join(txt_segments) + '\n\n' + convertor_functext
|
||||
|
||||
convertor_functext = '\n'.join(list(filter(lambda x: len(x.strip()), convertor_functext.split('\n'))))
|
||||
convertor_functext = convertor_functext.replace(', )', ')')
|
||||
namespace = {
|
||||
'_tgt_type': schema,
|
||||
'_lookup_table': lookup_table
|
||||
}
|
||||
exec(convertor_functext, namespace)
|
||||
print(convertor_functext)
|
||||
|
||||
# пихаем сгенеренный метод в табличку,
|
||||
# ожидаем что она обновится во всех вложенных методах,
|
||||
# разрешая циклические зависимости
|
||||
lookup_table[schema.__name__] = namespace['inflate']
|
||||
|
||||
return namespace['inflate']
|
||||
return convertor_functext, namespace
|
||||
|
||||
Binary file not shown.
@@ -1,65 +1,61 @@
|
||||
{% set ns = namespace(retry_indent=0) %}
|
||||
from typing import Any
|
||||
def inflate(from_data: dict[str, Any]):
|
||||
{% import "unwrap_type_data.jinja2" as unwrap_type_data %}
|
||||
|
||||
|
||||
{% macro render_setter(argname, argval) -%}
|
||||
{%- set out -%}
|
||||
{% if argname.startswith('Nullable') %}
|
||||
{% if argval == 'None' %}
|
||||
{{argname}}.unset()
|
||||
{% else %}
|
||||
{{argname}}.set({{argval}})
|
||||
{%endif%}
|
||||
{% else %}
|
||||
{{argname}} = {{argval}}
|
||||
{% endif %}
|
||||
{%- endset %}
|
||||
{{out}}
|
||||
{%- endmacro %}
|
||||
|
||||
{% macro check_null(argname) -%}
|
||||
{%- set out -%}
|
||||
{% if argname.startswith('Nullable') %}
|
||||
if {{argname}}.has:
|
||||
{% else %}
|
||||
if {{argname}} is None:
|
||||
{% endif %}
|
||||
{%- endset %}
|
||||
{{out}}
|
||||
{%- endmacro %}
|
||||
|
||||
|
||||
cpdef object {{funcname}}(dict from_data):
|
||||
"""
|
||||
{{tgt_type}}
|
||||
"""
|
||||
from_data_keys = from_data.keys()
|
||||
cdef object conv_data
|
||||
|
||||
{% for conv in conversions %}
|
||||
{% if not conv.is_optional or conv.default_option is not none%}
|
||||
cdef {{conv.argctype}} {{conv.argname_escaped}}
|
||||
if '{{conv.argname}}' not in from_data_keys:
|
||||
{% if not conv.is_optional %}
|
||||
raise ValueError(f"No required field provided: {{conv.argname}} with type {{conv.typename | replace('"', "'")}}")
|
||||
{% endif %}
|
||||
{% if conv.default_option is not none %}
|
||||
from_data['{{conv.argname}}'] = {{conv.default_option}}
|
||||
{% endif%}
|
||||
{% endif %}
|
||||
{%endfor%}
|
||||
|
||||
{% for conv in conversions %}
|
||||
conv_data = from_data['{{conv.argname}}']
|
||||
if conv_data is None:
|
||||
{% if not conv.allow_none %}
|
||||
raise ValueError(f"Field {{conv.argname}} required type {{conv.typename | replace('"', "'")}}, null provided")
|
||||
{% if conv.is_optional %}
|
||||
{{ render_setter(conv.argname_escaped, conv.default_option) | indent(4*2) }}
|
||||
{% else %}
|
||||
{{conv.argname}} = None
|
||||
raise MissingFieldException('{{conv.argname}}', "{{conv.typename | replace('"', "'")}}")
|
||||
{% endif %}
|
||||
else:
|
||||
|
||||
{% if conv.is_union %}
|
||||
{% set ns.retry_indent = 0 %}
|
||||
{% for union_type, is_builtin in conv.constrs %}
|
||||
{{ ' ' * ns.retry_indent }}try:
|
||||
{% if is_builtin %}
|
||||
{{ ' ' * ns.retry_indent }} {{conv.argname}} = {{union_type}}(conv_data)
|
||||
conv_data = from_data['{{conv.argname}}']
|
||||
if conv_data is None:
|
||||
{% if not conv.allow_none %}
|
||||
raise FieldValidationException('{{conv.argname}}', "{{conv.typename | replace('"', "'")}}", conv_data)
|
||||
{% else %}
|
||||
{{ ' ' * ns.retry_indent }} {{conv.argname}} = _lookup_table['{{union_type}}'](conv_data)
|
||||
{{ render_setter(conv.argname_escaped, 'None') | indent(4*3) }}
|
||||
{% endif %}
|
||||
{{ ' ' * ns.retry_indent }}except Exception as e:
|
||||
{% set ns.retry_indent = ns.retry_indent + 1 %}
|
||||
{% endfor %}
|
||||
{{ ' ' * ns.retry_indent }} raise e from e
|
||||
{% else %}
|
||||
|
||||
{% if conv.constrs[0][1] %}
|
||||
{% if conv.is_optional %}
|
||||
if '{{conv.argname}}' not in from_data_keys:
|
||||
{{conv.argname}} = None
|
||||
else:
|
||||
{{conv.argname}} = {{conv.constrs[0][0]}}(conv_data)
|
||||
{% else %}
|
||||
{{conv.argname}} = {{conv.constrs[0][0]}}(conv_data)
|
||||
{% endif %}
|
||||
{% else %}
|
||||
{% if conv.is_optional %}
|
||||
if '{{conv.argname}}' not in from_data_keys:
|
||||
{{conv.argname}} = None
|
||||
else:
|
||||
{{conv.argname}} = _lookup_table['{{conv.constrs[0][0]}}'](conv_data)
|
||||
{% else %}
|
||||
{{conv.argname}} = _lookup_table['{{conv.constrs[0][0]}}'](conv_data)
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
|
||||
{% endif %}
|
||||
{{ unwrap_type_data.render_segment(conv.argname_escaped, conv.constrs, "conv_data", false) | indent(4*3) }}
|
||||
|
||||
|
||||
{% endfor %}
|
||||
return _tgt_type({% for conv in conversions %}{{conv.argname}}={{conv.argname}}, {% endfor %})
|
||||
return {{funcname}}_tgt_type({% for conv in conversions %}{{conv.argname_escaped}}={{conv.argname_escaped}}, {% endfor %})
|
||||
|
||||
32
src/megasniff/templates/inflator_tuple.jinja2
Normal file
32
src/megasniff/templates/inflator_tuple.jinja2
Normal file
@@ -0,0 +1,32 @@
|
||||
{% set ns = namespace(retry_indent=0) %}
|
||||
{% import "unwrap_type_data.jinja2" as unwrap_type_data %}
|
||||
|
||||
|
||||
def {{funcname}}(from_data: {% if from_type is none %}dict[str, Any]{% else %}{{from_type}}{% endif %}) {% if tgt_type is not none %} -> tuple {% endif %}:
|
||||
"""
|
||||
{% for conv in conversions %}{{conv.argname_escaped}}:{{conv.typename}}, {% endfor %}
|
||||
"""
|
||||
from_data_keys = from_data.keys()
|
||||
|
||||
{% for conv in conversions %}
|
||||
if '{{conv.argname}}' not in from_data_keys:
|
||||
{% if conv.is_optional %}
|
||||
{{conv.argname_escaped}} = {{conv.default_option}}
|
||||
{% else %}
|
||||
raise MissingFieldException('{{conv.argname}}', "{{conv.typename | replace('"', "'")}}")
|
||||
{% endif %}
|
||||
else:
|
||||
conv_data = from_data['{{conv.argname}}']
|
||||
if conv_data is None:
|
||||
{% if not conv.allow_none %}
|
||||
raise FieldValidationException('{{conv.argname}}', "{{conv.typename | replace('"', "'")}}", conv_data)
|
||||
{% else %}
|
||||
{{conv.argname_escaped}} = None
|
||||
{% endif %}
|
||||
else:
|
||||
|
||||
{{ unwrap_type_data.render_segment(conv.argname_escaped, conv.constrs, "conv_data", false) | indent(4*3) }}
|
||||
|
||||
|
||||
{% endfor %}
|
||||
return ({% for conv in conversions %}{{conv.argname_escaped}}, {% endfor %})
|
||||
55
src/megasniff/templates/unwrap_type_data.jinja2
Normal file
55
src/megasniff/templates/unwrap_type_data.jinja2
Normal file
@@ -0,0 +1,55 @@
|
||||
{% macro render_iterable(argname, typedef, conv_data) -%}
|
||||
{%- set out -%}
|
||||
{{argname}} = []
|
||||
if not isinstance({{conv_data}}, list):
|
||||
raise FieldValidationException('{{argname}}', "list", conv_data, [])
|
||||
for item in {{conv_data}}:
|
||||
{{ render_segment("_" + argname, typedef, "item", false ) | indent(4) }}
|
||||
{{argname}}.append(_{{argname}})
|
||||
{%- endset %}
|
||||
{{out}}
|
||||
{%- endmacro %}
|
||||
|
||||
{% macro render_union(argname, conv, conv_data) -%}
|
||||
{%- set out -%}
|
||||
# unwrapping union {{conv}}
|
||||
{% set ns = namespace(retry_indent=0) %}
|
||||
{% set ns.retry_indent = 0 %}
|
||||
all_conv_exceptions = []
|
||||
{% for union_type in conv.typeref %}
|
||||
{{ ' ' * ns.retry_indent }}try:
|
||||
{{ render_segment(argname, union_type, conv_data, false) | indent((ns.retry_indent + 1) * 4) }}
|
||||
{{ ' ' * ns.retry_indent }}except Exception as e:
|
||||
{{ ' ' * ns.retry_indent }} all_conv_exceptions.append(e)
|
||||
{% set ns.retry_indent = ns.retry_indent + 1 %}
|
||||
{% endfor %}
|
||||
{{ ' ' * ns.retry_indent }}raise FieldValidationException('{{conv.argname}}', "{{conv.typename | replace('"', "'")}}", conv_data, all_conv_exceptions)
|
||||
{%- endset %}
|
||||
{{out}}
|
||||
{%- endmacro %}
|
||||
|
||||
{% macro render_segment(argname, typeref, conv_data, strict) -%}
|
||||
{%- set out -%}
|
||||
|
||||
{% if typeref is string %}
|
||||
{% if strict %}
|
||||
if not isinstance({{conv_data}}, {{typeref}}):
|
||||
raise FieldValidationException('{{argname}}', "{{typeref | replace('"', "'")}}", {{conv_data}}, [])
|
||||
{% endif %}
|
||||
{{argname}} = {{typeref}}({{conv_data}})
|
||||
|
||||
{% elif typeref.is_union %}
|
||||
{{render_union(argname, typeref, conv_data)}}
|
||||
|
||||
{% elif typeref.is_list %}
|
||||
{{render_iterable(argname, typeref.typeref, conv_data)}}
|
||||
{{argname}} = {{typeref.iterable_type}}({{argname}})
|
||||
|
||||
{% else %}
|
||||
{{render_segment(argname, typeref.typeref, conv_data, typeref.is_strict)}}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{%- endset %}
|
||||
{{out}}
|
||||
{%- endmacro %}
|
||||
@@ -1,6 +1,15 @@
|
||||
import collections.abc
|
||||
import dataclasses
|
||||
import inspect
|
||||
from typing import get_type_hints, Any
|
||||
from typing import get_type_hints, Any, get_origin, Iterable, Optional
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class TupleSchemaItem:
|
||||
schema: type
|
||||
key_name: str
|
||||
has_default: bool = False
|
||||
default: Any = None
|
||||
|
||||
|
||||
def is_typed_dict_type(tp: type) -> bool:
|
||||
@@ -19,7 +28,7 @@ def get_kwargs_type_hints(obj: type) -> dict[str, Any]:
|
||||
return get_type_hints(obj.__init__)
|
||||
|
||||
|
||||
def get_field_default(cls: type[Any], field: str) -> tuple[bool, Any]:
|
||||
def get_field_default(cls: type[Any] | Iterable[TupleSchemaItem], field: str) -> tuple[bool, Any]:
|
||||
if dataclasses.is_dataclass(cls):
|
||||
for f in dataclasses.fields(cls):
|
||||
if f.name == field:
|
||||
@@ -32,6 +41,12 @@ def get_field_default(cls: type[Any], field: str) -> tuple[bool, Any]:
|
||||
# поле не объявлено в dataclass
|
||||
return False, None
|
||||
|
||||
if isinstance(cls, collections.abc.Iterable):
|
||||
for i, t in enumerate(cls):
|
||||
if (t.key_name or f'_arg_{i}') == field:
|
||||
return t.has_default, t.default
|
||||
return False, None
|
||||
|
||||
sig = inspect.signature(cls.__init__)
|
||||
|
||||
params = list(sig.parameters.values())[1:]
|
||||
@@ -50,3 +65,9 @@ def get_field_default(cls: type[Any], field: str) -> tuple[bool, Any]:
|
||||
|
||||
def is_builtin_type(tp: type) -> bool:
|
||||
return getattr(tp, '__module__', None) == 'builtins'
|
||||
|
||||
|
||||
def typename(tp: type) -> str:
|
||||
if get_origin(tp) is None and hasattr(tp, '__name__'):
|
||||
return tp.__name__
|
||||
return str(tp)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from src.megasniff import SchemaInflatorGenerator
|
||||
|
||||
@@ -10,12 +11,28 @@ def test_basic_constructor():
|
||||
self.a = a
|
||||
|
||||
infl = SchemaInflatorGenerator()
|
||||
fn = infl.schema_to_generator(A)
|
||||
fn = infl.schema_to_inflator(A)
|
||||
a = fn({'a': 42})
|
||||
|
||||
assert a.a == 42
|
||||
|
||||
|
||||
def test_unions():
|
||||
@dataclass
|
||||
class A:
|
||||
a: int | str
|
||||
|
||||
infl = SchemaInflatorGenerator()
|
||||
fn = infl.schema_to_inflator(A)
|
||||
|
||||
a = fn({'a': 42})
|
||||
assert a.a == 42
|
||||
a = fn({'a': '42'})
|
||||
assert a.a == 42
|
||||
a = fn({'a': '42a'})
|
||||
assert a.a == '42a'
|
||||
|
||||
|
||||
@dataclass
|
||||
class CircA:
|
||||
b: CircB
|
||||
@@ -28,7 +45,18 @@ class CircB:
|
||||
|
||||
def test_circular():
|
||||
infl = SchemaInflatorGenerator()
|
||||
fn = infl.schema_to_generator(CircA)
|
||||
fn = infl.schema_to_inflator(CircA)
|
||||
a = fn({'b': {'a': None}})
|
||||
|
||||
return isinstance(a.b, CircB)
|
||||
assert isinstance(a.b, CircB)
|
||||
|
||||
|
||||
def test_optional():
|
||||
@dataclass
|
||||
class C:
|
||||
a: Optional[int] = None
|
||||
|
||||
infl = SchemaInflatorGenerator()
|
||||
fn = infl.schema_to_inflator(C)
|
||||
c = fn({})
|
||||
assert c.a is None
|
||||
|
||||
39
tests/test_exceptions.py
Normal file
39
tests/test_exceptions.py
Normal file
@@ -0,0 +1,39 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
import pytest
|
||||
|
||||
from megasniff import SchemaInflatorGenerator
|
||||
from megasniff.exceptions import MissingFieldException, FieldValidationException
|
||||
|
||||
|
||||
def test_missing_field():
|
||||
@dataclass
|
||||
class A:
|
||||
a: int
|
||||
|
||||
infl = SchemaInflatorGenerator()
|
||||
fn = infl.schema_to_inflator(A)
|
||||
with pytest.raises(MissingFieldException):
|
||||
fn({})
|
||||
|
||||
|
||||
def test_null():
|
||||
@dataclass
|
||||
class A:
|
||||
a: int
|
||||
|
||||
infl = SchemaInflatorGenerator()
|
||||
fn = infl.schema_to_inflator(A)
|
||||
with pytest.raises(FieldValidationException):
|
||||
fn({'a': None})
|
||||
|
||||
|
||||
def test_invalid_field():
|
||||
@dataclass
|
||||
class A:
|
||||
a: float | int | None
|
||||
|
||||
infl = SchemaInflatorGenerator()
|
||||
fn = infl.schema_to_inflator(A)
|
||||
with pytest.raises(FieldValidationException):
|
||||
fn({'a': {}})
|
||||
87
tests/test_iterables.py
Normal file
87
tests/test_iterables.py
Normal file
@@ -0,0 +1,87 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
from megasniff import SchemaInflatorGenerator
|
||||
|
||||
|
||||
def test_list_basic():
|
||||
@dataclass
|
||||
class A:
|
||||
l: list[int]
|
||||
|
||||
infl = SchemaInflatorGenerator()
|
||||
fn = infl.schema_to_inflator(A)
|
||||
|
||||
a = fn({'l': []})
|
||||
assert isinstance(a.l, list)
|
||||
assert len(a.l) == 0
|
||||
|
||||
a = fn({'l': [1, 2.1, '0']})
|
||||
print(a.l)
|
||||
assert isinstance(a.l, list)
|
||||
assert len(a.l) == 3
|
||||
assert all(map(lambda x: isinstance(x, int), a.l))
|
||||
|
||||
@dataclass
|
||||
class B:
|
||||
l: list[str]
|
||||
|
||||
fn = infl.schema_to_inflator(B)
|
||||
|
||||
a = fn({'l': [1, 2.1, '0']})
|
||||
print(a.l)
|
||||
assert isinstance(a.l, list)
|
||||
assert len(a.l) == 3
|
||||
assert all(map(lambda x: isinstance(x, str), a.l))
|
||||
assert a.l == ['1', '2.1', '0']
|
||||
|
||||
|
||||
def test_list_union():
|
||||
@dataclass
|
||||
class A:
|
||||
l: list[int | str]
|
||||
|
||||
infl = SchemaInflatorGenerator()
|
||||
fn = infl.schema_to_inflator(A)
|
||||
|
||||
a = fn({'l': []})
|
||||
assert isinstance(a.l, list)
|
||||
assert len(a.l) == 0
|
||||
|
||||
a = fn({'l': [1, 2.1, '0']})
|
||||
print(a.l)
|
||||
assert isinstance(a.l, list)
|
||||
assert len(a.l) == 3
|
||||
assert all(map(lambda x: isinstance(x, int), a.l))
|
||||
|
||||
|
||||
def test_set_basic():
|
||||
|
||||
@dataclass
|
||||
class A:
|
||||
l: set[int]
|
||||
|
||||
infl = SchemaInflatorGenerator()
|
||||
fn = infl.schema_to_inflator(A)
|
||||
|
||||
a = fn({'l': []})
|
||||
assert isinstance(a.l, set)
|
||||
assert len(a.l) == 0
|
||||
|
||||
a = fn({'l': [1, 2.1, '0']})
|
||||
print(a.l)
|
||||
assert isinstance(a.l, set)
|
||||
assert len(a.l) == 3
|
||||
assert all(map(lambda x: isinstance(x, int), a.l))
|
||||
|
||||
@dataclass
|
||||
class B:
|
||||
l: set[str]
|
||||
|
||||
fn = infl.schema_to_inflator(B)
|
||||
|
||||
a = fn({'l': [1, 2.1, '0', 0]})
|
||||
print(a.l)
|
||||
assert isinstance(a.l, set)
|
||||
assert len(a.l) == 3
|
||||
assert all(map(lambda x: isinstance(x, str), a.l))
|
||||
assert a.l == {'1', '2.1', '0'}
|
||||
43
tests/test_signature.py
Normal file
43
tests/test_signature.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import get_type_hints, Any, Annotated
|
||||
|
||||
from megasniff import SchemaInflatorGenerator
|
||||
|
||||
|
||||
def test_return_signature():
|
||||
@dataclass
|
||||
class A:
|
||||
a: list[int]
|
||||
|
||||
infl = SchemaInflatorGenerator(strict_mode=True)
|
||||
fn = infl.schema_to_inflator(A)
|
||||
|
||||
hints = get_type_hints(fn)
|
||||
assert hints['return'] == A
|
||||
assert len(hints) == 2
|
||||
|
||||
|
||||
def test_argument_signature():
|
||||
@dataclass
|
||||
class A:
|
||||
a: list[int]
|
||||
|
||||
infl = SchemaInflatorGenerator(strict_mode=True)
|
||||
|
||||
type custom_from_type = dict[str, Any]
|
||||
|
||||
fn1 = infl.schema_to_inflator(A, from_type_override=custom_from_type)
|
||||
|
||||
fn2 = infl.schema_to_inflator(A)
|
||||
|
||||
hints = get_type_hints(fn1)
|
||||
assert hints['return'] == A
|
||||
assert len(hints) == 2
|
||||
assert hints['from_data'] == custom_from_type
|
||||
assert hints['from_data'] != dict[str, Any]
|
||||
|
||||
hints = get_type_hints(fn2)
|
||||
assert hints['return'] == A
|
||||
assert len(hints) == 2
|
||||
assert hints['from_data'] != custom_from_type
|
||||
assert hints['from_data'] == dict[str, Any]
|
||||
75
tests/test_strict_mode.py
Normal file
75
tests/test_strict_mode.py
Normal file
@@ -0,0 +1,75 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
import pytest
|
||||
|
||||
from megasniff import SchemaInflatorGenerator
|
||||
from megasniff.exceptions import FieldValidationException
|
||||
|
||||
|
||||
def test_global_strict_mode_basic():
|
||||
class A:
|
||||
def __init__(self, a: int):
|
||||
self.a = a
|
||||
|
||||
infl = SchemaInflatorGenerator(strict_mode=True)
|
||||
fn = infl.schema_to_inflator(A)
|
||||
a = fn({'a': 42})
|
||||
|
||||
assert a.a == 42
|
||||
|
||||
with pytest.raises(FieldValidationException):
|
||||
fn({'a': 42.0})
|
||||
|
||||
|
||||
def test_global_strict_mode_basic_override():
|
||||
class A:
|
||||
def __init__(self, a: int):
|
||||
self.a = a
|
||||
|
||||
infl = SchemaInflatorGenerator(strict_mode=False)
|
||||
fn = infl.schema_to_inflator(A, strict_mode_override=True)
|
||||
a = fn({'a': 42})
|
||||
|
||||
assert a.a == 42
|
||||
|
||||
with pytest.raises(FieldValidationException):
|
||||
fn({'a': 42.0})
|
||||
|
||||
|
||||
def test_global_strict_mode_list():
|
||||
@dataclass
|
||||
class A:
|
||||
a: list[int]
|
||||
|
||||
infl = SchemaInflatorGenerator(strict_mode=True)
|
||||
fn = infl.schema_to_inflator(A)
|
||||
a = fn({'a': [42]})
|
||||
|
||||
assert a.a == [42]
|
||||
|
||||
with pytest.raises(FieldValidationException):
|
||||
fn({'a': [42.0, 42]})
|
||||
|
||||
|
||||
def test_global_strict_mode_circular():
|
||||
@dataclass
|
||||
class A:
|
||||
a: list[int]
|
||||
|
||||
@dataclass
|
||||
class B:
|
||||
b: list[A | int]
|
||||
|
||||
infl = SchemaInflatorGenerator(strict_mode=True)
|
||||
fn = infl.schema_to_inflator(B)
|
||||
b = fn({'b': [{'a': []}, 42]})
|
||||
|
||||
assert len(b.b) == 2
|
||||
assert isinstance(b.b[0], A)
|
||||
assert isinstance(b.b[1], int)
|
||||
|
||||
with pytest.raises(FieldValidationException):
|
||||
fn({'b': [42.0]})
|
||||
|
||||
with pytest.raises(FieldValidationException):
|
||||
fn({'b': [{'a': [1.1]}]})
|
||||
27
tests/test_tuple_inflate.py
Normal file
27
tests/test_tuple_inflate.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from src.megasniff.utils import TupleSchemaItem
|
||||
from src.megasniff import SchemaInflatorGenerator
|
||||
|
||||
|
||||
def test_basic_tuple():
|
||||
infl = SchemaInflatorGenerator()
|
||||
|
||||
fn = infl.schema_to_inflator({'a': int, 'b': float, 'c': str, 'd': list[int]})
|
||||
a = fn({'a': 42, 'b': 1.1, 'c': 123, 'd': []})
|
||||
|
||||
assert a[0] == 42
|
||||
|
||||
fn = infl.schema_to_inflator((('a', int), ('b', list[int])))
|
||||
a = fn({'a': 42, 'b': ['1']})
|
||||
|
||||
assert a[1][0] == 1
|
||||
|
||||
fn = infl.schema_to_inflator(
|
||||
(('a', int), TupleSchemaItem(Optional[list[int]], key_name='b', has_default=True, default=None)))
|
||||
a = fn({'a': 42})
|
||||
|
||||
assert a[1] is None
|
||||
assert a[0] == 42
|
||||
44
uv.lock
generated
44
uv.lock
generated
@@ -42,6 +42,35 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/3c/38/bbe2e63902847cf79036ecc75550d0698af31c91c7575352eb25190d0fb3/coverage-7.9.2-py3-none-any.whl", hash = "sha256:e425cd5b00f6fc0ed7cdbd766c70be8baab4b7839e4d4fe5fac48581dd968ea4", size = 204005, upload-time = "2025-07-03T10:54:13.491Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cython"
|
||||
version = "3.1.3"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/18/ab/915337fb39ab4f4539a313df38fc69938df3bf14141b90d61dfd5c2919de/cython-3.1.3.tar.gz", hash = "sha256:10ee785e42328924b78f75a74f66a813cb956b4a9bc91c44816d089d5934c089", size = 3186689, upload-time = "2025-08-13T06:19:13.619Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/7f/93/0e5dfcc6215a6c2cae509d7e40f8fb197237ba5998c936e9c19692f8eedf/cython-3.1.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:9458d540ef0853ea4fc65b8a946587bd483ef7244b470b3d93424eb7b04edeb1", size = 2998232, upload-time = "2025-08-13T06:20:35.817Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/6b/6c/01b22de45e3a9b86fbe4a18cd470146514209448cb4d3d3ba9c72390d45b/cython-3.1.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:32d1b22c3b231326e9f16480a7f508c6841bbf7d0615c2d6f489ebc72dd05205", size = 2830052, upload-time = "2025-08-13T06:20:37.71Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/52/08/a7d4b91b144b4bd015e932303861061cd43221f737ecdc6e380a438f245f/cython-3.1.3-cp313-cp313-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:d4c7e0b8584b02a349952de7d7d47f89c97cbf3fee74962e89e3caa78139ec84", size = 3359478, upload-time = "2025-08-13T06:20:39.811Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/f5/7d/b44ee735439ee73a88c6532536cfbc5b2f146c5f315effa124e85aadb447/cython-3.1.3-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c9178f0c06f4bc92372dc44e3867e9285bebd556953e47857c26b389aabe2828", size = 3155157, upload-time = "2025-08-13T06:20:42.305Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/a8/e0/ef1a44ba765057b04e99cf34dcc1910706a666ea66fcd2b92175ab645416/cython-3.1.3-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:d4da2e624d381e9790152672bfc599a5fb4b823b99d82700a10f5db3311851f9", size = 3305331, upload-time = "2025-08-13T06:20:44.423Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/62/f1/8bf3ea5babdef82df3023e72522c71bfc5cc5091e9710828a0dda81bda88/cython-3.1.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:680c9168882c5e8031dd31df199b9a5ee897e95136d15f8c6454b62162ede25e", size = 3171968, upload-time = "2025-08-13T06:20:48.962Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/b5/c3/c1383f987d3add9cb8655943f6a0f164bfd06951f28e51b7887d12c8716a/cython-3.1.3-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:833cd0fdba9210d2f1f29e097579565a296d7ff567fd63e8cf5fde4c14339f4f", size = 3372840, upload-time = "2025-08-13T06:20:51.495Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/71/d5/02fb7454756cb31b0c044050ee563ac172314aa8e74e5a4dd73bf77041d3/cython-3.1.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:c04367fa0e6c35b199eb51d64b5e185584b810f6c2b96726ce450300faf99686", size = 3317912, upload-time = "2025-08-13T06:20:53.461Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/91/62/b96227adf45236952f7cf07f869ff4157b82fe25ff7bb5ba9a3037c98993/cython-3.1.3-cp313-cp313-win32.whl", hash = "sha256:f02ef2bf72a576bf541534c704971b8901616db431bc46d368eed1d6b20aaa1e", size = 2479889, upload-time = "2025-08-13T06:20:55.437Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/74/09/100c0727d0fc8e4d7134c44c12b8c623e40f309401af56b7f6faf795c4bb/cython-3.1.3-cp313-cp313-win_amd64.whl", hash = "sha256:00264cafcc451dcefc01eaf29ed5ec150fb73af21d4d21105d97e9d829a53e99", size = 2701550, upload-time = "2025-08-13T06:20:57.503Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/23/0e/6e535f2eedf0ddc3c84b087e5d0f04a7b88d8229ec8c27be41a142bcbbfa/cython-3.1.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:62b0a9514b68391aae9784405b65738bbe19cdead3dd7b90dd9e963281db1ee3", size = 2995613, upload-time = "2025-08-13T06:20:59.408Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/77/10/3c9e2abf315f608bc22f49b6f9ee66859c23e07edbf484522d5f27b61ab7/cython-3.1.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:976db373c315f342dcb24cd65b5e4c08d2c7b42f9f6ac1b3f677eb2abc9bfb0f", size = 2841282, upload-time = "2025-08-13T06:21:01.274Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/cd/77/04e39af308d5716640bc638e7d90d8be34277ebc642ea5bda5ac09628215/cython-3.1.3-cp314-cp314-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:e765c12a02dea0bd968cf1e85af77be1dc6d21909c3fbf5bd81815a7cdd4a65e", size = 3361624, upload-time = "2025-08-13T06:21:03.418Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/75/f4/bdbc989ad88401e03ffe17e0bc3a03e3fe5dccbeb9c90e8762d7da4c7a45/cython-3.1.3-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:097374fa1370e9967e48442a41a0acbebb94fe9d63976cad31eacd38424847bf", size = 3194014, upload-time = "2025-08-13T06:21:05.719Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/a2/c8/9f282e5d31280f3912199b638c71557062443608eb3909a562283eda376d/cython-3.1.3-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:29d8fda4d62b693e62992c665a688e3a220be70958c48eb4c2634093c9998156", size = 3309703, upload-time = "2025-08-13T06:21:08.026Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/0a/09/83416a454a575e3ea7e84ec138f0b6dbfb34de28de4968359d7fdb428028/cython-3.1.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:da23fa5082940ae1eed487ee9b7c1da7015b53f9feffeee661f4ee57f696dcd5", size = 3210317, upload-time = "2025-08-13T06:21:10.92Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/8f/dc/901ed74302d52105588c59a41a239ef6bd01ff708391a15938aba9670b9e/cython-3.1.3-cp314-cp314-musllinux_1_2_i686.whl", hash = "sha256:8880daa7a0ddf971593f24da161c976bc1bea895393fdfebb8e54269321d9d2b", size = 3378211, upload-time = "2025-08-13T06:21:13.067Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/b7/6d/1e077b99a678b69a39bfe96e1888bcf6c868830220e635f862a44c7761b4/cython-3.1.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:20d6b5a9fc210d3bc2880413011f606e1208e12ee6efc74717445a63f9795af1", size = 3321051, upload-time = "2025-08-13T06:21:17.314Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/00/cd/2c442e9e41eafa851d89af1f62720007e03a12e1c01d9a71ed75f550a6c5/cython-3.1.3-cp314-cp314-win32.whl", hash = "sha256:3b2243fed3eeb129dedf2cebbe3be0d9b02fbf3bc75b387aafd54aac3950baa6", size = 2502067, upload-time = "2025-08-13T06:21:19.404Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/ae/63/7a1f2f06331f7dcf3fd31721fdaa8b60762748b82395631c0324672a4f2b/cython-3.1.3-cp314-cp314-win_amd64.whl", hash = "sha256:d32792c80b1fa8be9de207ec8844d49c4d1d0d60e5136d20f344729270db6490", size = 2733427, upload-time = "2025-08-13T06:21:21.525Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/56/c8/46ac27096684f33e27dab749ef43c6b0119c6a0d852971eaefb73256dc4c/cython-3.1.3-py3-none-any.whl", hash = "sha256:d13025b34f72f77bf7f65c1cd628914763e6c285f4deb934314c922b91e6be5a", size = 1225725, upload-time = "2025-08-13T06:19:09.593Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hatchling"
|
||||
version = "1.27.0"
|
||||
@@ -108,11 +137,13 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "megasniff"
|
||||
version = "0.1.2"
|
||||
version = "0.2.3.post2"
|
||||
source = { editable = "." }
|
||||
dependencies = [
|
||||
{ name = "cython" },
|
||||
{ name = "hatchling" },
|
||||
{ name = "jinja2" },
|
||||
{ name = "setuptools" },
|
||||
]
|
||||
|
||||
[package.dev-dependencies]
|
||||
@@ -123,8 +154,10 @@ dev = [
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "cython", specifier = ">=3.1.3" },
|
||||
{ name = "hatchling", specifier = ">=1.27.0" },
|
||||
{ name = "jinja2", specifier = ">=3.1.6" },
|
||||
{ name = "setuptools", specifier = ">=80.9.0" },
|
||||
]
|
||||
|
||||
[package.metadata.requires-dev]
|
||||
@@ -199,6 +232,15 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/bc/16/4ea354101abb1287856baa4af2732be351c7bee728065aed451b678153fd/pytest_cov-6.2.1-py3-none-any.whl", hash = "sha256:f5bc4c23f42f1cdd23c70b1dab1bbaef4fc505ba950d53e0081d0730dd7e86d5", size = 24644, upload-time = "2025-06-12T10:47:45.932Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "setuptools"
|
||||
version = "80.9.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/18/5d/3bf57dcd21979b887f014ea83c24ae194cfcd12b9e0fda66b957c69d1fca/setuptools-80.9.0.tar.gz", hash = "sha256:f36b47402ecde768dbfafc46e8e4207b4360c654f1f3bb84475f0a28628fb19c", size = 1319958, upload-time = "2025-05-27T00:56:51.443Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/a3/dc/17031897dae0efacfea57dfd3a82fdd2a2aeb58e0ff71b77b87e44edc772/setuptools-80.9.0-py3-none-any.whl", hash = "sha256:062d34222ad13e0cc312a4c02d73f059e86a4acbfbdea8f8f76b28c99f306922", size = 1201486, upload-time = "2025-05-27T00:56:49.664Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "trove-classifiers"
|
||||
version = "2025.5.9.12"
|
||||
|
||||
Reference in New Issue
Block a user