2017-09-23 38 views
-1

我需要找到一种方法,使用7 CPU的出的8个可用的CPU的运行下面的代码多个处理器:使用的Python程序

import pandas as pd 
import numpy as np 
import datetime 
import math 
from itertools import chain, combinations 
import operator 
import time as t 
from multiprocessing import Pool 
#ASSUMPTION 
#EQUAL ALLOCATION OF RESOURCES 

t0 = t.time() 
start_date = '2016-06-01' 
end_date = '2017-09-19' 
allocation = 170000 
#usesymbols=['PAEL', 'ISL', 'ENGRO', 'HUBC', 'NML', 'SNGP', 'OGDC', 
      #'ATRL', 'AVN'] 
usesymbols = ['PAEL', 'ISL', 'SNGP', 'OGDC'] 

cost_matrix = [] 

def data(symbols): 
    dates=pd.date_range(start_date,end_date) 
    df=pd.DataFrame(index=dates) 
    for symbol in symbols: 
     df_temp=pd.read_csv('/home/furqan/Desktop/python_data/{}.csv'.format(str(symbol)),usecols=['Date','Close'], 
          parse_dates=True,index_col='Date',na_values=['nan']) 
     df_temp = df_temp.rename(columns={'Close': symbol}) 
     df=df.join(df_temp) 
     df=df.fillna(method='ffill') 
     df=df.fillna(method='bfill') 
    return df 

def mat_alloc_auto(symbols): 

    n = len(symbols) 
    mat_alloc = np.zeros((n,n), dtype='float') 
    for i in range(0,n): 
     mat_alloc[i,i] = allocation/n 
    return mat_alloc 

def compute_daily_returns(df): 
    """Compute and return the daily return values.""" 
    daily_returns=(df/df.shift(1))-1 
    df=df.fillna(value=0) 
    daily_returns=daily_returns[1:] 
    daily_returns = np.array(daily_returns) 
    return daily_returns 

def port_eval(matrix_alloc,daily_return_matrix): 
    risk_free = 0 
    amount_matrix = [allocation] 
    return_mat = np.dot(daily_return_matrix,matrix_alloc) 
    return_mat = np.sum(return_mat, axis=1, keepdims=True) 
    return_mat = np.divide(return_mat,amount_matrix) 
    mat_average = np.mean(return_mat) 
    mat_std = np.std(return_mat, ddof=1) 
    sharpe_ratio = ((mat_average-risk_free)/mat_std) * math.sqrt(252) 
    return return_mat, sharpe_ratio, mat_average 


def powerset(iterable): 
    s = list(iterable) 
    return chain.from_iterable(combinations(s, r) for r in range(1, len(s)+1)) 

power_set = list(powerset(usesymbols)) 
len_power = len(power_set) 
sharpe = [] 
for j in range(0, len_power): 
    p = Pool(5) 
    p.map(data, powerset[j]) 
    matrix_allocation = mat_alloc_auto(power_set[j]) 
    daily_return_mat = compute_daily_returns(df_01) 
    return_matrix, sharpe_ratio_val, matrix_average = port_eval(matrix_allocation, daily_return_mat) 
    sharpe.append(sharpe_ratio_val) 


max_index, max_value = max(enumerate(sharpe), key=operator.itemgetter(1)) 
print('Maximum sharpe ratio occurs from ',power_set[max_index], ' value = ', max_value) 

t1=t.time() 
print('exec time is ', t1-t0, 'seconds') 

我怎么能这样做?

如果我按照原样运行上面的代码,它只使用1个CPU,即使有七个可以使用的CPU。

我需要做的是让处理时间减少,我可以分析更多的数据。

目前,上述代码花费的时间约为5小时。

如果使用符号列表变大,程序将终止。

我使用多重处理编辑了代码。 错误,我得到的是:

p.map(data, powerset[j]) 
TypeError: 'function' object is not subscriptable 
+0

那么你需要使用'multiprocessing'。就目前而言,你只是要求将整个代码翻译成使用多处理的代码;你有没有努力去做这件事? – roganjosh

+0

我已经使用了多处理模块。我已经将Pool导入为p,然后当我使用p时。地图(数据(powerset [j]))它不起作用。 –

+0

“不起作用”是什么意思?你有错误吗?如果是这样,请把它们放在你的问题中。 – roganjosh

回答

0

如果您真的需要多个进程的并行性,有两种方法可以解决这个问题。首先是使用bharel提出的multiprocess.map或类似的技术。 另一种方法是使用适当编译的numpy版本。后来的anaconda环境随mkl库一起提供,这确保了大部分数组计算都是高度并行完成的。

第三种方法,我相信在你的情况下效率最高,是确保你不会在代码中重复自己。如果我正确地阅读代码,则多次解析相同的csv文件。你应该做的第一件事是确保你只读取和解析每个文件一次。现在,它看起来像是每个powerset多次解析相同的csv文件(而不仅仅是for循环)。

最重要的是,我相信你会多次重复相同的每日回报计算。 我猜你的想法是改善分配功能(考虑到你的问题),但是因为它目前正在重复多次,所以我不妨指出它。

一旦你完成了对csv文件的读取,我会想象代码会大大加快。如果你想进一步改进它,你可以选择多处理路线或者numpy(和pandas)矢量化路线。 后者可能会更快,但由于您需要构建和管理阵列以获得最佳吞吐量,您的代码看起来可读性较差。

0

是否p.map(data, power_set[j])吧?我想你也打算把它分配到某个地方。

尽量为变量和函数选择明确和不同的名称,否则会混淆包括自己在内的周围人。

TypeError: 'function' object is not subscriptable意味着你可能有一个错字。另外,关于时间优化,记住利用更多的CPU通常不是解决方案。尝试检查分析算法并使用cProfile模块运行一些配置文件。

+0

技术上他们是不同的,但你怎么拿起'_'是有点超出我:) – roganjosh

+0

@roganjosh错误说,它不必选择任何东西:-P – Bharel

+0

我还没有看到他们也代码中有'power_set'作为变量。看看这个实际是否有效将会很有趣。 – roganjosh