2012-11-12 45 views
1

我正在使用Carbon和Ceres作为存储方法的Graphite监控。我在纠正不良数据方面遇到了一些问题。看来(由于各种问题)我已经结束了重叠的文件。也就是说,由于Carbon/Ceres将数据存储为[email protected],因此我可以有两个或多个具有重叠时间范围的文件。石墨/碳/ Ceres节点重叠

有两种重叠:

File A: +------------+  orig file 
File B:  +-----+   subset 
File C:   +---------+ overlap 

这是造成问题,因为可用的现有工具(CERES-维护碎片整理和汇总)不与这些重叠应付。相反,他们跳过目录并继续前进。显然这是一个问题。

回答

0

我已经创建了一个修复此问题的脚本,内容如下:

  1. 对于子集,只是删除子文件。

  2. 对于重叠,在下一个文件开始的位置使用原始文件上的文件系统'truncate'。虽然可以切断重叠文件的开始并重新命名,但我认为这是充满危险的。

我发现,这是可能做到这一点有两种方法:

  1. 走在显示目录和遍历文件,固定,当您去,并查找文件的子集,删除它们;

  2. 在继续之前,先行走dir并修复dir中的所有问题。这是BY FAR更快的方法,因为dir walk非常耗时。

代码:

#!/usr/bin/env python2.6 
################################################################################ 

import io 
import os 
import time 
import sys 
import string 
import logging 
import unittest 
import datetime 
import random 
import zmq 
import json 
import socket 
import traceback 
import signal 
import select 
import simplejson 
import cPickle as pickle 
import re 
import shutil 
import collections 
from pymongo import Connection 
from optparse import OptionParser 
from pprint import pprint, pformat 

################################################################################ 

class SliceFile(object): 
    def __init__(self, fname): 
     self.name  = fname 
     basename  = fname.split('/')[-1] 
     fnArray   = basename.split('@') 
     self.timeStart = int(fnArray[0]) 
     self.freq  = int(fnArray[1].split('.')[0]) 
     self.size  = None 
     self.numPoints = None 
     self.timeEnd = None 
     self.deleted = False 

    def __repr__(self): 
     out = "Name: %s, tstart=%s tEnd=%s, freq=%s, size=%s, npoints=%s." % (
      self.name, self.timeStart, self.timeEnd, self.freq, self.size, self.numPoints) 
     return out 

    def setVars(self): 
     self.size  = os.path.getsize(self.name) 
     self.numPoints = int(self.size/8) 
     self.timeEnd = self.timeStart + (self.numPoints * self.freq) 

################################################################################ 

class CeresOverlapFixup(object): 

    def __del__(self): 
     import datetime 
     self.writeLog("Ending at %s" % (str(datetime.datetime.today()))) 
     self.LOGFILE.flush() 
     self.LOGFILE.close() 

    def __init__(self): 
     self.verbose   = False 
     self.debug    = False 
     self.LOGFILE   = open("ceresOverlapFixup.log", "a") 
     self.badFilesList  = set() 
     self.truncated   = 0 
     self.subsets   = 0 
     self.dirsExamined  = 0    
     self.lastStatusTime  = 0 

    def getOptionParser(self): 
     return OptionParser() 

    def getOptions(self): 
     parser = self.getOptionParser() 
     parser.add_option("-d", "--debug",  action="store_true",     dest="debug", default=False, help="debug mode for this program, writes debug messages to logfile.") 
     parser.add_option("-v", "--verbose", action="store_true",     dest="verbose", default=False, help="verbose mode for this program, prints a lot to stdout.") 
     parser.add_option("-b", "--basedir", action="store",  type="string", dest="basedir", default=None, help="base directory location to start converting.") 
     (options, args)  = parser.parse_args() 
     self.debug   = options.debug 
     self.verbose  = options.verbose 
     self.basedir  = options.basedir 
     assert self.basedir, "must provide base directory." 

    # Examples: 
    # ./updateOperations/[email protected] 
    # ./updateOperations/[email protected] 
    # ./updateOperations/[email protected] 

    def getFileData(self, inFilename): 
     ret = SliceFile(inFilename) 
     ret.setVars() 
     return ret 

    def removeFile(self, inFilename): 
     os.remove(inFilename) 
     #self.writeLog("removing file: %s" % (inFilename)) 
     self.subsets += 1 

    def truncateFile(self, fname, newSize): 
     if self.verbose: 
      self.writeLog("Truncating file, name=%s, newsize=%s" % (pformat(fname), pformat(newSize))) 
     IFD = None 
     try: 
      IFD = os.open(fname, os.O_RDWR|os.O_CREAT) 
      os.ftruncate(IFD, newSize) 
      os.close(IFD) 
      self.truncated += 1 
     except: 
      self.writeLog("Exception during truncate: %s" % (traceback.format_exc())) 
     try: 
      os.close(IFD) 
     except: 
      pass 
     return 

    def printStatus(self): 
     now = self.getNowTime() 
     if ((now - self.lastStatusTime) > 10): 
      self.writeLog("Status: time=%d, Walked %s dirs, subsetFilesRemoved=%s, truncated %s files." % (now, self.dirsExamined, self.subsets, self.truncated)) 
      self.lastStatusTime = now 

    def fixupThisDir(self, inPath, inFiles): 

     # self.writeLog("Fixing files in dir: %s" % (inPath)) 
     if not '.ceres-node' in inFiles: 
      # self.writeLog("--> Not a slice directory, skipping.") 
      return 

     self.dirsExamined += 1    

     sortedFiles = sorted(inFiles) 
     sortedFiles = [x for x in sortedFiles if ((x != '.ceres-node') and (x.count('@') > 0)) ] 
     lastFile = None 
     fileObjList = [] 
     for thisFile in sortedFiles: 
      wholeFilename = os.path.join(inPath, thisFile) 
      try: 
       curFile = self.getFileData(wholeFilename) 
       fileObjList.append(curFile) 
      except: 
       self.badFilesList.add(wholeFilename) 
       self.writeLog("ERROR: file %s, %s" % (wholeFilename, traceback.format_exc())) 

     # name is timeStart, really. 
     fileObjList = sorted(fileObjList, key=lambda thisObj: thisObj.name) 

     while fileObjList: 

      self.printStatus() 

      changes = False 
      firstFile = fileObjList[0] 
      removedFiles = [] 
      for curFile in fileObjList[1:]: 
       if (curFile.timeEnd <= firstFile.timeEnd): 
        # have subset file. elim. 
        self.removeFile(curFile.name) 
        removedFiles.append(curFile.name) 
        self.subsets += 1 
        changes = True 
        if self.verbose: 
         self.writeLog("Subset file situation. First=%s, overlap=%s" % (firstFile, curFile)) 
      fileObjList = [x for x in fileObjList if x.name not in removedFiles] 
      if (len(fileObjList) < 2): 
       break 
      secondFile = fileObjList[1] 

      # LT is right. FirstFile's timeEnd is always the first open time after first is done. 
      # so, first [email protected], len=2, end=102, positions used=100,101. second [email protected] == OK. 
      if (secondFile.timeStart < firstFile.timeEnd): 
       # truncate first file. 
       # file_A (last): +---------+ 
       # file_B (curr):   +----------+ 
       # solve by truncating previous file at startpoint of current file. 
       newLenFile_A_seconds = int(secondFile.timeStart - firstFile.timeStart) 
       newFile_A_datapoints = int(newLenFile_A_seconds/firstFile.freq) 
       newFile_A_bytes  = int(newFile_A_datapoints) * 8 
       if (not newFile_A_bytes): 
        fileObjList = fileObjList[1:] 
        continue 
       assert newFile_A_bytes, "Must have size. newLenFile_A_seconds=%s, newFile_A_datapoints=%s, newFile_A_bytes=%s." % (newLenFile_A_seconds, newFile_A_datapoints, newFile_A_bytes) 
       self.truncateFile(firstFile.name, newFile_A_bytes) 
       if self.verbose: 
        self.writeLog("Truncate situation. First=%s, overlap=%s" % (firstFile, secondFile)) 
       self.truncated += 1 
       fileObjList = fileObjList[1:] 
       changes = True 

      if not changes: 
       fileObjList = fileObjList[1:] 


    def getNowTime(self): 
     return time.time() 


    def walkDirStructure(self): 

     startTime   = self.getNowTime() 
     self.lastStatusTime = startTime 
     updateStatsDict  = {} 
     self.okayFiles  = 0 
     emptyFiles   = 0 

     for (thisPath, theseDirs, theseFiles) in os.walk(self.basedir): 
      self.printStatus() 
      self.fixupThisDir(thisPath, theseFiles) 
      self.dirsExamined += 1 

     endTime = time.time() 
     # time.sleep(11) 
     self.printStatus() 
     self.writeLog("now = %s, started at %s, elapsed time = %s seconds." % (startTime, endTime, endTime - startTime)) 
     self.writeLog("Done.") 


    def writeLog(self, instring): 
     print instring 
     print >> self.LOGFILE, instring 
     self.LOGFILE.flush() 

    def main(self): 
     self.getOptions() 
     self.walkDirStructure() 
+0

导入列表看起来有点粗放...... –

+0

没错。可以清理,同意。但是,whatchagonnado?我可以花很长时间让这个更紧。如果您有更好的版本,请发布。 –