2017-02-28 37 views
1

我试图使用python driver加载和删除卡桑德拉的数据。我已经尝试过使用cassandra在Docker容器中运行,并且在Docker版本给我带来问题之后再次尝试这种方法。下面是我在做什么的例子:卡桑德拉连接空闲和超时

class Controller(object): 
def __init__(self): 
    self.cluster = Cluster() 
    self.session = self.cluster.connect('mykeyspace') 

def insert_into_cassandra(self): 
    query = ('INSERT INTO mytable (mykey, indexed_key) VALUES (?, ?)') 
    prepared = self.session.prepare(query) 
    prepared.consistency_level = ConsistencyLevel.QUORUM 
    params_gen = self.params_generator(fname) 
    execute_concurrent_with_args(self.session, prepared, self.parameter_generator(), concurrency=50) 

def delete_param_gen(self, results): 
    for r in results: 
     yield [r.mykey] 

def delete_by_index(self, value): 
    query = "SELECT mykey from mytable where indexed_key = '%s'" % value 
    res = self.session.execute(query) 
    delete_query = "DELETE from mytable where mykey = ?" 
    prepared = self.session.prepare(delete_query) 
    prepared.consistency_level = ConsistencyLevel.QUORUM 
    params_gen = self.delete_param_gen(res) 
    execute_concurrent_with_args(self.session, prepared, params_gen, concurrency=50) 

没有疯狂。在加载/删除数据时,我经常看到以下消息:

Sending options message heartbeat on idle connection (4422117360) 127.0.0.1 
Heartbeat failed for connection (4422117360) to 127.0.0.1 

以下是删除数据的一些日志。

[2017-02-28 08:37:20,562] [DEBUG] [cassandra.connection] Defuncting connection (4422117360) to 127.0.0.1: errors=Connection heartbeat timeout after 30 seconds, last_host=127.0.0.1 
[2017-02-28 08:37:20,563] [DEBUG] [cassandra.io.libevreactor] Closing connection (4422117360) to 127.0.0.1 
[2017-02-28 08:37:20,563] [DEBUG] [cassandra.io.libevreactor] Closed socket to 127.0.0.1 
[2017-02-28 08:37:20,564] [DEBUG] [cassandra.pool] Defunct or closed connection (4422117360) returned to pool, potentially marking host 127.0.0.1 as down 
[2017-02-28 08:37:20,566] [DEBUG] [cassandra.pool] Replacing connection (4422117360) to 127.0.0.1 
[2017-02-28 08:37:20,567] [DEBUG] [cassandra.connection] Defuncting connection (4426057600) to 127.0.0.1: errors=Connection heartbeat timeout after 30 seconds, last_host=127.0.0.1 
[2017-02-28 08:37:20,567] [DEBUG] [cassandra.io.libevreactor] Closing connection (4426057600) to 127.0.0.1 
[2017-02-28 08:37:20,567] [DEBUG] [cassandra.io.libevreacto[2017-02-28 08:37:20,568] [ERROR] [cassandra.cluster] Unexpected exception while handling result in ResponseFuture: 
Traceback (most recent call last): 
    File "cassandra/cluster.py", line 3536, in cassandra.cluster.ResponseFuture._set_result (cassandra/cluster.c:67556) 
    File "cassandra/cluster.py", line 3711, in cassandra.cluster.ResponseFuture._set_final_result (cassandra/cluster.c:71769) 
    File "cassandra/concurrent.py", line 154, in cassandra.concurrent._ConcurrentExecutor._on_success (cassandra/concurrent.c:3357) 
    File "cassandra/concurrent.py", line 203, in cassandra.concurrent.ConcurrentExecutorListResults._put_result (cassandra/concurrent.c:5539) 
    File "cassandra/concurrent.py", line 209, in cassandra.concurrent.ConcurrentExecutorListResults._put_result (cassandra/concurrent.c:5427) 
    File "cassandra/concurrent.py", line 123, in cassandra.concurrent._ConcurrentExecutor._execute_next (cassandra/concurrent.c:2369) 
    File "load_cassandra.py", line 148, in delete_param_gen 
    for r in rows: 
    File "cassandra/cluster.py", line 3991, in cassandra.cluster.ResultSet.next (cassandra/cluster.c:76025) 
    File "cassandra/cluster.py", line 4006, in cassandra.cluster.ResultSet.fetch_next_page (cassandra/cluster.c:76193) 
    File "cassandra/cluster.py", line 3781, in cassandra.cluster.ResponseFuture.result (cassandra/cluster.c:73073) 
cassandra.cluster.NoHostAvailable: ('Unable to complete the operation against any hosts', {})r] Closed socket to 127.0.0.1 

这里有一些来自插入数据:

[2017-02-28 16:50:25,594] [DEBUG] [cassandra.connection] Sending options message heartbeat on idle connection (140301574604448) 127.0.0.1 
[2017-02-28 16:50:25,595] [DEBUG] [cassandra.cluster] [control connection] Attempting to reconnect 
[2017-02-28 16:50:25,596] [DEBUG] [cassandra.cluster] [control connection] Opening new connection to 127.0.0.1 
[2017-02-28 16:50:25,596] [DEBUG] [cassandra.connection] Not sending options message for new connection(140301347717016) to 127.0.0.1 because compression is disabled and a cql version was not specified 
[2017-02-28 16:50:25,596] [DEBUG] [cassandra.connection] Sending StartupMessage on <AsyncoreConnection(140301347717016) 127.0.0.1:9042> 
[2017-02-28 16:50:25,596] [DEBUG] [cassandra.connection] Sent StartupMessage on <AsyncoreConnection(140301347717016) 127.0.0.1:9042> 
[2017-02-28 16:50:30,596] [DEBUG] [cassandra.io.asyncorereactor] Closing connection (140301347717016) to 127.0.0.1 
[2017-02-28 16:50:30,596] [DEBUG] [cassandra.io.asyncorereactor] Closed socket to 127.0.0.1 
[2017-02-28 16:50:30,596] [DEBUG] [cassandra.connection] Connection to 127.0.0.1 was closed during the startup handshake 
[2017-02-28 16:50:30,597] [WARNING] [cassandra.cluster] [control connection] Error connecting to 127.0.0.1: 
Traceback (most recent call last): 
    File "cassandra/cluster.py", line 2623, in cassandra.cluster.ControlConnection._reconnect_internal (cassandra/cluster.c:47899) 
    File "cassandra/cluster.py", line 2645, in cassandra.cluster.ControlConnection._try_connect (cassandra/cluster.c:48416) 
    File "cassandra/cluster.py", line 1119, in cassandra.cluster.Cluster.connection_factory (cassandra/cluster.c:15085) 
    File "cassandra/connection.py", line 333, in cassandra.connection.Connection.factory (cassandra/connection.c:5790) 
cassandra.OperationTimedOut: errors=Timed out creating connection (5 seconds), last_host=None 
[2017-02-28 16:50:39,309] [ERROR] [root] Exception inserting data into cassandra 
Traceback (most recent call last): 
    File "load_cassandra.py", line 54, in run 
    controller.insert_into_cassandra(filename) 
    File "extract_to_cassandra.py", line 141, in insert_into_cassandra 
    for success, result in results: 
    File "cassandra/concurrent.py", line 177, in _results (cassandra/concurrent.c:4856) 
    File "cassandra/concurrent.py", line 186, in cassandra.concurrent.ConcurrentExecutorGenResults._results (cassandra/concurrent.c:4622) 
    File "cassandra/concurrent.py", line 165, in cassandra.concurrent._ConcurrentExecutor._raise (cassandra/concurrent.c:3745) 
cassandra.WriteTimeout: Error from server: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'consistency': 'QUORUM', 'required_responses': 1, 'received_responses': 0} 
[2017-02-28 16:50:39,465] [DEBUG] [cassandra.connection] Received options response on connection (140301574604448) from 127.0.0.1 
[2017-02-28 16:50:39,466] [DEBUG] [cassandra.cluster] Shutting down Cluster Scheduler 
[2017-02-28 16:50:39,467] [DEBUG] [cassandra.cluster] Shutting down control connection 
[2017-02-28 16:50:39,467] [DEBUG] [cassandra.io.asyncorereactor] Closing connection (140301574604448) to 127.0.0.1 
[2017-02-28 16:50:39,467] [DEBUG] [cassandra.io.asyncorereactor] Closed socket to 127.0.0.1 
[2017-02-28 16:50:39,468] [DEBUG] [cassandra.pool] Defunct or closed connection (140301574604448) returned to pool, potentially marking host 127.0.0.1 as down 

我与一致性扭捏,甚至将其设置为1,但没有奏效。当在本地运行cassandra而不是docker时,插入操作会更好,但仍然超时。删除通常工作几秒钟,然后挂起/超时。

编辑:这里的是从卡桑德拉日志时,事情会失败:

INFO 18:39:11 MUTATION messages were dropped in last 5000 ms: 4 for internal timeout and 0 for cross node timeout. Mean internal dropped latency: 2933809 ms and Mean cross-node dropped latency: 0 msINFO 18:39:11 Pool Name     Active Pending  Completed Blocked All Time Blocked                       [48/1513] 
INFO 18:39:11 MutationStage     32  15   470   0     0 

INFO 18:39:11 ViewMutationStage     0   0    0   0     0 

INFO 18:39:11 ReadStage       0   0    59   0     0 

INFO 18:39:11 RequestResponseStage    0   0    0   0     0 

INFO 18:39:11 ReadRepairStage     0   0    0   0     0 

INFO 18:39:11 CounterMutationStage    0   0    0   0     0 

INFO 18:39:11 MiscStage       0   0    0   0     0 

INFO 18:39:11 CompactionExecutor    0   0   6399   0     0 

INFO 18:39:11 MemtableReclaimMemory    0   0    36   0     0 

INFO 18:39:11 PendingRangeCalculator   0   0    1   0     0 

INFO 18:39:11 GossipStage      0   0    0   0     0 

INFO 18:39:11 SecondaryIndexManagement   0   0    0   0     0 

INFO 18:39:11 HintsDispatcher     0   0    0   0     0 

INFO 18:39:11 MigrationStage     0   0    2   0     0 

INFO 18:39:11 MemtablePostFlush     0   0    62   0     0 

INFO 18:39:11 PerDiskMemtableFlushWriter_0   0   0    36   0     0 

INFO 18:39:11 ValidationExecutor    0   0    0   0     0 

INFO 18:39:11 Sampler       0   0    0   0     0 

INFO 18:39:11 MemtableFlushWriter    0   0    36   0     0 

INFO 18:39:11 InternalResponseStage    0   0    0   0     0 

INFO 18:39:11 AntiEntropyStage     0   0    0   0     0 

INFO 18:39:11 CacheCleanupExecutor    0   0    0   0     0 

INFO 18:39:11 Native-Transport-Requests  33   0   727   0     0 

INFO 18:39:11 CompactionManager     0   0INFO 18:39:11 MessagingService    n/a  0/0 
INFO 18:39:11 Cache Type      Size     Capacity    KeysToSave 
INFO 18:39:11 KeyCache      1368     51380224      all 
INFO 18:39:11 RowCache       0      0      all 
INFO 18:39:11 Table      Memtable ops,data 
INFO 18:39:11 system_distributed.parent_repair_history     0,0 
INFO 18:39:11 system_distributed.repair_history     0,0 
INFO 18:39:11 system_distributed.view_build_status     0,0 
INFO 18:39:11 system.compaction_history    1,231 
INFO 18:39:11 system.hints        0,0 
INFO 18:39:11 system.schema_aggregates     0,0 
INFO 18:39:11 system.IndexInfo       0,0 
INFO 18:39:11 system.schema_columnfamilies     0,0 
INFO 18:39:11 system.schema_triggers     0,0 
INFO 18:39:11 system.size_estimates     40,1255 
INFO 18:39:11 system.schema_functions     0,0 
INFO 18:39:11 system.paxos        0,0 
INFO 18:39:11 system.views_builds_in_progress     0,0 
INFO 18:39:11 system.built_views      0,0 
INFO 18:39:11 system.peer_events      0,0 
INFO 18:39:11 system.range_xfers      0,0 
INFO 18:39:11 system.peers        0,0 
INFO 18:39:11 system.batches       0,0 
INFO 18:39:11 system.schema_keyspaces     0,0 
INFO 18:39:11 system.schema_usertypes     0,0 
INFO 18:39:11 system.local        0,0 
INFO 18:39:11 system.sstable_activity     6,117 
INFO 18:39:11 system.available_ranges     0,0 
INFO 18:39:11 system.batchlog       0,0 
INFO 18:39:11 system.schema_columns      0,0 
INFO 18:39:11 system_schema.columns      0,0 
INFO 18:39:11 system_schema.types      0,0 
INFO 18:39:11 system_schema.indexes      0,0 
INFO 18:39:11 system_schema.keyspaces     0,0 
INFO 18:39:11 system_schema.dropped_columns     0,0 
INFO 18:39:11 system_schema.aggregates     0,0 
INFO 18:39:11 system_schema.triggers     0,0 
INFO 18:39:11 system_schema.tables      0,0 
INFO 18:39:11 system_schema.views      0,0 
INFO 18:39:11 system_schema.functions     0,0 
INFO 18:39:11 system_auth.roles       0,0 
INFO 18:39:11 system_auth.role_members     0,0 
INFO 18:39:11 system_auth.resource_role_permissons_index     0,0 
INFO 18:39:11 system_auth.role_permissions     0,0 
INFO 18:39:11 mykeyspace.mytable      430,27163514 
INFO 18:39:11 system_traces.sessions     0,0 
INFO 18:39:11 system_traces.events      0,0 
INFO 18:39:13 ParNew GC in 261ms. CMS Old Gen: 46106544 -> 74868512; Par Eden Space: 208895224 -> 0; Par Survivor Space: 16012448 -> 26083328 

我看到的消息像这样太:

你可以尝试
Out of 29 commit log syncs over the past 248s with average duration of  1596.14ms, 1 have exceeded the configured commit interval by an average of 18231.00ms 

回答

0

一件事,是降低idle_heartbeat_interval设置在你的连接。默认情况下它是30秒,但可以在实例化Cluster类时进行配置。在这个例子中,我将它设置为10秒:

def __init__(self): 
    self.cluster = Cluster(idle_heartbeat_interval=10) 
    self.session = self.cluster.connect('mykeyspace') 

如果没有帮助,那么它可能是时间来检查你的数据模型反模式。

+0

不幸的是,没有奏效。我注意到这只发生在我的数据包含每个项目大约80kb的blob字段时。如果我改变这个更小的东西一切正常。我在cassandra.yaml文件中增加了write_request_timeout_in_ms(当然重新启动了),但这并没有帮助 – ukejoe