2015-04-04 77 views
1

NetworkX是否支持定制存储节点,边缘和属性的位置?例如,我想尝试2个选项:用于定制存储节点/边缘的NetworkX可扩展性

  1. 使用类似于LevelDB/Kyoto Cabinet的东西作为后备存储。

  2. 使用一些分布式数据库(Neo4j甚至HBase--我只需要节点/边缘的分布式存储)作为后备存储。

NetworkX支持这些事情的可扩展性点是什么?

回答

1

我会发布设置NetworkX的外部存储的细微差别。 Kikohs正确地指出,每个字典都有工厂。这些可以被覆盖。

对于持久存储,真正需要特别关注的唯一字典是节点字典。

必须特别注意类似字典的实现的行为。 NetworkX类中有代码可以更改字典在内存中返回的值,而不会将其设置回。

对于像例如事情:

self.succ[u][v]=datadict 
self.pred[v][u]=datadict 

这些值不会坚持回存储后端。为了适应这种情况,我使用了内存缓存来保存内存中的对象,当它们被驱逐时,它将它们写入底层存储。

对于我使用的内存中高速缓存cachetools。对于驱逐见:Python functools.lru_cache eviction callback or equivalent

对于我使用的底层存储plyvelhttps://plyvel.readthedocs.org/en/latest/)这是一个用于LevelDB的Python接口。

我也给了字典下面的实现。请注意,代码中仍然存在错误和错误,并且尚未进行正确测试,但您已经了解了一般想法。

class PlyvelBatchWrite(object): 
    def __init__(self, plv_dict): 
     self.__batch = plv_dict._db.write_batch() 
     self.__plvd = plv_dict 

    def set(self, key, val): 
     self.__batch.put(self.__plvd.serializer.pack(key), self.__plvd.serializer.pack(val)) 

    def delete(self, key): 
     self.__batch.delete(self.__plvd.serializer.pack(key)) 

    def clear(self): 
     self.__batch.clear() 

    def commit(self): 
     self.__batch.write() 


class PlyvelDict(MutableMapping): 
    def __init__(self, directory='', db=None, serializer_factory=None, cache_factory=None, **kwargs): 
     self.__directory = directory 
     ensure_directory(directory) 
     if isinstance(db, str) or db is None: 
      if db is None: 
       # generate UUID 
       db = str(uuid.uuid4()) 
      self.__db = db 
      db = plyvel.DB(self.name(), **kwargs) 
     else: 
      self.__db = kwargs['db'] 
     self._db = db 
     if serializer_factory: 
      self.serializer = serializer_factory() 
     else: 
      self.serializer = None 
     if cache_factory: 
      self.__cache = cache_factory(self.__cache_miss, self.__cache_evict) 
     else: 
      self.__cache = None 

    def name(self): 
     full_path = os.path.join(self.__directory, self.__db) 
     return full_path 

    def __cache_miss(self, key): 
     b_item = self._db.get(self.serializer.pack(key)) 
     if b_item is not None: 
      return self.serializer.unpack(b_item) 
     else: 
      raise KeyError(key) 

    def __cache_evict(self, key, val): 
     self._db.put(self.serializer.pack(key), self.serializer.pack(val)) 

    def __copy__(self): 
     return type(self)(self.__directory, self._db, type(self.serializer), type(self.__cache), db=self.__db) 

    def __getitem__(self, key): 
     return self.__cache[key] 

    def __setitem__(self, key, value): 
     if key in self.__cache: 
      self.__cache[key] = value 
     self.__write_to_db(key, value) 

    def __write_to_db(self, key, value): 
     self._db.put(self.serializer.pack(key), self.serializer.pack(value)) 

    def __delitem__(self, key): 
     if key in self.__cache: 
      del self.__cache[key] 
     self._db.delete(self.serializer.pack(key)) 

    def __iter__(self): 
     return self.iterkeys() 

    def __keytransform__(self, key): 
     return key 

    def __len__(self): 
     return self.count() 

    def __del__(self): 
     self.flush() 
     if not self._db.closed: 
      self._db.close() 

    # improved methods 
    def flush(self, write_to_db=False): 
     if self.__cache: 
      if write_to_db: 
       batch = self.set_batch() 
       for key, val in self.__cache.items(): 
        batch.set(key, val) 
       batch.commit() 
      self.__cache.clear() 

    def set_batch(self): 
     return PlyvelBatchWrite(self) 

    def iteritems(self): 
     self.flush() 
     for key, value in self._db.iterator(include_key=True, include_value=True): 
      yield (self.serializer.unpack(key), self.serializer.unpack(value)) 

    def iterkeys(self): 
     self.flush() 
     for key in self._db.iterator(include_key=True, include_value=False): 
      yield self.serializer.unpack(key) 

    def itervalues(self): 
     self.flush() 
     for val in self._db.iterator(include_key=False, include_value=True): 
      yield self.serializer.unpack(val) 

    def keys(self): 
     self.flush() 
     # fixes default method which calls __len__ 
     return list(self.iterkeys()) 

    def values(self): 
     self.flush() 
     return list(self.itervalues()) 

    def has_key(self, key): 
     return key in self 

    def clear(self): 
     self.flush() 
     for k in self: 
      del self[k] 

    def count(self): 
     self.flush() 
     return sum(1 for key in self) 

和图形类:

class PersistedGraph(nx.Graph): 
    def __init__(self, data=None, node_dict_factory=None, adjlist_dict_factory=None, edge_attr_dict_factory=None, 
       **attr): 
     if node_dict_factory: 
      self.node_dict_factory = node_dict_factory 
     if adjlist_dict_factory: 
      self.adjlist_dict_factory = adjlist_dict_factory 
     if edge_attr_dict_factory: 
      self.edge_attr_dict_factory = edge_attr_dict_factory 
     nx.Graph.__init__(self, data, **attr) 
1

应该可以通过继承类来扩展networkx,并提供用户定义的工厂函数。这些函数可以查询数据库并将结果缓存在networkx使用的字典中。

我找不到从在线文档,但在code你有这些线路:

子类(高级):

的图表类使用的字典-的快译通-OF-字典数据结构。

外部字典(node_dict)保存由节点键入的邻接列表。 下一个字典(adjlist)表示邻接列表,并保存由邻居键入的边缘数据。内部字典(edge_attr)表示边缘数据并保存由属性名称键入的边缘属性值。

这三个字母中的每一个都可以是,用户定义为 字典对象。一般来说,类似字典的特征应该保持 ,但可以添加额外的功能。若要替换 字典中的一个,可以通过更改类(!)变量 来保存该字典式结构的工厂来创建新图类。变量名称 是node_dict_factory,adjlist_dict_factory和edge_attr_dict_factory。

node_dict_factory : function, (default: dict) 
    Factory function to be used to create the outer-most dict 
    in the data structure that holds adjacency lists keyed by node. 
    It should require no arguments and return a dict-like object. 

    adjlist_dict_factory : function, (default: dict) 
    Factory function to be used to create the adjacency list 
    dict which holds edge data keyed by neighbor. 
    It should require no arguments and return a dict-like object 

    edge_attr_dict_factory : function, (default: dict) 
    Factory function to be used to create the edge attribute 
    dict which holds attrbute values keyed by attribute name. 
    It should require no arguments and return a dict-like object. 

我不知道任何官方扩展networkx。

相关问题