嗨,我是超新于一般,所以我有一个问题,可能是相当幼稚的芹菜和任务队列。
我想要一个相当大的.csv文件(将其转换为pandas DataFrame)并在所有列对上对其执行皮尔逊测试(统计数学函数)。大约需要9分钟到一个核心,我们有数百个.csv文件!
所以我想在我们的3台服务器集群上的所有内核之间划分这个处理。这里有一个原型的我的代码迄今....
from celery import Celery
import numpy as np
import pandas as pd
import scipy.stats as stats
import itertools
app = Celery()
minute_CSV = pd.read_csv('./test_dataframe.csv')
cycle_length = 300
row_max = minute_CSV.shape[0]
r_vector_data = pd.DataFrame()
column_combinations = itertools.combinations(minute_CSV.filter(regex='FREQ').keys(),2)
xy_cols = list(column_combinations)
@app.task
def data_processing(minute_CSV, cycle_length, row_max, x, y):
return np.array([stats.pearsonr(minute_CSV[x][c-cycle_length:c],
minute_CSV[y][c-cycle_length:c])[0] for c in range(cycle_length,row_max)])
for i in range(0, len(xy_cols)):
x = xy_cols[i][0]
y = xy_cols[i][1]
r_vector_data[x + ' to ' + y] = data_processing.delay(minute_CSV, cycle_length, row_max, x, y)
pd.DataFrame.to_csv(r_vector_data, processed_dataframe.csv)
当我运行此我得到这个消息:
“[1200行×870列]是不是JSON序列化”
的数学
Pearson相关的工作方式如下:取300(对我来说)两个C的顺序排运行相关性并将结果存储在新的DataFrame(r_vector_data)中。这是对行进行的:(0..299),(1..300),(2..301)等等。
另外,这个脚本只考虑一个.csv文件,但稍后会被修改:)。
关于何去何从的想法?我将如何使用芹菜来完成这一任务,因为我在文档中有点迷失。
谢谢!