2012-03-30 68 views
11

可能有人请相对于OHLC数据的时间内转换与Pandas点我在正确的方向?我想要做的是在更高的时间范围内为数据创建一个数据帧,并给定时间较短的数据。转换OHLC股票数据转换成不同的时间段使用Python和大熊猫

例如,假设我有以下一分钟(M1)数据:

     Open High  Low Close Volume 
Date              
1999-01-04 10:22:00 1.1801 1.1819 1.1801 1.1817  4 
1999-01-04 10:23:00 1.1817 1.1818 1.1804 1.1814  18 
1999-01-04 10:24:00 1.1817 1.1817 1.1802 1.1806  12 
1999-01-04 10:25:00 1.1807 1.1815 1.1795 1.1808  26 
1999-01-04 10:26:00 1.1803 1.1806 1.1790 1.1806  4 
1999-01-04 10:27:00 1.1801 1.1801 1.1779 1.1786  23 
1999-01-04 10:28:00 1.1795 1.1801 1.1776 1.1788  28 
1999-01-04 10:29:00 1.1793 1.1795 1.1782 1.1789  10 
1999-01-04 10:31:00 1.1780 1.1792 1.1776 1.1792  12 
1999-01-04 10:32:00 1.1788 1.1792 1.1788 1.1791  4 

具有开放式,高,低,关闭(OHLC)和容积值,每一分钟我想建一组5分钟的读数(M5),其看起来像这样:

     Open High  Low Close Volume 
Date              
1999-01-04 10:25:00 1.1807 1.1815 1.1776 1.1789  91 
1999-01-04 10:30:00 1.1780 1.1792 1.1776 1.1791  16 

所以工作流程是:

  • 打开为t的开放他在timewindow第一行
  • 高是在timewindow
  • 低最低低
  • 关闭是最后关闭
  • 量最高的高简直是卷

还有的总和虽然有几个问题:

  • 的数据差距(注意没有10:30:00行)
  • 5分钟的间隔必须在整个时间开始,例如, M5开始于10:25:00不是10:22:00
  • 第一,不完整的集可以在这个例子中可以省略等,或者包括(所以我们可以有10:20:00 5分钟进入)

Pandas documentation on up-down sampling举例说明,但它们使用平均值作为上采样行的值,这在此不起作用。我曾尝试使用groupbyagg,但无济于事。对于一个获得最高和最低的可能并不难,但我不知道如何获得第一个开放和最后一个关闭。

我想什么是沿着线的东西:

grouped = slice.groupby(dr5minute.asof).agg( 
    { 'Low': lambda x : x.min()[ 'Low' ], 'High': lambda x : x.max()[ 'High' ] } 
) 

,但它会导致下面的错误,我不明白:

In [27]: grouped = slice.groupby(dr5minute.asof).agg({ 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] }) 
--------------------------------------------------------------------------- 
IndexError        Traceback (most recent call last) 
/work/python/fxcruncher/<ipython-input-27-df50f9522a2f> in <module>() 
----> 1 grouped = slice.groupby(dr5minute.asof).agg({ 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] }) 

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in agg(self, func, *args, **kwargs) 
    242   See docstring for aggregate 
    243   """ 
--> 244   return self.aggregate(func, *args, **kwargs) 
    245 
    246  def _iterate_slices(self): 

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in aggregate(self, arg, *args, **kwargs) 
    1153      colg = SeriesGroupBy(obj[col], column=col, 
    1154           grouper=self.grouper) 
-> 1155      result[col] = colg.aggregate(func) 
    1156 
    1157    result = DataFrame(result) 

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in aggregate(self, func_or_funcs, *args, **kwargs) 
    906     return self._python_agg_general(func_or_funcs, *args, **kwargs) 
    907    except Exception: 
--> 908     result = self._aggregate_named(func_or_funcs, *args, **kwargs) 
    909 
    910    index = Index(sorted(result), name=self.grouper.names[0]) 

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in _aggregate_named(self, func, *args, **kwargs) 
    976    grp = self.get_group(name) 
    977    grp.name = name 
--> 978    output = func(grp, *args, **kwargs) 
    979    if isinstance(output, np.ndarray): 
    980     raise Exception('Must produce aggregated value') 

/work/python/fxcruncher/<ipython-input-27-df50f9522a2f> in <lambda>(x) 
----> 1 grouped = slice.groupby(dr5minute.asof).agg({ 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] }) 

IndexError: invalid index to scalar variable. 

这样做的任何帮助将是不胜感激。如果我选择的路径不起作用,请提出其他相对有效的方法(我有数百万行)。一些使用熊猫进行金融处理的资源也很好。

+2

您使用的是什么版本的熊猫?我们正在研究改进后的时间序列功能,这将大大简化这一过程,但它不可能在四月底左右发布。可能有一个错误需要修复,不过, – 2012-03-30 15:24:09

+1

Hi Wes,我正在使用0.7.2。我想等待新版本发布是一个可行的选择,因为我没有这个转换的最后期限(我需要这些数据用于私人研究)。让我使用这个ocasion来感谢你为开发熊猫做出的努力! :) – kgr 2012-03-30 15:55:16

+0

至于潜在的错误,请注意,我没有为Dataframe中的所有列指定值(仅限于2/5),如果这就是您的意思。 – kgr 2012-03-30 15:56:29

回答

8

您的方法是合理的,但是因为应用于agg()函数的函数中的每个函数都接收到一个Series对象,该对象反映了键值匹配的列。因此,再次在列标签上过滤 是没有必要的。有了这一点,假设GROUPBY保持秩序, 你可以切片的系列提取打开/关闭 列(注的第一个/最后一个元素:GROUPBY文件并没有要求保留原始数据的 系列秩序,但似乎在实践。)

In [50]: df.groupby(dr5minute.asof).agg({'Low': lambda s: s.min(), 
             'High': lambda s: s.max(), 
             'Open': lambda s: s[0], 
             'Close': lambda s: s[-1], 
             'Volume': lambda s: s.sum()}) 
Out[50]: 
         Close High  Low Open Volume 
key_0              
1999-01-04 10:20:00 1.1806 1.1819 1.1801 1.1801  34 
1999-01-04 10:25:00 1.1789 1.1815 1.1776 1.1807  91 
1999-01-04 10:30:00 1.1791 1.1792 1.1776 1.1780  16 

作为参考,这里是总结基于所述GROUPBY对象类型以及如何聚合功能(多个)被传递/到AGG()的预期 输入和输出类型的聚合功能的表。

    agg() method  agg func agg func   agg() 
        input type  accepts  returns   result 
GroupBy Object 
SeriesGroupBy  function   Series  value    Series 
        dict-of-funcs Series  value    DataFrame, columns match dict keys 
        list-of-funcs Series  value    DataFrame, columns match func names 
DataFrameGroupBy function   DataFrame Series/dict/ary DataFrame, columns match original DataFrame 
        dict-of-funcs Series  value    DataFrame, columns match dict keys, where dict keys must be columns in original DataFrame 
        list-of-funcs Series  value    DataFrame, MultiIndex columns (original cols x func names) 

从上面的表中,如果聚合需要访问多于一个 列,唯一的选择是一个单一功能传递给 DataFrameGroupBy对象。因此,为了完成原始任务的替代方法是定义一个 函数如下所示:

def ohlcsum(df): 
    df = df.sort() 
    return { 
     'Open': df['Open'][0], 
     'High': df['High'].max(), 
     'Low': df['Low'].min(), 
     'Close': df['Close'][-1], 
     'Volume': df['Volume'].sum() 
     } 

并应用AGG()与它:

In [30]: df.groupby(dr5minute.asof).agg(ohlcsum) 
Out[30]: 
         Open High  Low Close Volume 
key_0              
1999-01-04 10:20:00 1.1801 1.1819 1.1801 1.1806  34 
1999-01-04 10:25:00 1.1807 1.1815 1.1776 1.1789  91 
1999-01-04 10:30:00 1.1780 1.1792 1.1776 1.1791  16 

虽然大熊猫可能会提供一些清洁器内置在未来的魔术中,希望这解释了如何使用今天的agg()功能。

+0

首先感谢您提供非常丰富的答案:)请问您可以写出哪种版本的熊猫正在使用,也许您是如何创建'dr5minute'的? 'groupby(dr5minute.asof)'似乎有问题,它只返回一个组。 – kgr 2012-04-02 13:25:57

+0

我想这个问题可能与索引不当有关。我认为来自CSV的日期没有被正确地解析为日期......但这是另一回事,所以不需要在评论中讨论。再次感谢@crewburm! – kgr 2012-04-02 18:09:25

+0

不客气,@kgr。我使用0.7.2。为了在csv中解释日期,请查看''read_csv()''的''converters''参数。 – Garrett 2012-04-03 13:55:54

7

仅仅是有帮助的其他用户提供了较新版本的熊猫,还有一个重采样方法非常快捷实用来完成相同的任务:

ohlc_dict = {                            
'Open':'first',                          
'High':'max',                          
'Low':'min',                           
'Close': 'last',                          
'Volume': 'sum' 
} 

df.resample('5T', how=ohlc_dict, closed='left', label='left') 
+0

我得到这个警告'FutureWarning:如何在.resample()不推荐 新的语法是.resample(...)..应用()'这应该看起来像贬低后 – RaduS 2017-02-17 17:12:46

+1

@RaduS,'''df.resample('5T',closed ='left',label ='left')。apply(ohlc_dict)'''' – wombatonfire 2017-03-19 17:24:44

0

在我的的main()功能我正在收到流式出价/询问数据。然后我做到以下几点:

df = pd.DataFrame([]) 

for msg_type, msg in response.parts(): 
    if msg_type == "pricing.Price": 
     sd = StreamingData(datetime.now(),instrument_string(msg), 
          mid_string(msg),account_api,account_id, 
          's','5min',balance) 
     df = df.append(sd.df()) 
     sd.resample(df) 

我创建了一个类StreamingData()这需要所提供的输入(也创造了一些功能,打破了买入/卖出的数据到各个组件(买价,卖价,中,仪等)。

这样做的好处是所有你需要做的就是改变“S”“5分钟”到你想要的任何期限。将其设置为“m”和“d”按分钟获得每日价格

这是我的StreamingData()样子:

class StreamingData(object): 
def __init__(self, time, instrument, mid, api, _id, xsec, xmin, balance): 
    self.time = time 
    self.instrument = instrument 
    self.mid = mid 
    self.api = api 
    self._id = _id 
    self.xsec = xsec 
    self.xmin = xmin 
    self.balance = balance 
    self.data = self.resample(self.df()) 

def df(self): 
    df1 = pd.DataFrame({'Time':[self.time]}) 
    df2 = pd.DataFrame({'Mid':[float(self.mid)]}) 
    df3 = pd.concat([df1,df2],axis=1,join='inner') 
    df = df3.set_index(['Time']) 
    df.index = pd.to_datetime(df.index,unit='s') 
    return df 

def resample(self, df): 
    xx = df.to_period(freq=self.xsec) 
    openCol = xx.resample(self.xmin).first() 
    highCol = xx.resample(self.xmin).max() 
    lowCol = xx.resample(self.xmin).min() 
    closeCol = xx.resample(self.xmin).last() 
    self.data = pd.concat([openCol,highCol,lowCol,closeCol], 
          axis=1,join='inner') 
    self.data['Open'] = openCol.round(5) 
    self.data['High'] = highCol.round(5) 
    self.data['Low'] = lowCol.round(5) 
    self.data['Close'] = closeCol.round(5) 
    return self.data 

所以它的数据可以从StreamingData(),创建索引的数据帧时间DF()内,追加,然后发送通过resample()。我计算的价格基于:mid =(bid + ask)/ 2