Source code for pylasu.transformation.transformation
import functools
from dataclasses import dataclass, field, Field, fields
from inspect import signature
from typing import Any, Dict, Callable, TypeVar, Generic, Optional, List, Set, Iterable, Type, Union
from pylasu.model import Node, Origin
from pylasu.model.errors import GenericErrorNode
from pylasu.model.model import concept_of
from pylasu.model.reflection import PropertyDescription
from pylasu.transformation.generic_nodes import GenericNode
from pylasu.validation import Issue, IssueSeverity
Child = TypeVar('Child')
Output = TypeVar('Output', bound=Node)
Source = TypeVar('Source')
Target = TypeVar('Target')
node_factory_constructor_type = Callable[[Source, "ASTTransformer", "NodeFactory[Source, Output]"], List[Output]]
node_factory_single_constructor_type = Callable[[Source, "ASTTransformer", "NodeFactory[Source, Output]"], Output]
[docs]
def field_of(cl: type, name: str) -> Field:
class_fields = fields(cl)
for fld in class_fields:
if fld.name == name:
return fld
raise Exception(f"Field {name} not found in {cl}")
[docs]
@dataclass
class NodeFactory(Generic[Source, Output]):
constructor: node_factory_constructor_type
children: Dict[str, "ChildNodeFactory[Source, Any, Any]"] = field(default_factory=dict)
finalizer: Callable[[Source], None] = field(default=lambda _: None)
[docs]
def with_child(
self,
setter: Union[Callable[[Target, Optional[Child]], None], PropertyRef, Field],
getter: Union[Callable[[Source], Optional[Any]], PropertyRef, Field],
name: Optional[str] = None,
target_type: Optional[type] = None
) -> "NodeFactory[Source, Output]":
if not name:
name = setter.name
if target_type:
prefix = target_type.__qualname__ + "#"
else:
prefix = ""
if isinstance(getter, PropertyRef):
getter = getter.get
elif isinstance(getter, Field):
getter = PropertyRef(getter.name).get
if isinstance(setter, PropertyRef):
setter = setter.set
elif isinstance(setter, Field):
setter = PropertyRef(setter.name).set
self.children[prefix + name] = ChildNodeFactory(prefix + name, getter, setter)
return self
[docs]
@dataclass
class ChildNodeFactory(Generic[Source, Target, Child]):
name: str
get: Callable[[Source], Optional[Any]]
setter: Callable[[Target, Optional[Child]], None]
[docs]
def set(self, node: Target, child: Optional[Child]):
try:
self.setter(node, child)
except Exception as e:
raise Exception(f"{self.name} could not set child {child} of {node} using {self.setter}") from e
"""Sentinel value used to represent the information that a given property is not a child node."""
NO_CHILD_NODE = ChildNodeFactory("", lambda x: x, lambda _, __: None)
[docs]
class ASTTransformer:
issues: List[Issue] = []
"Additional issues found during the transformation process."
allow_generic_node: bool = True
factories: Dict[type, NodeFactory]
"Factories that map from source tree node to target tree node."
known_classes: Dict[str, Set[type]]
def __init__(self, issues: List[Issue] = None, allow_generic_node: bool = True):
self.issues = issues if issues is not None else []
self.allow_generic_node = allow_generic_node
self.factories = dict()
self.known_classes = dict()
[docs]
def transform(self, source: Optional[Any], parent: Optional[Node] = None) -> Optional[Node]:
result = self.transform_into_nodes(source, parent)
if len(result) == 0:
return None
elif len(result) == 1:
return result[0]
else:
raise Exception(f"Cannot transform {source} into a single Node as multiple nodes where produced")
[docs]
def transform_into_nodes(self, source: Optional[Any], parent: Optional[Node] = None) -> List[Node]:
if source is None:
return []
elif isinstance(source, Iterable):
raise Exception(f"Mapping error: received collection when value was expected: {source}")
factory = self.get_node_factory(type(source))
if factory:
nodes = self.make_nodes(factory, source)
for node in nodes:
for pd in concept_of(node).node_properties:
self.process_child(source, node, pd, factory)
factory.finalizer(node)
node.parent = parent
else:
if self.allow_generic_node:
origin = self.as_origin(source)
nodes = [GenericNode(parent).with_origin(origin)]
self.issues.append(
Issue.semantic(
f"Source node not mapped: {type(source).__qualname__}",
IssueSeverity.INFO,
origin.position if origin else None))
else:
raise Exception(f"Unable to transform node {source} (${type(source)})")
return nodes
[docs]
def process_child(self, source, node, pd, factory):
child_key = type(node).__qualname__ + "#" + pd.name
if child_key in factory.children:
child_node_factory = factory.children[child_key]
elif pd.name in factory.children:
child_node_factory = factory.children[pd.name]
else:
child_node_factory = None
if child_node_factory:
if child_node_factory != NO_CHILD_NODE:
self.set_child(child_node_factory, source, node, pd)
else:
# TODO should we support @Mapped / dot-notation?
factory.children[child_key] = NO_CHILD_NODE
[docs]
def as_origin(self, source: Any) -> Optional[Origin]:
return source if isinstance(source, Origin) else None
[docs]
def set_child(self, child_node_factory: ChildNodeFactory, source: Any, node: Node, pd: PropertyDescription):
src = child_node_factory.get(self.get_source(node, source))
if pd.multiple:
child = []
for child_src in src:
child.extend(self.transform_into_nodes(child_src, node))
else:
child = self.transform(src, node)
try:
child_node_factory.set(node, child)
except Exception as e:
raise Exception(f"Could not set child {child_node_factory}") from e
[docs]
def make_nodes(self, factory: NodeFactory[Source, Target], source: Source) -> List[Node]:
try:
nodes = factory.constructor(source, self, factory)
for node in nodes:
if node.origin is None:
node.with_origin(self.as_origin(source))
return nodes
except Exception as e:
if self.allow_generic_node:
return [GenericErrorNode(error=e).with_origin(self.as_origin(source))]
else:
raise e
[docs]
def get_node_factory(self, node_type: Type[Source]) -> Optional[NodeFactory[Source, Target]]:
if node_type in self.factories:
return self.factories[node_type]
else:
for superclass in node_type.__mro__[1:]:
factory = self.get_node_factory(superclass)
if factory:
return factory
[docs]
def register_node_factory(
self, source: Type[Source],
factory: Union[node_factory_constructor_type, node_factory_single_constructor_type, Type[Target]]
) -> NodeFactory[Source, Target]:
if isinstance(factory, type):
node_factory = NodeFactory(lambda _, __, ___: [factory()])
else:
node_factory = NodeFactory(get_node_constructor_wrapper(factory))
self.factories[source] = node_factory
return node_factory
[docs]
def register_identity_transformation(self, node_class: Type[Target]):
self.register_node_factory(node_class, lambda node: node)
[docs]
def ensure_list(obj):
if isinstance(obj, list):
return obj
elif obj is not None:
return [obj]
else:
return []
[docs]
def get_node_constructor_wrapper(decorated_function): # noqa C901
try:
sig = signature(decorated_function)
try:
sig.bind(1, 2, 3)
def wrapper(node: Node, transformer: ASTTransformer, factory):
return ensure_list(decorated_function(node, transformer, factory))
except TypeError:
try:
sig.bind(1, 2)
def wrapper(node: Node, transformer: ASTTransformer, _):
return ensure_list(decorated_function(node, transformer))
except TypeError:
sig.bind(1)
def wrapper(node: Node, _, __):
return ensure_list(decorated_function(node))
except ValueError:
def wrapper(node: Node, transformer: ASTTransformer, factory):
return ensure_list(decorated_function(node, transformer, factory))
functools.update_wrapper(wrapper, decorated_function)
return wrapper
[docs]
def ast_transformer(
node_class: Type[Node],
transformer: ASTTransformer,
method_name: str = None):
"""Decorator to register a function as an AST transformer"""
def decorator(decorated_function):
if method_name:
def transformer_method(self, parent: Optional[Node] = None, transformer: ASTTransformer = transformer):
return transformer.transform(self, parent)
setattr(node_class, method_name, transformer_method)
if transformer:
return transformer.register_node_factory(node_class, decorated_function).constructor
else:
return get_node_constructor_wrapper(decorated_function)
return decorator