Friday, November 2, 2007

How much cores are using WF solutions

Tim Bray published WF XI: Results and I would like to know how much of all these CPU cores each solution uses. Than I compute this table:
Name            Language    Elapsed     User        System  Parallel CPU work
-----------------------------------------------------------------------------
clv5            Gawk        46.73       40.63       6.1         1
tbray5          Erlang      01:04.32    35:33.35    00:45.84    33.88
wfinder1_1      Erlang      6.46        34.07       8.02        6.52
report-counts   Ruby        01:43.71    01:27.11    00:16.60    1
?               Groovy      02:21.83    02:22.97    00:19.95    1.15
wf_p            Ruby        50.16       37.58       12.5        1
wf-2            Python      41.04       34.8        6.24        1
wf-6(2)         Python      16.91       3.62        1.86        0.32
wf-6(4)         Python      9.08        3.66        1.89        0.61
wf-6(8)         Python      5.81        *           *           *
wf-6(16)        Python      4.38        *           *           *
wf              OCaml       49.69       41.94       7.75        1
widefinder      PHP         01:29.81    01:23.10    00:06.71    1
wf_pichi3       Erlang      8.28        51.98       9.38        7.41
tbray5          Erlang      00:20.74    03:51.33    00:08:00    34.3
Nice, that erlang implementations can use cores well, but in this task is not so much good generally. Erlang manages parallel processes well, but those processes can be better written in other languages and used as ports. Especially when this task is string operations on big amount of data.

Sunday, October 28, 2007

Faster than ruby but scalable

Update 2007-11-01: Correction of typo in wf_pichi3.erl.
@@ -14,7 +14,7 @@

 -compile([native]).

-main([File]) -> start(File), halt().
+main([File]) -> start_bmets(File), halt().

 start_bmets(FileName) ->
     {ok, F} = nlt_reader:open(FileName),
I worked on Wide Finder Project again. But what happen? I improved Anders Nygren's code with suggestion from my previous blog and also big suggestion from Caoyuan's blog. First I made some bricks: chunk_reder.erl with read ahead reading and consequential support, nlt_reader.erl with concurrent new line terminated block splitter and catenator and file_map_reduce.erl engine. And I plugged it together in wf_pichi3.erl wide finder. And what is great? It's about 40% faster on single core than ruby code and still scalable:
$ time ruby1.8 tbray.rb o1M.ap
8900: 2006/09/29/Dynamic-IDE
2000: 2006/07/28/Open-Data
1300: 2003/07/25/NotGaming
800: 2006/01/31/Data-Protection
800: 2003/09/18/NXML
800: 2003/10/16/Debbie
700: 2003/06/23/SamsPie
600: 2006/01/08/No-New-XML-Languages
600: 2005/11/03/Cars-and-Office-Suites
600: 2005/07/27/Atomic-RSS

real    0m7.469s
user    0m6.528s
sys     0m0.940s
$ time erl -noshell -run wf_pichi3 main o1M.ap
8900: 2006/09/29/Dynamic-IDE
2000: 2006/07/28/Open-Data
1300: 2003/07/25/NotGaming
800: 2003/09/18/NXML
800: 2003/10/16/Debbie
800: 2006/01/31/Data-Protection
700: 2003/06/23/SamsPie
600: 2006/01/08/No-New-XML-Languages
600: 2006/09/07/JRuby-guys
600: 2005/07/27/Atomic-RSS

real    0m5.370s
user    0m4.412s
sys     0m0.952s
It's big improvement from my last code, about 365% ;-) Good thing, that it's nice jigsaw and very powerful. I think, it is just what Tim Bray want when started Wide Finder Project.

Saturday, October 27, 2007

Scalable splitting is possible

In my previous post I thought that reading and splitting are unscalable processes. It's not true. Reading is scalable, but on current HW it is not useful, because sequential reading is more than twenty times faster than random from disks. But what about splitting and concatenating read chunk by new line. Splitting I can do in parallel and what concatenating? Yes, I can if can keep sequential information and send parts to correct process. Than I wrote scatter-getter algorithm with splitter and concatenator. I was also changed fold-reduce to map-reduce. Code is separated to three modules. Main module is tbray_pichi1
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Yet another Erlang solution to Tim Bray's Wide Finder project
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Author: Hynek (Pichi) Vychodil (http://pichis_blog.blogspot.com/), 23 October 2007.

-module(tbray_pichi1).

-export([main/1, start/1, start/2, start/3]).

start(FileName, ChunkSize) ->
    start(FileName, ChunkSize,
          erlang:system_info(schedulers) * 8).

start(FileName) -> start(FileName, 1024 * 32).

main([FileName, ChunkSize, N]) ->
    start(FileName, list_to_integer(ChunkSize),
          list_to_integer(N)),
    halt();
main([FileName, ChunkSize]) ->
    start(FileName, list_to_integer(ChunkSize)), halt();
main([FileName]) -> start(FileName), halt().

start(FileName, ChunkSize, N) ->
    Start = now(),
    Result = nlt_map_reduce:map_reduce(
        FileName,
        _Mapper = fun (Chunk) ->
                scan(binary_to_list(Chunk), dict:new())
            end,
        _Reducer = fun (A, B) ->
                dict:merge(fun (_, V1, V2) -> V1 + V2 end, A, B)
            end,
        ChunkSize, N),
    Delta = timer:now_diff(now(), Start) / 1000,
    print_result(Result),
    if Delta > 1000 ->
           io:format("Time: ~.3f s~n", [Delta / 1000]);
       true -> io:format("Time: ~.3f ms~n", [Delta])
    end,
    ok.

print_result(Dict) ->
    [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10 | _] =
        lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    lists:foreach(fun ({Word, Count}) ->
                          io:format("~p get requests for ~s~n", [Count, Word])
                  end,
                  [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10]).

scan("GET /ongoing/When/" ++
       [_, _, _, $x, $/, Y1, Y2, Y3, Y4, $/, M1, M2, $/, D1,
        D2, $/
        | Rest],
     Dict) ->
    case scan_key(Rest) of
      {[_ | _] = Key, NewRest} ->
          scan(NewRest,
               dict:update_counter(
                    [Y1, Y2, Y3, Y4, $/, M1, M2, $/, D1, D2, $/ | Key],
                    1, Dict));
      {[], NewRest} -> scan(NewRest, Dict)
    end;
scan([_ | Rest], Dict) -> scan(Rest, Dict);
scan([], Dict) -> Dict.

scan_key(L) -> scan_key(L, []).

scan_key([$\s | Rest], Key) ->
    {lists:reverse(Key), Rest};
scan_key([$\n | Rest], _) -> {[], Rest};
scan_key([$. | Rest], _) -> {[], Rest};
scan_key([C | Rest], Key) -> scan_key(Rest, [C | Key]);
scan_key([], _) -> {[], []}.
Second new line terminated chunks map reducer (nlt_map_reduce)
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% MapReduce for new line terminated blocks of file
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Author: Hynek (Pichi) Vychodil (http://pichis_blog.blogspot.com/), 22 October 2007.

-module(nlt_map_reduce).

%-compile([native, {hipe, [o3]}]).
-import(chunk_reader).

-export([map_reduce/5]).

% mapper reducer process info
-record(map_reduce,
        {mapper, reducer, result = '', data, receiver}).

% process spawner loop info
-record(context,
        {mr, cr, proc = [], n = 0, max = 1, maxR = 1}).

map_reduce(FileName, Mapper, Reducer, ChunkSize, N) ->
    {ok, ChunkReader} = chunk_reader:open(FileName,
                                          ChunkSize),
    Result = start(#map_reduce{mapper = Mapper,
                               reducer =
                                   fun ('', A) -> A;
                                       (A, '') -> A;
                                       (A, B) -> Reducer(A, B)
                                   end,
                               receiver = self()},
                   ChunkReader, N),
    ok = chunk_reader:close(ChunkReader),
    Result.

start(MR, ChunkReader, N) ->
    spawn_proc(#context{mr = MR, cr = ChunkReader, max = N},
               chunk_reader:read(ChunkReader), you_first).

loop(#context{cr = ChunkReader, proc = [{_, Last} | _],
              n = N, max = Max} =
         C)
    when N < Max ->
    receive
      map_done -> loop(C#context{n = N - 1})
      after 0 ->
                spawn_proc(C, chunk_reader:read(ChunkReader), Last)
    end;
loop(#context{n = N} = C) ->
    receive map_done -> loop(C#context{n = N - 1}) end.

spawn_proc(#context{mr = MR, n = N, proc = Proc,
                    maxR = R} =
               C,
           {ok, Chunk}, Last) ->
    loop(C#context{
           proc = send_result_requests(
                [{0, spawn_opt(fun () ->
                            split(MR#map_reduce{data = Chunk}, Last)
                        end,
                        [link])}
                | Proc]),
           n = N + 1,
           maxR =
               if R > N + 1 -> R;
                  true -> N + 1
               end});
spawn_proc(_, eof, you_first) ->
    '';                     % empty file
spawn_proc(#context{proc = Proc}, eof,
           Last) ->         % finalise
    Last ! you_last,
    send_final_result_requests(Proc),
    wait_for_result().

wait_for_result() ->
    receive
      {result, Result} -> Result;
      _ -> wait_for_result()  % clear pending messages if any
    end.

send_result_requests([{L1, P1}, {L1, P2} | T]) ->
    P2 ! {send_result_to, P1, L1},
    send_result_requests([{L1 + 1, P1} | T]);
send_result_requests(T) -> T.

send_final_result_requests(T) ->
    [{L1, P} | R] = lists:reverse(T),
    L = send_final_result_requests_i(L1, P, R),
    P ! {send_result_to, self(), L}.

send_final_result_requests_i(L, P, [{L1, P1} | T]) ->
    P1 ! {send_result_to, P, L1},
    send_final_result_requests_i(L + 1, P, T);
send_final_result_requests_i(L, _, []) -> L.

% mapper reducer process states
split(#map_reduce{data = Data} = MR, you_first) ->
    map_it(MR#map_reduce{data = join_next(Data)});
split(#map_reduce{data = Data} = MR, PrevPid) ->
    case split_on_nl(Data) of
      {_, none} ->    % do nothing yourself, send it
          PrevPid ! {your_next_part, join_next(Data)},
          map_done(MR#map_reduce{data = done});
      {Line, Rest} ->
          PrevPid ! {your_next_part, Line},
          map_it(MR#map_reduce{data = join_next(Rest)})
    end.

join_next(Data) ->
    receive
      {your_next_part, Next} -> <<Data/binary, Next/binary>>;
      you_last -> Data
    end.

map_it(#map_reduce{mapper = Mapper, data = Data} =
           MR) ->
    map_done(MR#map_reduce{data = done,
                           result = Mapper(Data)}).

map_done(#map_reduce{receiver = Master} = MR) ->
    Master ! map_done,      % notice master you done map
    reduce_and_wait(MR, 0).

reduce_and_wait(#map_reduce{result = Acc,
                            reducer = Reducer} =
                    MR,
                N) ->
    receive
      {send_result_to, Receiver, WaitForN} ->
          reduce(MR#map_reduce{receiver = Receiver}, N, WaitForN);
      {result, Result} ->
          reduce_and_wait(MR#map_reduce{result =
                                            Reducer(Acc, Result)},
                          N + 1)
    end.

reduce(#map_reduce{result = Acc, reducer = Reducer} =
           MR,
       N, WaitForN)
    when N < WaitForN ->
    receive
      {result, Result} ->
          reduce(MR#map_reduce{result = Reducer(Acc, Result)},
                 N + 1, WaitForN)
    end;
reduce(#map_reduce{receiver = Receiver,
                   result = Result},
       _, _) ->
    Receiver ! {result, Result}.    % We are finished

%splitter
split_on_nl(B) -> split_on_nl(B, 0, size(B)).

split_on_nl(B, N, S) when N < S ->
    case B of
      <<Line:N/binary, $\n, Tail/binary>> -> {Line, Tail};
      _ -> split_on_nl(B, N + 1, S)
    end;
split_on_nl(B, _, _) -> {B, none}.
And last is read ahead chunk reader
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Chunk reader process with read ahead
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Author: Hynek (Pichi) Vychodil (http://pichis_blog.blogspot.com/), 22 October 2007.

-module(chunk_reader).

-export([close/1, open/2, read/1]).

-export([test_read/2]).

% how many second have to wait for response
-define(TIMEOUT, 60).

% open(FileName, ChunkSize) -> {ok, ChunkReader}
open(FileName, ChunkSize) ->
    M = self(),
    {ok,
     {chunk_reader,
      spawn_opt(fun () ->
                        {ok, File} = file:open(FileName, [read, raw, binary]),
                        process_flag(trap_exit, true),
                        loop(M, File, file:read(File, ChunkSize), ChunkSize)
                end,
                [link, {priority, high}])}}.

% close(ChunkReader) -> ok | {error, invalid}
close({chunk_reader, Pid}) when is_pid(Pid) ->
    case is_process_alive(Pid) of
      true -> Pid ! close, ok;
      false -> {error, invalid}
    end.

% read(ChunkReader) -> eof | {ok, Data} | {error, invalid | closed}
read({chunk_reader, Pid}) when is_pid(Pid) ->
    case is_process_alive(Pid) of
      true -> Pid ! {read, self()}, wait_response(Pid, 0);
      false -> {error, invalid}
    end.

wait_response(Pid, N) when N < (?TIMEOUT) ->
    receive
      {ok, _} = Msg -> Msg;
      eof -> eof
      after 1000 ->   % take it long?
                case is_process_alive(Pid) of
                  true -> wait_response(Pid, N + 1);
                  false -> {error, closed}
                end
    end;
wait_response(_, _) -> {error, timeout}.

loop(Master, File, Chunk, ChunkSize) ->
    receive
      {read, From} ->
          From ! Chunk,
          case Chunk of
            {ok, _} ->
                loop(Master, File, file:read(File, ChunkSize),
                     ChunkSize);
            eof -> file:close(File), eof_loop(Master, From)
          end;
      close -> file:close(File);
      {'EXIT', Master, _} -> file:close(File);
      _ ->
          loop(Master, File, Chunk, ChunkSize)  % ignore unknow
    end.

eof_loop(Master) ->  % wait for eof request
    receive
      {read, From} -> From ! eof, eof_loop(Master);
      close -> ok;
      {'EXIT', Master, _} -> ok;
      _ -> eof_loop(Master)
    end.

% speed testing function
% test_read(FileName, ChunkSize) -> ok | {error, invalid}
test_read(FileName, ChunkSize) ->
    {ok, File} = open(FileName, ChunkSize),
    eof = test_read_loop(File, read(File)),
    close(File).

test_read_loop(File, {ok, _}) ->
    test_read_loop(File, read(File));
test_read_loop(_, eof) -> eof.
But nlt_map_reduce code is too complicated, bad readable and what is the worst, 20% slower on single core. All this indicate, that there is some problem and I think it is dictionary sending between processes. Dictionary is copied every chunk and it cost to much. Then I want rewrite it to more fancy code and fall back to fold-reduce concept, because this concept send less dictionaries.

Sunday, October 21, 2007

Wide Finder Project - fold&reduce

I have made new version of Wide Finder Project. I was inspired by last Caoyuan's last work. I thought about i/o operation too, but I think, parallelisation of file reading is not good idea. Instead of this I tried split file reading and new line finding in two independent processes. One process which read and one which searching for new line. It's looks expensive, send big messages, but I send binaries and binaries less than 64 bytes are not copied, but only pointers passed. I also look for new line from head, because I think binary splitting is faster when first part is smaller than second. Second part can be keep on its place and only pointer is moved and smaller first part is copied to new position. But when glue second part from previous read chunk with first part of current, I must copy bigger part and this is expensive. It looks like same as Caoyuan do, but I don't do it in splitter, but in worker. Both parts I send as binary apart. It's cheap. Why all this? Make minimal work in one process. One process only reads as fast as possible. One process splitting by new line and don't gluing and all other work I can do in parallel. But when one splitter calls reader for new chunk, it must not wait for reader until it read next chunk. Better if reader have next chunk prepared. And splitter dtto. Splitter must have chunk split prepared before any worker calls for new parts. Then I made read ahead file reader and chunk splitter.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Chunk reader process with read ahead
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

file_open(FileName, ChunkSize, chunk) ->        % raw chunks
M = self(),
{ok, {chunk_reader, spawn_link(fun() ->
       {ok, File} = file:open(FileName, [read, raw, binary]),
       process_flag(trap_exit, true),
       process_flag(priority, high),
       file_loop(M, File, file:read(File, ChunkSize), ChunkSize)
   end)}};
file_open(FileName, ChunkSize, nlt_chunk) ->    % new line terminated chunks
M = self(),
{ok, {nlt_chunk_reader, spawn_link(fun() ->
       {ok, CR} = file_open(FileName, ChunkSize, chunk),
       process_flag(trap_exit, true),
       process_flag(priority, high),
       {ok, First_Read} = file_read(CR),
       cr_loop(
           M,
           CR,
           cr_read_n_split(CR, First_Read, file_read(CR)))
   end)}}.

file_read({Type, Pid}) when Type == chunk_reader; Type == nlt_chunk_reader ->
case is_process_alive(Pid) of
   true ->
       Pid ! {read, self()},
       receive
           {ok, B} -> {ok, B};
           eof -> eof
       after 60000 -> timeout   % Possible race condition with is_process_alive
       end;
   false -> error
end.

file_close({Type, Pid}) when Type == chunk_reader; Type == nlt_chunk_reader ->
case is_process_alive(Pid) of
   true -> Pid ! close, ok;
   false -> error
end.

file_loop(Master, File, Chunk, ChunkSize) ->
receive
   {read, From} ->
       From ! Chunk,
       case Chunk of
           {ok, _} ->
               file_loop(Master, File, file:read(File, ChunkSize), ChunkSize);
           eof ->
               file:close(File),
               file_eof_loop(Master)
       end;
   close -> file:close(File);
   {'EXIT', Master, _} -> file:close(File);
   _ -> file_loop(Master, File, Chunk, ChunkSize)  % ignore unknow
end.

file_eof_loop(Master) ->  % wait for eof request
receive
   {read, From} ->
       From ! eof,
       file_eof_loop(Master);
   close -> ok;
   {'EXIT', Master, _} -> ok;
   _ -> file_eof_loop(Master)
end.

cr_loop(Master, CR, {Prev, Line, Next}) ->
receive
   {read, From} ->
       From ! {ok, {Prev, Line}},
       case Next of
           _ when is_binary(Next) ->
               cr_loop(Master, CR, cr_read_n_split(CR, Next, file_read(CR)));
           eof ->
               file_close(CR),
               file_eof_loop(Master)
       end;
   close -> file_close(CR);
   {'EXIT', Master, _} -> file_close(CR);
   _ -> cr_loop(Master, CR, {Prev, Line, Next})    % ignore unknow
end.

cr_read_n_split(CR, Prev, {ok, B}) ->
case split_on_nl(B) of
   {Line, Rest} when is_binary(Rest) ->    % nonempty remaining part
       { Prev, Line, Rest };
   {Line, none} -> % new line not found, read again, should be very rare
       cr_read_n_split(CR, <<Prev/binary, Line/binary>>, file_read(CR))
end;
cr_read_n_split(_CR, Prev, eof) ->
{<<>>, Prev, eof}.  % easier joining at this order

split_on_nl(B) -> split_on_nl(B, 0, size(B)).

split_on_nl(B, N, S) when N < S ->
case B of
   <<Line:N/binary, $\n, Tail/binary>> -> {Line, Tail};
   _ -> split_on_nl(B, N+1, S)
end;
split_on_nl(B, _, _) -> {B, none}.

% speed testing functions
file_test_read(FileName, ChunkSize, Type) ->
{ok, File} = file_open(FileName, ChunkSize, Type),
eof = file_test_read_loop(File, file_read(File)),
file_close(File).

file_test_read_loop(File, {ok, _}) ->
file_test_read_loop(File, file_read(File));
file_test_read_loop(_, eof) ->
eof.
When I have this file like devices, I thought about Tim Bray's request more readable and cleaner code. So what I want to do? Some like map_reduce but not exactly map_reduce. It looks like fold_reduce. I want fold over each chunk aka scan for some pattern and than I want collect all results and I want do it in parallel. Then I made fold_reduce operator over new line terminated chunk read from file, just fold_reduce_file.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Proof of concept of fold&reduce
% on file by new line terminated chunks
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-record(context, {acc,
               chunkNum,
               processedNum = 0,
               reducer}).

fold_reduce_file(FileName, Acc0, Folderer, Reducer, ChunkSize, N) ->
 {ok, CR} = file_open(FileName, ChunkSize, nlt_chunk),
 M = self(),
 do_n(fun() ->
     spawn_link(fun()-> folderer(CR, Acc0, Folderer, M) end)
     end, 0, N),
 Result = collect_loop(#context{
     acc=Acc0,
     chunkNum=N,
     reducer=Reducer}),
 file_close(CR),
 Result.

do_n(What, Start, Stop) when Start < Stop -> What(), do_n(What, Start+1, Stop);
do_n(_, _, _) -> ok.

folderer(CR, Acc0, Folderer, Collector) ->
 case file_read(CR) of
     {ok, {A, B}} ->
         folderer(CR, Folderer(
                 Acc0,
                 binary_to_list(A) ++ binary_to_list(B)
             ), Folderer, Collector);
     eof ->
         Collector ! {result, Acc0}
 end.

collect_loop(#context{acc=Acc0,
                   chunkNum=ChunkNum,
                   processedNum=ProcessedNum,
                   reducer=Reducer}=Context) ->
 case ProcessedNum of
     ChunkNum ->
         Acc0;
     _ ->
         receive
             {result, Result} ->
                 collect_loop(Context#context{
                     acc = Reducer(Acc0, Result),
                     processedNum = ProcessedNum+1})
         end
 end.
It looks complicated but, its only infrastructure. Now I have tool to make Tim Bray's exercise easy, but not only this one, but any similar task. Tim Bray's exercise implementation with this tool is here.
start(FileName) -> start(FileName, 1024*32, 1).
start(FileName, N) -> start(FileName, 1024*32, N).
start(FileName, ChunkSize, N) ->
 Start = now(),
 Result = fold_reduce_file(
     FileName,
     _Acc0 = dict:new(),
     _Folderer = fun(Acc, Chunk) -> scan(Chunk, Acc) end,
     _Reducer = fun(Acc, Result) -> dict:merge(
             fun(_,V1,V2) -> V1+V2 end,
             Acc,
             Result
         ) end,
     ChunkSize,
     N
 ),
 Delta = timer:now_diff(now(), Start) / 1000,
 print_result(Result),
 if
     Delta > 1000 -> io:format("Time: ~.3f s~n", [Delta/1000]);
     true -> io:format("Time: ~.3f ms~n", [Delta])
 end,
 ok.

print_result(Dict) ->
 [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10 | _] = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
 lists:foreach(fun ({Word, Count}) ->
                       io:format("~p get requests for ~s~n", [Count, Word])
               end, [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10]).

scan("GET /ongoing/When/" ++ [_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest], Dict) ->
 case scan_key(Rest) of
     {[_|_] = Key, NewRest} ->
         scan(NewRest, dict:update_counter([Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Key], 1, Dict));
     {[], NewRest} -> scan(NewRest, Dict)
 end;
scan([_|Rest], Dict) -> scan(Rest, Dict);
scan([], Dict) -> Dict.

scan_key(L) -> scan_key(L, []).

scan_key([$ |Rest], Key) -> {lists:reverse(Key), Rest};
scan_key([$\n|Rest], _) -> {[], Rest};
scan_key([$.|Rest], _) -> {[], Rest};
scan_key([C|Rest], Key) -> scan_key(Rest, [C|Key]);
scan_key([], _) -> {[],[]}.
Good new is, this is faster than my tbray2 and also Caoyuan's tbray4 on single core. But I can't test it on multi core now. All source code for testing is bellow. When N is number of processor threads, interesting will be test N-1, N , 2*N-1 or 2*N folderer processes.
-module(tbray6).

%-compile([debug_info, native, {hipe, [o3]}]).

-export([start/1, start/2, start/3]).
-export([file_open/3, file_read/1, file_close/1, file_test_read/3]).
-export([fold_reduce_file/6]).

start(FileName) -> start(FileName, 1024*32, 1).
start(FileName, N) -> start(FileName, 1024*32, N).
start(FileName, ChunkSize, N) ->
 Start = now(),
 Result = fold_reduce_file(
     FileName,
     _Acc0 = dict:new(),
     _Folderer = fun(Acc, Chunk) -> scan(Chunk, Acc) end,
     _Reducer = fun(Acc, Result) -> dict:merge(
             fun(_,V1,V2) -> V1+V2 end,
             Acc,
             Result
         ) end,
     ChunkSize,
     N
 ),
 Delta = timer:now_diff(now(), Start) / 1000,
 print_result(Result),
 if
     Delta > 1000 -> io:format("Time: ~.3f s~n", [Delta/1000]);
     true -> io:format("Time: ~.3f ms~n", [Delta])
 end,
 ok.

print_result(Dict) ->
 [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10 | _] = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
 lists:foreach(fun ({Word, Count}) ->
                       io:format("~p get requests for ~s~n", [Count, Word])
               end, [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10]).

scan("GET /ongoing/When/" ++ [_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest], Dict) ->
 case scan_key(Rest) of
     {[_|_] = Key, NewRest} ->
         scan(NewRest, dict:update_counter([Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Key], 1, Dict));
     {[], NewRest} -> scan(NewRest, Dict)
 end;
scan([_|Rest], Dict) -> scan(Rest, Dict);
scan([], Dict) -> Dict.

scan_key(L) -> scan_key(L, []).

scan_key([$ |Rest], Key) -> {lists:reverse(Key), Rest};
scan_key([$\n|Rest], _) -> {[], Rest};
scan_key([$.|Rest], _) -> {[], Rest};
scan_key([C|Rest], Key) -> scan_key(Rest, [C|Key]);
scan_key([], _) -> {[],[]}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Proof of concept of fold&reduce
% on file by new line terminated chunks
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-record(context, {acc,
               chunkNum,
               processedNum = 0,
               reducer}).

fold_reduce_file(FileName, Acc0, Folderer, Reducer, ChunkSize, N) ->
 {ok, CR} = file_open(FileName, ChunkSize, nlt_chunk),
 M = self(),
 do_n(fun() ->
     spawn_link(fun()-> folderer(CR, Acc0, Folderer, M) end)
     end, 0, N),
 Result = collect_loop(#context{
     acc=Acc0,
     chunkNum=N,
     reducer=Reducer}),
 file_close(CR),
 Result.

do_n(What, Start, Stop) when Start < Stop -> What(), do_n(What, Start+1, Stop);
do_n(_, _, _) -> ok.

folderer(CR, Acc0, Folderer, Collector) ->
 case file_read(CR) of
     {ok, {A, B}} ->
         folderer(CR, Folderer(
                 Acc0,
                 binary_to_list(A) ++ binary_to_list(B)
             ), Folderer, Collector);
     eof ->
         Collector ! {result, Acc0}
 end.

collect_loop(#context{acc=Acc0,
                   chunkNum=ChunkNum,
                   processedNum=ProcessedNum,
                   reducer=Reducer}=Context) ->
 case ProcessedNum of
     ChunkNum ->
         Acc0;
     _ ->
         receive
             {result, Result} ->
                 collect_loop(Context#context{
                     acc = Reducer(Acc0, Result),
                     processedNum = ProcessedNum+1})
         end
 end.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Chunk reader process with read ahead
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

file_open(FileName, ChunkSize, chunk) ->        % raw chunks
 M = self(),
 {ok, {chunk_reader, spawn_link(fun() ->
         {ok, File} = file:open(FileName, [read, raw, binary]),
         process_flag(trap_exit, true),
         process_flag(priority, high),
         file_loop(M, File, file:read(File, ChunkSize), ChunkSize)
     end)}};
file_open(FileName, ChunkSize, nlt_chunk) ->    % new line terminated chunks
 M = self(),
 {ok, {nlt_chunk_reader, spawn_link(fun() ->
         {ok, CR} = file_open(FileName, ChunkSize, chunk),
         process_flag(trap_exit, true),
         process_flag(priority, high),
         {ok, First_Read} = file_read(CR),
         cr_loop(
             M,
             CR,
             cr_read_n_split(CR, First_Read, file_read(CR)))
     end)}}.

file_read({Type, Pid}) when Type == chunk_reader; Type == nlt_chunk_reader ->
 case is_process_alive(Pid) of
     true ->
         Pid ! {read, self()},
         receive
             {ok, B} -> {ok, B};
             eof -> eof
         after 60000 -> timeout   % Possible race condition with is_process_alive
         end;
     false -> error
 end.

file_close({Type, Pid}) when Type == chunk_reader; Type == nlt_chunk_reader ->
 case is_process_alive(Pid) of
     true -> Pid ! close, ok;
     false -> error
 end.

file_loop(Master, File, Chunk, ChunkSize) ->
 receive
     {read, From} ->
         From ! Chunk,
         case Chunk of
             {ok, _} ->
                 file_loop(Master, File, file:read(File, ChunkSize), ChunkSize);
             eof ->
                 file:close(File),
                 file_eof_loop(Master)
         end;
     close -> file:close(File);
     {'EXIT', Master, _} -> file:close(File);
     _ -> file_loop(Master, File, Chunk, ChunkSize)  % ignore unknow
 end.

file_eof_loop(Master) ->  % wait for eof request
 receive
     {read, From} ->
         From ! eof,
         file_eof_loop(Master);
     close -> ok;
     {'EXIT', Master, _} -> ok;
     _ -> file_eof_loop(Master)
 end.

cr_loop(Master, CR, {Prev, Line, Next}) ->
 receive
     {read, From} ->
         From ! {ok, {Prev, Line}},
         case Next of
             _ when is_binary(Next) ->
                 cr_loop(Master, CR, cr_read_n_split(CR, Next, file_read(CR)));
             eof ->
                 file_close(CR),
                 file_eof_loop(Master)
         end;
     close -> file_close(CR);
     {'EXIT', Master, _} -> file_close(CR);
     _ -> cr_loop(Master, CR, {Prev, Line, Next})    % ignore unknow
 end.

cr_read_n_split(CR, Prev, {ok, B}) ->
 case split_on_nl(B) of
     {Line, Rest} when is_binary(Rest) ->    % nonempty remaining part
         { Prev, Line, Rest };
     {Line, none} -> % new line not found, read again, should be very rare
         cr_read_n_split(CR, <<Prev/binary, Line/binary>>, file_read(CR))
 end;
cr_read_n_split(_CR, Prev, eof) ->
 {<<>>, Prev, eof}.  % easier joining at this order

split_on_nl(B) -> split_on_nl(B, 0, size(B)).

split_on_nl(B, N, S) when N < S ->
 case B of
     <<Line:N/binary, $\n, Tail/binary>> -> {Line, Tail};
     _ -> split_on_nl(B, N+1, S)
 end;
split_on_nl(B, _, _) -> {B, none}.

% speed testing functions
file_test_read(FileName, ChunkSize, Type) ->
 {ok, File} = file_open(FileName, ChunkSize, Type),
 eof = file_test_read_loop(File, file_read(File)),
 file_close(File).

file_test_read_loop(File, {ok, _}) ->
 file_test_read_loop(File, file_read(File));
file_test_read_loop(_, eof) ->
 eof.

Sunday, October 7, 2007

Is bfile faster than old erlang file?

Steve Vinoski is using klacke’s bfile module in his Wide Finder Project work, but I don't know why bfile should be faster than erlang OTP file. Well, then I measured. I tried Steve's read test and my test on my old home desktop (model name : AMD Athlon(tm) processor, stepping : 2, cpu MHz : 1199.805, cache size : 256 KB).
-module(readold).
-export([start/1, start/2]).
-compile([native]).

scan_file(F, Readsize, Total) ->
   Rd = file:read(F, Readsize),
   case Rd of
       {ok, Bin} -> scan_file(F, Readsize, size(Bin)+Total);
       eof -> Total
   end.
scan_file(F, Readsize) -> scan_file(F, Readsize, 0).

start(File, Readsize) ->
   {ok, F} = file:open(File, [raw, binary, read]),
   T = scan_file(F, Readsize),
   io:format("read ~p bytes~n", [T]),
   file:close(F).
start(File) ->
   start(File, 512*1024).
And there are results here:
2> timer:tc(readold,start,["o1M.ap"]).
read 200995500 bytes
{1041306,ok}
3> timer:tc(readold,start,["o1M.ap"]).
read 200995500 bytes
{836876,ok}
4> c(readold).
{ok,readold}
5> timer:tc(readold,start,["o1M.ap"]).
read 200995500 bytes
{837501,ok}
6> timer:tc(read,start,["o1M.ap"]).  
read 200995500 bytes
{1353678,true}
7> timer:tc(read,start,["o1M.ap"]).
read 200995500 bytes
{1237174,true}
8> timer:tc(read,start,["o1M.ap"]).
read 200995500 bytes
{1318029,true}
9> timer:tc(readold,start,["o1M.ap"]).
read 200995500 bytes
{856662,ok}
In generally, I don't know why erlang's file should be slower. I don't know why bfile is 45% slower than file on my old home desktop, but why should be faster anywhere? I tested it on Linux, may be bfile using BSD file implementation is faster on Darwin aka BSD clone? The file implementation is fast enough on my Erlang/OTP R11B-5.

erlang-base-hipe.deb don't contain native compiled modules

When I measured Caoyuan and my Wide Finder Project solution I found my binary solution faster than list, but Caoyuan measured reverse result. Then I looked why. Caoyuan is using MacOS X and I'm using Debian. I don't know if MacOS port of erlang has base modules native compiled (with HiPE) but I looked on my debian's:
5> proplists:get_value(compile, lists:module_info()).
[{options,[{inline,[{merge3_12,7},
                 {merge3_21,7},
                 {rmerge3_12,7},
                 {rmerge3_21,7}]},
        {inline,[{umerge3_12,8},
                 {umerge3_21,8},
                 {rumerge3_12a,7},
                 {rumerge3_12b,8}]},
        {inline,[{keymerge3_12,12},
                 {keymerge3_21,12},
                 {rkeymerge3_12,12},
                 {rkeymerge3_21,12}]},
        {inline,[{ukeymerge3_12,13},
                 {ukeymerge3_21,13},
                 {rukeymerge3_12a,11},
                 {rukeymerge3_21a,13},
                 {rukeymerge3_12b,12},
                 {rukeymerge3_21b,12}]},
        {cwd,"/tmp/buildd/erlang-11.b.5dfsg/lib/stdlib/src"},
        {outdir,"/tmp/buildd/erlang-11.b.5dfsg/lib/stdlib/src/../ebin"},
        {i,"/tmp/buildd/erlang-11.b.5dfsg/lib/stdlib/src/../include"},
        {i,"/tmp/buildd/erlang-11.b.5dfsg/lib/stdlib/src/../../kernel/include"},
        warn_obsolete_guard,
        debug_info,
        {inline,[{merge3_12,7},
                 {merge3_21,7},
                 {rmerge3_12,7},
                 {rmerge3_21,7}]},
        {inline,[{umerge3_12,8},
                 {umerge3_21,8},
                 {rumerge3_12a,7},
                 {rumerge3_12b,8}]},
        {inline,[{keymerge3_12,12},
                 {keymerge3_21,12},
                 {rkeymerge3_12,12},
                 {rkeymerge3_21,12}]},
        {inline,[{ukeymerge3_12,13},
                 {ukeymerge3_21,13},
                 {rukeymerge3_12a,11},
                 {rukeymerge3_21a,13},
                 {rukeymerge3_12b,12},
                 {rukeymerge3_21b,12}]}]},
{version,"4.4.5"},
{time,{2007,9,28,11,10,32}},
{source,"/tmp/buildd/erlang-11.b.5dfsg/lib/stdlib/src/lists.erl"}]
There isn't native option. If MacOS port is native compiled there can be this unexpected difference. I have tried to create my own erlang-base-hipe package from source one, but I don't know how to put the option on into the making process. I have tried: debian/rules configure-hipe but it didn't work so I searched where is native option used.
$ grep -r +native .
./lib/asn1/src/Makefile:ERL_COMPILE_FLAGS += +native
./lib/megaco/src/app/megaco.mk:ERL_COMPILE_FLAGS += +native
./lib/megaco/test/Makefile:ERL_COMPILE_FLAGS += +native -Dmegaco_hipe_special=true
./lib/megaco/examples/meas/Makefile:ERL_COMPILE_FLAGS += +native
./README:       erlc +native Module.erl
Then I tried
export ERL_COMPILE_FLAGS=+native
It didn't affect binary-erlang-base and only generated warning message, but during binary-erlang-base-hipe error occured.
erlc -W  +native +debug_info +debug_info +debug_info +warn_obsolete_guard -I/home/hynek/work/erlang-11.b.5dfsg/lib/stdlib/include -o../ebin yecc.erl
./yecc.erl:none: internal error in native_compile;
crash reason: {undef,[{hipe,compile,
                            [yecc,
                             [],
                             <<70,79,82,49,0,1,91,124,66,69,65,77,65,116,111,
...
97,252,150,236,255,7,193,199,127,8,0,0>>,
                             []]},
                      {compile,native_compile_1,1},
                      {compile,'-internal_comp/4-anonymous-1-',2},
                      {compile,fold_comp,3},
                      {compile,internal_comp,4},
                      {compile,internal,3}]}
make[4]: Leaving directory `/home/hynek/work/erlang-11.b.5dfsg/lib/parsetools/src'
make[3]: Leaving directory `/home/hynek/work/erlang- 11.b.5dfsg/lib/parsetools'
make[2]: Leaving directory `/home/hynek/work/erlang-11.b.5dfsg/lib'
make[1]: Leaving directory `/home/hynek/work/erlang-11.b.5dfsg'
Then I tested if yecc can be compiled native and it can be.
3> c("/usr/lib/erlang/lib/parsetools-1.4.1.1/src/yecc.erl", [native, {i, "/usr/lib/erlang/lib/stdlib-1.14.5/include/"}, {outdir, "."}]).
{ok,yecc}
4> proplists:get_value(compile, yecc:module_info()).                                                                                    
[{options,[{inline,[{compute_closure,3}]},
           {nowarn_unused_function,{function_name,2}},
           {inline,[{set_empty,0}]},
           {inline,[{set_member,2}]},
           {inline,[{set_delete,2}]},
           {inline,[{set_union,2}]},
           {inline,[{set_is_subset,2}]},
           {inline,[{is_terminal,2}]},
           native,
           {i,"/usr/lib/erlang/lib/stdlib-1.14.5/include/"},
           {outdir,"."},
           {inline,[{compute_closure,3}]},
           {nowarn_unused_function,{function_name,2}},
           {inline,[{set_empty,0}]},
           {inline,[{set_member,2}]},
           {inline,[{set_delete,2}]},
           {inline,[{set_union,2}]},
           {inline,[{set_is_subset,2}]},
           {inline,[{is_terminal,2}]}]},
 {version,"4.4.5"},
 {time,{2007,10,7,17,53,55}},
 {source,"/usr/lib/erlang/lib/parsetools-1.4.1.1/src/yecc.erl"}]
yecc (and almost all other modules) can be native compiled, but I don't know how to do it. I'm totally messed up by debian packaging system and don't know what dpkg-buildpackage does. It's difficult to do it with installed version because many packages need some special compiling options at least {i,"/usr/lib/erlang/lib/stdlib-1.14.5/include/"} for included .hrl files and so. I will be glad if anyone give me some advice how to make package with native compiled modules or how to recompile only modules from source package to make some workaround.

Saturday, October 6, 2007

Binaries really faster than lists

Caoyuan did his second round of Wide Finder Project using lists, but expected lists traversing is faster than binary traversing. But he made little mistake. He assumed, that timer:tc(tbray, travel_list, [binary_to_list(Bin)]) including binary_to_list(Bin) time, but it is not true. binary_to_list(Bin) is done before timer:tc call and result passed as parameter. But there are more performance problems in his solution. He often use lists:reverse and ++ which twice reverse first parameter too. Bigger memory usage of list and often reversing must cause bad performance. When I tried write same algorithm using binary I take 3 times speed up.

-module(tbray2).

-compile([native, {hipe,[o3]}]).

-export([start/1, start/2]).

-record(context, {main,
                 dict,
                 chunkNum,
                 processedNum = 0}).

start(FileName) -> start(FileName, 1024*32).
start(FileName, ChunkSize) ->
   Start = now(),
   Main = self(),
   Collector = spawn_link(fun () -> collect_loop(#context{main = Main,
                                                     dict = dict:new(),
                                                     processedNum = 0}) end),
   ChunkNum = foreach_chunk(
           fun(Chunk) ->
               spawn_link(fun() -> Collector ! scan_lines(Chunk) end)
           end,
           FileName,
           ChunkSize
       ),
   Collector ! {chunkNum, ChunkNum},
  
   %% don't terminate, wait here, until all tasks done.
   receive
       stop -> io:format("Time: ~p ms~n", [timer:now_diff(now(), Start) / 1000])
   end.

foreach_chunk(Fun, FileName, SizeOfChunk) ->
   {ok, File} = file:open(FileName, [raw, binary]),
   N = foreach_chunk(Fun, File, <<>>, SizeOfChunk, 0),
   file:close(File),
   N.

foreach_chunk(Fun, File, PrevRest, SizeOfChunk, N) ->
   {Chunk, Rest} = read_chunk(File, PrevRest, SizeOfChunk),
   Fun(Chunk),
   case Rest of
       <<>> -> N+1;
       _ -> foreach_chunk(Fun, File, Rest, SizeOfChunk, N+1)
   end.

read_chunk(File, PrevRest, N) ->
   case file:read(File, N) of
       {ok, B} ->
           {Line, Rest} = split_on_nl(B),
           Chunk = <<PrevRest/binary, Line/binary>>,
           case Rest of
               <<>> ->
                   read_chunk(File, Chunk, N);
               _ ->
                   {Chunk, Rest}
           end;
       eof ->
           {PrevRest, <<>>}
   end.

split_on_nl(B) -> split_on_nl(B, 0, size(B)).

split_on_nl(B, N, S) when N < S -> case B of
       <<Line:N/binary, $\n, Tail/binary>> -> {Line, Tail};
       _ -> split_on_nl(B, N+1, S)
   end;
split_on_nl(B, _, _) -> {B, <<>>}.

collect_loop(#context{main=Main,
                     dict=Dict,
                     chunkNum=ChunkNum,
                     processedNum=ProcessedNum}=Context) ->
   case ProcessedNum of
       ChunkNum ->
           print_result(Dict),
           Main ! stop;
       _ ->
           receive
               {chunkNum, N} -> collect_loop(Context#context{chunkNum = N});
               DictX ->
                   collect_loop(Context#context{
                       dict = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
                       processedNum = ProcessedNum+1})
           end
   end.

print_result(Dict) ->
   [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10 | _] =
       lists:reverse(lists:keysort(2, dict:to_list(Dict))),
   lists:foreach(fun ({Word, Count}) ->
                         io:format("~p get requests for ~s~n", [Count, Word])
                 end, [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10]).

scan_lines(Bin) -> scan_lines(Bin, dict:new()).
scan_lines(Bin, Dict) ->
   case naive_search(Bin) of
       {ok, Key, Rest} ->
           scan_lines(
               element(2, split_on_nl(Rest)),
               dict:update_counter(Key, 1, Dict));
       false -> Dict
   end.

%% naive_search(Binary()) -> false | {ok, Key, Rest}
naive_search(B) -> naive_search(B, 0, size(B)-18).

naive_search(B, N, S) when N < S ->
   case B of
       <<_:N/binary, "GET /ongoing/When/", Rest/binary>> ->
           case keyMatch(Rest) of
               Result = {ok, _Key, _Rest2} -> Result;
               false -> naive_search(Rest)
           end;
       _ -> naive_search(B, N+1, S)
   end;
naive_search(_, _, _) -> false.

%% keyMatch(Binary()) -> false | {ok, Key, Rest}
keyMatch(<<C, _/binary>>) when C == $ ; C == $. -> false;   % empty
keyMatch(B) -> keyMatch(B, 1, size(B)).

keyMatch(B, N, S) when N<S ->
   case B of
       % end with space
       <<Key:N/binary, $ , Rest/binary>> -> {ok, Key, Rest};
       <<_:N/binary, $., _/binary>> -> false;
       _ -> keyMatch(B, N+1, S)
   end;
keyMatch(_, _, _) -> false.
Result is less memory consuming and faster program. Problem is partitioned same way as Caoyuan did and I suppose that scale same way, but I can't test it because I don't have any multi core computer.

Wednesday, October 3, 2007

Keep your mailbox empty

Matthias wrote Too much mail is bad for you. Yes, it's true. He solved problem by synchronous communication between producer and consumer, but this solution blocks producer and is not "appropriate solution in general" (second comment). I think, one good solution is make your own queue manager and keep process mailbox empty. It's principally solution what recommends Vlad in that discussion. And how to do it?
-module(mqueue).

-export([timed_run/3]).

producer(0, ConsumerPid, _WithAck) ->
  ConsumerPid ! {done, self()},
  receive
      done -> ok
  end;
producer(N, ConsumerPid, WithAck = false) ->
  ConsumerPid ! {msg, self()},
  producer(N-1, ConsumerPid, WithAck);
producer(N, ConsumerPid, WithAck = true) ->
  ConsumerPid ! {acked_msg, self()},
  receive
      ack -> ok
  end,
  producer(N-1, ConsumerPid, WithAck).

consumer(M, EchoPid) ->
  receive
      {msg, _From} ->
          call_echo(M, EchoPid),
          consumer(M, EchoPid);
      {acked_msg, From} ->
          From ! ack,
          call_echo(M, EchoPid),
          consumer(M, EchoPid);
      {done, From} ->
          EchoPid ! done,
          From ! done
  end.

queue_keeper_empty(ConsumerPid) ->
  receive
      {msg, _From} ->
          ConsumerPid ! {acked_msg, self()},
          queue_keeper_waiting(queue:new(), ConsumerPid);
      {done, From} ->
          ConsumerPid ! {done, self()},
          receive
              done ->
                  From ! done
          end
  end.

queue_keeper_waiting(Q, ConsumerPid) ->
  receive
      {msg, _From} ->
          queue_keeper_waiting(
              queue:in({acked_msg, self()}, Q),
              ConsumerPid
          );
      ack -> case queue:out(Q) of
              {{value, Msg}, Q1} ->
                  ConsumerPid ! Msg,
                  queue_keeper_waiting(Q1, ConsumerPid);
              {empty, Q} -> queue_keeper_empty(ConsumerPid)
          end
  end.

call_echo(0, _EchoPid) ->
  ok;
call_echo(M, EchoPid) ->
  EchoPid ! {hello, self()},
  receive
      hello -> call_echo(M-1, EchoPid)
  end.

echo() ->
  receive
      {Msg, From} ->
          From ! Msg,
          echo();
      done -> ok
  end.

run(N, M, WithAck) ->
  EchoPid     = spawn_link(fun echo/0),
  ConsumerPid = spawn_link(
                  fun () ->
                          consumer(M, EchoPid) end),
  case WithAck of
      true ->
          producer(N, ConsumerPid, WithAck);
      false ->
          producer(N, spawn_link(
                  fun() ->
                      queue_keeper_empty(ConsumerPid)
                  end
              ), WithAck)
  end.

time(F) ->
  Start = erlang:now(),
  F(),
  timer:now_diff(erlang:now(), Start).

timed_run(N, M, WithAck) ->
  time(fun() -> run(N, M, WithAck) end).
This solution is only about 25% slower than synchronous, but producer is not blocked.