Saturday, October 27, 2007

Scalable splitting is possible

In my previous post I thought that reading and splitting are unscalable processes. It's not true. Reading is scalable, but on current HW it is not useful, because sequential reading is more than twenty times faster than random from disks. But what about splitting and concatenating read chunk by new line. Splitting I can do in parallel and what concatenating? Yes, I can if can keep sequential information and send parts to correct process. Than I wrote scatter-getter algorithm with splitter and concatenator. I was also changed fold-reduce to map-reduce. Code is separated to three modules. Main module is tbray_pichi1
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Yet another Erlang solution to Tim Bray's Wide Finder project
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Author: Hynek (Pichi) Vychodil (http://pichis_blog.blogspot.com/), 23 October 2007.

-module(tbray_pichi1).

-export([main/1, start/1, start/2, start/3]).

start(FileName, ChunkSize) ->
    start(FileName, ChunkSize,
          erlang:system_info(schedulers) * 8).

start(FileName) -> start(FileName, 1024 * 32).

main([FileName, ChunkSize, N]) ->
    start(FileName, list_to_integer(ChunkSize),
          list_to_integer(N)),
    halt();
main([FileName, ChunkSize]) ->
    start(FileName, list_to_integer(ChunkSize)), halt();
main([FileName]) -> start(FileName), halt().

start(FileName, ChunkSize, N) ->
    Start = now(),
    Result = nlt_map_reduce:map_reduce(
        FileName,
        _Mapper = fun (Chunk) ->
                scan(binary_to_list(Chunk), dict:new())
            end,
        _Reducer = fun (A, B) ->
                dict:merge(fun (_, V1, V2) -> V1 + V2 end, A, B)
            end,
        ChunkSize, N),
    Delta = timer:now_diff(now(), Start) / 1000,
    print_result(Result),
    if Delta > 1000 ->
           io:format("Time: ~.3f s~n", [Delta / 1000]);
       true -> io:format("Time: ~.3f ms~n", [Delta])
    end,
    ok.

print_result(Dict) ->
    [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10 | _] =
        lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    lists:foreach(fun ({Word, Count}) ->
                          io:format("~p get requests for ~s~n", [Count, Word])
                  end,
                  [R1, R2, R3, R4, R5, R6, R7, R8, R9, R10]).

scan("GET /ongoing/When/" ++
       [_, _, _, $x, $/, Y1, Y2, Y3, Y4, $/, M1, M2, $/, D1,
        D2, $/
        | Rest],
     Dict) ->
    case scan_key(Rest) of
      {[_ | _] = Key, NewRest} ->
          scan(NewRest,
               dict:update_counter(
                    [Y1, Y2, Y3, Y4, $/, M1, M2, $/, D1, D2, $/ | Key],
                    1, Dict));
      {[], NewRest} -> scan(NewRest, Dict)
    end;
scan([_ | Rest], Dict) -> scan(Rest, Dict);
scan([], Dict) -> Dict.

scan_key(L) -> scan_key(L, []).

scan_key([$\s | Rest], Key) ->
    {lists:reverse(Key), Rest};
scan_key([$\n | Rest], _) -> {[], Rest};
scan_key([$. | Rest], _) -> {[], Rest};
scan_key([C | Rest], Key) -> scan_key(Rest, [C | Key]);
scan_key([], _) -> {[], []}.
Second new line terminated chunks map reducer (nlt_map_reduce)
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% MapReduce for new line terminated blocks of file
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Author: Hynek (Pichi) Vychodil (http://pichis_blog.blogspot.com/), 22 October 2007.

-module(nlt_map_reduce).

%-compile([native, {hipe, [o3]}]).
-import(chunk_reader).

-export([map_reduce/5]).

% mapper reducer process info
-record(map_reduce,
        {mapper, reducer, result = '', data, receiver}).

% process spawner loop info
-record(context,
        {mr, cr, proc = [], n = 0, max = 1, maxR = 1}).

map_reduce(FileName, Mapper, Reducer, ChunkSize, N) ->
    {ok, ChunkReader} = chunk_reader:open(FileName,
                                          ChunkSize),
    Result = start(#map_reduce{mapper = Mapper,
                               reducer =
                                   fun ('', A) -> A;
                                       (A, '') -> A;
                                       (A, B) -> Reducer(A, B)
                                   end,
                               receiver = self()},
                   ChunkReader, N),
    ok = chunk_reader:close(ChunkReader),
    Result.

start(MR, ChunkReader, N) ->
    spawn_proc(#context{mr = MR, cr = ChunkReader, max = N},
               chunk_reader:read(ChunkReader), you_first).

loop(#context{cr = ChunkReader, proc = [{_, Last} | _],
              n = N, max = Max} =
         C)
    when N < Max ->
    receive
      map_done -> loop(C#context{n = N - 1})
      after 0 ->
                spawn_proc(C, chunk_reader:read(ChunkReader), Last)
    end;
loop(#context{n = N} = C) ->
    receive map_done -> loop(C#context{n = N - 1}) end.

spawn_proc(#context{mr = MR, n = N, proc = Proc,
                    maxR = R} =
               C,
           {ok, Chunk}, Last) ->
    loop(C#context{
           proc = send_result_requests(
                [{0, spawn_opt(fun () ->
                            split(MR#map_reduce{data = Chunk}, Last)
                        end,
                        [link])}
                | Proc]),
           n = N + 1,
           maxR =
               if R > N + 1 -> R;
                  true -> N + 1
               end});
spawn_proc(_, eof, you_first) ->
    '';                     % empty file
spawn_proc(#context{proc = Proc}, eof,
           Last) ->         % finalise
    Last ! you_last,
    send_final_result_requests(Proc),
    wait_for_result().

wait_for_result() ->
    receive
      {result, Result} -> Result;
      _ -> wait_for_result()  % clear pending messages if any
    end.

send_result_requests([{L1, P1}, {L1, P2} | T]) ->
    P2 ! {send_result_to, P1, L1},
    send_result_requests([{L1 + 1, P1} | T]);
send_result_requests(T) -> T.

send_final_result_requests(T) ->
    [{L1, P} | R] = lists:reverse(T),
    L = send_final_result_requests_i(L1, P, R),
    P ! {send_result_to, self(), L}.

send_final_result_requests_i(L, P, [{L1, P1} | T]) ->
    P1 ! {send_result_to, P, L1},
    send_final_result_requests_i(L + 1, P, T);
send_final_result_requests_i(L, _, []) -> L.

% mapper reducer process states
split(#map_reduce{data = Data} = MR, you_first) ->
    map_it(MR#map_reduce{data = join_next(Data)});
split(#map_reduce{data = Data} = MR, PrevPid) ->
    case split_on_nl(Data) of
      {_, none} ->    % do nothing yourself, send it
          PrevPid ! {your_next_part, join_next(Data)},
          map_done(MR#map_reduce{data = done});
      {Line, Rest} ->
          PrevPid ! {your_next_part, Line},
          map_it(MR#map_reduce{data = join_next(Rest)})
    end.

join_next(Data) ->
    receive
      {your_next_part, Next} -> <<Data/binary, Next/binary>>;
      you_last -> Data
    end.

map_it(#map_reduce{mapper = Mapper, data = Data} =
           MR) ->
    map_done(MR#map_reduce{data = done,
                           result = Mapper(Data)}).

map_done(#map_reduce{receiver = Master} = MR) ->
    Master ! map_done,      % notice master you done map
    reduce_and_wait(MR, 0).

reduce_and_wait(#map_reduce{result = Acc,
                            reducer = Reducer} =
                    MR,
                N) ->
    receive
      {send_result_to, Receiver, WaitForN} ->
          reduce(MR#map_reduce{receiver = Receiver}, N, WaitForN);
      {result, Result} ->
          reduce_and_wait(MR#map_reduce{result =
                                            Reducer(Acc, Result)},
                          N + 1)
    end.

reduce(#map_reduce{result = Acc, reducer = Reducer} =
           MR,
       N, WaitForN)
    when N < WaitForN ->
    receive
      {result, Result} ->
          reduce(MR#map_reduce{result = Reducer(Acc, Result)},
                 N + 1, WaitForN)
    end;
reduce(#map_reduce{receiver = Receiver,
                   result = Result},
       _, _) ->
    Receiver ! {result, Result}.    % We are finished

%splitter
split_on_nl(B) -> split_on_nl(B, 0, size(B)).

split_on_nl(B, N, S) when N < S ->
    case B of
      <<Line:N/binary, $\n, Tail/binary>> -> {Line, Tail};
      _ -> split_on_nl(B, N + 1, S)
    end;
split_on_nl(B, _, _) -> {B, none}.
And last is read ahead chunk reader
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Chunk reader process with read ahead
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Author: Hynek (Pichi) Vychodil (http://pichis_blog.blogspot.com/), 22 October 2007.

-module(chunk_reader).

-export([close/1, open/2, read/1]).

-export([test_read/2]).

% how many second have to wait for response
-define(TIMEOUT, 60).

% open(FileName, ChunkSize) -> {ok, ChunkReader}
open(FileName, ChunkSize) ->
    M = self(),
    {ok,
     {chunk_reader,
      spawn_opt(fun () ->
                        {ok, File} = file:open(FileName, [read, raw, binary]),
                        process_flag(trap_exit, true),
                        loop(M, File, file:read(File, ChunkSize), ChunkSize)
                end,
                [link, {priority, high}])}}.

% close(ChunkReader) -> ok | {error, invalid}
close({chunk_reader, Pid}) when is_pid(Pid) ->
    case is_process_alive(Pid) of
      true -> Pid ! close, ok;
      false -> {error, invalid}
    end.

% read(ChunkReader) -> eof | {ok, Data} | {error, invalid | closed}
read({chunk_reader, Pid}) when is_pid(Pid) ->
    case is_process_alive(Pid) of
      true -> Pid ! {read, self()}, wait_response(Pid, 0);
      false -> {error, invalid}
    end.

wait_response(Pid, N) when N < (?TIMEOUT) ->
    receive
      {ok, _} = Msg -> Msg;
      eof -> eof
      after 1000 ->   % take it long?
                case is_process_alive(Pid) of
                  true -> wait_response(Pid, N + 1);
                  false -> {error, closed}
                end
    end;
wait_response(_, _) -> {error, timeout}.

loop(Master, File, Chunk, ChunkSize) ->
    receive
      {read, From} ->
          From ! Chunk,
          case Chunk of
            {ok, _} ->
                loop(Master, File, file:read(File, ChunkSize),
                     ChunkSize);
            eof -> file:close(File), eof_loop(Master, From)
          end;
      close -> file:close(File);
      {'EXIT', Master, _} -> file:close(File);
      _ ->
          loop(Master, File, Chunk, ChunkSize)  % ignore unknow
    end.

eof_loop(Master) ->  % wait for eof request
    receive
      {read, From} -> From ! eof, eof_loop(Master);
      close -> ok;
      {'EXIT', Master, _} -> ok;
      _ -> eof_loop(Master)
    end.

% speed testing function
% test_read(FileName, ChunkSize) -> ok | {error, invalid}
test_read(FileName, ChunkSize) ->
    {ok, File} = open(FileName, ChunkSize),
    eof = test_read_loop(File, read(File)),
    close(File).

test_read_loop(File, {ok, _}) ->
    test_read_loop(File, read(File));
test_read_loop(_, eof) -> eof.
But nlt_map_reduce code is too complicated, bad readable and what is the worst, 20% slower on single core. All this indicate, that there is some problem and I think it is dictionary sending between processes. Dictionary is copied every chunk and it cost to much. Then I want rewrite it to more fancy code and fall back to fold-reduce concept, because this concept send less dictionaries.

No comments: