0
我正在尝试为RethinkDB数据库编写一个'控制器'程序,该数据库使用RethinkDB的更改供稿功能不断转储到JSON并删除超过3天的数据。如何根据当前时间查询RethinkDB
问题在于查询从当前时间“挂起”,在定义查询时使用datetime.utcnow()
(或者替代地,rethinkdb.now()
)评估,此后保持固定。随着更换进度的增加,查询变得“过时”。
我该如何进行持续“更新”以反映当前时间的查询?
为了说明问题,这里是脚本至今:
import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta
# The database and table are assumed to have been previously created
database_name = "sensor_db"
table_name = "sensor_data"
table = r.db(database_name).table(table_name)
port_offset = 1 # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
conn = r.connect("localhost", 28015 + port_offset)
current_time = datetime.utcnow().replace(tzinfo=pytz.utc) # Current time including timezone (assumed to be UTC)
retention_period = timedelta(days=3) # Period of time during which data is retained on the main server
expiry_time = current_time - retention_period # Age at which data is removed from the main database
if "timestamp" in table.index_list().run(conn): # Assuming the table has "timestamp" as a secondary index, use "between" (for improved speed)
beginning_of_time = r.time(1400, 1, 1, 'Z') # The minimum time of a ReQL time object (the year 1400)
data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp")
else: # Else, use "filter" (requires more memory, but does not require "timestamp" to be a secondary index)
data_to_archive = table.filter(r.row['timestamp'] < expiry_time)
output_file = "archived_sensor_data.json"
with open(output_file, 'a') as f:
for change in data_to_archive.changes().run(conn, time_format="raw"): # The time_format="raw" option is passed to prevent a "RqlTzinfo object is not JSON serializable" error when dumping
if change['new_val'] is not None: # If the change is not a deletion
print change
json.dump(change['new_val'], f) # Since the main database we are reading from is append-only, the 'old_val' of the change is always None and we are interested in the 'new_val' only
f.write("\n") # Separate entries by a new line
ID_to_delete = change['new_val']['id'] # Get the ID of the data to be deleted from the database
r.db(database_name).table(table_name).get(ID_to_delete).delete().run(conn)
查询存储在变量data_to_archive
。但是,between
声明中的时间间隔基于utcnow()
,当current_time
变量已定义,并且在换货供应中未持续更新。我怎么能这样做?