from collections import defaultdict
import logging
import networkx as nx
from networkx.algorithms.dag import is_directed_acyclic_graph, topological_sort
from oldman.core.session.session import Session
from oldman.core.session.tracker import BasicResourceTracker
from oldman.storage.resource import StoreResource
[docs]class CrossStoreSession(Session):
@property
def tracker(self):
raise NotImplementedError("Should be implemented by a concrete implementation.")
[docs] def new(self, id, store, types=None, hashless_iri=None, collection_iri=None, is_new=True, former_types=None,
**kwargs):
"""Creates a new :class:`~oldman.resource.Resource` object **without saving it** in the `data_store`.
The `kwargs` dict can contains regular attribute key-values that will be assigned to
:class:`~oldman.attribute.OMAttribute` objects.
TODO: update this doc
:param id: TODO: explain
:param types: IRIs of RDFS classes the resource is instance of. Defaults to `None`.
Note that these IRIs are used to find the models of the resource
(see :func:`~oldman.resource.manager.ResourceManager.find_models_and_types` for more details).
:param hashless_iri: hash-less IRI that MAY be considered when generating an IRI for the new resource.
Defaults to `None`. Ignored if `id` is given. Must be `None` if `collection_iri` is given.
:param collection_iri: IRI of the controller to which this resource belongs. This information
is used to generate a new IRI if no `id` is given. The IRI generator may ignore it.
Defaults to `None`. Must be `None` if `hashless_iri` is given.
:return: A new :class:`~oldman.resource.Resource` object.
"""
raise NotImplementedError("Should be implemented by a concrete implementation.")
[docs] def load_from_graph(self, id, resource_graph, store):
raise NotImplementedError("Should be implemented by a concrete implementation.")
[docs]class DefaultCrossStoreSession(CrossStoreSession):
"""TODO: explain because the name can be counter-intuitive
"""
def __init__(self, store_selector):
self._store_selector = store_selector
self._tracker = BasicResourceTracker()
self._logger = logging.getLogger(__name__)
@property
def tracker(self):
return self._tracker
[docs] def flush(self, is_end_user=True):
"""TODO: re-implement it """
all_resources_to_update = self._sort_resources_to_update(self._tracker.modified_resources)
all_resources_to_delete = self._tracker.resources_to_delete
store_cluster = cluster_by_store_and_status(all_resources_to_update, all_resources_to_delete)
all_updated_resources = []
all_deleted_resources = []
for store in self._sort_stores(all_resources_to_update, all_resources_to_delete):
resources_to_update, resources_to_delete = store_cluster[store]
updated_resources, deleted_resources = store.flush(resources_to_update, resources_to_delete, is_end_user)
all_updated_resources.extend(updated_resources)
all_deleted_resources.extend(deleted_resources)
# TODO: improve this
self._tracker.add_all(all_updated_resources)
self._tracker.forget_resources_to_delete()
return all_updated_resources, all_deleted_resources
[docs] def get(self, iri, types=None, eager_with_reversed_attributes=True):
for store in self._store_selector.select_stores(iri=iri, types=types):
store_resource = store.get(self, iri, types=types,
eager_with_reversed_attributes=eager_with_reversed_attributes)
if store_resource is not None:
return store_resource
return None
[docs] def new(self, id, store, types=None, is_new=True, former_types=None, **kwargs):
resource = StoreResource(id, store.model_manager, store, self, types=types, is_new=is_new,
former_types=former_types)
self._tracker.add(resource)
return resource
[docs] def load_from_graph(self, id, resource_graph, store):
resource = StoreResource.load_from_graph(store.model_manager, store, self, id, resource_graph, is_new=False)
self._tracker.add(resource)
return resource
[docs] def delete(self, store_resource):
"""TODO: describe.
Wait for the next flush() to remove the resource
from the store.
"""
self._tracker.mark_to_delete(store_resource)
store_resource.prepare_deletion()
[docs] def receive_reference(self, reference, object_resource=None, object_iri=None):
self._tracker.receive_reference(reference, object_resource=object_resource, object_iri=object_iri)
[docs] def receive_reference_removal_notification(self, reference):
self._tracker.receive_reference_removal_notification(reference)
[docs] def close(self):
"""Does nothing"""
pass
def _sort_resources_to_update(self, resources_to_update):
""" TODO: implement it seriously. Construct a dependency graph.
The order is important when saving resources with temporary IDs.
"""
if len(resources_to_update) == 1:
return resources_to_update
graph = nx.DiGraph()
for resource in resources_to_update:
if resource not in graph:
graph.add_node(resource)
dependencies = self._tracker.get_dependencies(resource)
for dep_resource in dependencies:
if dep_resource not in graph:
graph.add_node(dep_resource)
# Inverse dependency edge
graph.add_edge(dep_resource, resource)
if is_directed_acyclic_graph(graph):
return topological_sort(graph)
else:
self._logger.warn("Some resources are mutually dependent so cannot be sorted. \n"
"This may cause problems when flushing new resources (with temporary IDs)"
"to some stores.")
return resources_to_update
@staticmethod
def _sort_stores(all_resources_to_update, all_resources_to_delete):
""" TODO: explain.
TODO: improve the implementation to throw an exception if the order
cannot be enforced.
"""
stores = []
# Here the order matters
for resource in all_resources_to_update:
if resource.store not in stores:
stores.append(resource.store)
# Probably the order does not really matters here
for resource in all_resources_to_delete:
if resource.store not in stores:
stores.append(resource.store)
return stores
[docs]def cluster_by_store_and_status(resources_to_update, resources_to_delete):
update_cluster = cluster_by_store(resources_to_update)
delete_cluster = cluster_by_store(resources_to_delete)
stores = update_cluster.keys() + delete_cluster.keys()
return {store: (update_cluster[store], delete_cluster[store])
for store in stores}
[docs]def cluster_by_store(resources):
cluster = defaultdict(list)
for resource in resources:
cluster[resource.store].append(resource)
return cluster