2014-01-21 50 views
2

我正在使用ElasticSearch的JDBC插件来更新我的MySQL数据库。它会拾取新的和已更改的记录,但不会删除已从MySQL中删除的记录。他们仍然在指数。ElasticSearch river JDBC MySQL不删除记录

这是我用来创建河流代码:

curl -XPUT 'localhost:9200/_river/account_river/_meta' -d '{ 
    "type" : "jdbc", 
    "jdbc" : { 
     "driver" : "com.mysql.jdbc.Driver", 
     "url" : "jdbc:mysql://localhost:3306/test", 
     "user" : "test_user", 
     "password" : "test_pass", 
     "sql" : "SELECT `account`.`id` as `_id`, `account`.`id`, `account`.`reference`, `account`.`company_name`, `account`.`also_known_as` from `account` WHERE NOT `account`.`deleted`", 
     "strategy" : "simple", 
     "poll" : "5s", 
     "versioning" : true, 
     "digesting" : false, 
     "autocommit" : true, 
     "index" : "headphones", 
     "type" : "Account" 
    } 
}' 

通过自制软件在OSX山狮,没有任何错误或问题,安装ElasticSearch和预期都响应。权限确定,日志中没有错误。

我已经删除,并且包括(并设置为true和false)autocommit,versioningdigesting的每个组合,我都能想到。这是一个开发数据库,​​所以我确信记录将被完全删除,不会被缓存,也不会被软删除。如果我删除所有记录(即保持河流完整,只删除ES上索引的内容),河流下次更新时不会重新添加记录,这导致我相信我错过了有关版本化和删除的内容。

注意我也尝试过各种方式来指定_id列,并且我通过调用JSON来检查它是否有值。

干杯。

+2

更新 - 我们永远无法使这个工作达到我们的满意度,并最终采取不同的方法。我们的系统现在使用事件来找出单个记录的更改并直接更新ElasticSearch。这使我们能够更加细致地控制正在发生的事情,并允许从系统重新启动reindex。对不起,如果有人卡住寻找答案。 –

回答

0

我对弹性还比较陌生,并且一直在为我的项目使用jdbc河。 如果我理解正确的话,这未必可能的情况下,这是它如何工作的:

  1. 获取从 数据库中的所有行(由SQL语句在河中指定)。
  2. 根据所有提取的行的(id,类型和索引)计算摘要(如果添加了新行或删除了行,则应更改为 )。
  3. 为所有行重新索引文件。这将自动地增加每个文档的版本 。
  4. 河流的增量版本存储在_river指数(自定义)
  5. 如果计算出的摘要在#3比所述一个是 存储在_river索引然后不同:
    • 商店它
    • 运行管家功能(删除版本号较低的所有文档)。

这样考虑,你会希望有运行,你需要有版本设置为true,随后这意味着digesting应设置为true以及看家。

所以话说回来,你的河水应该是这样的:

curl -XPUT 'localhost:9200/_river/account_river/_meta' -d '{ 
    "type" : "jdbc", 
    "jdbc" : { 
     "driver" : "com.mysql.jdbc.Driver", 
     "url" : "jdbc:mysql://localhost:3306/test", 
     "user" : "test_user", 
     "password" : "test_pass", 
     "sql" : "SELECT `account`.`id` as `_id`, `account`.`id`, `account`.`reference`, `account`.`company_name`, `account`.`also_known_as` from `account` WHERE NOT `account`.`deleted`", 
     "strategy" : "simple", 
     "poll" : "5s", 
     "autocommit" : true, 
     "index": { 
      "index" : "headphones", 
      "type" : "Account", 
      "versioning" : true, 
      "digesting" : true 
     } 
    } 
}' 

注意versioningdigestingindex定义的一部分,而不是jdbc定义

+0

上述不适用于我正在运行的内容。将索引作为子对象意味着它忽略索引/类型。 我注意到在版本控制和消化“take”时发生了一些事情,河流上有第三个文件,它显示了一个改变每次调查的版本。但是,我还没有看到实际文件本身的版本,这使我相信管家没有任何可触发的东西。 –

3

由于这个问题已经被问,参数已经被大大地改变了,版本和消化已被弃用,并且民意调查已经被时间表所取代,其将以多久来重新运行该河的计时表达式(下面计划每5分钟运行一次)

curl -XPUT 'localhost:9200/_river/account_river/_meta' -d '{ 
     "type" : "jdbc", 
     "jdbc" : { 
      "driver" : "com.mysql.jdbc.Driver", 
      "url" : "jdbc:mysql://localhost:3306/test", 
      "user" : "test_user", 
      "password" : "test_pass", 
      "sql" : "SELECT `account`.`id` as `_id`, `account`.`id`, `account`.`reference`, `account`.`company_name`, `account`.`also_known_as` from `account` WHERE NOT `account`.`deleted`", 
      "strategy" : "simple", 
      "schedule": "0 0/5 * * * ?" , 
      "autocommit" : true, 
      "index" : "headphones", 
      "type" : "Account" 
     } 
    }' 

但主要的问题,我从开发商得到的答案是这样的 https://github.com/jprante/elasticsearch-river-jdbc/issues/213

行删除不再检测。

我试着用版本管理,但是这并没有很好地工作 连同增量更新和添加行。

一个好方法是窗口索引。每个时间范围(可能为每天或每周一次 )为河流创建一个新索引,并将 添加到别名中。过了一段时间之后,旧的指数将被放弃。此 维护与logstash索引类似,但不在 范围内。

我目前使用的I研究别名方法是每晚重新创建索引和河流,并安排河流每隔几个小时运行一次。它确保当天的新数据被编入索引,并且删除将每24小时反映出来

+0

在附注中,河流是否仅使用“_id”字段替换现有记录?换句话说,如果sql语句返回一个非常大的结果集,那么即使很少的行受到插入/更新的影响,河流运行也需要很长时间吗?或者它以某种方式监视自上次运行以来发生了什么变化,并只插入/更新这些新行。 – Yamcha

+0

我相信答案是否定的,它会创建与doc完全相同的doc的重复项,您需要删除原始的 –

+0

我相信你错了。从我的实验中,它取代了它。我也在某处看过这种情况。 – Yamcha