Source code for oldman.core.model.attribute

import logging
from collections import namedtuple
from weakref import WeakKeyDictionary

from rdflib import Literal

from oldman.core.exception import OMAttributeTypeCheckError, OMRequiredPropertyError, OMReadOnlyAttributeError, OMEditError
from oldman.core.id import generate_uuid_iri
from oldman.core.parsing.value import AttributeValueExtractor
from oldman.core.resource.reference import ResourceReference
from oldman.core.resource.resource import Resource
from oldman.core.validation.value_format import ValueFormatError

OMAttributeMetadata = namedtuple("OMAttributeMetadata", ["name", "property", "language", "jsonld_type",
                                                         "container", "reversed"])


[docs]class OMAttribute(object): """An :class:`~oldman.attribute.OMAttribute` object corresponds to a JSON-LD term that refers to a RDF property. TODO: update the documentation. No direct access to the resource_manager anymore (indirect through the resource). Technically, the name of the :class:`~oldman.attribute.OMAttribute` object is a JSON-LD term, namely *"a short-hand string that expands to an IRI or a blank node identifier"* (cf. `the JSON-LD standard <http://www.w3.org/TR/json-ld/#dfn-term>`_) which corresponds here to a RDF property (see :class:`~oldman.property.OMProperty`). In JSON-LD, the same RDF property may correspond to multiple JSON-LD terms that have different metadata. For instance, a foaf:Person resource may have two attributes for its bio in English and in French. These attributes have two different languages but use the same property `bio:olb`. Look at the quickstart example to see it in practice. An :class:`~oldman.attribute.OMAttribute` object manages the values of every :class:`~oldman.resource.Resource` object that depends on a given :class:`~oldman.model.Model` object. Each value may be : - `None`; - The Python equivalent for a RDF literal (double, string, date, etc.); - An IRI; - A collection (set, list and dict) of these types. :param metadata: :class:`~oldman.attribute.OMAttributeMetadata` object. :param value_format: :class:`~oldman.validation.value_format.ValueFormat` object that validates the format of values and converts RDF values into regular Python objects. """ _CONTAINER_REQUIREMENTS = {'@set': set, '@list': list, '@language': dict, # '@index': dict, None: object} def __init__(self, metadata, value_format): self._metadata = metadata self._value_format = value_format self._entries = WeakKeyDictionary() self._value_extractor = AttributeValueExtractor(self) # TODO: support "@index" if self.container not in [None, "@set", "@list", "@language"]: raise NotImplementedError(u"Container %s is not yet supported" % self.container) @property def is_required(self): """`True` if its property is required.""" return self._metadata.property.is_required @property def is_read_only(self): """`True` if the property cannot be modified by regular end-users.""" return self._metadata.property.is_read_only @property def is_write_only(self): """`True` if the property cannot be accessed by regular end-users.""" return self._metadata.property.is_write_only @property def om_property(self): """:class:`~oldman.property.OMProperty` to which it belongs.""" return self._metadata.property @property def name(self): """Its name as an attribute.""" return self._metadata.name @property def language(self): """Its language if localized.""" return self._metadata.language @property def jsonld_type(self): """JSON-LD type (datatype IRI or JSON-LD keyword). May be `None`.""" return self._metadata.jsonld_type @property def reversed(self): """`True` if the object and subject in RDF triples should be reversed.""" return self._metadata.reversed @property def other_attributes(self): """ Other :class:`~oldman.attribute.OMAttribute` objects of the same property.""" return self.om_property.om_attributes.difference([self]) @property def container(self): """JSON-LD container (`"@set"`, `"@list"`, `"@language"` or `"@index"`). May be `None`. """ return self._metadata.container @property def value_format(self): """:class:`~oldman.validation.value_format.ValueFormat` object that validates the format of values and converts RDF values into regular Python objects. """ return self._value_format
[docs] def is_valid(self, resource, is_end_user=True): """Tests if the attribute value assigned to a resource is valid. See :func:`~oldman.attribute.OMAttribute.check_validity` for further details. :return: `False` if the value assigned to the resource is invalid and `True` otherwise. """ try: self.check_validity(resource, is_end_user) return True except OMEditError: return False
[docs] def check_validity(self, resource, is_end_user=True): """Raises an :class:`~oldman.exception.OMEditError` exception if the attribute value assigned to a resource is invalid. :param resource: :class:`~oldman.resource.Resource` object. :param is_end_user: `False` when an authorized user (not a regular end-user) wants to force some rights. Defaults to `True`. """ self._check_local_constraints(resource, is_end_user) self._check_requirement(resource)
def _check_local_constraints(self, resource, is_end_user): # Read-only constraint if is_end_user and self.is_read_only and self.has_changed(resource): raise OMReadOnlyAttributeError(u"Attribute %s is not editable by end-users" % self.name) def _check_requirement(self, resource): """A required property has to be provided by at least one of its attributes.""" if (not self.om_property.is_required) or self.has_value(resource): return for other in self.other_attributes: if other.has_value(resource): return raise OMRequiredPropertyError(self.name) def _check_iris(self, resource, resource_mediator): """ Does nothing by default. Does not concern literals. """ pass
[docs] def has_value(self, resource): """Tests if the resource attribute has a non-None value. :param resource: :class:`~oldman.resource.Resource` object. :return: `False` if the value is `None`. """ entry = self._entries.get(resource) return (entry is not None) and (entry.current_value is not None)
[docs] def has_changed(self, resource): """ :param resource: :class:`~oldman.resource.Resource` object. """ entry = self._entries.get(resource) return (entry is not None) and entry.has_changed()
[docs] def diff(self, resource): """Gets out the former value that has been replaced. TODO: update this comment :param resource: :class:`~oldman.resource.Resource` object. :return: The former and new attribute values. """ entry = self._entries.get(resource) if entry is None: #TODO: throw a more precise exception raise Exception("No diff available for attribute %s of %s" % (self.name, resource.id)) return entry.diff()
[docs] def receive_storage_ack(self, resource): """Clears the former value that has been replaced. TODO: update this description. :param resource: :class:`~oldman.resource.Resource` object. """ entry = self._entries.get(resource) if entry is not None: entry.receive_storage_ack()
[docs] def to_nt(self, resource): """Converts its current attribute value to N-Triples (NT) triples. Relies on :func:`~oldman.attribute.OMAttribute.value_to_nt`. :param resource: :class:`~oldman.resource.Resource` object. :return: N-Triples serialization of its attribute value. """ entry = self._entries.get(resource, None) value = entry.current_value if entry is not None else None return self.value_to_nt(value)
[docs] def value_to_nt(self, value): """Converts value(s) to N-Triples (NT) triples. :param value: Value of property. :return: N-Triples serialization of this value. """ if value is None: return "" vs = value if isinstance(value, (list, set, dict)) else [value] if isinstance(vs, dict): converted_values = [self._encode_value(v, language) for language, v in vs.iteritems()] else: converted_values = [self._encode_value(v) for v in vs] property_uri = self.om_property.iri lines = "" if self.container == "@list": # list_value = u"( " + u" ".join(converted_values) + u" )" # List with skolemized nodes first_node = "<%s>" % generate_uuid_iri() node = first_node for v in converted_values: lines += u' %s rdf:first %s .\n' % (node, v) previous_node = node node = "<%s>" % generate_uuid_iri() lines += u' %s rdf:rest %s .\n' % (previous_node, node) lines += u' %s rdf:rest rdf:nil .\n' % node serialized_values = [first_node] else: serialized_values = converted_values if self.reversed: for v in serialized_values: assert(v.startswith(u"<") and v.endswith(u">")) lines += u' %s <%s> %s .\n' % (v, property_uri, u"{0}") else: for v in serialized_values: lines += u' %s <%s> %s .\n' % (u"{0}", property_uri, v) return lines
[docs] def update_from_graph(self, resource, sub_graph, initial=False): """Updates a resource attribute value by extracting the relevant information from a RDF graph. :param resource: :class:`~oldman.resource.Resource` object. :param sub_graph: :class:`rdflib.Graph` object containing the value to extract. :param initial: `True` when the value is directly from the datastore. Defaults to `False`. """ values = self._value_extractor.extract_value(resource, sub_graph) setattr(resource, self.name, values) if initial: # Clears "None" former value self.receive_storage_ack(resource)
def _encode_value(self, value, language=None): """Encodes an atomic value into a N-Triples line. :param value: Atomic value. :param language: language code. Defaults to `None`. :return: N-Triples triple. """ jsonld_type = self.jsonld_type if language is None: language = self.language if jsonld_type == "@id": iri = value.object_iri if isinstance(value, ResourceReference) else value return u"<%s>" % iri elif language: return u'"%s"@%s' % (Literal(value), language) elif jsonld_type: return u'"%s"^^<%s>' % (Literal(value), jsonld_type) # Should we really define unknown types as string? else: raise NotImplementedError(u"Untyped JSON-LD value are not (yet?) supported")
[docs] def get(self, resource): """Gets the attribute value of a resource. :param resource: :class:`~oldman.resource.Resource` object. :return: Atomic value or a generator. """ entry = self._entries.get(resource) #TODO: should we throw an exception? if entry is None: return None return entry.current_value
[docs] def get_lightly(self, resource): """Gets the attribute value of a resource in a lightweight manner. By default, behaves exactly like :func:`~oldman.attribute.OMAttribute.get`. See the latter function for further details. """ return OMAttribute.get(self, resource)
[docs] def set(self, resource, value): """Sets the attribute value of a resource. :param resource: :class:`~oldman.resource.Resource` object. :param value: Its value for this attribute. """ # Even if None self.check_value(value) # Empty container -> None if isinstance(value, (list, set, dict)) and len(value) == 0: value = None entry = self._entries.get(resource) if entry is None: entry = Entry() self._entries[resource] = entry else: self._detach_previous_value(entry.current_value) entry.current_value = value
[docs] def check_value(self, value): """Checks a new **when assigned**. Raises an :class:`oldman.exception.OMAttributeTypeCheckError` exception if the value is invalid. :param value: collection or atomic value. """ # None value are always allowed # (at assignment time) if value is None: return required_container_type = OMAttribute._CONTAINER_REQUIREMENTS[self.container] if not isinstance(value, required_container_type): raise OMAttributeTypeCheckError(u"A container (%s) was expected instead of %s" % (required_container_type, type(value))) try: if isinstance(value, (list, set, dict)): self._check_container(value) else: self._value_format.check_value(value) except ValueFormatError as e: raise OMAttributeTypeCheckError(unicode(e))
[docs] def get_entry(self, resource): """TODO: describe. Clearly not for end-users!!! """ return self._entries.get(resource)
[docs] def has_entry(self, resource): """TODO: describe. Clearly not for end-users!!! """ return resource in self._entries
[docs] def set_entry(self, resource, entry): """TODO: describe. Clearly not for end-users!!! """ # Validation if entry.has_changed(): former_value, new_value = entry.diff() self.check_value(former_value) self.check_value(new_value) else: self.check_value(entry.current_value) self._entries[resource] = entry
def _check_container(self, value): """Checks that container used is authorized and its items are formatted properly. May raise a :class:`oldman.exception.OMAttributeTypeCheckError` or a :class:`oldman.exception.ValueFormatError` exception. :param value: collection of atomic items. """ if not self.container: logger = logging.getLogger(__name__) logger.warn("No container declared for %s" % self.name) # List declaration is required (default: set) # TODO: what about dict? if isinstance(value, list): raise OMAttributeTypeCheckError(u"Undeclared list %s assigned to %s ." u"For using a list, '@container': '@list' must be declared" u"in the JSON-LD context." % (value, self.name)) vs = value.values() if isinstance(value, dict) else value for v in vs: self._value_format.check_value(v) def _detach_previous_value(self, previous_value): """TODO: explain """ # By default, does nothing pass
[docs]class ObjectOMAttribute(OMAttribute): """An :class:`~oldman.attribute.ObjectOMAttribute` object is an :class:`~oldman.attribute.OMAttribute` object that depends on an owl:ObjectProperty. """ def __init__(self, metadata, value_format): OMAttribute.__init__(self, metadata, value_format)
[docs] def get(self, resource): """See :func:`~oldman.attribute.OMAttribute.get`. :return: :class:`~oldman.resource.Resource` object or a generator of :class:`~oldman.resource.Resource` objects. """ references = OMAttribute.get(self, resource) if isinstance(references, (list, set)): # Returns a generator return (ref.get() for ref in references) elif isinstance(references, dict): raise NotImplementedError(u"Should we implement it?") elif references is not None: ref = references return ref.get() else: return None
[docs] def get_lightly(self, resource): """Gets the attribute value of a resource in a lightweight manner. By contrast with :func:`~oldman.attribute.ObjectOMAttribute.get` only IRIs are returned, not :class:`~oldman.resource.Resource` objects. :return: An IRI, a list or a set of IRIs or `None`. """ references = OMAttribute.get(self, resource) return get_iris(references)
[docs] def set(self, resource, original_value): """See :func:`~oldman.attribute.OMAttribute.set`. Accepts :class:`~oldman.resource.Resource` object(s) or IRI(s). """ if isinstance(original_value, dict): if self.container == "@index": raise NotImplementedError(u"Index maps are not yet supported") else: raise OMAttributeTypeCheckError(u"Index maps must be declared. Other dict structures " u"are not supported for objects.") elif isinstance(original_value, list): value_to_store = [ResourceReference(resource, self, v) for v in original_value] elif isinstance(original_value, set): value_to_store = {ResourceReference(resource, self, v) for v in original_value} elif isinstance(original_value, (str, unicode)): value_to_store = ResourceReference(resource, self, original_value) elif isinstance(original_value, Resource): value_to_store = ResourceReference(resource, self, original_value) elif original_value is None: value_to_store = None else: raise ValueError("Unexpected assigned object value: %s" % original_value) OMAttribute.set(self, resource, value_to_store)
def _detach_previous_value(self, previous_value): """TODO: explain """ if previous_value is None: references = [] elif isinstance(previous_value, ResourceReference): references = [previous_value] elif isinstance(previous_value, (list, set)): references = previous_value else: raise ValueError("Unsupported previous_value: %s" % previous_value) for ref in references: ref.detach()
[docs]class Entry(object): """ Mutable. TODO: describe """ def __init__(self, saved_value=None): self._former_value = saved_value self._current_value = saved_value
[docs] def clone(self): new_entry = Entry(self._clone_value(self._former_value)) new_entry.current_value = self._current_value return new_entry
@property def current_value(self): return self._clone_value(self._current_value) @current_value.setter def current_value(self, new_value): self._current_value = self._clone_value(new_value)
[docs] def has_changed(self): """ True if the value differs from the stored one """ if self._former_value is None and self._current_value is not None: return True return self._former_value != self._current_value
[docs] def diff(self): """TODO: explain """ # TODO: find a better exception if not self.has_changed(): raise Exception("No diff") return self._clone_value(self._former_value), self._clone_value(self._current_value)
[docs] def receive_storage_ack(self): """TODO: explain """ self._former_value = self._current_value
@staticmethod def _clone_value(value): if isinstance(value, set): return set(value) if isinstance(value, list): return list(value) if isinstance(value, dict): return dict(value) return value
[docs]def get_iris(references): """TODO: describe it """ if isinstance(references, list): return [ref.object_iri for ref in references] elif isinstance(references, set): return {ref.object_iri for ref in references} elif isinstance(references, dict): raise NotImplementedError(u"Should we implement it?") elif references is not None: # A Resource object or an IRI ref = references return ref.object_iri else: return None