Skip to content

Inconsistencies of MapReduce when using http or pbc [JIRA: CLIENTS-131] #394

Open
@gglanzani

Description

@gglanzani

I have the following Erlang code (very simple, just for illustration)

-module(grid_mr).

-export([yearfun/3]).

yearfun(O, _KeyData, _Arg) ->
  {struct, Map} = mochijson2:decode(riak_object:get_value(O)),
  Year = proplists:get_value(<<"year">>, Map, -1.0),
  Grid = proplists:get_value(<<"grid">>, Map, -1.0),
  case Year > 2006 of
     true -> [{Grid, Year}];
     false -> []
end.

When I run a MR code via CURL, I get

$ curl -XPOST http://172.17.12.21:8089/mapred \
   -H 'Content-Type: application/json'   \
   -d '{"inputs":[["STATS", grid_stats_C7FF2796D6BD1153E847E84277F6B4A1022E29DACA35989004C10FFA92E2A5F0-2008"],
["STATS",  grid_stats_C7FF2796D6BD1153E847E84277F6B4A1022E29DACA35989004C10FFA92E2A5F0-2010"]],"query":[{"map":{"language":"erlang","module":"grid_mr","function":"yearfun"}}]}'

{"C7FF2796D6BD1153E847E84277F6B4A1022E29DACA35989004C10FFA92E2A5F0":2008,"C7FF2796D6BD1153E847E84277F6B4A1022E29DACA35989004C10FFA92E2A5F0":2010}

However when submitting with Python using http

from riak import RiakClient
from riak import RiakMapReduce
riak = RiakClient(protocol='http', host='172.17.12.22', http_port=8089)
bucket = riak.bucket("STATS")
mr = RiakMapReduce(riak)
keys = ["grid_stats_C7FF2796D6BD1153E847E84277F6B4A1022E29DACA35989004C10FFA92E2A5F0-2008", 
        "grid_stats_C7FF2796D6BD1153E847E84277F6B4A1022E29DACA35989004C10FFA92E2A5F0-2010"]
mr.add("STATS", keys)
#mr.search(index="grid_stats", query="industry_id:22 AND customer_segmentation_id:6")
mr.map(['grid_mr', 'yearfun'])
for result in mr.run():
    print "%s" % result 

I get

C7FF2796D6BD1153E847E84277F6B4A1022E29DACA35989004C10FFA92E2A5F0

If, instead, I use pbc

from riak import RiakClient
from riak import RiakMapReduce
riak = RiakClient(protocol='pbc', host='172.17.12.22', http_port=8087)
bucket = riak.bucket("STATS")
mr = RiakMapReduce(riak)
keys = ["grid_stats_C7FF2796D6BD1153E847E84277F6B4A1022E29DACA35989004C10FFA92E2A5F0-2008", 
        "grid_stats_C7FF2796D6BD1153E847E84277F6B4A1022E29DACA35989004C10FFA92E2A5F0-2010"]
mr.add("STATS", keys)
#mr.search(index="grid_stats", query="industry_id:22 AND customer_segmentation_id:6")
mr.map(['grid_mr', 'yearfun'])
for result in mr.run():
    print "%s" % result

the client raises an exception

[...]
.virtualenvs/numpy/lib/python2.7/site-packages/riak/transports/pbc/transport.pyc in mapred(self, inputs, query, timeout)
    410         for phase, content in self.stream_mapred(inputs, query, timeout):
    411             if phase in result:
--> 412                 result[phase] += content
    413             else:
    414                 result[phase] = content
TypeError: unsupported operand type(s) for +=: 'dict' and 'dict'

Am I doing something wrong, or…? Right now changing the Erlang code is the only fix:

-module(binary_grid).

-export([yearfun/3]).

yearfun(O, _KeyData, _Arg) ->
  {struct, Map} = mochijson2:decode(riak_object:get_value(O)),
  Year = proplists:get_value(<<"year">>, Map, -1.0),
  Grid = proplists:get_value(<<"grid">>, Map, -1.0),
  case Year > 2006 of
     true -> [list_to_binary(mochijson2:encode([{Grid, Year}]))];
     false -> []
end.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions