2016-10-12 18 views
0

我正在尝试为RethinkDB数据库编写一个'控制器'程序,该数据库使用RethinkDB的更改供稿功能不断转储到JSON并删除超过3天的数据。如何根据当前时间查询RethinkDB

问题在于查询从当前时间“挂起”,在定义查询时使用datetime.utcnow()(或者替代地,rethinkdb.now())评估,此后保持固定。随着更换进度的增加,查询变得“过时”。

我该如何进行持续“更新”以反映当前时间的查询?

为了说明问题,这里是脚本至今:

import json 
import rethinkdb as r 
import pytz 
from datetime import datetime, timedelta 

# The database and table are assumed to have been previously created 
database_name = "sensor_db" 
table_name = "sensor_data" 
table = r.db(database_name).table(table_name) 

port_offset = 1   # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line. 
conn = r.connect("localhost", 28015 + port_offset) 

current_time = datetime.utcnow().replace(tzinfo=pytz.utc) # Current time including timezone (assumed to be UTC) 
retention_period = timedelta(days=3)      # Period of time during which data is retained on the main server 
expiry_time = current_time - retention_period    # Age at which data is removed from the main database 

if "timestamp" in table.index_list().run(conn):   # Assuming the table has "timestamp" as a secondary index, use "between" (for improved speed) 
    beginning_of_time = r.time(1400, 1, 1, 'Z') # The minimum time of a ReQL time object (the year 1400) 
    data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp") 
else:  # Else, use "filter" (requires more memory, but does not require "timestamp" to be a secondary index) 
    data_to_archive = table.filter(r.row['timestamp'] < expiry_time) 

output_file = "archived_sensor_data.json" 
with open(output_file, 'a') as f: 
    for change in data_to_archive.changes().run(conn, time_format="raw"):  # The time_format="raw" option is passed to prevent a "RqlTzinfo object is not JSON serializable" error when dumping 
     if change['new_val'] is not None:    # If the change is not a deletion 
      print change 
      json.dump(change['new_val'], f)    # Since the main database we are reading from is append-only, the 'old_val' of the change is always None and we are interested in the 'new_val' only 
      f.write("\n")        # Separate entries by a new line 
      ID_to_delete = change['new_val']['id']    # Get the ID of the data to be deleted from the database 
      r.db(database_name).table(table_name).get(ID_to_delete).delete().run(conn) 

查询存储在变量data_to_archive。但是,between声明中的时间间隔基于utcnow(),当current_time变量已定义,并且在换货供应中未持续更新。我怎么能这样做?

回答

0

我终于通过在'batch'模式下执行转储而不是连续使用changes()来解决问题。 (就是说,我正在使用schedule模块)。

下面是脚本:

import json 
import rethinkdb as r 
import pytz 
from datetime import datetime, timedelta 
import schedule 
import time 
import functools 

def generate_archiving_query(retention_period=timedelta(days=3), database_name="ipercron", table_name="sensor_data", conn=None): 
    if conn is None: 
     conn = r.connect("localhost", 28015) 

    table = r.db(database_name).table(table_name)    # RethinkDB cursor for the table of interest 
    current_time = r.now() 
    expiry_time = current_time - retention_period.total_seconds() 

    if "timestamp" in table.index_list().run(conn):   # If the table has "timestamp" as a secondary index, use "between" (for improved speed) 
     beginning_of_time = r.time(1400, 1, 1, 'Z')   # The minimum time of a ReQL time object (the year 1400) 
     data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp") 
    else:             # Else, use "filter" (requires more memory, but does not require "timestamp" to be a secondary index) 
     data_to_archive = table.filter(r.row['timestamp'] < expiry_time) 

    # try: 
    #  beginning_of_time = r.time(1400, 1, 1, 'Z')   # The minimum time of a ReQL time object (the year 1400) 
    #  data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp") 
    # except: 
    #  data_to_archive = table.filter(r.row['timestamp'] < expiry_time) 

    return data_to_archive 

def archiving_job(data_to_archive=None, output_file="archived_sensor_data.json", database_name="ipercron", table_name="sensor_data", conn=None): 
    if data_to_archive is None: 
     data_to_archive = generate_archiving_query() 
    if conn is None: 
     conn = r.connect("localhost", 28015) 

    table = r.db(database_name).table(table_name) 
    old_data = data_to_archive.run(conn, time_format="raw")   # Without time_format="raw" the output does not dump to JSON 
    with open(output_file, 'a') as f: 
     ids_to_delete = [] 
     for item in old_data: 
      print item 
      json.dump(item, f) 
      f.write('\n')           # Separate each document by a new line 
      ids_to_delete.append(item['id']) 
      # table.get(item['id']).delete().run(conn) 

    table.get_all(r.args(ids_to_delete)).delete().run(conn) 

if __name__ == "__main__": 
    # The database and table are assumed to have been previously created 
    database_name = "ipercron" 
    table_name = "sensor_data" 
    # table = r.db(database_name).table(table_name) 

    port_offset = 1   # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line. 
    conn = r.connect("localhost", 28015 + port_offset) 

    clean_slate = True 
    if clean_slate: 
     r.db(database_name).table(table_name).delete().run(conn)   # For testing, start with an empty table and add a fixed amount of data 
     import rethinkdb_add_data 

    data_to_archive = generate_archiving_query(conn=conn, database_name=database_name, table_name=table_name)  # Because r.now() is evaluated upon run(), the query needs only to be generated once 
    archiving_job_fixed_query = functools.partial(archiving_job, data_to_archive=data_to_archive, conn=conn) 

    schedule.every(0.1).minutes.do(archiving_job_fixed_query) 

    while True: 
     schedule.run_pending() 
相关问题