Wednesday, October 3, 2007

Keep your mailbox empty

Matthias wrote Too much mail is bad for you. Yes, it's true. He solved problem by synchronous communication between producer and consumer, but this solution blocks producer and is not "appropriate solution in general" (second comment). I think, one good solution is make your own queue manager and keep process mailbox empty. It's principally solution what recommends Vlad in that discussion. And how to do it?
-module(mqueue).

-export([timed_run/3]).

producer(0, ConsumerPid, _WithAck) ->
  ConsumerPid ! {done, self()},
  receive
      done -> ok
  end;
producer(N, ConsumerPid, WithAck = false) ->
  ConsumerPid ! {msg, self()},
  producer(N-1, ConsumerPid, WithAck);
producer(N, ConsumerPid, WithAck = true) ->
  ConsumerPid ! {acked_msg, self()},
  receive
      ack -> ok
  end,
  producer(N-1, ConsumerPid, WithAck).

consumer(M, EchoPid) ->
  receive
      {msg, _From} ->
          call_echo(M, EchoPid),
          consumer(M, EchoPid);
      {acked_msg, From} ->
          From ! ack,
          call_echo(M, EchoPid),
          consumer(M, EchoPid);
      {done, From} ->
          EchoPid ! done,
          From ! done
  end.

queue_keeper_empty(ConsumerPid) ->
  receive
      {msg, _From} ->
          ConsumerPid ! {acked_msg, self()},
          queue_keeper_waiting(queue:new(), ConsumerPid);
      {done, From} ->
          ConsumerPid ! {done, self()},
          receive
              done ->
                  From ! done
          end
  end.

queue_keeper_waiting(Q, ConsumerPid) ->
  receive
      {msg, _From} ->
          queue_keeper_waiting(
              queue:in({acked_msg, self()}, Q),
              ConsumerPid
          );
      ack -> case queue:out(Q) of
              {{value, Msg}, Q1} ->
                  ConsumerPid ! Msg,
                  queue_keeper_waiting(Q1, ConsumerPid);
              {empty, Q} -> queue_keeper_empty(ConsumerPid)
          end
  end.

call_echo(0, _EchoPid) ->
  ok;
call_echo(M, EchoPid) ->
  EchoPid ! {hello, self()},
  receive
      hello -> call_echo(M-1, EchoPid)
  end.

echo() ->
  receive
      {Msg, From} ->
          From ! Msg,
          echo();
      done -> ok
  end.

run(N, M, WithAck) ->
  EchoPid     = spawn_link(fun echo/0),
  ConsumerPid = spawn_link(
                  fun () ->
                          consumer(M, EchoPid) end),
  case WithAck of
      true ->
          producer(N, ConsumerPid, WithAck);
      false ->
          producer(N, spawn_link(
                  fun() ->
                      queue_keeper_empty(ConsumerPid)
                  end
              ), WithAck)
  end.

time(F) ->
  Start = erlang:now(),
  F(),
  timer:now_diff(erlang:now(), Start).

timed_run(N, M, WithAck) ->
  time(fun() -> run(N, M, WithAck) end).
This solution is only about 25% slower than synchronous, but producer is not blocked.

No comments: