2017-04-19 201 views
3

下午好。Python tnsnames.ora解析器

我需要一个包含来自tnsnames.ora文件的所有数据库连接的字典。

我需要从这个去:

(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=mydbserver.mydomain.com)(PORT=1521)))(CONNECT_DATA=(SID=CATAL)(SERVER=DEDICATED)(SERVICE_NAME=mydb.mydomain.com))) 

这样:

{'DESCRIPTION': [{'ADDRESS_LIST': [{'ADDRESS': [{'PROTOCOL': 'TCP'}, 
               {'HOST': 'mydbserver.mydomain.com'}, 
               {'PORT': '1521'} 
               ] 
            }] 
        }, 
        {'CONNECT_DATA': [{'SID': 'CATAL'}, 
            {'SERVER': 'DEDICATED'}, 
            {'SERVICE_NAME': 'mydb.mydomain.com'} 
            ] 
        } 
       ] 
} 

到目前为止,我的代码是:

def get_param(param_string): 
    print("get_param input:", param_string) 
    if param_string.count("(") != param_string.count(")"): 
     raise Exception("Number of '(' is not egal to number of ')' : " + str(param_string.count("(")) + " and " + str(param_string.count(")"))) 
    else: 
     param_string = param_string[1:-1] 
     splitted  = param_string.split("=") 
     keywork  = splitted[0] 
     if len(splitted) == 2: 
      return {keywork: splitted[1]} 
     else: 
      splitted.remove(keywork) 
      values  = "=".join(splitted) 
      return {keywork: get_value_list(values)} 

def get_value_list(value_string): 
    print("get_value_list input:", value_string) 
    to_return = list() 
    if "=" not in value_string and "(" not in value_string and ")" not in value_string: 
     to_return.append(value_string) 
    elif value_string[0] != "(": 
     raise Exception("[ERROR] Format error '(' is not the first char: " + repr(value_string)) 
    else: 
     parenth_count = 0 
     strlen  = len(value_string) 
     current_value = "" 
     for i in range(0,strlen): 
      current_char = value_string[i] 
      current_value += current_char 
      if current_char == "(": 
       parenth_count += 1 
      elif current_char == ")": 
       parenth_count += -1 
       if parenth_count == 0: 
        to_return.append(get_param(current_value)) 
        if i != (strlen - 1): 
         if value_string[i+1] == "(": 
          to_return += get_value_list(value_string[i+1:]) 
         else: 
          raise Exception("Format error - Next char should be a '('. value_string[i+1]:" + repr(value_string[i+1])) 
        break 
    print("get_value_list return:", to_return) 
    if len(to_return) == 0: 
     to_return = "" 
    elif len(to_return) == 1: 
     to_return = to_return[0] 
    return to_return 

connection_infos = "(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=mydbserver.mydomain.com)(PORT=1521)))(CONNECT_DATA=(SID=CATAL)(SERVER=DEDICATED)(SERVICE_NAME=mydb.mydomain.com)))" 
current_connection = get_param(connection_infos) 
print("current_connection:", current_connection) 
pprint(current_connection) 

而且我得到了这一点:

{'DESCRIPTION': [{'ADDRESS_LIST': {'ADDRESS': [{'PROTOCOL': 'TCP'}, 
               {'HOST': 'mydbserver.mydomain.com'}, 
               'PORT'] 
            } 
        }, 
        'CONNECT_DATA' 
       ] 
} 

所以我做错了什么。我觉得我做的事情太复杂了。有人会指出我犯的一些错误,或者帮我找到一个更简单的方法来做到这一点?

+0

恕我直言,你可以为tnsnames.ora下载ANTLR语法,然后让ANTLR为你生成解析器源代码。 – ibre5041

+0

此外,它看起来像您的解析器不支持“包含”子句。 – ibre5041

回答

1

我现在有一个工作代码,但我不是很满意它。它太长,不灵活,而且不会与其他一些可能的tnsnames.ora中工作的格式:

class Tnsnames(): 
    def __init__(self, file_path, file_name='tnsnames.ora'): 
     self.file_path = file_path 
     self.file_name = file_name 
     self.load_file() 
    def load_file(self): 
     try: 
      fhd = open(os.path.join(self.file_path, self.file_name), 'rt', encoding='utf-8') 
     except: 
      raise 
     else: 
      #Oracle doc : https://docs.oracle.com/cd/B28359_01/network.111/b28317/tnsnames.htm#NETRF007 
      file_content  = list() 
      for l in fhd: 
       l = " ".join(l.split()).strip(" \n") 
       if len(l) > 0: 
        if "#" not in l: 
         file_content.append(l) 
      fhd.close() 
      file_content   = " ".join(file_content) 
      connections_list  = dict() 
      current_depth   = 0 
      current_word   = "" 
      current_keyword  = "" 
      name_to_register  = "" 
      is_in_add_list   = False 
      current_addr   = dict() 
      connections_aliases = dict() 
      stop_registering  = False 
      connections_duplicates = list() 
      for c in file_content: 
       if c == " ": 
        pass 
       elif c == "=": 
        current_keyword = str(current_word) 
        current_word = "" 
        if current_keyword == "ADDRESS_LIST": 
         is_in_add_list = True 
       elif c == "(": 
        if current_depth == 0: 
         current_keyword = current_keyword.upper() 
         names_list  = current_keyword.replace(" ","").split(",") 
         if len(names_list) == 1: 
          name_to_register = names_list[0] 
         else: 
          name_to_register = None 
          # We use either the first name with at least 
          # a dot in it, or the longest one. 
          for n in names_list: 
           if "." in n: 
            name_to_register = n 
            break 
          else: 
           name_to_register = max(names_list, key=len) 
          names_list.remove(name_to_register) 
          for n in names_list: 
           if n in connections_aliases.keys(): 
            print("[ERROR] already registered alias:", n, 
              ". Registered to:", connections_aliases[n], 
              ". New:", name_to_register, 
              ". This possible duplicate will not be registered.") 
            connections_duplicates.append(n) 
            stop_registering = True 
           else: 
            connections_aliases[n] = name_to_register 
         if not stop_registering: 
          connections_list[name_to_register] = {"ADDRESS_LIST": list(), 
                    "CONNECT_DATA": dict(), 
                    "LAST_TEST_TS": None} 
         current_depth += 1 
        elif current_depth in [1,2,3]: 
         current_depth += 1 
        else: 
         print("[ERROR] Incorrect depth:", repr(current_depth), ". Current connection will not be registered") 
         del connections_list[name_to_register] 
         stop_registering = True 
       elif c == ")": 
        if current_depth == 1: 
         if stop_registering: 
          stop_registering = False 
         else: 
          # Before moving to next connection, 
          # we check that current connection 
          # have at least a HOST, and a SID or 
          # SERVICE_NAME 
          connection_is_valid = True 
          if isinstance(connections_list[name_to_register]["ADDRESS_LIST"], dict): 
           if "HOST" not in connections_list[name_to_register]["ADDRESS_LIST"].keys(): 
            print("[ERROR] Only one address defined, and no HOST defined. Current connection will not be registered:", name_to_register) 
            connection_is_valid = False 
          elif isinstance(connections_list[name_to_register]["ADDRESS_LIST"], list): 
           for current_address in connections_list[name_to_register]["ADDRESS_LIST"]: 
            if "HOST" in current_address.keys(): 
             break 
           else: 
            print("[ERROR] Multiple addresses but none with HOST. Current connection will not be registered:", name_to_register) 
            connection_is_valid = False 
          else: 
           print("[ERROR] Incorrect address format:", connections_list[name_to_register]["ADDRESS_LIST"], " Connection:", name_to_register) 
           connection_is_valid = False 
          if not connection_is_valid: 
           del connections_list[name_to_register] 
          else: 
           if "SERVICE_NAME" not in connections_list[name_to_register]["CONNECT_DATA"].keys() and \ 
            "SID" not in connections_list[name_to_register]["CONNECT_DATA"].keys(): 
            print("[ERROR] Missing SERVICE_NAME/SID for connection:", name_to_register) 
            del connections_list[name_to_register] 
        elif current_depth == 2: 
         if is_in_add_list: 
          is_in_add_list = False 
          if not stop_registering: 
           if len(connections_list[name_to_register]["ADDRESS_LIST"]) == 1: 
            connections_list[name_to_register]["ADDRESS_LIST"] = connections_list[name_to_register]["ADDRESS_LIST"][0] 
        elif current_depth == 3: 
         if is_in_add_list: 
          if not stop_registering: 
           connections_list[name_to_register]["ADDRESS_LIST"].append(current_addr) 
          current_addr = dict() 
         elif current_keyword.upper() in ["SID", "SERVER", "SERVICE_NAME"]: 
          if not stop_registering: 
           connections_list[name_to_register]["CONNECT_DATA"][current_keyword.upper()] = current_word.upper() 
        elif current_depth == 4: 
         if is_in_add_list: 
          if not stop_registering: 
           current_addr[current_keyword.upper()] = current_word.upper() 
        current_keyword = "" 
        current_word = "" 
        current_depth += -1 
       else: 
        current_word += c 
     self.connections = connections_list 
     self.aliases  = connections_aliases 
     self.duplicates = connections_duplicates 

测试的tnsnames.ora:

######################################## 
# This is a sample tnsnames.ora  # 
######################################## 


################################################### 
# PRODDB 
################################################### 

proddb.mydbs.domain.com, PRODDB = 
    (DESCRIPTION = 
    (ADDRESS_LIST = 
     (ADDRESS = (PROTOCOL = TCP)(HOST = proddb1.mydbs.domain.com)(PORT = 1522)) 
     (ADDRESS = (PROTOCOL = TCP)(HOST = proddb2.mydbs.domain.com)(PORT = 1522)) 
     (ADDRESS = (PROTOCOL = TCP)(HOST = proddb3.mydbs.domain.com)(PORT = 1522)) 
     (ADDRESS = (PROTOCOL = TCP)(HOST = proddb4.mydbs.domain.com)(PORT = 1522)) 
    ) 
    (CONNECT_DATA = 
     (SID = PRODDB) 
     (SERVER = DEDICATED) 
     (SERVICE_NAME = proddb.mydbs.domain.com) 
    ) 
) 

################################################### 
# DEVDBA : Test database for DBA usage 
################################################### 

devdba.mydbs.domain.com, DEVDBA = 
    (DESCRIPTION = 
    (ADDRESS_LIST = 
     (ADDRESS = (PROTOCOL = TCP)(HOST = devdba.mydbs.domain.com)(PORT = 1521)) 
    ) 
    (CONNECT_DATA = 
     (SID = DEVDBA) 
    ) 
) 

测试代码:

from pprint  import pprint 
from lib_database import Tnsnames 

tnsnnames = Tnsnames('/usr/lib/oracle/12.2/client64/network/admin') 
print('Connexions:') 
pprint(tnsnnames.connections) 
print('Aliases:') 
pprint(tnsnnames.aliases) 
print('Duplicates:') 
pprint(tnsnnames.duplicates) 

输出:

Connexions: 
    {'DEVDBA.MYDBS.DOMAIN.COM': {'ADDRESS_LIST': {'HOST': 'DEVDBA.MYDBS.DOMAIN.COM', 
                'PORT': '1521', 
                'PROTOCOL': 'TCP'}, 
           'CONNECT_DATA': {'SID': 'DEVDBA'}, 
    'PRODDB.MYDBS.DOMAIN.COM': {'ADDRESS_LIST': [{'HOST': 'PRODDB1.MYDBS.DOMAIN.COM', 
                'PORT': '1522', 
                'PROTOCOL': 'TCP'}, 
                {'HOST': 'PRODDB2.MYDBS.DOMAIN.COM', 
                'PORT': '1522', 
                'PROTOCOL': 'TCP'}, 
                {'HOST': 'PRODDB3.MYDBS.DOMAIN.COM', 
                'PORT': '1522', 
                'PROTOCOL': 'TCP'}, 
                {'HOST': 'PRODDB4.MYDBS.DOMAIN.COM', 
                'PORT': '1522', 
                'PROTOCOL': 'TCP'}], 
           'CONNECT_DATA': {'SERVER': 'DEDICATED', 
                'SERVICE_NAME': 'PRODDB.MYDBS.DOMAIN.COM', 
                'SID': 'PRODDB'}} 
Aliases: 
    {'DEVDBA': 'DEVDBA.MYDBS.DOMAIN.COM', 'PRODDB': 'PRODDB.MYDBS.DOMAIN.COM'} 
Duplicates: 
    [] 

I cou ld找不到tnsnames.ora文件的其他Python解析器。如果你知道一个,请告诉我。