2017-08-02 57 views
0

我在单进程和多进程中尝试了批量插入,但他们使用同一时间。我没有得到任何改善。 cassandra的keyspace是SimpleStrategy,我认为它只有一个节点。做这些影响吗?python多处理bach在Cassandra中插入,性能没有提高

这是我的多处理代码,你能帮我找到哪里出错吗?

lock = Lock() 
ID = Value('i', 0) 

def copy(x): 

    cluster = Cluster() 
    session = cluster.connect('test') 
    global lock, row_ID 
    count = 0 

    insertt = session.prepare("INSERT INTO table2(id, age, gender, name) values(?, ?, ?, ?)") 
    batch = BatchStatement() 

    for i in x: 
     with open(files[i]) as csvfile: 
      reader = csv.reader(csvfile, delimiter=',') 
      for row in tqdm(reader): 
       if count <= 59: 
        with lock: 
         ID.value += 1 
        name_ID = row[1] 
        gender_ID = row[2] 
        age_ID = int(row[3]) 
        batch.add(insertt, (ID.value, age_ID, gender_ID, name_ID)) 
        count += 1 
       else: 
        count = 0 
        with lock: 
         ID.value += 1 
        name_ID = row[1] 
        gender_ID = row[2] 
        age_ID = int(row[3]) 
        batch.add(insertt, (ID.value, age_ID, gender_ID, name_ID)) 
        session.execute(batch) 
        batch = BatchStatement() 

if __name__ == '__main__': 
    start = time.time() 
    with Pool() as p: 
     p.map(copy, [range(0,6),range(6,12),range(12,18),range(18,24)]) 
     end = time.time() 
     t = end - start 
     print(t) 

回答

1

批次不是为了提高性能,而是真的相反。特别记录的批次(您在此使用的)是正常写入成本的两倍多。未记录的批次可能会略微提高性能如果批次中的所有数据都属于同一分区。

在这个特定的例子中,你的吞吐量也会受限于你的csv阅读器可以从磁盘中获取多快。由于其阻塞可能是吞吐量的主要影响之一。您也可以使用executeAsync,以便在完成前一个批处理时不阻塞下一个批处理的构建(尽管在这里再次使用批处理)。

+0

谢谢,我是卡桑德拉的新手。我比较了在一个过程中使用批处理(记录)和不使用批处理所消耗的时间,它实际上节省了大约4/5时间。但是当我添加多处理时,它具有相同的性能。所以另一个问题是“我是否正确使用多处理”?你能否给我一些相关的教程链接? – Oak

+0

如果可能,你能直接纠正我的代码吗?提前致谢。 – Oak

+0

如果您从执行更改为executeasync,您将看到更大的差异。您看到批处理性能提高的原因是因为吞吐量在使用同步请求时被延迟阻塞。通过减少请求的数量,减少阻塞延迟(littles law fwiw)。但是这个请求实际上更加昂贵且速度更慢。 –