2017-10-19 22 views
1

我期待使用多线程来加速这个过程是非常I/O绑定。我希望能够通过循环中的CSV文件中的ID来调用休息服务。我一直没能搞清楚的是如何将文件优雅分成取决于线程我想使用的量块。红宝石:一个文件,根据主题的拆分和阅读部分计数

CSV文件包含ID的一列像这样: ...

require 'benchmark' 
require 'csv' 

FILE_RECORD_COUNT = File.open("path-to-csv","r").readlines.size 

def setup(thread_count) 
    threads = [] 
    thread_count.times do 
    threads << Thread.new do 
     fetches_per_thread = FILE_RECORD_COUNT/thread_count 

     fetches_per_thread.times do 
     CSV.foreach("id_file.csv") do |id| 
      response = RestClient.get("https://api.examplerest/names?id={#id}",{accept: :json}) 
      # do some quick validation... 
     end 
     end 
    end 
    end 

    threads.each(&:join) 
end 

def run_benchmark 
    Benchmark.bm(20) do |bm| 
    [1, 2, 3, 5, 6, 10, 15, 30, 100].each do |thread_count| 
     bm.report("with #{thread_count} threads") do 
     setup(thread_count) 
     end 
    end 
    end 
end 

凡我难倒,并在那里我需要一个解决方案是代码块CSV.foreach(id_file.csv") do |id|...。我想动态分割数据并将其馈入每个线程,然后再打一个电话。我知道我可以手动分割文件,但我想避免这种情况。

我曾经尝试到从一个例子,我在网上找到的,看看那里的甜区是线程数的基准这一点。

编辑: 使用BernardK的回答,我能够运行我的代码,螺纹和将得到以下结果:

     | user | system | total | real | 
with 1 threads   5.125000 2.594000 7.719000 (40.416162) 
with 2 threads   1.625000 2.015000 3.640000 (28.571521) 
with 3 threads   1.578000 1.625000 3.203000 (17.210526) 
with 4 threads   1.578000 1.235000 2.813000 ( 8.496068) 
with 5 threads   1.406000 1.250000 2.656000 ( 6.779216) 
with 10 threads  1.875000 1.328000 3.203000 ( 5.069487) 
with 15 threads  2.016000 1.640000 3.656000 ( 4.285426) 
with 30 threads  2.125000 1.625000 3.750000 ( 3.817084) 
with 100 threads  2.281000 1.375000 3.656000 ( 3.943304) 

这是试运行,但真正体现出像这些线程如何加速比Ruby代码!

+0

请稍候,存在错误('@ lines.each_slice'必须更换'thread_count.times')。 – BernardK

+0

@BernardK,没关系,当你重新后,我会实现你的变化,让你知道如何去。 –

+0

完成。 (以前的版本读取文件'thread_count'次) – BernardK

回答

1

的文件被读入的阵列,其在块使用Enumerable#each_slice分裂。

require 'benchmark' 
require 'csv' 

@file_name = 'xxx.txt' 
file = File.open(@file_name, 'w') 
1000.times do | i | 
    file.puts "#{i.to_s}" 
end 
file.close 

@lines = [] 
CSV.foreach(@file_name) { | line | @lines << line } 
FILE_RECORD_COUNT = @lines.size 
puts FILE_RECORD_COUNT 

def setup(thread_count) 
    puts "----- thread_count=#{thread_count}" 
    threads = [] 
    fetches_per_thread = FILE_RECORD_COUNT/thread_count 
    puts "----- fetches_per_thread=#{fetches_per_thread}" 
    raise 'invalid slice size' if fetches_per_thread < 1 

    @lines.each_slice(fetches_per_thread) do | slice | 
    threads << Thread.new do 
     puts "===== slice from #{slice.first} to #{slice.last}" 
     slice.each do | id | 
#  puts id 
#   response = RestClient.get("https://api.examplerest/names/{#id}",{accept: :json}) 
      # do some quick validation... 
     end # slice.each 
    end # Thread.new 
    end # @lines.each_slice 

    threads.each(&:join) 
end # def setup 

def run_benchmark 
    Benchmark.bm(20) do |bm| 
    [1, 2, 3, 5, 6, 10, 15, 30, 100].each do |thread_count| 
     bm.report("with #{thread_count} threads") do 
     setup(thread_count) 
     end 
    end 
    end 
end 

run_benchmark 

执行:

$ -------------------------------- 
-bash: --------------------------------: command not found 
$ ruby -w t.rb 
1000 
          user  system  total  real 
with 1 threads  ----- thread_count=1 
----- fetches_per_thread=1000 
===== slice from ["0"] to ["999"] 
    0.000000 0.000000 0.000000 ( 0.000288) 
with 2 threads  ----- thread_count=2 
----- fetches_per_thread=500 
===== slice from ["0"] to ["499"] 
===== slice from ["500"] to ["999"] 
    0.000000 0.000000 0.000000 ( 0.000318) 
with 3 threads  ----- thread_count=3 
----- fetches_per_thread=333 
===== slice from ["0"] to ["332"] 
===== slice from ["666"] to ["998"] 
===== slice from ["999"] to ["999"] 
===== slice from ["333"] to ["665"] 
    0.000000 0.000000 0.000000 ( 0.000549) 
with 5 threads  ----- thread_count=5 
----- fetches_per_thread=200 
===== slice from ["0"] to ["199"] 
===== slice from ["200"] to ["399"] 
===== slice from ["400"] to ["599"] 
===== slice from ["600"] to ["799"] 
===== slice from ["800"] to ["999"] 
    0.000000 0.000000 0.000000 ( 0.000536) 
with 6 threads  ----- thread_count=6 
----- fetches_per_thread=166 
===== slice from ["166"] to ["331"] 
===== slice from ["664"] to ["829"] 
===== slice from ["830"] to ["995"] 
===== slice from ["996"] to ["999"] 
===== slice from ["0"] to ["165"] 
===== slice from ["332"] to ["497"] 
===== slice from ["498"] to ["663"] 
    0.000000 0.000000 0.000000 ( 0.000735) 
with 10 threads  ----- thread_count=10 
----- fetches_per_thread=100 
===== slice from ["900"] to ["999"] 
... 
===== slice from ["190"] to ["199"] 
===== slice from ["200"] to ["209"] 
===== slice from ["210"] to ["219"] 
===== slice from ["220"] to ["229"] 
===== slice from ["230"] to ["239"] 
===== slice from ["240"] to ["249"] 
... 
===== slice from ["970"] to ["979"] 
===== slice from ["980"] to ["989"] 
===== slice from ["990"] to ["999"] 
===== slice from ["20"] to ["29"] 
===== slice from ["30"] to ["39"] 
    0.000000 0.000000 0.000000 ( 0.011656) 

然后我使用find命令在终端找到--------------------------------------并跳转到开始执行。

+0

是不是你的基准测试显示更多的线程导致它运行速度更慢? – Max

+0

@Max这不是Austin L的问题吗?这是他的基准(5线程显示最佳时间)。我刚刚帮助将CSV文件分成块。 – BernardK