You can see a live demo of the following tutorial at chat.tech9computers.com

Getting into the code

In the previous part, I introduced the chat system and how it should work. Now we are going to write the chat_postoffice, chat_mailbox, chat_room, and chat_web modules. Before you read download the source for this tutorial here. and dedicate about 20 minutes to read through this tutorial, because it's kind of long (sorry).

UPDATE 7/2/11: The source code is also available here: https://github.com/chrismoos/erl_chat_tutorial. You can checkout the 0.1 tag.

Setting up the project

When you look at the source, take a look at the structure of the files and folders. There is a makefile to help you build and run the project. To run the server, just do the following:

$ cd erl_chat
$ make erl_chat

This will compile all the code, and start the OTP application. The web server runs on port 8000.

The Mailbox Process

Before we set up the post office, let's define how a mailbox will work. First, it will run in its own process and keep a list of messages. Essentially it is just a loop, that accepts the following messages:

  • {mail, {msg, Data}}
  • {mail, {add_listener, {MsgID, Pid}}}
  • {mail, {remove_listener, Pid}}
  • {mail, {get_msg_id, Pid}}
  • quit

To create a mailbox, the post office will spawn a process and then the chat_mailbox:start function is called. The start function sets the initial state and then puts the process into hibernation.

start

start(ID) ->
    proc_lib:hibernate(?MODULE, loop, [#state{id=ID}]).

The following are the record definitions for the mailbox:

-record(message, {
id,data
}).

-record(state, {
id,cur_id=0,listeners=[],messages=[]
}).

NOTE: What is proc_lib:hibernate

This function helps keep the memory footprint much lower for a process. It is very useful when a process only does action when it receives a message. It discards the call stack and garbage collects for the process.

Now let's write the loop function for our mailbox. It just receives a message for our process and does something with it. Here is the entire loop function.

loop

loop(#state{id=ID,cur_id=CurID,messages=Msgs,listeners=Listeners} = State) ->
    receive
        {mail, {add_listener, {_MsgID, _Pid} = Listener}} ->
            NewState = notify_listeners(State#state{listeners=[Listener | Listeners]}),
            proc_lib:hibernate(?MODULE, loop, [NewState]);
        {mail, {remove_listener, Pid}} ->
            NewListeners = lists:filter(fun({_Id, P}) -> P /= Pid end, Listeners),
            proc_lib:hibernate(?MODULE, loop, [State#state{listeners=NewListeners}]);
        {mail, {get_msg_id, Pid}} ->
            Pid ! {cur_msg_id, CurID},
            proc_lib:hibernate(?MODULE, loop, [State]);
        {mail, {msg, Data}} ->
            Msg = #message{id=CurID,data=Data},
            NewState = notify_listeners(State#state{messages=[Msg | Msgs],cur_id=CurID+1}),
            proc_lib:hibernate(?MODULE, loop, [NewState]);
        quit -> ok;
        _ -> proc_lib:hibernate(?MODULE, loop, [State])
    end.

Okay, let's break down everything in that loop.

add_listener

When a new listener is added, the first thing we do is call notify_listeners, which will try to update the newly added listener(and any others) with any messages that have a higher/or equal message ID. Well go over notify listeners in a little.

remove_listener

This just goes through the list of listeners, if one matches the given Pid, remove it.

get_msg_id

This just returns the highest message ID (so the client knows what the latest message ID is).

msg

This adds a message to the mailbox. notify_listeners is also called to try and update anyone interested.

quit

Tells the mailbox to quit (and the process will exit).

Notifying listeners

Finally, our mailbox needs a notify_listener function. This will go through each listener and then check to see if any messages are available, and if so, send them to the Pid. If a listener is notified, it is removed also.

notify_listeners

notify_listeners(#state{listeners=Listeners,messages=Msgs,cur_id=CurID} = State) ->
    NewListeners = lists:filter(fun({MsgID, Pid}) ->
        case MsgID >= CurID of
            true -> true;
            _ ->
                % Select messages that are greater than or equal to the requested ID
                case lists:filter(fun(#message{id=ID}) -> ID >= MsgID end, Msgs) of
                    [] -> true; % no messages were found for this listener, keep it in the list
                    M -> Pid ! lists:map(fun(#message{data=Data,id=MID}) -> {MID, Data} end, M), false % remove it
                end
        end
    end, Listeners),
    State#state{listeners=NewListeners}.

Okay, that does it for our mailbox, now let's work on the post office, which will communicate with our mailbox.

Creating the Post Office

Now it's time to implement the post office. Without really focusing on the chat room aspects now, the post office should work somewhat like it does in real life. Ours will do the following:

  • Create/Delete mailboxes.
  • Send message to a mailbox by a unique ID (aka address).
  • Send message to all mailboxes (junk mail, broadcast).

Our chat_postoffice module uses the gen_server behavior so a lot of the code in the module is related to that. First let's define the state of our postoffice server. It is very simple and just contains a list of mailboxes.

-record(state, {
mailboxes=[]
}).

Each item in the mailboxes list will be a tuple:

Where Id is the mailbox ID, and Pid is the process ID for the mailbox. You can see in our init function that our server starts with no mailboxes(empty list).

Our create_mailbox function is defined below. First we check to see if the mailbox already exists, if not, we add it to our list and spawn off a mailbox process (we will go into the chat_mailbox module in a little bit).

create_mailbox

handle_call({create_mailbox, ID}, _From, #state{mailboxes=MBoxes} = State) ->
    case get_mailbox(ID, State) of
        {ok, _} -> {reply, {error, already_exists}, State};
        {error, notfound} ->
            Pid = spawn_link(chat_mailbox, start, [ID]),
            NewBox = {ID, Pid},
            {reply, ok, State#state{mailboxes=[NewBox | MBoxes]}}
    end;

And here is our method for deleting a mailbox. It removes the mailbox from the list and sends a message to the mailbox process to quit.

delete_mailbox

handle_cast({delete_mailbox, ID}, #state{mailboxes=MBoxes} = State) ->
    NewBoxes = lists:filter(fun({Id, Pid}) ->
        case Id /= ID of
            false -> 
                % tell the mailbox process to quit
                Pid ! quit, false;
            _ -> true
        end
    end, MBoxes),
    {noreply, State#state{mailboxes=NewBoxes}};

Now we want to create methods for sending and broadcasting mail.

To send mail, we look up the mailbox ID, and send the process a message. Note: Delivery is not guaranteed, if the mailbox process dies then it will never receive the message. In another post we will make it more fault tolerant, but for now, I wouldn't worry about the mailbox process dying.

send_mail

handle_cast({send_mail, {ID, Msg}}, State) ->
    case get_mailbox(ID, State) of
        {ok, {_Id, Pid}} -> Pid ! {mail, Msg};
        _ -> ok
    end,
    {noreply, State};

And finally, let's make our broadcast function. It takes a message and a list of ID's not to send to.

broadcast_mail

handle_cast({broadcast_mail, {Msg, Except}}, #state{mailboxes=MBoxes} = State) when is_list(Except) ->
    [Pid ! {mail, Msg} || {Id, Pid} <- MBoxes, lists:member(Id, Except) == false],
    {noreply, State};

chat_room server

This server contains the core logic of our chat room. This includes maintaining all connected users, and functions to allow a user to join or leave the chat room, get list of online users, and send/receive messages(and events).

The state of the chat_room server is defined below. At the most basic level, it is just going to contain a list of all the users in the chat room. The client state is also defined below.

-record(state, {
clients=[]
}).

-record(client_state, {
id, nick, host,last_action
}).

The chat_room server will have the following functions:

  • join
  • leave
  • wait/wait_finish
  • chat_message
  • get_users
  • get_msg_id
  • find_idle_clients

Let's start by writing the join function. This will first check to make sure that a nickname is valid(alphanumeric, no longer than 16 characters) and available. If everything is okay, we will create a mailbox for the user, notify all other users that a user has joined, and return a unique session to the caller. If an error occurs, it is returned to the caller as well.

join

handle_call({join, {Nick, Host}}, _From, #state{clients=Clients} = State) when is_list(Nick) ->
    case validate_nick(Nick, State) of
        {error, Reason} -> {reply, {error, Reason}, State};
        {ok, ValidNick} ->
            Session = get_unique_session(State),
            case chat_postoffice:create_mailbox(Session) of
                ok -> 
                    chat_postoffice:broadcast_mail({msg, {user_joined_room, ValidNick}}, [Session]),
                    Client = #client_state{id=Session,nick=ValidNick,host=Host,last_action=now()},
                    {reply, {ok, Session}, State#state{clients=[Client | Clients]}};
                {error, _} -> {reply, {error, not_available}, State}
            end
    end;

The leave function is used to log out of the chat room. It will remove the user, and notify the others that that user has left. In the next part, we will implement a timeout so that if a user hasn't done anything in a specified time, he will be removed.

leave

handle_cast({leave, {Sess, Reason}}, #state{clients=Clients} = State) when is_list(Reason) ->
    case get_session(Sess, State) of
        {error, not_found} -> {noreply, State};
        {ok, Client} ->
            chat_postoffice:delete_mailbox(Client#client_state.id),
            CleanReason =  chat_util:unicode_clean(lists:sublist(Reason, 32)),
            chat_postoffice:broadcast_mail({msg, {user_left_room, {Client#client_state.nick, CleanReason}}}, [Client#client_state.id]),
            OtherClients = lists:filter(fun(#client_state{id=ID}) -> ID /= Client#client_state.id end, Clients),
            {noreply, State#state{clients=OtherClients}}
    end; 

Chat message will broadcast a message to the chat room.

chat_message

handle_cast({chat_message, {Sess, Msg}}, State) when is_list(Msg) ->
    case get_session(Sess, State) of
        {error, not_found} -> {noreply, State};
        {ok, #client_state{nick=Nick,id=ID} = C} ->
            CleanMsg = chat_util:unicode_clean(lists:sublist(Msg, 256)),
            chat_postoffice:broadcast_mail({msg, {chat_msg, {Nick, CleanMsg}}}, [ID]),
            chat_postoffice:send_mail(ID, {msg, {sent_chat_msg, {Nick, CleanMsg}}}),
            NewState = update_client(C, State),
            {noreply, NewState}
    end;

Finally, we have ,wait, wait_finish, and get_users.

The wait function will add a listener to the user's mailbox.

wait

handle_cast({wait, {Sess, MsgID, Pid}}, State) when is_integer(MsgID) ->
    case get_session(Sess, State) of
        {error, not_found} -> Pid ! {error, bad_session}, {noreply, State};
        {ok, C} ->
            NewState = update_client(C, State), 
            chat_postoffice:send_mail(Sess, {add_listener, {MsgID, Pid}}), {noreply, NewState}
    end;

wait_finish removes the listener from a mailbox.

wait_finish

handle_cast({wait_finish, {Sess, Pid}}, State) ->
    case get_session(Sess, State) of
        {error, not_found} -> {noreply, State};
        {ok, _} -> chat_postoffice:send_mail(Sess, {remove_listener, Pid}), {noreply, State}
    end;

The get_users function just returns a list with the nickname of every user.

get_users

handle_call({get_users, Sess}, _From, State) ->
    case get_session(Sess, State) of
        {error, not_found} -> {reply, {error, not_found}, State};
        {ok, C} -> 
            NewState = update_client(C, State),
            {reply, {ok, lists:map(fun(#client_state{nick=Nick}) -> Nick end, State#state.clients)}, NewState}
    end;

That does it for the chat_room server. If you look through the code you will also see a find_idle_clients function, which runs on a timer to see if any clients have left the browser and timed out.

Finally, the web front-end

There are two pages that the end user will see. The login page, where they can choose a nickname, and the chat page. There are two templates that we will be using, called index.html, the login page, and chat.html, the chat screen. The rest of the interaction between the web browser and the server will take place with AJAX.

First, let's make it so a user can join the chat room. Take a look at the chat_web module and you will see a section called Request Handlers. These are functions that are called when a request comes in. We can look at the path to decide what to do.

At the bottom of our request handlers, we have one that will just try to serve a file from our docroot directory. This is where we will store our javascript and any other static files.

handle_request(Req, Path) ->
	Req:serve_file(string:sub_string(Path, 2), "docroot", []).

Also, I've included some helper functions in chat_util for loading templates with ErlyDTL. You can use the Django template language in your templates, and then render them with any variables you want. We don't use this that much.

Login

Our login page, which is at the path /, is rendered like so:

handle_request(Req, "/") -> html_ok(Req, chat_util:get_template("index", []));

You can take a look at the template, it basically does a simple HTTP post to the path, /login/.

Our login handler just calls chat_room:join to attempt to join the chat room. If an error occurs, it is passed into the login template and returned to the user. If all is good, we set a cookie with the user's chat session ID.

login

handle_request(Req, "/login/") ->
    Post = Req:parse_post(),
	case chat_room:join(chat_util:get_parameter("nick", Post), Req:get(peer)) of
	    {ok, SessID} -> 
	        SessCookie = mochiweb_cookies:cookie("chat_sess", SessID, [{path, "/"}]),
	        Req:respond({302, [SessCookie, {"Location", "/chat/"}], <<>>});
		{error, not_available} -> html_ok(Req, chat_util:get_template("index", [{error, "The nickname is not available."}]));
	    _ -> html_ok(Req, chat_util:get_template("index", [{error, "The nickname must be alphanumeric and not blank."}]))
	end;

The main chat interface

Once the user is redirected to the /chat/ path and has a session cookie set, the AJAX will take over. I don't want to go too much into the javascript, but it mainly does the following:

  • Get's the current message ID and validates the session.
  • Get's the list of online users.
  • Waits for any messages to come in from the server.

Start client is called to get the message ID. If successful, it gets the online users.

startClient

function startClient() {
	addSystemMsg("Establishing connection to chat service...");
	new Ajax.Request('/chat/start/',
	  {
	    method:'get',
	    onSuccess: function(transport){
			var response = transport.responseText.evalJSON();
			
			if(response.status == "ok") {
				msgID = response.response;
				addSystemMsg("Connected to chat service.");
				getOnlineUsers();
			}
			else if(response.status == "error" && response.response == "bad_session") addSystemMsg("Unable to locate your session. Please <a href=\"/\">login</a> again.");
	    },
	    onFailure: function(){ addSystemMsg("The chat service is not responding at this time.") }
	  });
}

After the message ID and the online users are retrieved, the client enters the message loop. After a message is received, it is processed by handleServiceMsg, which basically looks at the JSON object and decides what to do(user joined, chat message, etc,.).

getServiceMsg

function getServiceMsg() {
	new Ajax.Request('/chat/wait/?msg_id=' + msgID,
	  {
	    method:'get',
	    onSuccess: function(transport){
	        if(transport.responseText == "") {
	            setTimeout('getServiceMsg();', 10000);
	        }
	        else {
	            addSystemMsg(transport.responseText);
	            var response = transport.responseText.evalJSON();
	            if(handleServiceMsg(response) == true) {
		  	        getServiceMsg();
		        }
	        }
	    },
	    onFailure: function(){ 
	        setTimeout('getServiceMsg();', 10000) }
	  });
}

getServiceMsg talks to the following handler, which just adds a listener for our session, and waits for a message.

handle_request(Req, "/chat/wait/") ->
    MsgID = chat_util:get_parameter_int("msg_id", Req:parse_qs()),
    chat_room:wait(get_session(Req), MsgID, self()),
    timer:apply_after(?COMET_TIMEOUT, ?MODULE, timeout_wait, [self()]),
    proc_lib:hibernate(?MODULE, wait, [Req]);

If there are no messages after a certain time(?COMET_TIMEOUT), a timeout message is sent and the client is told to reconnect.

When the user wants to send a message, an AJAX post is performed to the path /chat/send_msg/. The following is the handler for sending a chat message. It's pretty simple.

handle_request(Req, "/chat/send_msg/") ->
    chat_room:chat_message(get_session(Req), chat_util:get_parameter("msg", Req:parse_post())),
    json_respond(json_client_ok(<<"">>), Req);

Conclusion

Well, that's it for now. You can download the source and try it out yourself. The server runs on port 8000. (http://localhost:8000).

In the next part, I will discuss some more advanced features, such as rate limiting, limiting connections for a host, admin control, more fault tolerance(using ETS/DETS) for storing messages, and more. While you are waiting, come chat: chat.tech9computers.com