Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes for query_many and its variants #152

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions lib/myxql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,14 @@ defmodule MyXQL do
* requires two roundtrips to the DB server: one for preparing the statement and one for executing it.
This can be alleviated by holding on to prepared statement and executing it multiple times.

## Stored procedures

A successfully executed stored procedure always returns the affected row count
of the last statement. This means any stored procedure containing statements that
return result sets, such as `SELECT` statements, must use `query_many/4` and
similar functions. These functions will return one result for each statement
returning a result set and one for the affected row count of the last statement.

## Options

* `:query_type` - use `:binary` for binary protocol (prepared statements), `:binary_then_text` to attempt
Expand Down Expand Up @@ -365,6 +373,14 @@ defmodule MyXQL do
but the statement isn't. If a new statement is given to an old name, the old
statement will be the one effectively used.

## Stored procedures

A successfully executed stored procedure always returns the affected row count
of the last statement. This means any stored procedure containing statements that
return result sets, such as `SELECT` statements, must use `prepare_many/4` and similar
functions. These functions will return one result for each statement returning a
result set and one for the affected row count of the last statement.

## Options

Options are passed to `DBConnection.prepare/3`, see it's documentation for
Expand Down Expand Up @@ -417,7 +433,7 @@ defmodule MyXQL do
## Examples

iex> {:ok, query} = MyXQL.prepare_many(conn, "", "CALL multi_procedure()")
iex> {:ok, [%MyXQL.Result{rows: [row1]}, %MyXQL.Result{rows: [row2]}]} = MyXQL.execute_many(conn, query, [2, 3])
iex> {:ok, [%MyXQL.Result{rows: [row1]}, %MyXQL.Result{rows: [row2]}, %MyXQL.Result{rows: nil}]} = MyXQL.execute_many(conn, query, [2, 3])
iex> row1
[2]
iex> row2
Expand Down Expand Up @@ -449,6 +465,14 @@ defmodule MyXQL do
@doc """
Prepares and executes a query that returns a single result, in a single step.

## Stored procedures

A successfully executed stored procedure always returns the affected row count
of the last statement. This means any stored procedure containing statements that
return result sets, such as `SELECT` statements, must use `prepare_execute_many/5`
and similar functions. These functions will return one result for each statement
returning a result set and one for the affected row count of the last statement.

## Options

Options are passed to `DBConnection.prepare_execute/4`, see it's documentation for
Expand Down Expand Up @@ -499,7 +523,7 @@ defmodule MyXQL do

## Examples

iex> {:ok, _, [%MyXQL.Result{rows: [row1]}, %MyXQL.Result{rows: [row2]}]} = MyXQL.prepare_execute(conn, "", "CALL multi_procedure()")
iex> {:ok, _, [%MyXQL.Result{rows: [row1]}, %MyXQL.Result{rows: [row2]}, %MyXQL.Result{rows: nil}]} = MyXQL.prepare_execute(conn, "", "CALL multi_procedure()")
iex> row1
[2]
iex> row2
Expand Down Expand Up @@ -533,6 +557,14 @@ defmodule MyXQL do
@doc """
Executes a prepared query that returns a single result.

## Stored procedures

A successfully executed stored procedure always returns the affected row count
of the last statement. This means any stored procedure containing statements that
return result sets, such as `SELECT` statements, must use execute_many/4 and
similar functions. These functions will return one result for each statement
returning a result set and one for the affected row count of the last statement.

## Options

Options are passed to `DBConnection.execute/4`, see it's documentation for
Expand Down Expand Up @@ -575,7 +607,7 @@ defmodule MyXQL do
## Examples

iex> {:ok, query} = MyXQL.prepare_many(conn, "", "CALL multi_procedure()")
iex> {:ok, [%MyXQL.Result{rows: [row1]}, %MyXQL.Result{rows: [row2]}]} = MyXQL.execute_many(conn, query)
iex> {:ok, [%MyXQL.Result{rows: [row1]}, %MyXQL.Result{rows: [row2]}, %MyXQL.Result{rows: nil}]} = MyXQL.execute_many(conn, query)
iex> row1
[2]
iex> row2
Expand Down
31 changes: 9 additions & 22 deletions lib/myxql/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -192,28 +192,6 @@ defmodule MyXQL.Client do

defp recv_packets(data, decode, decoder_state, result_state, timeout, client, partial \\ <<>>)

defp recv_packets(
<<size::uint3, _seq::uint1, payload::string(size), rest::binary>> = data,
decoder,
{:more_results, resultset},
result_state,
timeout,
client,
partial
)
when size < @default_max_packet_size do
case decode_more_results(<<partial::binary, payload::binary>>, rest, resultset, result_state) do
{:cont, decoder_state, result_state} ->
recv_packets(data, decoder, decoder_state, result_state, timeout, client, partial)

{:halt, result} ->
{:ok, result}

{:error, _} = error ->
error
end
end

defp recv_packets(
<<size::uint3, _seq::uint1, payload::string(size), rest::binary>>,
decoder,
Expand All @@ -225,6 +203,15 @@ defmodule MyXQL.Client do
)
when size < @default_max_packet_size do
case decoder.(<<partial::binary, payload::binary>>, rest, decoder_state) do
{:cont, {:more_results, result}} ->
case result_state do
:single ->
{:error, :multiple_results}

{:many, results} ->
recv_packets(rest, decoder, :initial, {:many, [result | results]}, timeout, client)
end

{:cont, decoder_state} ->
recv_packets(rest, decoder, decoder_state, result_state, timeout, client)

Expand Down
137 changes: 68 additions & 69 deletions lib/myxql/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ defmodule MyXQL.Connection do
end

other ->
result(other, query, state)
stream_result(other, query, state)
end
end

Expand All @@ -245,7 +245,7 @@ defmodule MyXQL.Connection do
end

other ->
result(other, query, state)
stream_result(other, query, state)
end
end

Expand All @@ -271,81 +271,41 @@ defmodule MyXQL.Connection do
end

## Internals
defp stream_result({:error, :multiple_results}, _query, _state) do
raise RuntimeError,
"streaming stored procedures is not supported. Use MyXQL.query_many/4 and similar functions."
end
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wojtekmach The idea here is that when streaming you can only get this type of error when using a stored procedure. The text query separated by semi-colons returns a syntax error, which I put a test for here: f04be98#diff-25c00486ed7c3364911a13625fb201869b4221fc96834062c8abe5f099d334daR775


defp result(
{:ok,
ok_packet(
last_insert_id: last_insert_id,
affected_rows: affected_rows,
status_flags: status_flags,
num_warnings: num_warnings
)},
query,
state
) do
result = %Result{
connection_id: state.client.connection_id,
last_insert_id: last_insert_id,
num_rows: affected_rows,
num_warnings: num_warnings
}

{:ok, query, result, put_status(state, status_flags)}
defp stream_result(result, query, state) do
result(result, query, state)
end

defp result(
{:ok,
resultset(
column_defs: column_defs,
num_rows: num_rows,
rows: rows,
status_flags: status_flags,
num_warnings: num_warnings
)},
query,
state
) do
columns = Enum.map(column_defs, &elem(&1, 1))
defp result({:ok, ok_packet(status_flags: status_flags) = result}, query, state) do
{:ok, query, format_result(result, state), put_status(state, status_flags)}
end

result = %Result{
connection_id: state.client.connection_id,
columns: columns,
num_rows: num_rows,
rows: rows,
num_warnings: num_warnings
}
defp result({:ok, resultset(status_flags: status_flags) = result}, query, state) do
{:ok, query, format_result(result, state), put_status(state, status_flags)}
end

{:ok, query, result, put_status(state, status_flags)}
# If a multi-result query has an error, it will be the latest query executed.
# The results are returned to this function in reverse order so it's the first
# in the result list.
defp result({:ok, [err_packet() = result | _rest]}, query, state) do
result({:ok, result}, query, state)
end

defp result({:ok, resultsets}, query, state) when is_list(resultsets) do
defp result({:ok, results}, query, state) when is_list(results) do
{results, status_flags} =
Enum.reduce(resultsets, {[], nil}, fn resultset, {results, newest_status_flags} ->
resultset(
column_defs: column_defs,
num_rows: num_rows,
rows: rows,
status_flags: status_flags,
num_warnings: num_warnings
) = resultset

columns = Enum.map(column_defs, &elem(&1, 1))

result = %Result{
connection_id: state.client.connection_id,
columns: columns,
num_rows: num_rows,
rows: rows,
num_warnings: num_warnings
}

# Keep status flags from the last query. The resultsets
# are given to this function in reverse order, so it is the first one.
if newest_status_flags do
{[result | results], newest_status_flags}
else
{[result | results], status_flags}
end
Enum.reduce(results, {[], nil}, fn
result, {results, latest_status_flags} ->
# Keep status flags from the last query. The result sets
# are given to this function in reverse order, so it is the first one.
if latest_status_flags do
{[format_result(result, state) | results], latest_status_flags}
else
{[format_result(result, state) | results], status_flags(result)}
end
end)

{:ok, query, results, put_status(state, status_flags)}
Expand Down Expand Up @@ -374,6 +334,45 @@ defmodule MyXQL.Connection do
{:error, %DBConnection.ConnectionError{message: message}}
end

defp format_result(
ok_packet(
last_insert_id: last_insert_id,
affected_rows: affected_rows,
num_warnings: num_warnings
),
state
) do
%Result{
connection_id: state.client.connection_id,
last_insert_id: last_insert_id,
num_rows: affected_rows,
num_warnings: num_warnings
}
end

defp format_result(
resultset(
column_defs: column_defs,
num_rows: num_rows,
rows: rows,
num_warnings: num_warnings
),
state
) do
columns = Enum.map(column_defs, &elem(&1, 1))

%Result{
connection_id: state.client.connection_id,
columns: columns,
num_rows: num_rows,
rows: rows,
num_warnings: num_warnings
}
end

defp status_flags(ok_packet(status_flags: status_flags)), do: status_flags
defp status_flags(resultset(status_flags: status_flags)), do: status_flags

defp error(reason, %{statement: statement}, state) do
error(reason, statement, state)
end
Expand Down
40 changes: 18 additions & 22 deletions lib/myxql/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,27 @@ defmodule MyXQL.Protocol do
end

# https://dev.mysql.com/doc/internals/en/com-query-response.html#packet-COM_QUERY_Response
# If an ok packet (0x00) is received during the `:initial` state it can be done of two things:
# 1. The result of a single statement query which doesn't return a result set (i.e `CREATE`)
# 2. The partial result of a multi-statement query where the current statement being executed
# doesn't return a result set.
# If the first scenario, return the ok packet as the response.
# If the second scenario, store the result and keep reading from the socket.
def decode_com_query_response(<<0x00, rest::binary>>, "", :initial) do
{:halt, decode_ok_packet_body(rest)}
ok_packet(status_flags: status_flags) = ok_response = decode_ok_packet_body(rest)

if has_status_flag?(status_flags, :server_more_results_exists) do
{:cont, {:more_results, ok_response}}
else
{:halt, ok_response}
end
end

def decode_com_query_response(<<0x00, rest::binary>>, _next_data, :initial) do
{:cont, {:more_results, decode_ok_packet_body(rest)}}
end

def decode_com_query_response(<<0xFF, rest::binary>>, "", :initial) do
def decode_com_query_response(<<0xFF, rest::binary>>, _next_data, :initial) do
{:halt, decode_err_packet_body(rest)}
end

Expand Down Expand Up @@ -482,26 +498,6 @@ defmodule MyXQL.Protocol do
)
end

def decode_more_results(payload, "", resultset, result_state) do
ok_packet(status_flags: status_flags) = decode_generic_response(payload)

case result_state do
:single ->
{:halt, resultset(resultset, status_flags: status_flags)}

{:many, results} ->
{:halt, [resultset(resultset, status_flags: status_flags) | results]}
end
end

def decode_more_results(_payload, _next_data, _resultset, :single) do
{:error, :multiple_results}
end

def decode_more_results(_payload, _next_data, resultset, {:many, results}) do
{:cont, :initial, {:many, [resultset | results]}}
end

defp decode_resultset(payload, _next_data, :initial, _row_decoder) do
{:cont, {:column_defs, decode_int_lenenc(payload), []}}
end
Expand Down
14 changes: 7 additions & 7 deletions test/myxql/client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,9 @@ defmodule MyXQL.ClientTest do

test "with stored procedure of single result", %{client: client} do
{:ok, com_stmt_prepare_ok(statement_id: statement_id)} =
Client.com_stmt_prepare(client, "CALL single_procedure()")
Client.com_stmt_prepare(client, "CALL single_ok_procedure()")

{:ok, resultset(num_rows: 1, status_flags: status_flags)} =
{:ok, ok_packet(status_flags: status_flags)} =
Client.com_stmt_execute(client, statement_id, [], :cursor_type_read_only)

assert list_status_flags(status_flags) == [:server_status_autocommit]
Expand All @@ -359,7 +359,7 @@ defmodule MyXQL.ClientTest do

test "with stored procedure of multiple results", %{client: client} do
{:ok, com_stmt_prepare_ok(statement_id: statement_id)} =
Client.com_stmt_prepare(client, "CALL multi_procedure()")
Client.com_stmt_prepare(client, "CALL one_resultset_one_ok_procedure()")

assert {:error, :multiple_results} =
Client.com_stmt_execute(client, statement_id, [], :cursor_type_read_only)
Expand All @@ -371,12 +371,12 @@ defmodule MyXQL.ClientTest do
{:ok, com_stmt_prepare_ok(statement_id: statement_id)} =
Client.com_stmt_prepare(client, "CALL cursor_procedure()")

{:ok, resultset(num_rows: 1, rows: [[3]])} =
Client.com_stmt_execute(client, statement_id, [], :cursor_type_read_only)
{:ok, [ok_packet(affected_rows: 0), resultset(num_rows: 1, rows: [[3]])]} =
Client.com_stmt_execute(client, statement_id, [], :cursor_type_read_only, {:many, []})

# This will be called if, for instance, someone issues the procedure statement from Ecto.Adapters.SQL.query
{:ok, resultset(num_rows: 1, rows: [[3]])} =
Client.com_stmt_execute(client, statement_id, [], :cursor_type_no_cursor)
{:ok, [ok_packet(affected_rows: 0), resultset(num_rows: 1, rows: [[3]])]} =
Client.com_stmt_execute(client, statement_id, [], :cursor_type_no_cursor, {:many, []})

Client.com_quit(client)
end
Expand Down
Loading