Name Language Elapsed User System Parallel CPU work ----------------------------------------------------------------------------- clv5 Gawk 46.73 40.63 6.1 1 tbray5 Erlang 01:04.32 35:33.35 00:45.84 33.88 wfinder1_1 Erlang 6.46 34.07 8.02 6.52 report-counts Ruby 01:43.71 01:27.11 00:16.60 1 ? Groovy 02:21.83 02:22.97 00:19.95 1.15 wf_p Ruby 50.16 37.58 12.5 1 wf-2 Python 41.04 34.8 6.24 1 wf-6(2) Python 16.91 3.62 1.86 0.32 wf-6(4) Python 9.08 3.66 1.89 0.61 wf-6(8) Python 5.81 * * * wf-6(16) Python 4.38 * * * wf OCaml 49.69 41.94 7.75 1 widefinder PHP 01:29.81 01:23.10 00:06.71 1 wf_pichi3 Erlang 8.28 51.98 9.38 7.41 tbray5 Erlang 00:20.74 03:51.33 00:08:00 34.3Nice, that erlang implementations can use cores well, but in this task is not so much good generally. Erlang manages parallel processes well, but those processes can be better written in other languages and used as ports. Especially when this task is string operations on big amount of data.
Friday, November 2, 2007
How much cores are using WF solutions
Tim Bray published WF XI: Results and I would like to know how much of all these CPU cores each solution uses. Than I compute this table:
Sunday, October 28, 2007
Faster than ruby but scalable
Update 2007-11-01: Correction of typo in wf_pichi3.erl.
@@ -14,7 +14,7 @@ -compile([native]). -main([File]) -> start(File), halt(). +main([File]) -> start_bmets(File), halt(). start_bmets(FileName) -> {ok, F} = nlt_reader:open(FileName),I worked on Wide Finder Project again. But what happen? I improved Anders Nygren's code with suggestion from my previous blog and also big suggestion from Caoyuan's blog. First I made some bricks: chunk_reder.erl with read ahead reading and consequential support, nlt_reader.erl with concurrent new line terminated block splitter and catenator and file_map_reduce.erl engine. And I plugged it together in wf_pichi3.erl wide finder. And what is great? It's about 40% faster on single core than ruby code and still scalable:
$ time ruby1.8 tbray.rb o1M.ap 8900: 2006/09/29/Dynamic-IDE 2000: 2006/07/28/Open-Data 1300: 2003/07/25/NotGaming 800: 2006/01/31/Data-Protection 800: 2003/09/18/NXML 800: 2003/10/16/Debbie 700: 2003/06/23/SamsPie 600: 2006/01/08/No-New-XML-Languages 600: 2005/11/03/Cars-and-Office-Suites 600: 2005/07/27/Atomic-RSS real 0m7.469s user 0m6.528s sys 0m0.940s $ time erl -noshell -run wf_pichi3 main o1M.ap 8900: 2006/09/29/Dynamic-IDE 2000: 2006/07/28/Open-Data 1300: 2003/07/25/NotGaming 800: 2003/09/18/NXML 800: 2003/10/16/Debbie 800: 2006/01/31/Data-Protection 700: 2003/06/23/SamsPie 600: 2006/01/08/No-New-XML-Languages 600: 2006/09/07/JRuby-guys 600: 2005/07/27/Atomic-RSS real 0m5.370s user 0m4.412s sys 0m0.952sIt's big improvement from my last code, about 365% ;-) Good thing, that it's nice jigsaw and very powerful. I think, it is just what Tim Bray want when started Wide Finder Project.
Saturday, October 27, 2007
Scalable splitting is possible
In my previous post I thought that reading and splitting are unscalable processes. It's not true. Reading is scalable, but on current HW it is not useful, because sequential reading is more than twenty times faster than random from disks. But what about splitting and concatenating read chunk by new line. Splitting I can do in parallel and what concatenating? Yes, I can if can keep sequential information and send parts to correct process. Than I wrote scatter-getter algorithm with splitter and concatenator. I was also changed fold-reduce to map-reduce.
Code is separated to three modules. Main module is
tbray_pichi1
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Yet another Erlang solution to Tim Bray's Wide Finder project %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Author: Hynek (Pichi) Vychodil (http://pichis_blog.blogspot.com/), 23 October 2007. -module(tbray_pichi1). -export([main/1, start/1, start/2, start/3]). start(FileName, ChunkSize) -> start(FileName, ChunkSize, erlang:system_info(schedulers) * 8). start(FileName) -> start(FileName, 1024 * 32). main([FileName, ChunkSize, N]) -> start(FileName, list_to_integer(ChunkSize), list_to_integer(N)), halt(); main([FileName, ChunkSize]) -> start(FileName, list_to_integer(ChunkSize)), halt(); main([FileName]) -> start(FileName), halt(). start(FileName, ChunkSize, N) -> Start = now(), Result = nlt_map_reduce:map_reduce( FileName, _Mapper = fun (Chunk) -> scan(binary_to_list(Chunk), dict:new()) end, _Reducer = fun (A, B) -> dict:merge(fun (_, V1, V2) -> V1 + V2 end, A, B) end, ChunkSize, N), Delta = timer:now_diff(now(), Start) / 1000, print_result(Result), if Delta > 1000 -> io:format("Time: ~.3f s~n", [Delta / 1000]); true -> io:format("Time: ~.3f ms~n", [Delta]) end, ok. print_result(Dict) -> [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10 | _] = lists:reverse(lists:keysort(2, dict:to_list(Dict))), lists:foreach(fun ({Word, Count}) -> io:format("~p get requests for ~s~n", [Count, Word]) end, [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10]). scan("GET /ongoing/When/" ++ [_, _, _, $x, $/, Y1, Y2, Y3, Y4, $/, M1, M2, $/, D1, D2, $/ | Rest], Dict) -> case scan_key(Rest) of {[_ | _] = Key, NewRest} -> scan(NewRest, dict:update_counter( [Y1, Y2, Y3, Y4, $/, M1, M2, $/, D1, D2, $/ | Key], 1, Dict)); {[], NewRest} -> scan(NewRest, Dict) end; scan([_ | Rest], Dict) -> scan(Rest, Dict); scan([], Dict) -> Dict. scan_key(L) -> scan_key(L, []). scan_key([$\s | Rest], Key) -> {lists:reverse(Key), Rest}; scan_key([$\n | Rest], _) -> {[], Rest}; scan_key([$. | Rest], _) -> {[], Rest}; scan_key([C | Rest], Key) -> scan_key(Rest, [C | Key]); scan_key([], _) -> {[], []}.Second new line terminated chunks map reducer (
nlt_map_reduce
)%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % MapReduce for new line terminated blocks of file %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Author: Hynek (Pichi) Vychodil (http://pichis_blog.blogspot.com/), 22 October 2007. -module(nlt_map_reduce). %-compile([native, {hipe, [o3]}]). -import(chunk_reader). -export([map_reduce/5]). % mapper reducer process info -record(map_reduce, {mapper, reducer, result = '', data, receiver}). % process spawner loop info -record(context, {mr, cr, proc = [], n = 0, max = 1, maxR = 1}). map_reduce(FileName, Mapper, Reducer, ChunkSize, N) -> {ok, ChunkReader} = chunk_reader:open(FileName, ChunkSize), Result = start(#map_reduce{mapper = Mapper, reducer = fun ('', A) -> A; (A, '') -> A; (A, B) -> Reducer(A, B) end, receiver = self()}, ChunkReader, N), ok = chunk_reader:close(ChunkReader), Result. start(MR, ChunkReader, N) -> spawn_proc(#context{mr = MR, cr = ChunkReader, max = N}, chunk_reader:read(ChunkReader), you_first). loop(#context{cr = ChunkReader, proc = [{_, Last} | _], n = N, max = Max} = C) when N < Max -> receive map_done -> loop(C#context{n = N - 1}) after 0 -> spawn_proc(C, chunk_reader:read(ChunkReader), Last) end; loop(#context{n = N} = C) -> receive map_done -> loop(C#context{n = N - 1}) end. spawn_proc(#context{mr = MR, n = N, proc = Proc, maxR = R} = C, {ok, Chunk}, Last) -> loop(C#context{ proc = send_result_requests( [{0, spawn_opt(fun () -> split(MR#map_reduce{data = Chunk}, Last) end, [link])} | Proc]), n = N + 1, maxR = if R > N + 1 -> R; true -> N + 1 end}); spawn_proc(_, eof, you_first) -> ''; % empty file spawn_proc(#context{proc = Proc}, eof, Last) -> % finalise Last ! you_last, send_final_result_requests(Proc), wait_for_result(). wait_for_result() -> receive {result, Result} -> Result; _ -> wait_for_result() % clear pending messages if any end. send_result_requests([{L1, P1}, {L1, P2} | T]) -> P2 ! {send_result_to, P1, L1}, send_result_requests([{L1 + 1, P1} | T]); send_result_requests(T) -> T. send_final_result_requests(T) -> [{L1, P} | R] = lists:reverse(T), L = send_final_result_requests_i(L1, P, R), P ! {send_result_to, self(), L}. send_final_result_requests_i(L, P, [{L1, P1} | T]) -> P1 ! {send_result_to, P, L1}, send_final_result_requests_i(L + 1, P, T); send_final_result_requests_i(L, _, []) -> L. % mapper reducer process states split(#map_reduce{data = Data} = MR, you_first) -> map_it(MR#map_reduce{data = join_next(Data)}); split(#map_reduce{data = Data} = MR, PrevPid) -> case split_on_nl(Data) of {_, none} -> % do nothing yourself, send it PrevPid ! {your_next_part, join_next(Data)}, map_done(MR#map_reduce{data = done}); {Line, Rest} -> PrevPid ! {your_next_part, Line}, map_it(MR#map_reduce{data = join_next(Rest)}) end. join_next(Data) -> receive {your_next_part, Next} -> <<Data/binary, Next/binary>>; you_last -> Data end. map_it(#map_reduce{mapper = Mapper, data = Data} = MR) -> map_done(MR#map_reduce{data = done, result = Mapper(Data)}). map_done(#map_reduce{receiver = Master} = MR) -> Master ! map_done, % notice master you done map reduce_and_wait(MR, 0). reduce_and_wait(#map_reduce{result = Acc, reducer = Reducer} = MR, N) -> receive {send_result_to, Receiver, WaitForN} -> reduce(MR#map_reduce{receiver = Receiver}, N, WaitForN); {result, Result} -> reduce_and_wait(MR#map_reduce{result = Reducer(Acc, Result)}, N + 1) end. reduce(#map_reduce{result = Acc, reducer = Reducer} = MR, N, WaitForN) when N < WaitForN -> receive {result, Result} -> reduce(MR#map_reduce{result = Reducer(Acc, Result)}, N + 1, WaitForN) end; reduce(#map_reduce{receiver = Receiver, result = Result}, _, _) -> Receiver ! {result, Result}. % We are finished %splitter split_on_nl(B) -> split_on_nl(B, 0, size(B)). split_on_nl(B, N, S) when N < S -> case B of <<Line:N/binary, $\n, Tail/binary>> -> {Line, Tail}; _ -> split_on_nl(B, N + 1, S) end; split_on_nl(B, _, _) -> {B, none}.And last is read ahead chunk reader
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Chunk reader process with read ahead %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Author: Hynek (Pichi) Vychodil (http://pichis_blog.blogspot.com/), 22 October 2007. -module(chunk_reader). -export([close/1, open/2, read/1]). -export([test_read/2]). % how many second have to wait for response -define(TIMEOUT, 60). % open(FileName, ChunkSize) -> {ok, ChunkReader} open(FileName, ChunkSize) -> M = self(), {ok, {chunk_reader, spawn_opt(fun () -> {ok, File} = file:open(FileName, [read, raw, binary]), process_flag(trap_exit, true), loop(M, File, file:read(File, ChunkSize), ChunkSize) end, [link, {priority, high}])}}. % close(ChunkReader) -> ok | {error, invalid} close({chunk_reader, Pid}) when is_pid(Pid) -> case is_process_alive(Pid) of true -> Pid ! close, ok; false -> {error, invalid} end. % read(ChunkReader) -> eof | {ok, Data} | {error, invalid | closed} read({chunk_reader, Pid}) when is_pid(Pid) -> case is_process_alive(Pid) of true -> Pid ! {read, self()}, wait_response(Pid, 0); false -> {error, invalid} end. wait_response(Pid, N) when N < (?TIMEOUT) -> receive {ok, _} = Msg -> Msg; eof -> eof after 1000 -> % take it long? case is_process_alive(Pid) of true -> wait_response(Pid, N + 1); false -> {error, closed} end end; wait_response(_, _) -> {error, timeout}. loop(Master, File, Chunk, ChunkSize) -> receive {read, From} -> From ! Chunk, case Chunk of {ok, _} -> loop(Master, File, file:read(File, ChunkSize), ChunkSize); eof -> file:close(File), eof_loop(Master, From) end; close -> file:close(File); {'EXIT', Master, _} -> file:close(File); _ -> loop(Master, File, Chunk, ChunkSize) % ignore unknow end. eof_loop(Master) -> % wait for eof request receive {read, From} -> From ! eof, eof_loop(Master); close -> ok; {'EXIT', Master, _} -> ok; _ -> eof_loop(Master) end. % speed testing function % test_read(FileName, ChunkSize) -> ok | {error, invalid} test_read(FileName, ChunkSize) -> {ok, File} = open(FileName, ChunkSize), eof = test_read_loop(File, read(File)), close(File). test_read_loop(File, {ok, _}) -> test_read_loop(File, read(File)); test_read_loop(_, eof) -> eof.But
nlt_map_reduce
code is too complicated, bad readable and what is the worst, 20% slower on single core. All this indicate, that there is some problem and I think it is dictionary sending between processes. Dictionary is copied every chunk and it cost to much. Then I want rewrite it to more fancy code and fall back to fold-reduce concept, because this concept send less dictionaries.
Sunday, October 21, 2007
Wide Finder Project - fold&reduce
I have made new version of Wide Finder Project. I was inspired by last Caoyuan's last work. I thought about i/o operation too, but I think, parallelisation of file reading is not good idea. Instead of this I tried split file reading and new line finding in two independent processes. One process which read and one which searching for new line. It's looks expensive, send big messages, but I send binaries and binaries less than 64 bytes are not copied, but only pointers passed.
I also look for new line from head, because I think binary splitting is faster when first part is smaller than second. Second part can be keep on its place and only pointer is moved and smaller first part is copied to new position. But when glue second part from previous read chunk with first part of current, I must copy bigger part and this is expensive. It looks like same as Caoyuan do, but I don't do it in splitter, but in worker. Both parts I send as binary apart. It's cheap.
Why all this? Make minimal work in one process. One process only reads as fast as possible. One process splitting by new line and don't gluing and all other work I can do in parallel.
But when one splitter calls reader for new chunk, it must not wait for reader until it read next chunk. Better if reader have next chunk prepared. And splitter dtto. Splitter must have chunk split prepared before any worker calls for new parts. Then I made read ahead file reader and chunk splitter.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Chunk reader process with read ahead %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% file_open(FileName, ChunkSize, chunk) -> % raw chunks M = self(), {ok, {chunk_reader, spawn_link(fun() -> {ok, File} = file:open(FileName, [read, raw, binary]), process_flag(trap_exit, true), process_flag(priority, high), file_loop(M, File, file:read(File, ChunkSize), ChunkSize) end)}}; file_open(FileName, ChunkSize, nlt_chunk) -> % new line terminated chunks M = self(), {ok, {nlt_chunk_reader, spawn_link(fun() -> {ok, CR} = file_open(FileName, ChunkSize, chunk), process_flag(trap_exit, true), process_flag(priority, high), {ok, First_Read} = file_read(CR), cr_loop( M, CR, cr_read_n_split(CR, First_Read, file_read(CR))) end)}}. file_read({Type, Pid}) when Type == chunk_reader; Type == nlt_chunk_reader -> case is_process_alive(Pid) of true -> Pid ! {read, self()}, receive {ok, B} -> {ok, B}; eof -> eof after 60000 -> timeout % Possible race condition with is_process_alive end; false -> error end. file_close({Type, Pid}) when Type == chunk_reader; Type == nlt_chunk_reader -> case is_process_alive(Pid) of true -> Pid ! close, ok; false -> error end. file_loop(Master, File, Chunk, ChunkSize) -> receive {read, From} -> From ! Chunk, case Chunk of {ok, _} -> file_loop(Master, File, file:read(File, ChunkSize), ChunkSize); eof -> file:close(File), file_eof_loop(Master) end; close -> file:close(File); {'EXIT', Master, _} -> file:close(File); _ -> file_loop(Master, File, Chunk, ChunkSize) % ignore unknow end. file_eof_loop(Master) -> % wait for eof request receive {read, From} -> From ! eof, file_eof_loop(Master); close -> ok; {'EXIT', Master, _} -> ok; _ -> file_eof_loop(Master) end. cr_loop(Master, CR, {Prev, Line, Next}) -> receive {read, From} -> From ! {ok, {Prev, Line}}, case Next of _ when is_binary(Next) -> cr_loop(Master, CR, cr_read_n_split(CR, Next, file_read(CR))); eof -> file_close(CR), file_eof_loop(Master) end; close -> file_close(CR); {'EXIT', Master, _} -> file_close(CR); _ -> cr_loop(Master, CR, {Prev, Line, Next}) % ignore unknow end. cr_read_n_split(CR, Prev, {ok, B}) -> case split_on_nl(B) of {Line, Rest} when is_binary(Rest) -> % nonempty remaining part { Prev, Line, Rest }; {Line, none} -> % new line not found, read again, should be very rare cr_read_n_split(CR, <<Prev/binary, Line/binary>>, file_read(CR)) end; cr_read_n_split(_CR, Prev, eof) -> {<<>>, Prev, eof}. % easier joining at this order split_on_nl(B) -> split_on_nl(B, 0, size(B)). split_on_nl(B, N, S) when N < S -> case B of <<Line:N/binary, $\n, Tail/binary>> -> {Line, Tail}; _ -> split_on_nl(B, N+1, S) end; split_on_nl(B, _, _) -> {B, none}. % speed testing functions file_test_read(FileName, ChunkSize, Type) -> {ok, File} = file_open(FileName, ChunkSize, Type), eof = file_test_read_loop(File, file_read(File)), file_close(File). file_test_read_loop(File, {ok, _}) -> file_test_read_loop(File, file_read(File)); file_test_read_loop(_, eof) -> eof.When I have this file like devices, I thought about Tim Bray's request more readable and cleaner code. So what I want to do? Some like map_reduce but not exactly map_reduce. It looks like fold_reduce. I want fold over each chunk aka scan for some pattern and than I want collect all results and I want do it in parallel. Then I made fold_reduce operator over new line terminated chunk read from file, just fold_reduce_file.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Proof of concept of fold&reduce % on file by new line terminated chunks %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -record(context, {acc, chunkNum, processedNum = 0, reducer}). fold_reduce_file(FileName, Acc0, Folderer, Reducer, ChunkSize, N) -> {ok, CR} = file_open(FileName, ChunkSize, nlt_chunk), M = self(), do_n(fun() -> spawn_link(fun()-> folderer(CR, Acc0, Folderer, M) end) end, 0, N), Result = collect_loop(#context{ acc=Acc0, chunkNum=N, reducer=Reducer}), file_close(CR), Result. do_n(What, Start, Stop) when Start < Stop -> What(), do_n(What, Start+1, Stop); do_n(_, _, _) -> ok. folderer(CR, Acc0, Folderer, Collector) -> case file_read(CR) of {ok, {A, B}} -> folderer(CR, Folderer( Acc0, binary_to_list(A) ++ binary_to_list(B) ), Folderer, Collector); eof -> Collector ! {result, Acc0} end. collect_loop(#context{acc=Acc0, chunkNum=ChunkNum, processedNum=ProcessedNum, reducer=Reducer}=Context) -> case ProcessedNum of ChunkNum -> Acc0; _ -> receive {result, Result} -> collect_loop(Context#context{ acc = Reducer(Acc0, Result), processedNum = ProcessedNum+1}) end end.It looks complicated but, its only infrastructure. Now I have tool to make Tim Bray's exercise easy, but not only this one, but any similar task. Tim Bray's exercise implementation with this tool is here.
start(FileName) -> start(FileName, 1024*32, 1). start(FileName, N) -> start(FileName, 1024*32, N). start(FileName, ChunkSize, N) -> Start = now(), Result = fold_reduce_file( FileName, _Acc0 = dict:new(), _Folderer = fun(Acc, Chunk) -> scan(Chunk, Acc) end, _Reducer = fun(Acc, Result) -> dict:merge( fun(_,V1,V2) -> V1+V2 end, Acc, Result ) end, ChunkSize, N ), Delta = timer:now_diff(now(), Start) / 1000, print_result(Result), if Delta > 1000 -> io:format("Time: ~.3f s~n", [Delta/1000]); true -> io:format("Time: ~.3f ms~n", [Delta]) end, ok. print_result(Dict) -> [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10 | _] = lists:reverse(lists:keysort(2, dict:to_list(Dict))), lists:foreach(fun ({Word, Count}) -> io:format("~p get requests for ~s~n", [Count, Word]) end, [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10]). scan("GET /ongoing/When/" ++ [_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest], Dict) -> case scan_key(Rest) of {[_|_] = Key, NewRest} -> scan(NewRest, dict:update_counter([Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Key], 1, Dict)); {[], NewRest} -> scan(NewRest, Dict) end; scan([_|Rest], Dict) -> scan(Rest, Dict); scan([], Dict) -> Dict. scan_key(L) -> scan_key(L, []). scan_key([$ |Rest], Key) -> {lists:reverse(Key), Rest}; scan_key([$\n|Rest], _) -> {[], Rest}; scan_key([$.|Rest], _) -> {[], Rest}; scan_key([C|Rest], Key) -> scan_key(Rest, [C|Key]); scan_key([], _) -> {[],[]}.Good new is, this is faster than my tbray2 and also Caoyuan's tbray4 on single core. But I can't test it on multi core now. All source code for testing is bellow. When N is number of processor threads, interesting will be test N-1, N , 2*N-1 or 2*N folderer processes.
-module(tbray6). %-compile([debug_info, native, {hipe, [o3]}]). -export([start/1, start/2, start/3]). -export([file_open/3, file_read/1, file_close/1, file_test_read/3]). -export([fold_reduce_file/6]). start(FileName) -> start(FileName, 1024*32, 1). start(FileName, N) -> start(FileName, 1024*32, N). start(FileName, ChunkSize, N) -> Start = now(), Result = fold_reduce_file( FileName, _Acc0 = dict:new(), _Folderer = fun(Acc, Chunk) -> scan(Chunk, Acc) end, _Reducer = fun(Acc, Result) -> dict:merge( fun(_,V1,V2) -> V1+V2 end, Acc, Result ) end, ChunkSize, N ), Delta = timer:now_diff(now(), Start) / 1000, print_result(Result), if Delta > 1000 -> io:format("Time: ~.3f s~n", [Delta/1000]); true -> io:format("Time: ~.3f ms~n", [Delta]) end, ok. print_result(Dict) -> [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10 | _] = lists:reverse(lists:keysort(2, dict:to_list(Dict))), lists:foreach(fun ({Word, Count}) -> io:format("~p get requests for ~s~n", [Count, Word]) end, [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10]). scan("GET /ongoing/When/" ++ [_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest], Dict) -> case scan_key(Rest) of {[_|_] = Key, NewRest} -> scan(NewRest, dict:update_counter([Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Key], 1, Dict)); {[], NewRest} -> scan(NewRest, Dict) end; scan([_|Rest], Dict) -> scan(Rest, Dict); scan([], Dict) -> Dict. scan_key(L) -> scan_key(L, []). scan_key([$ |Rest], Key) -> {lists:reverse(Key), Rest}; scan_key([$\n|Rest], _) -> {[], Rest}; scan_key([$.|Rest], _) -> {[], Rest}; scan_key([C|Rest], Key) -> scan_key(Rest, [C|Key]); scan_key([], _) -> {[],[]}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Proof of concept of fold&reduce % on file by new line terminated chunks %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -record(context, {acc, chunkNum, processedNum = 0, reducer}). fold_reduce_file(FileName, Acc0, Folderer, Reducer, ChunkSize, N) -> {ok, CR} = file_open(FileName, ChunkSize, nlt_chunk), M = self(), do_n(fun() -> spawn_link(fun()-> folderer(CR, Acc0, Folderer, M) end) end, 0, N), Result = collect_loop(#context{ acc=Acc0, chunkNum=N, reducer=Reducer}), file_close(CR), Result. do_n(What, Start, Stop) when Start < Stop -> What(), do_n(What, Start+1, Stop); do_n(_, _, _) -> ok. folderer(CR, Acc0, Folderer, Collector) -> case file_read(CR) of {ok, {A, B}} -> folderer(CR, Folderer( Acc0, binary_to_list(A) ++ binary_to_list(B) ), Folderer, Collector); eof -> Collector ! {result, Acc0} end. collect_loop(#context{acc=Acc0, chunkNum=ChunkNum, processedNum=ProcessedNum, reducer=Reducer}=Context) -> case ProcessedNum of ChunkNum -> Acc0; _ -> receive {result, Result} -> collect_loop(Context#context{ acc = Reducer(Acc0, Result), processedNum = ProcessedNum+1}) end end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Chunk reader process with read ahead %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% file_open(FileName, ChunkSize, chunk) -> % raw chunks M = self(), {ok, {chunk_reader, spawn_link(fun() -> {ok, File} = file:open(FileName, [read, raw, binary]), process_flag(trap_exit, true), process_flag(priority, high), file_loop(M, File, file:read(File, ChunkSize), ChunkSize) end)}}; file_open(FileName, ChunkSize, nlt_chunk) -> % new line terminated chunks M = self(), {ok, {nlt_chunk_reader, spawn_link(fun() -> {ok, CR} = file_open(FileName, ChunkSize, chunk), process_flag(trap_exit, true), process_flag(priority, high), {ok, First_Read} = file_read(CR), cr_loop( M, CR, cr_read_n_split(CR, First_Read, file_read(CR))) end)}}. file_read({Type, Pid}) when Type == chunk_reader; Type == nlt_chunk_reader -> case is_process_alive(Pid) of true -> Pid ! {read, self()}, receive {ok, B} -> {ok, B}; eof -> eof after 60000 -> timeout % Possible race condition with is_process_alive end; false -> error end. file_close({Type, Pid}) when Type == chunk_reader; Type == nlt_chunk_reader -> case is_process_alive(Pid) of true -> Pid ! close, ok; false -> error end. file_loop(Master, File, Chunk, ChunkSize) -> receive {read, From} -> From ! Chunk, case Chunk of {ok, _} -> file_loop(Master, File, file:read(File, ChunkSize), ChunkSize); eof -> file:close(File), file_eof_loop(Master) end; close -> file:close(File); {'EXIT', Master, _} -> file:close(File); _ -> file_loop(Master, File, Chunk, ChunkSize) % ignore unknow end. file_eof_loop(Master) -> % wait for eof request receive {read, From} -> From ! eof, file_eof_loop(Master); close -> ok; {'EXIT', Master, _} -> ok; _ -> file_eof_loop(Master) end. cr_loop(Master, CR, {Prev, Line, Next}) -> receive {read, From} -> From ! {ok, {Prev, Line}}, case Next of _ when is_binary(Next) -> cr_loop(Master, CR, cr_read_n_split(CR, Next, file_read(CR))); eof -> file_close(CR), file_eof_loop(Master) end; close -> file_close(CR); {'EXIT', Master, _} -> file_close(CR); _ -> cr_loop(Master, CR, {Prev, Line, Next}) % ignore unknow end. cr_read_n_split(CR, Prev, {ok, B}) -> case split_on_nl(B) of {Line, Rest} when is_binary(Rest) -> % nonempty remaining part { Prev, Line, Rest }; {Line, none} -> % new line not found, read again, should be very rare cr_read_n_split(CR, <<Prev/binary, Line/binary>>, file_read(CR)) end; cr_read_n_split(_CR, Prev, eof) -> {<<>>, Prev, eof}. % easier joining at this order split_on_nl(B) -> split_on_nl(B, 0, size(B)). split_on_nl(B, N, S) when N < S -> case B of <<Line:N/binary, $\n, Tail/binary>> -> {Line, Tail}; _ -> split_on_nl(B, N+1, S) end; split_on_nl(B, _, _) -> {B, none}. % speed testing functions file_test_read(FileName, ChunkSize, Type) -> {ok, File} = file_open(FileName, ChunkSize, Type), eof = file_test_read_loop(File, file_read(File)), file_close(File). file_test_read_loop(File, {ok, _}) -> file_test_read_loop(File, file_read(File)); file_test_read_loop(_, eof) -> eof.
Sunday, October 7, 2007
Is bfile faster than old erlang file?
Steve Vinoski is using klacke’s
bfile
module in his Wide Finder Project work, but I don't know why bfile
should be faster than erlang OTP file
. Well, then I measured. I tried Steve's read test and my test on my old home desktop (model name : AMD Athlon(tm) processor, stepping : 2, cpu MHz : 1199.805, cache size : 256 KB).-module(readold). -export([start/1, start/2]). -compile([native]). scan_file(F, Readsize, Total) -> Rd = file:read(F, Readsize), case Rd of {ok, Bin} -> scan_file(F, Readsize, size(Bin)+Total); eof -> Total end. scan_file(F, Readsize) -> scan_file(F, Readsize, 0). start(File, Readsize) -> {ok, F} = file:open(File, [raw, binary, read]), T = scan_file(F, Readsize), io:format("read ~p bytes~n", [T]), file:close(F). start(File) -> start(File, 512*1024).And there are results here:
2> timer:tc(readold,start,["o1M.ap"]). read 200995500 bytes {1041306,ok} 3> timer:tc(readold,start,["o1M.ap"]). read 200995500 bytes {836876,ok} 4> c(readold). {ok,readold} 5> timer:tc(readold,start,["o1M.ap"]). read 200995500 bytes {837501,ok} 6> timer:tc(read,start,["o1M.ap"]). read 200995500 bytes {1353678,true} 7> timer:tc(read,start,["o1M.ap"]). read 200995500 bytes {1237174,true} 8> timer:tc(read,start,["o1M.ap"]). read 200995500 bytes {1318029,true} 9> timer:tc(readold,start,["o1M.ap"]). read 200995500 bytes {856662,ok}In generally, I don't know why erlang's
file
should be slower. I don't know why bfile
is 45% slower than file
on my old home desktop, but why should be faster anywhere? I tested it on Linux, may be bfile
using BSD file implementation is faster on Darwin aka BSD clone? The file
implementation is fast enough on my Erlang/OTP R11B-5.
erlang-base-hipe.deb don't contain native compiled modules
When I measured Caoyuan and my Wide Finder Project solution I found my binary solution faster than list, but Caoyuan measured reverse result. Then I looked why. Caoyuan is using MacOS X and I'm using Debian. I don't know if MacOS port of erlang has base modules native compiled (with HiPE) but I looked on my debian's:
5> proplists:get_value(compile, lists:module_info()). [{options,[{inline,[{merge3_12,7}, {merge3_21,7}, {rmerge3_12,7}, {rmerge3_21,7}]}, {inline,[{umerge3_12,8}, {umerge3_21,8}, {rumerge3_12a,7}, {rumerge3_12b,8}]}, {inline,[{keymerge3_12,12}, {keymerge3_21,12}, {rkeymerge3_12,12}, {rkeymerge3_21,12}]}, {inline,[{ukeymerge3_12,13}, {ukeymerge3_21,13}, {rukeymerge3_12a,11}, {rukeymerge3_21a,13}, {rukeymerge3_12b,12}, {rukeymerge3_21b,12}]}, {cwd,"/tmp/buildd/erlang-11.b.5dfsg/lib/stdlib/src"}, {outdir,"/tmp/buildd/erlang-11.b.5dfsg/lib/stdlib/src/../ebin"}, {i,"/tmp/buildd/erlang-11.b.5dfsg/lib/stdlib/src/../include"}, {i,"/tmp/buildd/erlang-11.b.5dfsg/lib/stdlib/src/../../kernel/include"}, warn_obsolete_guard, debug_info, {inline,[{merge3_12,7}, {merge3_21,7}, {rmerge3_12,7}, {rmerge3_21,7}]}, {inline,[{umerge3_12,8}, {umerge3_21,8}, {rumerge3_12a,7}, {rumerge3_12b,8}]}, {inline,[{keymerge3_12,12}, {keymerge3_21,12}, {rkeymerge3_12,12}, {rkeymerge3_21,12}]}, {inline,[{ukeymerge3_12,13}, {ukeymerge3_21,13}, {rukeymerge3_12a,11}, {rukeymerge3_21a,13}, {rukeymerge3_12b,12}, {rukeymerge3_21b,12}]}]}, {version,"4.4.5"}, {time,{2007,9,28,11,10,32}}, {source,"/tmp/buildd/erlang-11.b.5dfsg/lib/stdlib/src/lists.erl"}]There isn't native option. If MacOS port is native compiled there can be this unexpected difference. I have tried to create my own
erlang-base-hipe
package from source one, but I don't know how to put the option on into the making process. I have tried: debian/rules configure-hipe
but it didn't work so I searched where is native option used.
$ grep -r +native . ./lib/asn1/src/Makefile:ERL_COMPILE_FLAGS += +native ./lib/megaco/src/app/megaco.mk:ERL_COMPILE_FLAGS += +native ./lib/megaco/test/Makefile:ERL_COMPILE_FLAGS += +native -Dmegaco_hipe_special=true ./lib/megaco/examples/meas/Makefile:ERL_COMPILE_FLAGS += +native ./README: erlc +native Module.erlThen I tried
export ERL_COMPILE_FLAGS=+nativeIt didn't affect binary-erlang-base and only generated warning message, but during binary-erlang-base-hipe error occured.
erlc -W +native +debug_info +debug_info +debug_info +warn_obsolete_guard -I/home/hynek/work/erlang-11.b.5dfsg/lib/stdlib/include -o../ebin yecc.erl ./yecc.erl:none: internal error in native_compile; crash reason: {undef,[{hipe,compile, [yecc, [], <<70,79,82,49,0,1,91,124,66,69,65,77,65,116,111, ... 97,252,150,236,255,7,193,199,127,8,0,0>>, []]}, {compile,native_compile_1,1}, {compile,'-internal_comp/4-anonymous-1-',2}, {compile,fold_comp,3}, {compile,internal_comp,4}, {compile,internal,3}]} make[4]: Leaving directory `/home/hynek/work/erlang-11.b.5dfsg/lib/parsetools/src' make[3]: Leaving directory `/home/hynek/work/erlang- 11.b.5dfsg/lib/parsetools' make[2]: Leaving directory `/home/hynek/work/erlang-11.b.5dfsg/lib' make[1]: Leaving directory `/home/hynek/work/erlang-11.b.5dfsg'Then I tested if yecc can be compiled native and it can be.
3> c("/usr/lib/erlang/lib/parsetools-1.4.1.1/src/yecc.erl", [native, {i, "/usr/lib/erlang/lib/stdlib-1.14.5/include/"}, {outdir, "."}]). {ok,yecc} 4> proplists:get_value(compile, yecc:module_info()). [{options,[{inline,[{compute_closure,3}]}, {nowarn_unused_function,{function_name,2}}, {inline,[{set_empty,0}]}, {inline,[{set_member,2}]}, {inline,[{set_delete,2}]}, {inline,[{set_union,2}]}, {inline,[{set_is_subset,2}]}, {inline,[{is_terminal,2}]}, native, {i,"/usr/lib/erlang/lib/stdlib-1.14.5/include/"}, {outdir,"."}, {inline,[{compute_closure,3}]}, {nowarn_unused_function,{function_name,2}}, {inline,[{set_empty,0}]}, {inline,[{set_member,2}]}, {inline,[{set_delete,2}]}, {inline,[{set_union,2}]}, {inline,[{set_is_subset,2}]}, {inline,[{is_terminal,2}]}]}, {version,"4.4.5"}, {time,{2007,10,7,17,53,55}}, {source,"/usr/lib/erlang/lib/parsetools-1.4.1.1/src/yecc.erl"}]
yecc
(and almost all other modules) can be native compiled, but I don't know how to do it. I'm totally messed up by debian packaging system and don't know what dpkg-buildpackage does. It's difficult to do it with installed version because many packages need some special compiling options at least {i,"/usr/lib/erlang/lib/stdlib-1.14.5/include/"}
for included .hrl
files and so.
I will be glad if anyone give me some advice how to make package with native compiled modules or how to recompile only modules from source package to make some workaround.
Saturday, October 6, 2007
Binaries really faster than lists
Caoyuan did his second round of Wide Finder Project using lists, but expected lists traversing is faster than binary traversing. But he made little mistake. He assumed, that
timer:tc(tbray, travel_list, [binary_to_list(Bin)])
including binary_to_list(Bin)
time, but it is not true. binary_to_list(Bin)
is done before timer:tc
call and result passed as parameter. But there are more performance problems in his solution. He often use lists:reverse
and ++
which twice reverse first parameter too. Bigger memory usage of list and often reversing must cause bad performance. When I tried write same algorithm using binary I take 3 times speed up.
-module(tbray2).
-compile([native, {hipe,[o3]}]).
-export([start/1, start/2]).
-record(context, {main,
dict,
chunkNum,
processedNum = 0}).
start(FileName) -> start(FileName, 1024*32).
start(FileName, ChunkSize) ->
Start = now(),
Main = self(),
Collector = spawn_link(fun () -> collect_loop(#context{main = Main,
dict = dict:new(),
processedNum = 0}) end),
ChunkNum = foreach_chunk(
fun(Chunk) ->
spawn_link(fun() -> Collector ! scan_lines(Chunk) end)
end,
FileName,
ChunkSize
),
Collector ! {chunkNum, ChunkNum},
%% don't terminate, wait here, until all tasks done.
receive
stop -> io:format("Time: ~p ms~n", [timer:now_diff(now(), Start) / 1000])
end.
foreach_chunk(Fun, FileName, SizeOfChunk) ->
{ok, File} = file:open(FileName, [raw, binary]),
N = foreach_chunk(Fun, File, <<>>, SizeOfChunk, 0),
file:close(File),
N.
foreach_chunk(Fun, File, PrevRest, SizeOfChunk, N) ->
{Chunk, Rest} = read_chunk(File, PrevRest, SizeOfChunk),
Fun(Chunk),
case Rest of
<<>> -> N+1;
_ -> foreach_chunk(Fun, File, Rest, SizeOfChunk, N+1)
end.
read_chunk(File, PrevRest, N) ->
case file:read(File, N) of
{ok, B} ->
{Line, Rest} = split_on_nl(B),
Chunk = <<PrevRest/binary, Line/binary>>,
case Rest of
<<>> ->
read_chunk(File, Chunk, N);
_ ->
{Chunk, Rest}
end;
eof ->
{PrevRest, <<>>}
end.
split_on_nl(B) -> split_on_nl(B, 0, size(B)).
split_on_nl(B, N, S) when N < S -> case B of
<<Line:N/binary, $\n, Tail/binary>> -> {Line, Tail};
_ -> split_on_nl(B, N+1, S)
end;
split_on_nl(B, _, _) -> {B, <<>>}.
collect_loop(#context{main=Main,
dict=Dict,
chunkNum=ChunkNum,
processedNum=ProcessedNum}=Context) ->
case ProcessedNum of
ChunkNum ->
print_result(Dict),
Main ! stop;
_ ->
receive
{chunkNum, N} -> collect_loop(Context#context{chunkNum = N});
DictX ->
collect_loop(Context#context{
dict = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
processedNum = ProcessedNum+1})
end
end.
print_result(Dict) ->
[R1, R2, R3, R4, R5, R6, R7, R8, R9, R10 | _] =
lists:reverse(lists:keysort(2, dict:to_list(Dict))),
lists:foreach(fun ({Word, Count}) ->
io:format("~p get requests for ~s~n", [Count, Word])
end, [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10]).
scan_lines(Bin) -> scan_lines(Bin, dict:new()).
scan_lines(Bin, Dict) ->
case naive_search(Bin) of
{ok, Key, Rest} ->
scan_lines(
element(2, split_on_nl(Rest)),
dict:update_counter(Key, 1, Dict));
false -> Dict
end.
%% naive_search(Binary()) -> false | {ok, Key, Rest}
naive_search(B) -> naive_search(B, 0, size(B)-18).
naive_search(B, N, S) when N < S ->
case B of
<<_:N/binary, "GET /ongoing/When/", Rest/binary>> ->
case keyMatch(Rest) of
Result = {ok, _Key, _Rest2} -> Result;
false -> naive_search(Rest)
end;
_ -> naive_search(B, N+1, S)
end;
naive_search(_, _, _) -> false.
%% keyMatch(Binary()) -> false | {ok, Key, Rest}
keyMatch(<<C, _/binary>>) when C == $ ; C == $. -> false; % empty
keyMatch(B) -> keyMatch(B, 1, size(B)).
keyMatch(B, N, S) when N<S ->
case B of
% end with space
<<Key:N/binary, $ , Rest/binary>> -> {ok, Key, Rest};
<<_:N/binary, $., _/binary>> -> false;
_ -> keyMatch(B, N+1, S)
end;
keyMatch(_, _, _) -> false.
Result is less memory consuming and faster program. Problem is partitioned same way as Caoyuan did and I suppose that scale same way, but I can't test it because I don't have any multi core computer.
Wednesday, October 3, 2007
Keep your mailbox empty
Matthias wrote Too much mail is bad for you. Yes, it's true. He solved problem by synchronous communication between producer and consumer, but this solution blocks producer and is not "appropriate solution in general" (second comment). I think, one good solution is make your own queue manager and keep process mailbox empty. It's principally solution what recommends Vlad in that discussion. And how to do it?
-module(mqueue). -export([timed_run/3]). producer(0, ConsumerPid, _WithAck) -> ConsumerPid ! {done, self()}, receive done -> ok end; producer(N, ConsumerPid, WithAck = false) -> ConsumerPid ! {msg, self()}, producer(N-1, ConsumerPid, WithAck); producer(N, ConsumerPid, WithAck = true) -> ConsumerPid ! {acked_msg, self()}, receive ack -> ok end, producer(N-1, ConsumerPid, WithAck). consumer(M, EchoPid) -> receive {msg, _From} -> call_echo(M, EchoPid), consumer(M, EchoPid); {acked_msg, From} -> From ! ack, call_echo(M, EchoPid), consumer(M, EchoPid); {done, From} -> EchoPid ! done, From ! done end. queue_keeper_empty(ConsumerPid) -> receive {msg, _From} -> ConsumerPid ! {acked_msg, self()}, queue_keeper_waiting(queue:new(), ConsumerPid); {done, From} -> ConsumerPid ! {done, self()}, receive done -> From ! done end end. queue_keeper_waiting(Q, ConsumerPid) -> receive {msg, _From} -> queue_keeper_waiting( queue:in({acked_msg, self()}, Q), ConsumerPid ); ack -> case queue:out(Q) of {{value, Msg}, Q1} -> ConsumerPid ! Msg, queue_keeper_waiting(Q1, ConsumerPid); {empty, Q} -> queue_keeper_empty(ConsumerPid) end end. call_echo(0, _EchoPid) -> ok; call_echo(M, EchoPid) -> EchoPid ! {hello, self()}, receive hello -> call_echo(M-1, EchoPid) end. echo() -> receive {Msg, From} -> From ! Msg, echo(); done -> ok end. run(N, M, WithAck) -> EchoPid = spawn_link(fun echo/0), ConsumerPid = spawn_link( fun () -> consumer(M, EchoPid) end), case WithAck of true -> producer(N, ConsumerPid, WithAck); false -> producer(N, spawn_link( fun() -> queue_keeper_empty(ConsumerPid) end ), WithAck) end. time(F) -> Start = erlang:now(), F(), timer:now_diff(erlang:now(), Start). timed_run(N, M, WithAck) -> time(fun() -> run(N, M, WithAck) end).This solution is only about 25% slower than synchronous, but producer is not blocked.
Subscribe to:
Posts (Atom)