ボクココ

サービス開発を成功させるまでの歩み

Rails で大量のレコードを並列処理する

ども、@kimihom です。

f:id:cevid_cpp:20200119181256j:plain

今回、大量のレコードを一つずつ処理する実装をしたので、その実装方法をまとめておく。

コードの大枠

以下は全ユーザー(User)に紐づいているレコード(Record) に対して処理をするコードとなっている。

User.all.order("id").each do |u|
  r_all = u.records
  r_all.find_in_batches do |records|
    Parallel.each(records, in_threads: 50) do |r|
      begin
        # 処理
        ActiveRecord::Base.connection_pool.with_connection do
          # ActiveRecord を使った処理
        end
      rescue => e
        puts "err #{e}"
      end
    end
  end
  r_all = nil
end

find_in_batches

find_in_batches を使うことで、u.records を一気に処理するのではなく、デフォルトでは1,000件ごとに分けて処理するようになる。これによって、サーバーのメモリ負荷を軽減することができる。 ドキュメントには

To be yielded each record one by one, use #find_each instead.

と書かれている。一つずつレコードを生成するには、#find_each を使うとのこと。

Parallel

大量のレコードを一つ一つ処理していては、日が暮れてしまう。ということでマルチスレッドでコードを実行するには Parallel という Gem が便利に使える。デフォルト Ruby の提供している Threads は、実際にコードを書いてみると複雑になりがちだ。

ActiveRecord で取ってきたデータをスレッドで each させる。この時指定するパラメータ in_threads の数は、実行する環境によって左右される。例えば外部のデータにアクセスする際や、書くコードの重さなどによって低くしないといけないケースが出てくる。まずは少なめの数から実行してみて、最適な数を見つけていく形になるだろう。

ActiveRecord::Base.connection_pool.with_connection

マルチスレッドで処理をすると、ActiveRecord の DB アクセスがスレッドごとに作られてしまい、コネクションの作成に失敗してしまう。

could not obtain a connection from the pool within 5.000 seconds (waited 5.000 seconds);

DB コネクションを使い回すようにするために、この with_connection のブロック内で ActiveRecord の処理を書く必要がある。

終わりに

普段 Rails でコードを書いているだけだと、このような大量の処理というケースはあまり出くわさないんだけど、大量のデータを一括で更新したいといったような運用のケースで並列処理は必要になってくるだろう。

私自身、Web コードばかり書いていた影響で並列処理を熟知しているわけではないんだけど、最終的に今回書いたコードでうまく大量の処理を実行できたので良かった。

こうした大量の処理をする前には、DB のバックアップは取っておいた方が身のためだね。より安全なバッチ処理についても考えていかなければならないと思った。