Commit c238f64e5d4ba1b4353c3747a301997df41d7a51

Authored by comboy
Committed by Julian Langschaedel
1 parent 035a312fc1

use COPY for massive inserts with postgres + cleanup

Showing 1 changed file with 51 additions and 45 deletions Side-by-side Diff

lib/bitcoin/storage/sequel/sequel_store.rb
... ... @@ -71,44 +71,25 @@
71 71 existing ? blk_tx[idx] = existing : new_tx << [tx, idx]
72 72 end
73 73  
74   - if @db.adapter_scheme == :postgres
75   - new_tx_ids = []
76   - db.transaction do
77   - first_id = db.fetch("SELECT nextval('tx_id_seq') AS id").first[:id]
78   - csv = new_tx.map {|tx, _| tx_data(tx) }.map.with_index {|tx,i| [i+first_id, "\\x#{tx[:hash].hth}", tx[:version], tx[:lock_time], tx[:coinbase], tx[:tx_size], (tx[:nhash] ? "\\x#{tx[:nhash].hth}" : '')].join(',') }.join("\n")
79   - db.copy_into(:tx, columns: [:id, :hash, :version, :lock_time, :coinbase, :tx_size, :nhash], format: :csv, data: csv)
80   - last_id = first_id + new_tx.size - 1
81   - db.execute("SELECT setval('tx_id_seq', #{last_id}, true)")
82   - new_tx_ids = (first_id..last_id).to_a
83   - end
84   - else
85   - new_tx_ids = @db[:tx].insert_multiple(new_tx.map {|tx, _| tx_data(tx) })
86   - end
87   -
  74 + new_tx_ids = fast_insert(:tx, new_tx.map {|tx, _| tx_data(tx) }, return_ids: true)
88 75 new_tx_ids.each.with_index {|tx_id, idx| blk_tx[new_tx[idx][1]] = tx_id }
89 76  
90   - if @db.adapter_scheme == :postgres
91   - csv_data = blk_tx.map.with_index {|id, idx| [block_id, id, idx].join(',')}.join("\n")
92   - @db.copy_into(:blk_tx, format: :csv, columns: [:blk_id, :tx_id, :idx], data: csv_data)
93   - else
94   - @db[:blk_tx].insert_multiple(blk_tx.map.with_index {|id, idx|
95   - { blk_id: block_id, tx_id: id, idx: idx } })
96   - end
  77 + fast_insert(:blk_tx, blk_tx.map.with_index {|id, idx| { blk_id: block_id, tx_id: id, idx: idx } })
97 78  
98 79 # store txins
99   - txin_ids = @db[:txin].insert_multiple(new_tx.map.with_index {|tx, tx_idx|
  80 + fast_insert(:txin, new_tx.map.with_index {|tx, tx_idx|
100 81 tx, _ = *tx
101 82 tx.in.map.with_index {|txin, txin_idx|
102 83 txin_data(new_tx_ids[tx_idx], txin, txin_idx) } }.flatten)
103 84  
104 85 # store txouts
105 86 txout_i = 0
106   - txout_ids = @db[:txout].insert_multiple(new_tx.map.with_index {|tx, tx_idx|
  87 + txout_ids = fast_insert(:txout, new_tx.map.with_index {|tx, tx_idx|
107 88 tx, _ = *tx
108 89 tx.out.map.with_index {|txout, txout_idx|
109 90 script_type, a, n = *parse_script(txout, txout_i, tx.hash, txout_idx)
110 91 addrs += a; names += n; txout_i += 1
111   - txout_data(new_tx_ids[tx_idx], txout, txout_idx, script_type) } }.flatten)
  92 + txout_data(new_tx_ids[tx_idx], txout, txout_idx, script_type) } }.flatten, return_ids: true)
112 93  
113 94 # store addrs
114 95 persist_addrs addrs.map {|i, h| [txout_ids[i], h]}
115 96  
116 97  
... ... @@ -150,34 +131,15 @@
150 131 new_addrs << [hash160, txouts.map {|id, _| id }]
151 132 end
152 133 end
153   - new_addr_ids = []
154   - if @db.adapter_scheme == :postgres
155   - db.transaction do
156   - first_id = db.fetch("SELECT nextval('addr_id_seq') AS id").first[:id]
157   - csv = new_addrs.map(&:first).map.with_index {|hash160, i| [hash160, i+first_id].join(',')}.join("\n")
158   - db.copy_into(:addr, columns: [:hash160, :id], format: :csv, data: csv)
159   - last_id = first_id + new_addrs.size - 1
160   - db.execute("SELECT setval('addr_id_seq', #{last_id}, true)")
161   - new_addr_ids = (first_id..last_id).to_a
162   - end
163   - else
164   - new_addr_ids = @db[:addr].insert_multiple(new_addrs.map {|hash160, txout_id|
165   - { hash160: hash160 } })
166   - end
167 134  
  135 + new_addr_ids = fast_insert(:addr, new_addrs.map {|hash160, txout_id| {hash160: hash160}}, return_ids: true)
168 136 new_addr_ids.each.with_index do |addr_id, idx|
169 137 new_addrs[idx][1].each do |txout_id|
170 138 addr_txouts << [addr_id, txout_id]
171 139 end
172 140 end
173 141  
174   - if @db.adapter_scheme == :postgres
175   - csv = addr_txouts.map{|x| x.join(',')}.join("\n")
176   - @db.copy_into(:addr_txout, format: :csv, columns: [:addr_id, :txout_id], data: csv)
177   - else
178   - @db[:addr_txout].insert_multiple(addr_txouts.map {|addr_id, txout_id|
179   - { addr_id: addr_id, txout_id: txout_id }})
180   - end
  142 + fast_insert(:addr_txout, addr_txouts.map {|addr_id, txout_id| { addr_id: addr_id, txout_id: txout_id }})
181 143 end
182 144  
183 145 # prepare transaction data for storage
... ... @@ -480,6 +442,50 @@
480 442 # tx = txout.get_tx
481 443 # total += txout.value
482 444 # end
  445 + end
  446 +
  447 + protected
  448 +
  449 + # Abstraction for doing many quick inserts.
  450 + #
  451 + # * +table+ - db table name
  452 + # * +data+ - a table of hashes with the same keys
  453 + # * +opts+
  454 + # ** return_ids - if true table of inserted rows ids will be returned
  455 + def fast_insert(table, data, opts={})
  456 + return [] if data.empty?
  457 + # For postgres we are using COPY which is much faster than separate INSERTs
  458 + if @db.adapter_scheme == :postgres
  459 +
  460 + columns = data.first.keys
  461 + if opts[:return_ids]
  462 + ids = db.transaction do
  463 + # COPY does not return ids, so we set ids manually based on current sequence value
  464 + # We lock the table to avoid inserts that could happen in the middle of COPY
  465 + db.execute("LOCK TABLE #{table} IN SHARE UPDATE EXCLUSIVE MODE")
  466 + first_id = db.fetch("SELECT nextval('#{table}_id_seq') AS id").first[:id]
  467 +
  468 + # Blobs need to be represented in the hex form (yes, we do hth on them earlier, could be improved
  469 + # \\x is the format of bytea as hex encoding in postgres
  470 + csv = data.map.with_index{|x,i| [first_id + i, columns.map{|c| x[c].kind_of?(Sequel::SQL::Blob) ? "\\x#{x[c].hth}" : x[c]}].join(',')}.join("\n")
  471 + db.copy_into(table, columns: [:id] + columns, format: :csv, data: csv)
  472 + last_id = first_id + data.size - 1
  473 +
  474 + # Set sequence value to max id, last arg true means it will be incremented before next value
  475 + db.execute("SELECT setval('#{table}_id_seq', #{last_id}, true)")
  476 + (first_id..last_id).to_a # returned ids
  477 + end
  478 + else
  479 + csv = data.map{|x| columns.map{|c| x[c].kind_of?(Sequel::SQL::Blob) ? "\\x#{x[c].hth}" : x[c]}.join(',')}.join("\n")
  480 + @db.copy_into(table, format: :csv, columns: columns, data: csv)
  481 + end
  482 +
  483 + else
  484 +
  485 + # Life is simple when your are not optimizing ;)
  486 + @db[table].insert_multiple(data)
  487 +
  488 + end
483 489 end
484 490  
485 491 end