Riak Ruby Client における MapReduce の問題点と対応方法 #1

Riak Ruby Client で MapReduce を実行しようとしたところ、下記の問題点を見つけたので対応方法を書いておく。

  • Secondary Indexes でバケットタイプを指定することができない
  • Riak Search (Yokozuna) を使うことができない

Secondary Indexes でバケットタイプを指定することができない

Secondary Indexes の検索結果を MapReduce の入力とするには Riak::MapReduce#index を使うことになるが、このメソッドの引数としてバケットタイプを指定することができない。

そもそも、バケットタイプを指定する方法を調べてみたら、下記のように inputs.bucket[bucket_type, bucket_name] のようにすれば出来ることが分かった。

curl -XPOST http://localhost:8098/mapred \
  -H "Content-Type: application/json" \
  -d @-<<EOF
{
  "inputs": {
    "bucket": ["yokozuna", "access_log"],
    "index": "time_int",
    "start": 1421506800,
    "end": 1421593199
  },
  "query": [
    {
      "map": {
        "language": "javascript",
        "source": "function(value) { return [1]; }"
      }
    },
    {
      "reduce": {
        "language": "javascript",
        "source": "Riak.reduceSum",
        "keep": true
      }
    }
  ]
}
EOF

これを Riak Ruby Client で実行する場合は、下記のようになる。

map_reduce = Riak::MapReduce.new(client)
map_reduce.inputs = {
  bucket: ["yokozuna", "access_log"],
  index:  'time_int',
  start:  1421506800,
  end:    1421593199
}
map_reduce.map('function(value) { return [1]; }').reduce('Riak.reduceSum', keep: true).run

Riak Search (Yokozuna) を使うことができない

Riak Search (Yokozuna) の検索結果を MapReduce の入力とするには Riak::MapReduce#search を使うことになると思っていたが、この実装を確認してみると、

def search(bucket, query)
  bucket = bucket.name if bucket.respond_to?(:name)
  @inputs = {:module => "riak_search", :function => "mapred_search", :arg => [bucket, query]}
  self
end

moduleriak_search となっており Riak Search (Yokozuna) には対応していないことが分かった。Riak Search (Yokozuna) を使う場合は下記のように moduleyokozuna を指定しなければならない。

curl -XPOST http://localhost:8098/mapred \
  -H "Content-Type: application/json" \
  -d @-<<EOF
{
  "inputs": {
    "module": "yokozuna",
    "function":"mapred_search",
    "arg": ["access_log", "*:*"]
  },
  "query": [
    {
      "map": {
        "language": "javascript",
        "source": "function(value) { return [1]; }"
      }
    },
    {
      "reduce": {
        "language": "javascript",
        "source": "Riak.reduceSum",
        "keep": true
      }
    }
  ]
}
EOF

そのため、Riak Ruby Client で実行する場合は、下記のようにしてやる必要がある。

map_reduce = Riak::MapReduce.new(client)
map_reduce.inputs = { module: 'yokozuna', function: 'mapred_search', arg: ['access_log', '*:*'] }
map_reduce.map('function(value) { return [1]; }').reduce('Riak.reduceSum', keep: true).run