From d34a0721acc69fbaf69c5b3891da79530b368bcb Mon Sep 17 00:00:00 2001 From: Bikram Chatterjee Date: Sun, 1 Mar 2020 10:21:38 +0100 Subject: [PATCH 1/3] Timing logging --- src_erlang/src/orabench.erl | 45 ++++++++++++++++++++++++++++++++----- 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/src_erlang/src/orabench.erl b/src_erlang/src/orabench.erl index 8ab126df..a3b22b23 100644 --- a/src_erlang/src/orabench.erl +++ b/src_erlang/src/orabench.erl @@ -1,4 +1,5 @@ -module(orabench). +-include_lib("kernel/include/file.hrl"). %% API exports -export([main/1, insert_partition/5, select_partition/4]). @@ -9,9 +10,11 @@ %%==================================================================== %% API functions %%==================================================================== +-define(TRACE, io:format("----> ~p ~ps~n", [{?MODULE, ?FUNCTION_NAME, ?LINE}, timer:now_diff(os:timestamp(), S) div 1000000])). %% escript Entry point main([ConfigFile, Driver]) -> +S = os:timestamp(), io:format( "~n[~p:~p] Start ~p (~s)~n", [?FUNCTION_NAME, ?LINE, ?MODULE, Driver] ), @@ -48,13 +51,19 @@ main([ConfigFile, Driver]) -> sql_insert := SqlInsert, sql_select := SqlSelect } = Config, + {ok, FI} = file:read_file_info(BulkFile), {ok, Fd} = file:open(BulkFile, [read, raw, binary, {read_ahead, 1024 * 1024}]), +?TRACE, + io:format("[~p:~p:~p] Loading data...", [?MODULE, ?FUNCTION_NAME, ?LINE]), Rows = load_data( - Driver, Fd, list_to_binary(Header), BulkDelimiter, Partitions, #{} + S, Driver, FI, Fd, list_to_binary(Header), BulkDelimiter, Partitions, #{}, 0, 0 ), +?TRACE, put(rows, Rows), put(conf, Config), +?TRACE, RowCount = length(lists:merge(maps:values(Rows))), +?TRACE, if RowCount /= FBulkSz -> io:format("First = ~p~nLast = ~p~n", [hd(Rows), lists:last(Rows)]), error({loaded_rows, length(Rows), 'of', FBulkSz}); @@ -71,10 +80,12 @@ main([ConfigFile, Driver]) -> end, Rows ), +?TRACE, #{ startTime := StartTs, endTime := EndTs } = Results = run_trials(Trials), +?TRACE, BMDrv = case Driver of "oranif" -> ok = application:load(oranif), @@ -85,12 +96,14 @@ main([ConfigFile, Driver]) -> {ok, JamDBVsn} = application:get_key(jamdb_oracle, vsn), lists:flatten(io_lib:format("jamdb_oracle (Version ~s)", [JamDBVsn])) end, +?TRACE, BMMod = lists:flatten( io_lib:format( "OTP ~s, erts-~s", [erlang:system_info(otp_release), erlang:system_info(version)] ) ), +?TRACE, RowFmt = string:join( [ BMRelease, BMId, BMComment, BMHost, integer_to_list(BMCores), BMOs, @@ -108,11 +121,13 @@ main([ConfigFile, Driver]) -> ], ResultDelim ), +?TRACE, case filelib:is_regular(ResultFile) of false -> ok = file:write_file(ResultFile, ResultHeader ++ "\n"); _ -> ok end, {ok, RFd} = file:open(ResultFile, [append, binary]), +?TRACE, DurationMicros = timer:now_diff(EndTs, StartTs), maps:map( fun( @@ -129,6 +144,7 @@ main([ConfigFile, Driver]) -> end, {[], [], 0}, maps:without([startTime, endTime], Insrts) ), +?TRACE, InsMaxET = lists:max(InsETs), InsMinST = lists:min(InsSTs), InsDur = timer:now_diff(InsMaxET, InsMinST), @@ -147,6 +163,7 @@ main([ConfigFile, Driver]) -> end, {[], [], 0}, maps:without([startTime, endTime], Slcts) ), +?TRACE, SelMaxET = lists:max(SelETs), SelMinST = lists:min(SelSTs), SelDur = timer:now_diff(SelMaxET, SelMinST), @@ -155,6 +172,7 @@ main([ConfigFile, Driver]) -> [Trial, SqlSelect, 'query', ts_str(SelMinST), ts_str(SelMaxET), round(SelDur / 1000000), SelDur * 1000] ), +?TRACE, DMs = timer:now_diff(ETs, STs), ok = io:format( RFd, RowFmt, @@ -529,13 +547,27 @@ select_partition( } }. -load_data(Driver, Fd, Header, BulkDelimiter, Partitions, Rows) -> +load_data( + S, Driver, FI, Fd, Header, BulkDelimiter, Partitions, Rows, BytesReadCount, + ReadPerCent +) -> case file:read_line(Fd) of - eof -> Rows; + eof -> + io:format("~n"), + Rows; {ok, Line0} -> + BytesReadCount1 = BytesReadCount + byte_size(Line0), + ReadPerCent1 = (BytesReadCount1 / FI#file_info.size) * 100, + if round(ReadPerCent1) /= round(ReadPerCent) -> + io:format("~p% (~ps)...", [round(ReadPerCent1), timer:now_diff(os:timestamp(), S) div 1000000]); + true -> ok + end, case string:trim(Line0, both, "\r\n") of Header -> - load_data(Driver, Fd, Header, BulkDelimiter, Partitions, Rows); + load_data( + S, Driver, FI, Fd, Header, BulkDelimiter, Partitions, Rows, + BytesReadCount1, ReadPerCent1 + ); Line -> [<> = Key, Data] = string:split( Line, BulkDelimiter, all @@ -543,13 +575,14 @@ load_data(Driver, Fd, Header, BulkDelimiter, Partitions, Rows) -> Partition = (KeyByte1 * 256 + KeyByte2) rem Partitions, OldData = maps:get(Partition, Rows, []), load_data( - Driver, Fd, Header, BulkDelimiter, Partitions, + S, Driver, FI, Fd, Header, BulkDelimiter, Partitions, Rows#{ Partition => case Driver of "oranif" -> [{Key, Data} | OldData]; "jamdb" -> [[Key, Data] | OldData] end - } + }, + BytesReadCount1, ReadPerCent1 ) end end. From 0925dcad0db8c5338ec63259dd7bdd079dd2f693 Mon Sep 17 00:00:00 2001 From: Bikram Chatterjee Date: Sun, 1 Mar 2020 11:25:10 +0100 Subject: [PATCH 2/3] transforing to bulk to ETS - WIP --- src_erlang/src/orabench.erl | 49 ++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/src_erlang/src/orabench.erl b/src_erlang/src/orabench.erl index a3b22b23..9f423d38 100644 --- a/src_erlang/src/orabench.erl +++ b/src_erlang/src/orabench.erl @@ -55,30 +55,33 @@ S = os:timestamp(), {ok, Fd} = file:open(BulkFile, [read, raw, binary, {read_ahead, 1024 * 1024}]), ?TRACE, io:format("[~p:~p:~p] Loading data...", [?MODULE, ?FUNCTION_NAME, ?LINE]), - Rows = load_data( + ets:new(bulk_data, [set, protected, named_table]), + load_data( S, Driver, FI, Fd, list_to_binary(Header), BulkDelimiter, Partitions, #{}, 0, 0 ), ?TRACE, - put(rows, Rows), put(conf, Config), ?TRACE, - RowCount = length(lists:merge(maps:values(Rows))), + RowCount = ets:info(bulk_data, size), ?TRACE, if RowCount /= FBulkSz -> - io:format("First = ~p~nLast = ~p~n", [hd(Rows), lists:last(Rows)]), - error({loaded_rows, length(Rows), 'of', FBulkSz}); + io:format( + "First = ~p~nLast = ~p~n", + [ets:lookup(bulk_data, ets:first(bulk_data)), + ets:lookup(bulk_data, ets:first(bulk_data))] + ), + error({loaded_rows, RowCount, 'of', FBulkSz}); true -> ok end, ok = file:close(Fd), - _ = maps:map( - fun(Partition, PartitionRows) -> + lists:foreach( + fun(Partition) -> io:format( "[~p:~p:~p] Partition ~p contains ~p rows~n", - [?MODULE, ?FUNCTION_NAME, ?LINE, Partition, length(PartitionRows)] - ), - PartitionRows - end, - Rows + [?MODULE, ?FUNCTION_NAME, ?LINE, Partition + 1, + ets:select_count(bulk_data, [{{'_', Partition, '_'}, [], [true]}])] + ) + end, lists:seq(0, Partitions - 1) ), ?TRACE, #{ @@ -363,13 +366,12 @@ run_trials(Trial, Trials, Ctx, Stats) -> ). run_insert(Ctx) -> - Rows = get(rows), #{benchmark_number_partitions := Partitions} = Config = get(conf), Master = self(), Threads = [ spawn_link( ?MODULE, insert_partition, - [Partition, maps:get(Partition, Rows), Ctx, Master, Config] + [Partition, Ctx, Master, Config] ) || Partition <- lists:seq(0, Partitions - 1) ], thread_join(Threads). @@ -385,7 +387,7 @@ run_select(Ctx) -> thread_join(Threads). insert_partition( - Partition, Rows, Ctx, Master, + Partition, Ctx, Master, #{ connection_user := User, connection_password := Password, @@ -573,15 +575,16 @@ load_data( Line, BulkDelimiter, all ), Partition = (KeyByte1 * 256 + KeyByte2) rem Partitions, - OldData = maps:get(Partition, Rows, []), + true = ets:insert(bulk_data, {Key, Partition, Data}), + %OldData = maps:get(Partition, Rows, []), load_data( - S, Driver, FI, Fd, Header, BulkDelimiter, Partitions, - Rows#{ - Partition => case Driver of - "oranif" -> [{Key, Data} | OldData]; - "jamdb" -> [[Key, Data] | OldData] - end - }, + S, Driver, FI, Fd, Header, BulkDelimiter, Partitions, Rows, + %Rows#{ + % Partition => case Driver of + % "oranif" -> [{Key, Data} | OldData]; + % "jamdb" -> [[Key, Data] | OldData] + % end + %}, BytesReadCount1, ReadPerCent1 ) end From 1378a8c1bdcb968b48333602d53631d2cd4b0d63 Mon Sep 17 00:00:00 2001 From: Bikram Chatterjee Date: Mon, 2 Mar 2020 17:54:48 +0100 Subject: [PATCH 3/3] fixes memory problem in docker --- src_erlang/src/orabench.erl | 118 ++++++++++++++---------------------- 1 file changed, 45 insertions(+), 73 deletions(-) diff --git a/src_erlang/src/orabench.erl b/src_erlang/src/orabench.erl index 9f423d38..eb9e8347 100644 --- a/src_erlang/src/orabench.erl +++ b/src_erlang/src/orabench.erl @@ -2,7 +2,7 @@ -include_lib("kernel/include/file.hrl"). %% API exports --export([main/1, insert_partition/5, select_partition/4]). +-export([main/1, insert_partition/4, select_partition/4]). -define(DPI_MAJOR_VERSION, 3). -define(DPI_MINOR_VERSION, 0). @@ -10,11 +10,10 @@ %%==================================================================== %% API functions %%==================================================================== --define(TRACE, io:format("----> ~p ~ps~n", [{?MODULE, ?FUNCTION_NAME, ?LINE}, timer:now_diff(os:timestamp(), S) div 1000000])). +%-define(TRACE, io:format("----> ~p ~ps~n", [{?MODULE, ?FUNCTION_NAME, ?LINE}, timer:now_diff(os:timestamp(), S) div 1000000])). %% escript Entry point main([ConfigFile, Driver]) -> -S = os:timestamp(), io:format( "~n[~p:~p] Start ~p (~s)~n", [?FUNCTION_NAME, ?LINE, ?MODULE, Driver] ), @@ -53,17 +52,11 @@ S = os:timestamp(), } = Config, {ok, FI} = file:read_file_info(BulkFile), {ok, Fd} = file:open(BulkFile, [read, raw, binary, {read_ahead, 1024 * 1024}]), -?TRACE, - io:format("[~p:~p:~p] Loading data...", [?MODULE, ?FUNCTION_NAME, ?LINE]), + io:format("[~p:~p:~p] Loading data...~n", [?MODULE, ?FUNCTION_NAME, ?LINE]), ets:new(bulk_data, [set, protected, named_table]), - load_data( - S, Driver, FI, Fd, list_to_binary(Header), BulkDelimiter, Partitions, #{}, 0, 0 - ), -?TRACE, + load_data(Driver, FI, Fd, list_to_binary(Header), BulkDelimiter, Partitions), put(conf, Config), -?TRACE, RowCount = ets:info(bulk_data, size), -?TRACE, if RowCount /= FBulkSz -> io:format( "First = ~p~nLast = ~p~n", @@ -83,12 +76,10 @@ S = os:timestamp(), ) end, lists:seq(0, Partitions - 1) ), -?TRACE, #{ startTime := StartTs, endTime := EndTs } = Results = run_trials(Trials), -?TRACE, BMDrv = case Driver of "oranif" -> ok = application:load(oranif), @@ -99,14 +90,12 @@ S = os:timestamp(), {ok, JamDBVsn} = application:get_key(jamdb_oracle, vsn), lists:flatten(io_lib:format("jamdb_oracle (Version ~s)", [JamDBVsn])) end, -?TRACE, BMMod = lists:flatten( io_lib:format( "OTP ~s, erts-~s", [erlang:system_info(otp_release), erlang:system_info(version)] ) ), -?TRACE, RowFmt = string:join( [ BMRelease, BMId, BMComment, BMHost, integer_to_list(BMCores), BMOs, @@ -124,13 +113,11 @@ S = os:timestamp(), ], ResultDelim ), -?TRACE, case filelib:is_regular(ResultFile) of false -> ok = file:write_file(ResultFile, ResultHeader ++ "\n"); _ -> ok end, {ok, RFd} = file:open(ResultFile, [append, binary]), -?TRACE, DurationMicros = timer:now_diff(EndTs, StartTs), maps:map( fun( @@ -147,7 +134,6 @@ S = os:timestamp(), end, {[], [], 0}, maps:without([startTime, endTime], Insrts) ), -?TRACE, InsMaxET = lists:max(InsETs), InsMinST = lists:min(InsSTs), InsDur = timer:now_diff(InsMaxET, InsMinST), @@ -166,7 +152,6 @@ S = os:timestamp(), end, {[], [], 0}, maps:without([startTime, endTime], Slcts) ), -?TRACE, SelMaxET = lists:max(SelETs), SelMinST = lists:min(SelSTs), SelDur = timer:now_diff(SelMaxET, SelMinST), @@ -175,7 +160,6 @@ S = os:timestamp(), [Trial, SqlSelect, 'query', ts_str(SelMinST), ts_str(SelMaxET), round(SelDur / 1000000), SelDur * 1000] ), -?TRACE, DMs = timer:now_diff(ETs, STs), ok = io:format( RFd, RowFmt, @@ -224,7 +208,6 @@ run_trials(Trials) -> benchmark_driver := Driver } = Config = get(conf), - Ctx = case Driver of "oranif" -> ok = dpi:load_unsafe(), @@ -441,29 +424,9 @@ insert_partition( case Driver of "oranif" -> #{insertStmt := InsStmt, keyVar := KV, dataVar := DV} = Params, - {NIE, _} = lists:foldl( - fun({Key, Data}, {NIE, RowCount}) -> - NewNIE = if - NumItersExec == 0 orelse NIE < NumItersExec -> - ok = dpi:var_setFromBytes(KV, NIE, Key), - ok = dpi:var_setFromBytes(DV, NIE, Data), - NIE + 1; - true -> - ok = dpi:stmt_executeMany( - InsStmt, [], - if NumItersExec > 0 -> NumItersExec; true -> FBulkSz end - ), - ok = dpi:var_setFromBytes(KV, 0, Key), - ok = dpi:var_setFromBytes(DV, 0, Data), - 1 - end, - if NumItersCommit > 0 andalso RowCount rem NumItersCommit == 0 -> - ok = dpi:conn_commit(Conn); - true -> ok - end, - {NewNIE, RowCount + 1} - end, - {0, 0}, Rows + {NIE, _} = insert_oranif( + ets:match(bulk_data, {'$1', Partition, '$2'}, 1), Conn, InsStmt, KV, DV, + NumItersCommit, NumItersExec, FBulkSz, {0, 0} ), if NIE > 0 -> ok = dpi:stmt_executeMany(InsStmt, [], NIE); true -> ok end, @@ -474,6 +437,7 @@ insert_partition( ok = dpi:conn_close(Conn, [], <<>>); "jamdb" -> #{insertSql := InsSql} = Params, + Rows = ets:match(bulk_data, {'$1', Partition, '$2'}), if NumItersExec == 0 -> {ok, [{affected_rows, FBulkSz}]} = jamdb_oracle:sql_query( @@ -489,7 +453,8 @@ insert_partition( Master ! { result, self(), #{ - start => Start, 'end' => os:timestamp(), rows => length(Rows), + start => Start, 'end' => os:timestamp(), + rows => ets:select_count(bulk_data, [{{'_', Partition, '_'}, [], [true]}]), partition => Partition } }. @@ -549,44 +514,20 @@ select_partition( } }. -load_data( - S, Driver, FI, Fd, Header, BulkDelimiter, Partitions, Rows, BytesReadCount, - ReadPerCent -) -> +load_data(Driver, FI, Fd, Header, BulkDelimiter, Partitions) -> case file:read_line(Fd) of - eof -> - io:format("~n"), - Rows; + eof -> done; {ok, Line0} -> - BytesReadCount1 = BytesReadCount + byte_size(Line0), - ReadPerCent1 = (BytesReadCount1 / FI#file_info.size) * 100, - if round(ReadPerCent1) /= round(ReadPerCent) -> - io:format("~p% (~ps)...", [round(ReadPerCent1), timer:now_diff(os:timestamp(), S) div 1000000]); - true -> ok - end, case string:trim(Line0, both, "\r\n") of Header -> - load_data( - S, Driver, FI, Fd, Header, BulkDelimiter, Partitions, Rows, - BytesReadCount1, ReadPerCent1 - ); + load_data(Driver, FI, Fd, Header, BulkDelimiter, Partitions); Line -> [<> = Key, Data] = string:split( Line, BulkDelimiter, all ), Partition = (KeyByte1 * 256 + KeyByte2) rem Partitions, true = ets:insert(bulk_data, {Key, Partition, Data}), - %OldData = maps:get(Partition, Rows, []), - load_data( - S, Driver, FI, Fd, Header, BulkDelimiter, Partitions, Rows, - %Rows#{ - % Partition => case Driver of - % "oranif" -> [{Key, Data} | OldData]; - % "jamdb" -> [[Key, Data] | OldData] - % end - %}, - BytesReadCount1, ReadPerCent1 - ) + load_data(Driver, FI, Fd, Header, BulkDelimiter, Partitions) end end. @@ -656,3 +597,34 @@ insert_jamdb(Conn, Rows, InsSql, _NumItersExec, _NumItersCommit, _RowCount) {batch, InsSql, Rows} ), {ok,[]} = jamdb_oracle:sql_query(Conn, "COMMIT;"). + +insert_oranif( + '$end_of_table', _Conn, _InsStmt, _KV, _DV, _NumItersCommit, _NumItersExec, + _FBulkSz, {NIE, RowCount} +) -> {NIE, RowCount}; +insert_oranif( + {[[Key, Data]], Contd}, Conn, InsStmt, KV, DV, NumItersCommit, NumItersExec, + FBulkSz, {NIE, RowCount} +) -> + NewNIE = if + NumItersExec == 0 orelse NIE < NumItersExec -> + ok = dpi:var_setFromBytes(KV, NIE, Key), + ok = dpi:var_setFromBytes(DV, NIE, Data), + NIE + 1; + true -> + ok = dpi:stmt_executeMany( + InsStmt, [], + if NumItersExec > 0 -> NumItersExec; true -> FBulkSz end + ), + ok = dpi:var_setFromBytes(KV, 0, Key), + ok = dpi:var_setFromBytes(DV, 0, Data), + 1 + end, + if NumItersCommit > 0 andalso RowCount rem NumItersCommit == 0 -> + ok = dpi:conn_commit(Conn); + true -> ok + end, + insert_oranif( + ets:match(Contd), Conn, InsStmt, KV, DV, NumItersCommit, NumItersExec, + FBulkSz, {NewNIE, RowCount + 1} + ).