2014-05-19 423 views
2

我一直在玩熊猫来获取HTTP日志到熊猫进行分析,因为它是大量数据的好来源,并且可以让我学习熊猫。动态添加到熊猫数据框

我一次获得一行流的日志,因此无法从CSV导入,需要将这些日志“泵送”到Pandas DataFrame中,然后我会将其保存到HDFStore文件中。

我现在编写的代码确实从GZIP中读取了,所以我可以让这个过程继续下去,但是一旦我完成了熊猫的工作,我会将它修改为使用pubsub风格的协同例程进行事件驱动。

这里是我到目前为止的代码:

import os 
    import gzip, io 
    import pandas as pd 
    import numpy as np 

    def read_gzip(file):   
     if os.path.isdir(os.path.dirname(file)): 
      if os.path.isfile(file): 
       while True: 
        for line in io.BufferedReader(gzip.open(file)): 
         yield line 

    class ElbParser(object): 

     def __init__(self, file): 
      self.file = file 

     def get_log_lines(self): 
      return read_gzip(self.file) 

     def build_series(self, log_line): 

      def normalise_ip(input_string): 
       try: 
        text = input_string.split(':')[0] 
       except: 
        text = np.NaN 
       finally: 
        return text 

      def normalise_time(input_string): 
       try: 
        text = float(input_string) 
       except: 
        text = np.NaN 
       finally: 
        return text 

      def normalise_ints(input_string): 
       try: 
        text = int(input_string) 
       except: 
        text = np.NaN 
       finally: 
        return text 

      log_list = log_line.split() 
      elb_series = pd.Series({ 
        'timestamp' : np.datetime64(log_list[0]), 
        'elb_name' : log_list[1], 

        'client_ip' : normalise_ip(log_list[2]), 
        'backend_ip' : normalise_ip(log_list[3]), 

        'request_processing_time' : normalise_time(log_list[4]), 
        'backend_processing_time' : normalise_time(log_list[5]), 
        'response_processing_time' : normalise_time(log_list[6]), 

        'elb_status_code' : normalise_ints(log_list[7]), 
        'backend_status_code' : normalise_ints(log_list[8]), 

        'received_bytes' : normalise_ints(log_list[9]), 
        'sent_bytes' : normalise_ints(log_list[10]), 

        'http_method' : log_list[11].strip("'").strip('"'), 
        'request' : log_list[13].strip("'").strip('"') 
        }) 

      return elb_series 

以下是行使上述代码的基本测试代码:

import os 
    import sys 
    import unittest 
    import pandas as pd 

    from lbreader import ElbParser 

    test_elb_log_file = 'tests/resources/lb_log.gz' 

    class ElbTest(unittest.TestCase): 

     # 
     # Fixture Framework 
     # 

     def setUp(self): 
      print '\nTest Fixture setUp' 
      self.elbparser = ElbParser(test_elb_log_file) 


     def tearDown(self): 
      print '\nTest Fixture tearDown' 
      del self.elbparser 

     # 
     # Tests 
     # 

     def test_01_get_aws_elb_log_line(self): 
      print '\n running get aws elb log line' 
      log_lines = self.elbparser.get_log_lines() 
      self.failUnlessEqual('2014-05-15T23:00:04.696671Z poundlb 192.168.0.10:53345 - -1 -1 -1 408 408 55 0 "PUT https://localhost:443/private/urls HTTP/1.1"\n' 
          , log_lines.next()) 

     def test_02_build_series_with_aws_elb_log_line(self): 
      print '\n running build series from elb log' 
      log_lines = self.elbparser.get_log_lines() 
      test = self.elbparser.build_series(log_lines.next()) 
      test_line_dict = {'backend_ip': '-', 
           'backend_processing_time': -1.0, 
           'backend_status_code': 408, 
           'client_ip': '192.168.0.10', 
           'lb_name': 'poundlb', 
           'elb_status_code': 408, 
           'http_method': 'PUT', 
           'received_bytes': 55, 
           'request': 'HTTP/1.1', 
           'request_processing_time': -1.0, 
           'response_processing_time': -1.0, 
           'sent_bytes': 0, 
           'timestamp': numpy.datetime64('2014-05-16T00:00:04.696671+0100')} 
      self.failUnlessEqual(test_line_dict, test.to_dict()) 


    if __name__ == '__main__': 
     unittest.main() 

现在的地方,我似乎已经成功地卡住:

我能够创造pandas.Series数据,但是当我尝试把它推到一个数据帧需要它作为11行两列。

test.values 
    >>> array(['-', -1.0, 408, '192.168.0.10', 'poundlb', 408, 'PUT', 55, 
    'HTTP/1.1', -1.0, -1.0, 0, 
    numpy.datetime64('2014-05-16T00:00:04.696671+0100')], dtype=object) 


    df = pd.DataFrame(test.values.tolist(), column_list) 
    >>> df 
    Out[35]: 
                  0 
    backend_ip            - 
    backend_processing_time        -1 
    backend_status_code         408 
    client_ip         192.168.0.10 
    elb_name           poundlb 
    elb_status_code          408 
    http_method           PUT 
    received_bytes           55 
    request           HTTP/1.1 
    request_processing_time        -1 
    response_processing_time        -1 
    sent_bytes            0 
    timestamp     2014-05-16T00:00:04.696671+0100 

    [13 rows x 1 columns] 

这远不是我希望得到的,我希望为[1行×1列]:

    backend_ip backend_processing_time backend_status_code backend_status_code client_ip  elb_name elb_status_code http_method received_bytes request request_processing_time response_processing_time sent_bytes timestamp      
    0   -   -1      408     408     192.168.0.10 poundlb 408    PUT   55    HTTP/1.1 -1      -1       0   2014-05-16T00:00:04 
    1   -   -1      408     408     192.168.0.11 poundlb 408    PUT   55    HTTP/1.1 -1      -1       0   2014-05-16T00:00:05 
    2   -   -1      408     408     192.168.0.12 poundlb 408    PUT   55    HTTP/1.1 -1      -1       0   2014-05-16T00:00:06 

这样的,如果我添加了另一个pandas.Series日志行,我会得到另一行,等等。 我也打算索引以下内容:timestamp,client_ip,backend_ip

我真的很感谢这方面的一些帮助,因为我似乎没有让我的行/列正确。

经过一段时间的游戏后,我得到了以下内容: 所以打了一下这个后,我想出了以下内容,但仍然无法concatinate/append。

>>> test0 = elbparser.build_series(log_lines.next()) 
    >>> test1 = elbparser.build_series(log_lines.next()) 
    >>> test2 = elbparser.build_series(log_lines.next()) 
    >>> test3 = elbparser.build_series(log_lines.next()) 
    >>> test4 = elbparser.build_series(log_lines.next()) 
    >>> test5 = elbparser.build_series(log_lines.next()) 
    >>> test6 = elbparser.build_series(log_lines.next()) 
    >>> test7 = elbparser.build_series(log_lines.next()) 
    >>> test8 = elbparser.build_series(log_lines.next()) 
    >>> test9 = elbparser.build_series(log_lines.next()) 
    >>> test10 = elbparser.build_series(log_lines.next()) 
    >>> test_list = [test.to_dict(), test1.to_dict(), test2.to_dict(), test3.to_dict(), test4.to_dict(), test5.to_dict(), test6.to_dict(), test7.to_dict(), test8.to_dict(), test9.to_dict(), test10.to_dict()] 
    >>> test_list 
    [{'backend_ip': '-', 
     'backend_processing_time': -1.0, 
     'backend_status_code': 408, 
     'client_ip': '192.168.0.1', 
     'elb_name': 'poundlb', 
     'elb_status_code': 408, 
     'http_method': 'PUT', 
     'received_bytes': 55, 
     'request': 'HTTP/1.1', 
     'request_processing_time': -1.0, 
     'response_processing_time': -1.0, 
     'sent_bytes': 0, 
     'timestamp': numpy.datetime64('2014-05-16T00:00:04.696671+0100')}, 
    {'backend_ip': '10.0.0.241', 
     'backend_processing_time': 59.246736, 
     'backend_status_code': 403, 
     'client_ip': '192.168.0.2', 
     'elb_name': 'poundlb', 
     'elb_status_code': 403, 
     'http_method': 'PUT', 
     'received_bytes': 55, 
     'request': 'HTTP/1.1', 
     'request_processing_time': 3.4e-05, 
     'response_processing_time': 2.9e-05, 
     'sent_bytes': 64, 
     'timestamp': numpy.datetime64('2014-05-16T00:00:30.980494+0100')}, 
    {'backend_ip': '10.0.0.242', 
     'backend_processing_time': 59.42053, 
     'backend_status_code': 200, 
     'client_ip': '192.168.0.3', 
     'elb_name': 'poundlb', 
     'elb_status_code': 200, 
     'http_method': 'PUT', 
     'received_bytes': 173, 
     'request': 'HTTP/1.1', 
     'request_processing_time': 8.4e-05, 
     'response_processing_time': 6.9e-05, 
     'sent_bytes': 149, 
     'timestamp': numpy.datetime64('2014-05-16T00:00:32.687835+0100')}, 
    {'backend_ip': '10.0.0.39', 
     'backend_processing_time': 0.016443, 
     'backend_status_code': 200, 
     'client_ip': '192.168.0.4', 
     'elb_name': 'poundlb', 
     'elb_status_code': 200, 
     'http_method': 'GET', 
     'received_bytes': 0, 
     'request': 'HTTP/1.1', 
     'request_processing_time': 7.9e-05, 
     'response_processing_time': 4e-05, 
     'sent_bytes': 289, 
     'timestamp': numpy.datetime64('2014-05-16T00:00:38.247760+0100')}, 
    {'backend_ip': '10.0.0.41', 
     'backend_processing_time': 0.008624, 
     'backend_status_code': 200, 
     'client_ip': '192.168.0.5', 
     'elb_name': 'poundlb', 
     'elb_status_code': 200, 
     'http_method': 'GET', 
     'received_bytes': 0, 
     'request': 'HTTP/1.1', 
     'request_processing_time': 5.4e-05, 
     'response_processing_time': 3.2e-05, 
     'sent_bytes': 200, 
     'timestamp': numpy.datetime64('2014-05-16T00:00:38.432535+0100')}, 
    {'backend_ip': '10.0.0.43', 
     'backend_processing_time': 0.138925, 
     'backend_status_code': 200, 
     'client_ip': '192.168.0.6', 
     'elb_name': 'poundlb', 
     'elb_status_code': 200, 
     'http_method': 'GET', 
     'received_bytes': 0, 
     'request': 'HTTP/1.1', 
     'request_processing_time': 4.5e-05, 
     'response_processing_time': 4.3e-05, 
     'sent_bytes': 268, 
     'timestamp': numpy.datetime64('2014-05-16T00:00:38.509598+0100')}, 
    {'backend_ip': '10.0.0.38', 
     'backend_processing_time': 0.013578, 
     'backend_status_code': 200, 
     'client_ip': '192.168.0.7', 
     'elb_name': 'poundlb', 
     'elb_status_code': 200, 
     'http_method': 'POST', 
     'received_bytes': 291, 
     'request': 'HTTP/1.1', 
     'request_processing_time': 4.2e-05, 
     'response_processing_time': 2.7e-05, 
     'sent_bytes': 36, 
     'timestamp': numpy.datetime64('2014-05-16T00:00:38.667479+0100')}, 
    {'backend_ip': '10.0.0.42', 
     'backend_processing_time': 0.017493, 
     'backend_status_code': 200, 
     'client_ip': '192.168.0.8', 
     'elb_name': 'poundlb', 
     'elb_status_code': 200, 
     'http_method': 'GET', 
     'received_bytes': 0, 
     'request': 'HTTP/1.1', 
     'request_processing_time': 3.7e-05, 
     'response_processing_time': 2.7e-05, 
     'sent_bytes': 290, 
     'timestamp': numpy.datetime64('2014-05-16T00:00:38.708697+0100')}, 
    {'backend_ip': '10.0.0.40', 
     'backend_processing_time': 0.014167, 
     'backend_status_code': 200, 
     'client_ip': '192.168.0.9', 
     'elb_name': 'poundlb', 
     'elb_status_code': 200, 
     'http_method': 'POST', 
     'received_bytes': 297, 
     'request': 'HTTP/1.1', 
     'request_processing_time': 3.5e-05, 
     'response_processing_time': 2.7e-05, 
     'sent_bytes': 36, 
     'timestamp': numpy.datetime64('2014-05-16T00:00:38.746867+0100')}, 
    {'backend_ip': '10.0.0.40', 
     'backend_processing_time': 0.094383, 
     'backend_status_code': 200, 
     'client_ip': '192.168.0.10', 
     'elb_name': 'poundlb', 
     'elb_status_code': 200, 
     'http_method': 'PUT', 
     'received_bytes': 79, 
     'request': 'HTTP/1.1', 
     'request_processing_time': 3.4e-05, 
     'response_processing_time': 3.6e-05, 
     'sent_bytes': 148, 
     'timestamp': numpy.datetime64('2014-05-16T00:00:39.333482+0100')}, 
    {'backend_ip': '10.0.0.42', 
     'backend_processing_time': 0.061355, 
     'backend_status_code': 200, 
     'client_ip': '192.168.0,10', 
     'elb_name': 'poundlb', 
     'elb_status_code': 200, 
     'http_method': 'PUT', 
     'received_bytes': 79, 
     'request': 'HTTP/1.1', 
     'request_processing_time': 9.6e-05, 
     'response_processing_time': 5.8e-05, 
     'sent_bytes': 148, 
     'timestamp': numpy.datetime64('2014-05-16T00:00:39.345097+0100')}] 


    >>> df = pd.DataFrame(test.to_dict(), index_list) 
    >>> df 
    Out[45]: 
          backend_ip backend_processing_time 
    backend_ip      -      -1 
    backend_processing_time   -      -1 
    backend_status_code    -      -1 
    client_ip       -      -1 
    elb_name       -      -1 
    elb_status_code     -      -1 
    http_method      -      -1 
    received_bytes     -      -1 
    request       -      -1 
    request_processing_time   -      -1 
    response_processing_time   -      -1 
    sent_bytes      -      -1 
    timestamp       -      -1 

           backend_status_code  client_ip 
    backend_ip        408 192.168.0.10 
    backend_processing_time     408 192.168.0.10 
    backend_status_code      408 192.168.0.10 
    client_ip         408 192.168.0.10 
    elb_name         408 192.168.0.10 
    elb_status_code       408 192.168.0.10 
    http_method        408 192.168.0.10 
    received_bytes       408 192.168.0.10 
    request         408 192.168.0.10 
    request_processing_time     408 192.168.0.10 
    response_processing_time     408 192.168.0.10 
    sent_bytes        408 192.168.0.10 
    timestamp         408 192.168.0.10 

            elb_name elb_status_code http_method 
    backend_ip    poundlb    408   PUT 
    backend_processing_time poundlb    408   PUT 
    backend_status_code  poundlb    408   PUT 
    client_ip     poundlb    408   PUT 
    elb_name     poundlb    408   PUT 
    elb_status_code   poundlb    408   PUT 
    http_method    poundlb    408   PUT 
    received_bytes   poundlb    408   PUT 
    request     poundlb    408   PUT 
    request_processing_time poundlb    408   PUT 
    response_processing_time poundlb    408   PUT 
    sent_bytes    poundlb    408   PUT 
    timestamp     poundlb    408   PUT 

           received_bytes request request_processing_time 
    backend_ip       55 HTTP/1.1      -1 
    backend_processing_time    55 HTTP/1.1      -1 
    backend_status_code     55 HTTP/1.1      -1 
    client_ip        55 HTTP/1.1      -1 
    elb_name        55 HTTP/1.1      -1 
    elb_status_code      55 HTTP/1.1      -1 
    http_method       55 HTTP/1.1      -1 
    received_bytes      55 HTTP/1.1      -1 
    request        55 HTTP/1.1      -1 
    request_processing_time    55 HTTP/1.1      -1 
    response_processing_time    55 HTTP/1.1      -1 
    sent_bytes       55 HTTP/1.1      -1 
    timestamp        55 HTTP/1.1      -1 

           response_processing_time sent_bytes 
    backend_ip          -1   0 
    backend_processing_time       -1   0 
    backend_status_code        -1   0 
    client_ip          -1   0 
    elb_name          -1   0 
    elb_status_code         -1   0 
    http_method          -1   0 
    received_bytes         -1   0 
    request           -1   0 
    request_processing_time       -1   0 
    response_processing_time      -1   0 
    sent_bytes          -1   0 
    timestamp          -1   0 

               timestamp 
    backend_ip    2014-05-15 23:00:04.696671 
    backend_processing_time 2014-05-15 23:00:04.696671 
    backend_status_code  2014-05-15 23:00:04.696671 
    client_ip    2014-05-15 23:00:04.696671 
    elb_name     2014-05-15 23:00:04.696671 
    elb_status_code   2014-05-15 23:00:04.696671 
    http_method    2014-05-15 23:00:04.696671 
    received_bytes   2014-05-15 23:00:04.696671 
    request     2014-05-15 23:00:04.696671 
    request_processing_time 2014-05-15 23:00:04.696671 
    response_processing_time 2014-05-15 23:00:04.696671 
    sent_bytes    2014-05-15 23:00:04.696671 
    timestamp    2014-05-15 23:00:04.696671 

    [13 rows x 13 columns] 

这是我想要的,但我似乎仍然有一个问题,此后追加/连接。

我将调查从

+0

解决注意,您找到要添加到您的数据帧,无论解决方案是非常重要的,你将无法避免你的数据帧的完整副本,除非你有预分配的内存通过定义一个足够大小的空框架来填充。每个人,如果我对此有错,请纠正我,如果没有预先分配,如何追加将是可能的。 – eickenberg

回答

1
import pandas as pd 
import numpy 
# Create a data dict 
test_line_dict = {'backend_ip': '-', 
        'backend_processing_time': -1.0, 
        'backend_status_code': 408, 
        'client_ip': '192.168.0.10', 
        'lb_name': 'poundlb', 
        'elb_status_code': 408, 
        'http_method': 'PUT', 
        'received_bytes': 55, 
        'request': 'HTTP/1.1', 
        'request_processing_time': -1.0, 
        'response_processing_time': -1.0, 
        'sent_bytes': 0, 
        'timestamp': numpy.datetime64('2014-05-16T00:00:04.696671+0100')} 
# Create the corresponding dataframe 
idOfTheFirstElement = '1st' 
df=pd.DataFrame.from_dict({idOfTheFirstElement:test_line_dict}) 
# Transpose the result 
df = df.T 
# One can dynamically append/add new rows with pd.concat 
dfNew = pd.concat([df,df,df]) 
+0

嗨Guillaume, 感谢您的回复,我真的很感激快速周转。我会给它一个去看看我是否可以按照你的建议使连接工作。 我应该关心连接的速度,因为我将每天增加大约12亿次请求,....我应该看看附加内容吗? 注意我会经常将数据编组到HDFStore和磁盘,可能每隔5万次请求..我担心这会花费什么时间。 每天将存储日志并创建系统的新HDFStore文件跨越午夜。 – casibbald

+0

我想你将不得不在ipython中使用%timeit来查看执行操作所需的时间 In [39]:%timeit pd.concat([df,df,df]) 1000循环,最好的3:554 μs每个循环 –