import logging
from rdflib import RDF, URIRef
from oldman.exception import AlreadyAllocatedModelError, OMInternalError
[docs]class ModelRegistry(object):
""" A :class:`~oldman.resource.registry.ModelRegistry` object registers
the :class:`~oldman.model.Model` objects.
Its main function is to find and order models from a set of class IRIs
(this ordering is crucial when creating new :class:`~oldman.resource.Resource` objects).
See :func:`~oldman.resource.registry.ModelRegistry.find_models_and_types` for more details.
"""
def __init__(self):
self._models_by_classes = {}
self._models_by_names = {}
self._default_model_name = None
#Only IRIs in this dict
self._model_descendants = {}
self._type_set_cache = {}
self._logger = logging.getLogger(__name__)
@property
[docs] def model_names(self):
"""Names of the registered models."""
return self._models_by_names.keys()
@property
[docs] def models(self):
return self._models_by_names.values()
@property
[docs] def non_default_models(self):
""" Non-default models."""
return [m for m in self._models_by_names.values() if m.name != self._default_model_name]
[docs] def has_specific_models(self):
""":return: `True` if contains other models than the default one."""
return len(self._models_by_names) > int(self._default_model_name is not None)
@property
[docs] def default_model(self):
return self._models_by_names.get(self._default_model_name)
[docs] def register(self, model, is_default=False):
"""Registers a :class:`~oldman.model.Model` object.
:param model: the :class:`~oldman.model.Model` object to register.
:param is_default: If `True`, sets the model as the default model. Defaults to `False`.
"""
class_iri = model.class_iri
self._logger.info("Register model %s (%s)" % (model.name, class_iri))
if class_iri in self._models_by_classes:
raise AlreadyAllocatedModelError(u"%s is already allocated to %s" %
(class_iri, self._models_by_classes[class_iri]))
if model.name in self._models_by_names:
raise AlreadyAllocatedModelError(u"%s is already allocated to %s" %
(model.name, self._models_by_names[model.name].class_iri))
sub_model_iris = set()
# The new is not yet in this list
for m in self._models_by_classes.values():
if class_iri in m.ancestry_iris:
sub_model_iris.add(m.class_iri)
self._model_descendants[class_iri] = sub_model_iris
self._models_by_classes[class_iri] = model
self._models_by_names[model.name] = model
# Clears the cache
self._type_set_cache = {}
if is_default:
if self._default_model_name is not None:
self._logger.warn(u"Default model name overwritten: %s" % model.name)
self._default_model_name = model.name
[docs] def unregister(self, model):
"""Un-registers a :class:`~oldman.model.Model` object.
:param model: the :class:`~oldman.model.Model` object to remove from the registry.
"""
self._models_by_classes.pop(model.class_iri)
self._model_descendants.pop(model.class_iri)
self._models_by_names.pop(model.name)
# Clears the cache
self._type_set_cache = {}
[docs] def get_model(self, class_name_or_iri):
"""Gets a :class:`~oldman.model.Model` object.
:param class_name_or_iri: Name or IRI of a RDFS class
:return: A :class:`~oldman.model.Model` object or `None` if not found
"""
model = self._models_by_classes.get(class_name_or_iri)
if model is None:
model = self._models_by_names.get(class_name_or_iri)
return model
[docs] def find_descendant_models(self, top_ancestor_name_or_iri):
"""TODO: explain. Includes the top ancestor. """
descendant_iris = set(self._model_descendants.get(top_ancestor_name_or_iri, []))
descendant_iris.add(top_ancestor_name_or_iri)
models = [self.get_model(class_iri) for class_iri in descendant_iris]
return filter(lambda x: x is not None, models)
[docs] def find_models_and_types(self, type_set):
"""Finds the leaf models from a set of class IRIs and orders them.
Also returns an ordered list of the RDFS class IRIs that
come from `type_set` or were deduced from it.
Leaf model ordering is important because it determines:
1. the IRI generator to use (the one of the first model);
2. method inheritance priorities between leaf models.
Resulting orderings are cached.
:param type_set: Set of RDFS class IRIs.
:return: An ordered list of leaf :class:`~oldman.model.Model` objects
and an ordered list of RDFS class IRIs.
"""
if type_set is None or len(type_set) == 0 or type_set == [None]:
if self._default_model_name is None:
raise OMInternalError(u"No default model defined!")
return [self._models_by_names[self._default_model_name]], []
if isinstance(type_set, list):
type_set = set(type_set).difference([None])
cache_entry = self._type_set_cache.get(tuple(type_set))
if cache_entry is not None:
leaf_models, types = cache_entry
# Protection against mutation
return list(leaf_models), list(types)
leaf_models = self._find_leaf_models(type_set)
leaf_model_iris = [m.class_iri for m in leaf_models if m.class_iri is not None]
ancestry_class_iris = {t for m in leaf_models for t in m.ancestry_iris}.difference(leaf_model_iris)
independent_class_iris = type_set.difference(leaf_model_iris).difference(ancestry_class_iris)
types = leaf_model_iris + list(independent_class_iris) + list(ancestry_class_iris)
pair = (leaf_models, types)
self._type_set_cache[tuple(type_set)] = pair
# If type_set was not exhaustive
self._type_set_cache[tuple(set(types))] = pair
# Protection against mutation
return list(leaf_models), list(types)
def _find_leaf_models(self, type_set):
leaf_models = []
for type_iri in type_set:
descendants = self._model_descendants.get(type_iri)
if (descendants is not None) and (len(descendants.intersection(type_set)) == 0):
model = self._models_by_classes[type_iri]
assert(model.class_iri == type_iri)
leaf_models.append(model)
if len(leaf_models) == 0:
default_model = self._models_by_names.get(self._default_model_name)
if default_model:
return [default_model]
return []
return self._sort_leaf_models(leaf_models)
def _sort_leaf_models(self, leaf_models):
"""TODO: propose some vocabulary to give priorities."""
if len(leaf_models) > 1:
self._logger.warn(u"Arbitrary order between leaf models %s" % [m.name for m in leaf_models])
return leaf_models