In my
previous post I thought that reading and splitting are unscalable processes. It's not true. Reading is scalable, but on current HW it is not useful, because sequential reading is more than twenty times faster than random from disks. But what about splitting and concatenating read chunk by new line. Splitting I can do in parallel and what concatenating? Yes, I can if can keep sequential information and send parts to correct process. Than I wrote scatter-getter algorithm with splitter and concatenator. I was also changed fold-reduce to map-reduce.
Code is separated to three modules. Main module is
tbray_pichi1
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Yet another Erlang solution to Tim Bray's Wide Finder project
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Author: Hynek (Pichi) Vychodil (http://pichis_blog.blogspot.com/), 23 October 2007.
-module(tbray_pichi1).
-export([main/1, start/1, start/2, start/3]).
start(FileName, ChunkSize) ->
start(FileName, ChunkSize,
erlang:system_info(schedulers) * 8).
start(FileName) -> start(FileName, 1024 * 32).
main([FileName, ChunkSize, N]) ->
start(FileName, list_to_integer(ChunkSize),
list_to_integer(N)),
halt();
main([FileName, ChunkSize]) ->
start(FileName, list_to_integer(ChunkSize)), halt();
main([FileName]) -> start(FileName), halt().
start(FileName, ChunkSize, N) ->
Start = now(),
Result = nlt_map_reduce:map_reduce(
FileName,
_Mapper = fun (Chunk) ->
scan(binary_to_list(Chunk), dict:new())
end,
_Reducer = fun (A, B) ->
dict:merge(fun (_, V1, V2) -> V1 + V2 end, A, B)
end,
ChunkSize, N),
Delta = timer:now_diff(now(), Start) / 1000,
print_result(Result),
if Delta > 1000 ->
io:format("Time: ~.3f s~n", [Delta / 1000]);
true -> io:format("Time: ~.3f ms~n", [Delta])
end,
ok.
print_result(Dict) ->
[R1, R2, R3, R4, R5, R6, R7, R8, R9, R10 | _] =
lists:reverse(lists:keysort(2, dict:to_list(Dict))),
lists:foreach(fun ({Word, Count}) ->
io:format("~p get requests for ~s~n", [Count, Word])
end,
[R1, R2, R3, R4, R5, R6, R7, R8, R9, R10]).
scan("GET /ongoing/When/" ++
[_, _, _, $x, $/, Y1, Y2, Y3, Y4, $/, M1, M2, $/, D1,
D2, $/
| Rest],
Dict) ->
case scan_key(Rest) of
{[_ | _] = Key, NewRest} ->
scan(NewRest,
dict:update_counter(
[Y1, Y2, Y3, Y4, $/, M1, M2, $/, D1, D2, $/ | Key],
1, Dict));
{[], NewRest} -> scan(NewRest, Dict)
end;
scan([_ | Rest], Dict) -> scan(Rest, Dict);
scan([], Dict) -> Dict.
scan_key(L) -> scan_key(L, []).
scan_key([$\s | Rest], Key) ->
{lists:reverse(Key), Rest};
scan_key([$\n | Rest], _) -> {[], Rest};
scan_key([$. | Rest], _) -> {[], Rest};
scan_key([C | Rest], Key) -> scan_key(Rest, [C | Key]);
scan_key([], _) -> {[], []}.
Second new line terminated chunks map reducer (
nlt_map_reduce
)
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% MapReduce for new line terminated blocks of file
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Author: Hynek (Pichi) Vychodil (http://pichis_blog.blogspot.com/), 22 October 2007.
-module(nlt_map_reduce).
%-compile([native, {hipe, [o3]}]).
-import(chunk_reader).
-export([map_reduce/5]).
% mapper reducer process info
-record(map_reduce,
{mapper, reducer, result = '', data, receiver}).
% process spawner loop info
-record(context,
{mr, cr, proc = [], n = 0, max = 1, maxR = 1}).
map_reduce(FileName, Mapper, Reducer, ChunkSize, N) ->
{ok, ChunkReader} = chunk_reader:open(FileName,
ChunkSize),
Result = start(#map_reduce{mapper = Mapper,
reducer =
fun ('', A) -> A;
(A, '') -> A;
(A, B) -> Reducer(A, B)
end,
receiver = self()},
ChunkReader, N),
ok = chunk_reader:close(ChunkReader),
Result.
start(MR, ChunkReader, N) ->
spawn_proc(#context{mr = MR, cr = ChunkReader, max = N},
chunk_reader:read(ChunkReader), you_first).
loop(#context{cr = ChunkReader, proc = [{_, Last} | _],
n = N, max = Max} =
C)
when N < Max ->
receive
map_done -> loop(C#context{n = N - 1})
after 0 ->
spawn_proc(C, chunk_reader:read(ChunkReader), Last)
end;
loop(#context{n = N} = C) ->
receive map_done -> loop(C#context{n = N - 1}) end.
spawn_proc(#context{mr = MR, n = N, proc = Proc,
maxR = R} =
C,
{ok, Chunk}, Last) ->
loop(C#context{
proc = send_result_requests(
[{0, spawn_opt(fun () ->
split(MR#map_reduce{data = Chunk}, Last)
end,
[link])}
| Proc]),
n = N + 1,
maxR =
if R > N + 1 -> R;
true -> N + 1
end});
spawn_proc(_, eof, you_first) ->
''; % empty file
spawn_proc(#context{proc = Proc}, eof,
Last) -> % finalise
Last ! you_last,
send_final_result_requests(Proc),
wait_for_result().
wait_for_result() ->
receive
{result, Result} -> Result;
_ -> wait_for_result() % clear pending messages if any
end.
send_result_requests([{L1, P1}, {L1, P2} | T]) ->
P2 ! {send_result_to, P1, L1},
send_result_requests([{L1 + 1, P1} | T]);
send_result_requests(T) -> T.
send_final_result_requests(T) ->
[{L1, P} | R] = lists:reverse(T),
L = send_final_result_requests_i(L1, P, R),
P ! {send_result_to, self(), L}.
send_final_result_requests_i(L, P, [{L1, P1} | T]) ->
P1 ! {send_result_to, P, L1},
send_final_result_requests_i(L + 1, P, T);
send_final_result_requests_i(L, _, []) -> L.
% mapper reducer process states
split(#map_reduce{data = Data} = MR, you_first) ->
map_it(MR#map_reduce{data = join_next(Data)});
split(#map_reduce{data = Data} = MR, PrevPid) ->
case split_on_nl(Data) of
{_, none} -> % do nothing yourself, send it
PrevPid ! {your_next_part, join_next(Data)},
map_done(MR#map_reduce{data = done});
{Line, Rest} ->
PrevPid ! {your_next_part, Line},
map_it(MR#map_reduce{data = join_next(Rest)})
end.
join_next(Data) ->
receive
{your_next_part, Next} -> <<Data/binary, Next/binary>>;
you_last -> Data
end.
map_it(#map_reduce{mapper = Mapper, data = Data} =
MR) ->
map_done(MR#map_reduce{data = done,
result = Mapper(Data)}).
map_done(#map_reduce{receiver = Master} = MR) ->
Master ! map_done, % notice master you done map
reduce_and_wait(MR, 0).
reduce_and_wait(#map_reduce{result = Acc,
reducer = Reducer} =
MR,
N) ->
receive
{send_result_to, Receiver, WaitForN} ->
reduce(MR#map_reduce{receiver = Receiver}, N, WaitForN);
{result, Result} ->
reduce_and_wait(MR#map_reduce{result =
Reducer(Acc, Result)},
N + 1)
end.
reduce(#map_reduce{result = Acc, reducer = Reducer} =
MR,
N, WaitForN)
when N < WaitForN ->
receive
{result, Result} ->
reduce(MR#map_reduce{result = Reducer(Acc, Result)},
N + 1, WaitForN)
end;
reduce(#map_reduce{receiver = Receiver,
result = Result},
_, _) ->
Receiver ! {result, Result}. % We are finished
%splitter
split_on_nl(B) -> split_on_nl(B, 0, size(B)).
split_on_nl(B, N, S) when N < S ->
case B of
<<Line:N/binary, $\n, Tail/binary>> -> {Line, Tail};
_ -> split_on_nl(B, N + 1, S)
end;
split_on_nl(B, _, _) -> {B, none}.
And last is read ahead chunk reader
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Chunk reader process with read ahead
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Author: Hynek (Pichi) Vychodil (http://pichis_blog.blogspot.com/), 22 October 2007.
-module(chunk_reader).
-export([close/1, open/2, read/1]).
-export([test_read/2]).
% how many second have to wait for response
-define(TIMEOUT, 60).
% open(FileName, ChunkSize) -> {ok, ChunkReader}
open(FileName, ChunkSize) ->
M = self(),
{ok,
{chunk_reader,
spawn_opt(fun () ->
{ok, File} = file:open(FileName, [read, raw, binary]),
process_flag(trap_exit, true),
loop(M, File, file:read(File, ChunkSize), ChunkSize)
end,
[link, {priority, high}])}}.
% close(ChunkReader) -> ok | {error, invalid}
close({chunk_reader, Pid}) when is_pid(Pid) ->
case is_process_alive(Pid) of
true -> Pid ! close, ok;
false -> {error, invalid}
end.
% read(ChunkReader) -> eof | {ok, Data} | {error, invalid | closed}
read({chunk_reader, Pid}) when is_pid(Pid) ->
case is_process_alive(Pid) of
true -> Pid ! {read, self()}, wait_response(Pid, 0);
false -> {error, invalid}
end.
wait_response(Pid, N) when N < (?TIMEOUT) ->
receive
{ok, _} = Msg -> Msg;
eof -> eof
after 1000 -> % take it long?
case is_process_alive(Pid) of
true -> wait_response(Pid, N + 1);
false -> {error, closed}
end
end;
wait_response(_, _) -> {error, timeout}.
loop(Master, File, Chunk, ChunkSize) ->
receive
{read, From} ->
From ! Chunk,
case Chunk of
{ok, _} ->
loop(Master, File, file:read(File, ChunkSize),
ChunkSize);
eof -> file:close(File), eof_loop(Master, From)
end;
close -> file:close(File);
{'EXIT', Master, _} -> file:close(File);
_ ->
loop(Master, File, Chunk, ChunkSize) % ignore unknow
end.
eof_loop(Master) -> % wait for eof request
receive
{read, From} -> From ! eof, eof_loop(Master);
close -> ok;
{'EXIT', Master, _} -> ok;
_ -> eof_loop(Master)
end.
% speed testing function
% test_read(FileName, ChunkSize) -> ok | {error, invalid}
test_read(FileName, ChunkSize) ->
{ok, File} = open(FileName, ChunkSize),
eof = test_read_loop(File, read(File)),
close(File).
test_read_loop(File, {ok, _}) ->
test_read_loop(File, read(File));
test_read_loop(_, eof) -> eof.
But
nlt_map_reduce
code is too complicated, bad readable and what is the worst, 20% slower on single core. All this indicate, that there is some problem and I think it is dictionary sending between processes. Dictionary is copied every chunk and it cost to much. Then I want rewrite it to more fancy code and fall back to fold-reduce concept, because this concept send less dictionaries.
No comments:
Post a Comment