diff --git a/aioriak/transport.py b/aioriak/transport.py index 6617e13..f28599f 100644 --- a/aioriak/transport.py +++ b/aioriak/transport.py @@ -8,6 +8,7 @@ from collections import ChainMap from riak.pb import messages from riak.codecs import pbuf as codec +from riak.codecs import Msg from aioriak.content import RiakContent from riak.riak_object import VClock from riak.util import decode_index_value, bytes_to_str, str_to_bytes @@ -624,7 +625,9 @@ async def _stream(self, msg_code, msg=None, expect=None): return responses async def _request(self, msg_code, msg=None, expect=None): - self._writer.write(self._encode_message(msg_code, msg)) + if not isinstance(msg, Msg): + msg = self._encode_message(msg_code, msg) + self._writer.write(msg) if self._parser: tail = self._parser.tail del self._parser @@ -1090,3 +1093,11 @@ async def update_datatype(self, datatype, **options): datatype._set_value(self._decode_dt_value(type_name, resp)) return True + + async def search(self, index, query, **kwargs): + msg = codec.encode_search(index, query, **kwargs) + resp_code, resp = await self._request( + messages.MSG_CODE_SEARCH_QUERY_REQ, + msg + ) + return codec.decode_search(resp)