2011-12-04 52 views
0

我正在编写一个使用MySQLe作为嵌入式后端的程序。数据库库由一个名为“Domain”的对象拥有。这个Domain对象在主线程中运行。如何在多线程环境中使用嵌入式MySQL?

程序启动另一个运行XML-RPC服务器的线程(boost :: thread和xmlrpc_c :: serverAbyss)。它链接到域对象。

当XML-RPC服务器,使Domain对象执行SQL查询的程序崩溃:

Program received signal: “EXC_BAD_ACCESS”. 
[Switching to process 73191] 
[Switching to process 73191] 
Xcode could not locate source file: regex.cpp (line: 74) 

当主线程调用执行SQL查询的程序仍然运行域对象的方法。

/* 
* Ports listening 
* 
* - create a Rpc_Server object 
* - create a dedicated thread 
*/ 
Rpc_Server  server(&domain, &conf_params, &router); 
boost::thread server_thread(boost::bind(&Rpc_Server::run, &server)); // This thread makes the server crash 

/* 
* Domain routine 
* 
* - Check for ready jobs every minute 
*/ 
while (1) { 
    v_jobs jobs = domain.get_ready_jobs(conf_params.get_param("node_name")); // This method does NOT make the server crash 
    sleep(60); 
} 

Domain对象的方法和Database对象的方法都会锁定互斥锁以避免多重访问。

bool Mysql::execute(const std::string* query) { 
    MYSQL_RES* res; 
    MYSQL_ROW row; 

    if (query == NULL) 
     return false; 

    this->updates_mutex.lock(); 

    std::cout << query->c_str() << std::endl; 

    if (mysql_query(this->mysql, query->c_str()) != 0) { 
     std::cerr << query << std::endl << mysql_error(this->mysql); 
     UNLOCK_MUTEX; 
     return false; 
    } 

    res = mysql_store_result(this->mysql); 
    if (res) 
     while ((row = mysql_fetch_row(res))) 
      for (uint i=0 ; i < mysql_num_fields(res) ; i++) 
       std::cout << row[i] << std::endl; 
    else 
     if (mysql_field_count(this->mysql) != 0) { 
      std::cerr << "Erreur : " << mysql_error(this->mysql) << std::endl; 
      mysql_free_result(res); 
      this->updates_mutex.unlock(); 
      return false; 
     } 

    mysql_free_result(res); 
    this->updates_mutex.unlock(); 

    return true; 
} 


bool Domain::add_node(const std::string* running_node, const std::string* n, const int* w) { 
    std::string query; 

    this->updates_mutex.lock(); 
    query = "START TRANSACTION;"; 
    if (this->database.execute(&query) == false) { 
     this->updates_mutex.unlock(); 
     return false; 
    } 

    query = "REPLACE INTO node (node_name,node_weight) VALUES ('"; 
    query += n->c_str(); 
    query += "','"; 
    query += boost::lexical_cast<std::string>(*w); 
    query += "');"; 

    if (this->database.execute(&query) == false) { 
     query = "ROLLBACK;"; 
     this->database.execute(&query); 
     this->updates_mutex.unlock(); 
     return false; 
    } 

    query = "COMMIT;" 
    if (this->database.execute(&query) == false) { 
     this->updates_mutex.unlock(); 
     return false; 
    } else 
     this->updates_mutex.unlock(); 

    return true; 
} 

的MySQLe创建有:

bool Mysql::prepare(const std::string* node_name, const std::string* db_skeleton) { 
    static char* server_args[] = {"this_program","--datadir=."}; 
    static char* server_groups[] = {"embedded","server","this_program_SERVER",(char *)NULL}; 
    std::string query("CREATE DATABASE IF NOT EXISTS "); 

    // DB init 
    if (mysql_library_init(sizeof(server_args)/sizeof(char *), server_args, server_groups)) 
     std::cerr << "could not initialize MySQL library" << std::endl; 

    std::cout << "mysql init..." << std::endl; 
    if ((this->mysql = mysql_init(NULL)) == NULL) 
     std::cerr << mysql_error(this->mysql) << std::endl; 

    if (! mysql_thread_safe()) { 
     std::cerr << "MySQL is NOT theadsafe !" << std::endl; 
     return false; 
    } 

    mysql_options(this->mysql, MYSQL_READ_DEFAULT_GROUP, "embedded"); 
    mysql_options(this->mysql, MYSQL_OPT_USE_EMBEDDED_CONNECTION, NULL); 

    mysql_real_connect(this->mysql, NULL, NULL, NULL, NULL, 0, NULL, 0); 

    // Creates the schema 
    query += this->translate_into_db(node_name); 
    query += ";"; 

    if (this->execute(&query) == false) 
     return false; 

    // Creates the schema 
    query = "CREATE SCHEMA IF NOT EXISTS "; 
    query += this->translate_into_db(node_name); 
    query += " DEFAULT CHARACTER SET latin1;"; 

    this->execute(&query); 

    // Uses it 
    query = "USE " + this->translate_into_db(node_name) + ";"; 

    this->execute(&query); 

    // Loads the skeleton from file 
    return this->load_file(db_skeleton->c_str()); 
} 

我错了地方? 你有一个例子来展示我吗?

回答

0

我找到了解决我的问题的方法。每个线程都需要初始化MySQL环境。这就是说执行一些mysql_ *函数。

这里被修改的/新的方法:

bool Mysql::atomic_execute(const std::string* query) { 
    MYSQL_RES* res; 
    MYSQL_ROW row; 
    boost::regex empty_string("^\\s+$", boost::regex::perl); 

    if (query == NULL) 
     return false; 

    if (query->empty() == true or boost::regex_match(*query, empty_string) == true) { 
     std::cerr << "Error : query is empty !" << std::endl; 
     return false; 
    } 

    this->updates_mutex.lock(); 

    if (mysql_query(this->mysql, query->c_str()) != 0) { 
     std::cerr << query << std::endl << mysql_error(this->mysql); 
     this->updates_mutex.unlock();; 
     return false; 
    } 

    res = mysql_store_result(this->mysql); 
    if (res) 
     while ((row = mysql_fetch_row(res))) 
      for (uint i=0 ; i < mysql_num_fields(res) ; i++) 
       std::cout << row[i] << std::endl; 
    else 
     if (mysql_field_count(this->mysql) != 0) { 
      std::cerr << "Erreur : " << mysql_error(this->mysql) << std::endl; 
      mysql_free_result(res); 
      this->updates_mutex.unlock(); 
      return false; 
     } 

    mysql_free_result(res); 
    this->updates_mutex.unlock(); 

    return true; 
} 

bool Mysql::standalone_execute(const v_queries* queries) { 
    MYSQL*  local_mysql = this->init(); 
    std::string query  = "START TRANSACTION;"; 

    if (this->atomic_execute(&query) == false) { 
     mysql_close(local_mysql); 
     return false; 
    } 

    BOOST_FOREACH(std::string q, *queries) { 
     std::cout << q.c_str() << std::endl; 
     if (this->atomic_execute(&q) == false) { 
      query = "ROLLBACK"; 
      this->atomic_execute(&query); 
      mysql_close(local_mysql); 
      return false; 
     } 
    } 

    query = "COMMIT"; 

    if (this->atomic_execute(&query) == false) { 
     mysql_close(local_mysql); 
     return false; 
    } 

    mysql_close(local_mysql); 
    return true; 
} 

MYSQL*  Mysql::init() { 
    MYSQL* local_mysql; 

    local_mysql = mysql_init(this->mysql); 
    mysql_options(this->mysql, MYSQL_READ_DEFAULT_GROUP, "embedded"); 
    mysql_options(this->mysql, MYSQL_OPT_USE_EMBEDDED_CONNECTION, NULL); 
    mysql_real_connect(local_mysql, NULL, NULL, NULL, NULL, 0, NULL, 0); 

    return local_mysql; 
} 

的atomic_execute方法用于单个查询发送到服务器。

standalone_execute方法初始化连接和事务,然后使用atomic_execute将整个查询发送到服务器。

我不知道,如果一个ROLLBACK是凯明的失败的情况下很有用......

的代码可能需要一些改进,但它的工作原理。