Skip to main content

Riak Ruby Client における MapReduce の問題点と対応方法 #0

Riak Ruby Client で MapReduce を実行しようとしたところ、下記の問題点を見つけたので対応方法を書いておく。

  • Secondary Indexes でバケットタイプを指定できない
  • Riak Search (Yokozuna) を使うことができない

Secondary Indexes でバケットタイプを指定することができない #

Secondary Indexes の検索結果を MapReduce の入力とするには Riak::MapReduce#index を使うことになる。しかし、このメソッドの引数としてバケットタイプを指定できない。

そもそも、バケットタイプを指定する方法を調べてみたら、下記のように inputs.bucket[bucket_type, bucket_name] のようにすれば出来ることが分かった。

curl -XPOST http://localhost:8098/mapred \
  -H "Content-Type: application/json" \
  -d @-<<EOF
{
  "inputs": {
    "bucket": ["yokozuna", "access_log"],
    "index": "time_int",
    "start": 1421506800,
    "end": 1421593199
  },
  "query": [
    {
      "map": {
        "language": "javascript",
        "source": "function(value) { return [1]; }"
      }
    },
    {
      "reduce": {
        "language": "javascript",
        "source": "Riak.reduceSum",
        "keep": true
      }
    }
  ]
}
EOF

これを Riak Ruby Client で実行する場合は、下記のようになる。

map_reduce = Riak::MapReduce.new(client)
map_reduce.inputs = {
  bucket: ["yokozuna", "access_log"],
  index:  'time_int',
  start:  1421506800,
  end:    1421593199
}
map_reduce.map('function(value) { return [1]; }').reduce('Riak.reduceSum', keep: true).run

Riak Search (Yokozuna) を使うことができない #

Riak Search (Yokozuna) の検索結果を MapReduce の入力とするには Riak::MapReduce#search を使うことになると思っていた。しかし、この実装を確認してみると、moduleriak_search となっており Riak Search (Yokozuna) には対応していないことが分かった。

def search(bucket, query)
  bucket = bucket.name if bucket.respond_to?(:name)
  @inputs = {:module => "riak_search", :function => "mapred_search", :arg => [bucket, query]}
  self
end

Riak Search (Yokozuna) を使う場合は下記のように moduleyokozuna を指定しなければならない。

curl -XPOST http://localhost:8098/mapred \
  -H "Content-Type: application/json" \
  -d @-<<EOF
{
  "inputs": {
    "module": "yokozuna",
    "function":"mapred_search",
    "arg": ["access_log", "*:*"]
  },
  "query": [
    {
      "map": {
        "language": "javascript",
        "source": "function(value) { return [1]; }"
      }
    },
    {
      "reduce": {
        "language": "javascript",
        "source": "Riak.reduceSum",
        "keep": true
      }
    }
  ]
}
EOF

そのため、Riak Ruby Client で実行する場合は、下記のようにしてやる必要がある。

map_reduce = Riak::MapReduce.new(client)
map_reduce.inputs = { module: 'yokozuna', function: 'mapred_search', arg: ['access_log', '*:*'] }
map_reduce.map('function(value) { return [1]; }').reduce('Riak.reduceSum', keep: true).run