2016-12-14 61 views
1

我使用多个logstash JDBC输入:同步logstash JDBC输入

jdbc { 
    jdbc_driver_library => "../vendor/oracle/ojdbc7.jar" 
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" 
    connection_retry_attempts => 10 
    connection_retry_attempts_wait_time => 5 
    jdbc_validate_connection => "true" 
    jdbc_connection_string => "connectionString/myDataBase" 
    jdbc_user => "USER_NAME" 
    jdbc_password => "PASSWORD" 
    schedule => "* * * * *" 
    statement_filepath => "myPath/queryA.sql" 
    tracking_column => "myTrackingcolumn" 
    last_run_metadata_path => "myPath/.logstash_jdbc_last_run" 
    type => "documentType" 
    add_field => { 
      "tag" => "myFirstTag" 
      } 
    } 

jdbc { 
    jdbc_driver_library => "../vendor/oracle/ojdbc7.jar" 
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" 
    connection_retry_attempts => 10 
    connection_retry_attempts_wait_time => 5 
    jdbc_validate_connection => "true" 
    jdbc_connection_string => "connectionString/myDataBase" 
    jdbc_user => "USER_NAME" 
    jdbc_password => "PASSWORD" 
    schedule => "* * * * *" 
    statement_filepath => "myPath/queryB.sql" 
    tracking_column => "myTrackingcolumn" 
    last_run_metadata_path => "myPath/.logstash_jdbc_last_run" 
    type => "documentType" 
    add_field => { 
      "tag" => "mySecondTag" 
      } 
    } 

jdbc { 
    jdbc_driver_library => "../vendor/oracle/ojdbc7.jar" 
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" 
    connection_retry_attempts => 10 
    connection_retry_attempts_wait_time => 5 
    jdbc_validate_connection => "true" 
    jdbc_connection_string => "connectionString/myDataBase" 
    jdbc_user => "USER_NAME" 
    jdbc_password => "PASSWORD" 
    schedule => "* * * * *" 
    statement_filepath => "myPath/queryC.sql" 
    tracking_column => "myTrackingcolumn" 
    last_run_metadata_path => "myPath/.logstash_jdbc_last_run" 
    type => "documentType" 
    add_field => { 
      "tag" => "myThirdTag" 
      } 
    } 

由于现时我询问这引发了以下错误的数据库中定义的SESSIONS_PER_USER limit

[31mPipeline aborted due to error {:exception=>#<Sequel::DatabaseConnectionError: Java::JavaSql::SQLException: ORA-02391: 
exceeded simultaneous SESSIONS_PER_USER limit>, :backtrace=>["oracle.jdbc.driver.T4CTTIoer.processError(oracle/jdbc/driver/T4CTTIoer.java:450)", "oracle.jdbc.driver. 
T4CTTIoer.processError(oracle/jdbc/driver/T4CTTIoer.java:392)", "oracle.jdbc.driver.T4CTTIoer.processError(oracle/jdbc/driver/T4CTTIoer.java:385)", 
"oracle.jdbc.driver.T4CTTIfun.processError(oracle/jdbc/driver/T4CTTIfun.java:938)", "oracle.dbc.driver.T4CTTIoauthenticate.processError(oracle/jdbc/driver/T4CTTIoauthenticate.java:480)", 
"oracle.jdbc.driver.T4CTTIfun.receive(oracle/jdbc/driver/T4CTTIfun.java:655)", "oracle.jdbc.driver.T4CTTIfun.doRPC(oracle/jdbc/driver/T4CTTIfun.java:249)", 
"oracle.jdbc.driver.T4CTTIoauthenticate.doOAUTH(oracle/jdbc/driver/T4CTTIoauthenticate.java:416)", "oracle.jdbc.driver.T4CTTIoauthenticate.doOAUTH(oracle/jdbc/driver/T4CTTIoauthenticate.java:825)", 
"oracle.jdbc.driver.T4CConnection.logon(oracle/jdbc/driver/T4CConnection.java:596)", "oracle.jdbc.driver.PhysicalConnection.<init>(oracle/jdbc/driver/PhysicalConnection.java:715)", 
"oracle.jdbc.driver.T4CConnection.<init>(oracle/jdbc/driver/T4CConnection.java:385)", "oracle.jdbc.driver.T4CDriverExtension.getConnection(oracle/jdbc/driver/T4CDriverExtension.java:30)", 
"oracle.jdbc.driver.OracleDriver.connect(oracle/jdbc/driver/OracleDriver.java:564)", "RUBY.connect(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/adapters/jdbc.rb:222)", 
"RUBY.make_new(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool.rb:110)", "RUBY.make_new(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:226)", 
"RUBY.available(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:199)", "RUBY._acquire(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:135)", 
"RUBY.acquire(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:149)", "RUBY.sync(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:280)", 
"org.jruby.ext.thread.Mutex.synchronize(org/jruby/ext/thread/Mutex.java:149)", "RUBY.sync(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:280)", 
"RUBY.acquire(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:148)", "RUBY.acquire(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/extensions/connection_validator.rb:98)", 
"RUBY.hold(D:myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/connection_pool/threaded.rb:106)", "RUBY.synchronize(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/database/connecting.rb:256)", 
"RUBY.test_connection(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/sequel-4.36.0/lib/sequel/database/connecting.rb:266)", "RUBY.prepare_jdbc_connection(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-input-jdbc-3.1.0/lib/logstash/plugin_mixins/jdbc.rb:173)", 
"RUBY.register(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-input-jdbc-3.1.0/lib/logstash/inputs/jdbc.rb:187)", "RUBY.start_inputs(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/pipeline.rb:330)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)", 
"RUBY.start_inputs(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/pipeline.rb:329)", "RUBY.start_workers(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/pipeline.rb:180)", 
"RUBY.run(myPath/Env/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/pipeline.rb:136)", 
"RUBY.start_pipeline(myPath/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.4-java/lib/logstash/agent.rb:473)"], :level=>:error}?[0mstopping pipeline {:id=>"main"} 

如何配置这些输入使logstash按顺序执行SQL查询并避免超出允许的会话限制?

回答

1

我不认为有一种方法可以按顺序执行输入。

但是从jdbc输入选项schedule可以减少查询的频率,以避免SESSIONS_PER_USER的限制。

由于它是:schedule => "* * * * *",您的插件将每分钟连接到数据库(请参阅here)。您可以改用schedule => "*/15 * * * *",每15分钟连接一次(请参阅here)。

+0

我认为这可以是一个解决方法,但不是一个永久的解决方案。我需要确保所有输入都已执行并避免崩溃logstash实例。 crontab编辑器非常棒! – M3HD1

+1

@Mehdi是的,我知道这只是一个解决方法。一个长期的解决方案是每个输入使用一个用户或增加限制(但在您的情况下可能无法实现)。另一种解决方案(但我不确定它会起作用):将工作人员数量和批量大小设置为1(请参见[这里](https://www.elastic.co/guide/en/logstash/current/command -line-flags.html)),因此一次只能处理一个事件,可以阻止输入([documentation](https://www.elastic.co/guide/en/logstash/5.1/execution- model.html)对此不清楚) – baudsp

+0

我试图用只有一个管道工作者启动logstash实例,但这并不能解决问题。正如文档中提到的,似乎管道工只执行过滤和输出,而不是输入! (这个选项设置将并行执行管道的过滤和输出阶段的工作人员的数量) – M3HD1