Lately I encountered an unusual architecture which asked for some unusual software design. We needed a service which will aggregate the output of arbitrary number of clients with each client doing its part of a larger job. The tricky requirement was to make the server capable of working on many such aggregations in parallel. To make things even more complicated, each client will also be working on (and sending results of) multiple jobs at the same time. Finally, the amount of data a client sends is nearly arbitrary – from bytes to megabytes. This requires a solution that you will not find in textbooks.
Before diving into this task, lets briefly refresh ourselves on the usual designs for a network service gravitate around one of the following:
- One process serves requests sequentially. This is rarely used, although has certain use cases.
- One process handles the network (socket) and dispatches each request to a new process (or thread).
- Same as previous, but with a predefined number of worker threads which serve requests ob the first-free basis. This is often refereed to as non-blocking I/O (NIO) server.
At first, we had to decide whether to use one connection between each client and our server or not. Saying yes here would ask for a more complex messaging protocol and data marshaling inside the server (since we’ll assign each client a worker process, the latter will have to distinguish between jobs). Also, we knew that the client is not one single monolithic process, but rather a bunch of separate processes scoped by the job they work on. So, we decided to bite the bullet and try building an architecture with per-job worker processes, each receiving data from multiple clients (client multiplexing server).
Easier said than done. We had to write in C and working with sockets (more precisely, file descriptors; we’ll see later why this is important) does not really give you too many options. The classic NIO-style approach in a, say, Java application would be to have one process deal with all socket I/O, putting input and reading output from queues, associated with the worker threads that do the actual work (for a good example, see the Apilator project). We saw several deficiencies here: first, in our case we did not expect any responses to be sent (even if such were to be introduced later, their size would be significantly smaller that the input); second, placing all I/O on a single process may result in overloading it – or, at least, underloading the remainder; finally, doing such thing in pure C would not be the easiest thing in the world. We felt that only simple things could be solid, so looked for something else.
Having one process only accept new connections and then letting a separate one do all the I/O and data processing seemed much better. In fact, it is close to the proven accept-then-fork scenario the Unix world knows. This is perfect when a worker process need to serve one client… but remember, we had to accept data from multiple clients (read: multiple TCP streams) in each worker process. Forking just won’t work here, so we looked at threads and their coolest feature, the ability to share memory between processes.
Simply replacing fork() with pthread_create(), however, does not yield much of a benefit by itself. We needed to solve the main riddle: how to pass new connections to the worker thread while it keeps working on existing ones. As of the sockets. We already knew we did not want expensive stuff like O_NONBLOCK; again, we wanted things as simple, light (and portable) as possible. Our scenario allowed us to freely use blocking sockets with a select() on them to only read() when they are ready. Still, we needed to separate sockets by worker thread, so we laid our eyes on an nice network feature in C called file descriptor sets (FD set). A FD set is merely a bitmask which can be use to filter out only certain sockets from all (yes, you could easily implement this yourself, but POSIX provides this for free). Marrying select() with FD sets seemed a step in proper direction: in each worker thread we could block on select(), then filter out only sockets that are of interest to this particular worker thread and read() from them. Here is some skeleton code (with certain check omitted for brevity):
// Create TCP socket int tcp_socket = socket(...); // Declare 2 FD sets - one will be populated by use, // second by select() and zero the first fd_set sock_set_read, sock_set_active; FD_ZERO (&sock_set_active); // Add our socket to our FD set FD_SET (tcp_socket, &sock_set_active); while(1) { // Copy our list, because select() will overwrite it sock_set_read = sock_set_active; // Block until a socket becomes ready to yield data select (FD_SETSIZE, &sock_set_read, NULL, NULL, NULL); // Loop over all sockets, filtering only ours, // then accept() from it for (i = 0; i < FD_SETSIZE; ++i) { if (FD_ISSET (i, &sock_set_read)) { int new_conn = accept (tcp_socket, ...); // Add the new socket to our set so that we may // read actual data form it FD_SET (new_conn, &sock_set_active); } } }
We felt we’re on the right track, but needed more work to do. The main process had to figure out what to do with each new request – whether there is a worker thread already for it, or whether a new one should be created. Sadly, TCP sockets have no way to sending meta data, so we had to make this a part of the application layer – the protocol that we put on top of TCP. The good thing about TCP is that it guarantees the delivery order – so we just had to make sure that the first packet we send over a new TCP link contains the routing information for it. All the main process had to do was read this first packet and determine whether there is a suitable worker thread or a new one should be created.
Knowing the routing path, we still needed a way for the main process to actually pass the worker thread each new socket. Was it for objective language like Java or C++, we would have an easy way to go (e.g., by using a public method on the worker thread). Being on C, we had to resort to its most powerful tool: *pointers. Instead of keeping the FD set inside each worker thread, we decided to store the FD sets of all worker threads in the main process – and only give each worker thread a pointer to its own set. Bingo! We could now let the main process add sockets to the proper thread once a new connection arrives. Some code to illustrate:
// Allocate a new FD set, zero it, add the new connection to it fd_set *thread_fd_set = malloc(sizeof(fd_set)); FD_ZERO (thread_fd_set); FD_SET (new_conn, thread_fd_set); // Create new thread with the pointer to the FD set as argument pthread_t thread_id; pthread_create(&thread_id, NULL, ..., (void *)thread_fd_set); // In the thread entry function, simply cast the argument // to fd_set* and loop over it the same way as in the above code
Feeling more happy that we should at this point, we were suddenly met with an unexpected obstacle: the worker thread is actually blocked in select(), so even if we modify its FD set and add a new socket with data ready to be harvested, the worker thread will not know of this – until it gets out of the select() and re-reads its list of assigned sockets. We revised several ways of resolving this, including signals, but they all seemed too bulky; programming is an art and we need a state of the are solution… so we invented one. To our rescue came the fact that in C sockets are just a subset of entity known as file descriptors (remember, in Unix everything is a file after all). We wondered whether it would be possible to pre-supply each worker thread with one more socket (control socket) that the main thread could write to; if so, when a new connection is added to the FD set to a worker thread, the main process will write some (unimportant) data to this control socket. As a result, the select() in the worker thread will see it and will unblock, allowing self to pick up the newly added socket on its next run. Sounded fantastic, and even more fantastic was the fact that C had exactly the tool we needed: (in-memory) pipes. Pipes are not sockets, but their ends are “seen” as file descriptors, and – just like sockets – we could put a pipe’s end in a FD set. So, whenever a new worker thread is created, we supply it not only with the initial client socket to work on, but also create a pipe and add its reading end to the FD set of the worker thread while keeping the writing end with the main process. Voila!
The previous code with this addition:
// Allocate a new FD set, zero it, add the new connection to it ... // Create a pipe and push its reading end as a socket // in the FD set of the worker thread int pipefd[2]; pipe(pipefd); FD_SET (pipefd[0], thread_fd_set); // Create thread ... // Save the writing end of the pipe; whenever a new socket // is added to the FD of the worker thread, write something // to the pipe to make the select() exit and pick up // the newly added socket on its next iteration FD_SET (new_conn, thread_fd_set); write (pipefd[1], ...);