我已经创建了一个修复此问题的脚本,内容如下:
对于子集,只是删除子文件。
对于重叠,在下一个文件开始的位置使用原始文件上的文件系统'truncate'。虽然可以切断重叠文件的开始并重新命名,但我认为这是充满危险的。
我发现,这是可能做到这一点有两种方法:
走在显示目录和遍历文件,固定,当您去,并查找文件的子集,删除它们;
在继续之前,先行走dir并修复dir中的所有问题。这是BY FAR更快的方法,因为dir walk非常耗时。
代码:
#!/usr/bin/env python2.6
################################################################################
import io
import os
import time
import sys
import string
import logging
import unittest
import datetime
import random
import zmq
import json
import socket
import traceback
import signal
import select
import simplejson
import cPickle as pickle
import re
import shutil
import collections
from pymongo import Connection
from optparse import OptionParser
from pprint import pprint, pformat
################################################################################
class SliceFile(object):
def __init__(self, fname):
self.name = fname
basename = fname.split('/')[-1]
fnArray = basename.split('@')
self.timeStart = int(fnArray[0])
self.freq = int(fnArray[1].split('.')[0])
self.size = None
self.numPoints = None
self.timeEnd = None
self.deleted = False
def __repr__(self):
out = "Name: %s, tstart=%s tEnd=%s, freq=%s, size=%s, npoints=%s." % (
self.name, self.timeStart, self.timeEnd, self.freq, self.size, self.numPoints)
return out
def setVars(self):
self.size = os.path.getsize(self.name)
self.numPoints = int(self.size/8)
self.timeEnd = self.timeStart + (self.numPoints * self.freq)
################################################################################
class CeresOverlapFixup(object):
def __del__(self):
import datetime
self.writeLog("Ending at %s" % (str(datetime.datetime.today())))
self.LOGFILE.flush()
self.LOGFILE.close()
def __init__(self):
self.verbose = False
self.debug = False
self.LOGFILE = open("ceresOverlapFixup.log", "a")
self.badFilesList = set()
self.truncated = 0
self.subsets = 0
self.dirsExamined = 0
self.lastStatusTime = 0
def getOptionParser(self):
return OptionParser()
def getOptions(self):
parser = self.getOptionParser()
parser.add_option("-d", "--debug", action="store_true", dest="debug", default=False, help="debug mode for this program, writes debug messages to logfile.")
parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False, help="verbose mode for this program, prints a lot to stdout.")
parser.add_option("-b", "--basedir", action="store", type="string", dest="basedir", default=None, help="base directory location to start converting.")
(options, args) = parser.parse_args()
self.debug = options.debug
self.verbose = options.verbose
self.basedir = options.basedir
assert self.basedir, "must provide base directory."
# Examples:
# ./updateOperations/[email protected]
# ./updateOperations/[email protected]
# ./updateOperations/[email protected]
def getFileData(self, inFilename):
ret = SliceFile(inFilename)
ret.setVars()
return ret
def removeFile(self, inFilename):
os.remove(inFilename)
#self.writeLog("removing file: %s" % (inFilename))
self.subsets += 1
def truncateFile(self, fname, newSize):
if self.verbose:
self.writeLog("Truncating file, name=%s, newsize=%s" % (pformat(fname), pformat(newSize)))
IFD = None
try:
IFD = os.open(fname, os.O_RDWR|os.O_CREAT)
os.ftruncate(IFD, newSize)
os.close(IFD)
self.truncated += 1
except:
self.writeLog("Exception during truncate: %s" % (traceback.format_exc()))
try:
os.close(IFD)
except:
pass
return
def printStatus(self):
now = self.getNowTime()
if ((now - self.lastStatusTime) > 10):
self.writeLog("Status: time=%d, Walked %s dirs, subsetFilesRemoved=%s, truncated %s files." % (now, self.dirsExamined, self.subsets, self.truncated))
self.lastStatusTime = now
def fixupThisDir(self, inPath, inFiles):
# self.writeLog("Fixing files in dir: %s" % (inPath))
if not '.ceres-node' in inFiles:
# self.writeLog("--> Not a slice directory, skipping.")
return
self.dirsExamined += 1
sortedFiles = sorted(inFiles)
sortedFiles = [x for x in sortedFiles if ((x != '.ceres-node') and (x.count('@') > 0)) ]
lastFile = None
fileObjList = []
for thisFile in sortedFiles:
wholeFilename = os.path.join(inPath, thisFile)
try:
curFile = self.getFileData(wholeFilename)
fileObjList.append(curFile)
except:
self.badFilesList.add(wholeFilename)
self.writeLog("ERROR: file %s, %s" % (wholeFilename, traceback.format_exc()))
# name is timeStart, really.
fileObjList = sorted(fileObjList, key=lambda thisObj: thisObj.name)
while fileObjList:
self.printStatus()
changes = False
firstFile = fileObjList[0]
removedFiles = []
for curFile in fileObjList[1:]:
if (curFile.timeEnd <= firstFile.timeEnd):
# have subset file. elim.
self.removeFile(curFile.name)
removedFiles.append(curFile.name)
self.subsets += 1
changes = True
if self.verbose:
self.writeLog("Subset file situation. First=%s, overlap=%s" % (firstFile, curFile))
fileObjList = [x for x in fileObjList if x.name not in removedFiles]
if (len(fileObjList) < 2):
break
secondFile = fileObjList[1]
# LT is right. FirstFile's timeEnd is always the first open time after first is done.
# so, first [email protected], len=2, end=102, positions used=100,101. second [email protected] == OK.
if (secondFile.timeStart < firstFile.timeEnd):
# truncate first file.
# file_A (last): +---------+
# file_B (curr): +----------+
# solve by truncating previous file at startpoint of current file.
newLenFile_A_seconds = int(secondFile.timeStart - firstFile.timeStart)
newFile_A_datapoints = int(newLenFile_A_seconds/firstFile.freq)
newFile_A_bytes = int(newFile_A_datapoints) * 8
if (not newFile_A_bytes):
fileObjList = fileObjList[1:]
continue
assert newFile_A_bytes, "Must have size. newLenFile_A_seconds=%s, newFile_A_datapoints=%s, newFile_A_bytes=%s." % (newLenFile_A_seconds, newFile_A_datapoints, newFile_A_bytes)
self.truncateFile(firstFile.name, newFile_A_bytes)
if self.verbose:
self.writeLog("Truncate situation. First=%s, overlap=%s" % (firstFile, secondFile))
self.truncated += 1
fileObjList = fileObjList[1:]
changes = True
if not changes:
fileObjList = fileObjList[1:]
def getNowTime(self):
return time.time()
def walkDirStructure(self):
startTime = self.getNowTime()
self.lastStatusTime = startTime
updateStatsDict = {}
self.okayFiles = 0
emptyFiles = 0
for (thisPath, theseDirs, theseFiles) in os.walk(self.basedir):
self.printStatus()
self.fixupThisDir(thisPath, theseFiles)
self.dirsExamined += 1
endTime = time.time()
# time.sleep(11)
self.printStatus()
self.writeLog("now = %s, started at %s, elapsed time = %s seconds." % (startTime, endTime, endTime - startTime))
self.writeLog("Done.")
def writeLog(self, instring):
print instring
print >> self.LOGFILE, instring
self.LOGFILE.flush()
def main(self):
self.getOptions()
self.walkDirStructure()
导入列表看起来有点粗放...... –
没错。可以清理,同意。但是,whatchagonnado?我可以花很长时间让这个更紧。如果您有更好的版本,请发布。 –