Extending Parallelism in a Collaborative Filtering Prediction System with OpenMPI[1]
Bryan Mesich, Matthew Piehl, William Perrizo, Greg Wettstein, Prakash Ranganathan[2]
Department of Computer Science
North Dakota State University
Abstract
The Collaborative Filtering Prediction System (CFPS), developed at North Dakota State University by Dr. William Perrizo with assistance from Dr. Greg Wettstein, was designed as a response to the Netflix Prize announcement in October of 2006. The goal of the competition was to develop a collaborative filtering algorithm that could be used to predict user ratings for movies based on a users' previous ratings. A $1,000,000 prize would then be awarded to the team who could surpass Netflix's own Cinematch algorithm by a margin of 10%. As the title suggests, an implementation with OpenMPI was developed with the notion that we could increase performance by making additional processors available to the application in a distributed memory environment.
This paper is divided into the following sections:
Section 1: Overall architectural overview of the original Collaborative Filtering Prediction System (CFPS), Section 2: Area of improvement, Section 3: Architecture overview of modified CFPS, Section 4: Performance Benchmarks and section 5: Reflection
1 INTRODUCTION
In recent years, one of the many artifacts of a digital society has been the large accumulation of data it generates. Demands are ever increasing to extract or “mine” useful data from these large “raw” data sets. The current trend in comity based systems has been the mulit-core architecture. Leveraging the theoretic peak performance of these powerful multi-core processors is a key concept used when running in a high performance computing environment. Exposing and utilizing parallelism in the many different levels depends on the effort of the architectural designer and programmer. Though great effort has been made in recent years to leverage the multi-core architecture now found in most comity systems, a gap between an applications ability to use the parallelism continues to expand. In order to affectively mine these large data sets, efficient use of parallelism in a multi-core environment is needed. We focused our efforts on expanding parallelism to a recommender system. Recommender systems provide users with personalized suggestions for products or services. These systems often rely on Collaborating Filtering (CF), where past transactions are analyzed in order to establish connections between users and products. The
most common approach to CF is based on neighborhood models, which is based on similarities between products or users [1]][2][3][4].
Two major problems that most CF approaches have to
contend with are scalability and sparseness of the user profiles. To deal with the large amount of data that grows continuously in a prediction system, an effective algorithm should be designed to 1) scale its parallelism 2) perform incremental retraining and updates for attaining on-line performance, and 3) fuse information from multiple sources, thus alleviating information sparseness. The goal of our research is to focus on the first part, which allows theexecution of our CFPS application to run in parallel across a distributed memory environment. This ability decreases the application run time, thus allowing us to use the saved computing cycles on other resources.
1.1Overview Architecture
The CFPS developed at NDSU originally ran on a single SMP system, although it was later used in a distributed memory environment. Before making a prediction run, a configuration file and an input file are required to run the application. The configuration file contains settings the application will use to predict user ratings. The input file contains numeric movie and user ids that the application will make predictions on. A sample input file might look like the following shown in figure 1. Movie ids are denoted with an ending colon and all others are user ids. User ids directly following amovie id are associated to that movie and end when a new movie id is defined.
For example, in figure 1, movie 12641 has 4 users associated to it and movie 12502 has 2 users associated to it. Multiple movies and their corresponding user associations can be defined in a single input file.
Figure 1: Movies and User association
After a configuration file and input file have been created, the program can make a prediction run. The prediction application takes advantage of parallelism by forking n number for processes, each one predicting user ratings for a given movie. The variable n can be defined by the user at run time, but it is usually set to the number of processors (or cores) available to the system. In our case, we were running on a dual quad-core Penryn based system, so n would be set to 8 to run with 8 processors. The forked processes also share the user and movie PTrees that get loaded on start-up, which when loaded consume about 8 GB of memory. The concept of the prediction system using shared memory is the one that we missed initially.
When a child process has finished making predictions, it returns to the parent process and another child process is forked. The new process will then predict user ratings for a new movie. This continues until the end of the input file has been reached, with a maximum of n number of movies being predicted upon at any given time. Shown in figure 2 is pseudo code that depicts the basic flow of mpp-mpred.C. This is also the the main entry point for the prediction application. The distributed memory environment utilized to run the prediction application consisted of 32 nodes, each Penryn based with a total of 256 processors (32 nodes * 8 processors). In order to run the prediction application, a submit script called mpp-submit was used to allocate resources on the cluster using SLURM (Simple Linux Utility for Resource Management). SLURM is an open source resource manager commonly used in Linux clusters to perform cluster management and job scheduling tasks. As a cluster resource manager, SLURM has three key functions. First, it allocates exclusive and/or non-exclusive access to resources (computational nodes) to users for a period time so they can submit work. Second, it provides a framework for starting, executing, and monitoring jobs on a set of allocated nodes. Finally, it arbitrates conflicting job requests for resources by managing a queue of pending work. The submit script would allocate one node per job. Each time a job is allocated, a different input or configuration file (or both) would be provided to the prediction application.
An output file is generated for each movie, which includes the predicted user ratings for a movie. The individual output files are then written to local disk in a scratch directory. After all the predictions have been made, the output files are combined, RMSE is calculated, and a single file containing the results is written to the user specified output directory. Figure 3 shows a simplistic view of the overall architecture. Note that each job is associated with one node and each job can take any number of configuration and input file variations. A total of 30 jobs can run simultaneously, since there are 30 nodes available on a SLURM production partition.
main {
load_ptrees()
loop (input_file) {
user_list = get_users(movie);
if ( processes < n ) {
fork()
Predict(movie, user_list,..)
exit()
}
else {
wait()
}
}
}
Figure 2: Psuedo code for mpp-mpred.C
Figure: 3 CFPS architecture
2. Area of Improvement
After reviewing the existing code base, we noticed the benefits that OpenMPI could bring to the table when large input files were used. By using MPI, we could remove the 8 processor limitation by utilizing processors from other nodes in the cluster (see figure 8). We decided to approach the problem by utilizing the commonly used master-slave configuration, as shown in figure 4. When using MPI, each node (or process) is assigned a rank. In this case, rank 0 is considered the root/master process and all others are considered as subordinate/slave processes. The master in this case would control a job queue and sends jobs off to the slaves as needed. Depending on how a user allocates resources on the cluster, rank n can be considered as a physical node, or a single process on a physical node. Typically the -n or -N switch is used to distinguish this when allocating resources with SLURM. Our initial redesign had resources being allocated at the process level, which in the end did not work for us due to the shared memory requirements. To better illustrate this point, figure 5 contains pseudo code for main() in mpp-mpred.C. In our case we are allocating resources with the -n switch, which is used for allocating n number of processes. The 'mpirun' command is then called to execute mpp-mpred. An important concept to understand here is that main gets executed by all the participating processors. Since we have 8 processors per physical node, main() can be executed up to 8 times on each node. Each process that is participating is automatically assigned a unique rank when the MPI_Init() function is called. We then use the if statement shown in figure 5 to decide if the do_master() or do_slave() function is called. This decision is only made once per execution.
Figure 4: Master Slave Configuration
main {
MPI_Init()
MPI_Com_rank(..., &rank)
if ( rank == 0) {
do_master()
} else {
do_slave()
}
MPI_Finalize
}
Figure 5: pseudo code for main() in mpp-mpred.C.
while ( i < total_work ) {
MPI_Recv(JOB_TAG);
if ( ++i < total_work ) {
MPI_Send(work, WORK_TAG, slave_rank,..)
} else {
MPI_Send(DONE_TAG, slave_rank,...)
}
}
return
Figure 6: pseudo code for do_master function
Figure 6 shows pseudo code for what the typical do_master() function contains. As you can see, a slave executes main and falls into the else statement shown in figure 5. The rank 0 process receives the request for work and then decides to send work, or to finish if there is no more work available. The slave function is shown in figure 7.
while (1) {
MPI_Send(JOB_TAG, rank_0)
MPI_Probe()
if (TAG == WORK_TAG) {
MPI_Recv(work, WORK_TAG)
do_work(work)
} else if ( TAG == DONE_TAG ) {
MPI_Recv(DONE_TAG)
break
}
return
}
Figure 7: pseudo code for Slave function
As shown by the pseudo code, the slave process loops in a while loop as it processes work sent to it by the root node. The while loop is broken once the root node indicates that there is no more work to be done. We had initially planned on removing the process forking the original code base implemented in favor of MPI. However, by allocating resources on a per-process basis, we could not take advantage of shared memory. Each process we allocated would need to load local user and movie PTrees separately before any predicting could take place. This limited us to only being able to run 3 -4 processes per physical node due to wasted memory usage. The solution to our problem was to use a hybrid model where MPI and process forking were used in conjunction. MPI gave us access to more processors and process forking on each physical node allowed us implement shared memory.
3. Implementation
Even though our first implementation attempt failed, it was relatively easy to rebound from since the MPI architecture really didn't change much. Instead of allocating resources at the process level, we allocate resources at the node level as shown in Figure 8. This way each node executes main() (mpp-mpred) only once.
Figure 8: Architecture allocating resources at node level
Processes will then be forked on each slave node, just as was done in the original version (we actually reused the old code). We still use MPI for job allocation between the master and slave nodes. Figure 9 shows the modified pseudo code for the do_slave() function. The major difference here is the process forking. The slaves continue to request jobs until the maximum number of simultaneous processes have been reached. When a process finishes, the node
load_ptrees()
while ( have_work ) {
if ( n > max_processes ) {
wait_for _exit()
} else {
MPI_Probe()
if ( TAG == JOB_TAG ) {
MPI_Recv(JOB_TAG)
fork()
do_work(work)
} else {
MPI_Recv(DONE_TAG)
wait_for_everyone_to_finish()
break
}
}
}
return
Figure 9: Modified code for Master function with process forking
requests a new job from the master node. If work is available, a new process is forked. If the master node indicates it’s time to finish, the slave waits for the remaining processes to finish before exiting. This is similar to the psuedo code shown in figure 2. Also note the load_ptrees() function above the while loop. In our implementation, the master node does all the command line parsing and then passes this information to the slaves via MPI. The slaves then use this information to do some required initializing before the PTrees can be loaded. There are a few downfalls to using this architecture, the most important being the unused processes on the master node. Only one process is utilized on the master node, which is used for job management between it and the slaves. Ideally we would want to use the remaining processes, 7 in our case, to do slave work. Our initial approach to this problem was to fork processes on the master node, but the application would need to check 2 separate job queues. This is a problem because the master process does a blocking MPI_Probe() when waiting for job requests to come in from the slaves.
We could have implemented a non-blocking probe with MPI_IProbe(), but to do this eloquently while watching the local job queue was past our skill set. We decided not to implement slave processing on the master node due to the limited time we had remaining in the semester to finish the project. I was however interested in how to implement a solution for this problem. Further research pointed me to use threading in conjunction with MPI_Init_thread(). This modified version of MPI_init() allows a user to select a desired level of threading support, allowing the threads to directly interface with the MPI_Comm_world communicator. MPI_Send() and MPI_Recv() functions work just as if they were called from the parent. This is big win for us, since we can dispatch jobs from the same queue over MPI. This architecture is depicted in figure 10. Note the addition on the green rectangles representing forked processes on the slave and the master node sharing resources with a slave. It is important to use standardized message passing tags in this environment since the slave threads on the master node will be able to receive messages sent from other slaves intended for the master process.
In order to compile an OpenMPI program, special compilers called mpicc or mpiCC are used in place of a standard compiler. The mpicc/mpiCC compilers are really just pgcc/pgCC under the hood. The MPI specific compilers set library paths to the appropriate MPI header files and set a few compiler switches. To get the prediction application to compile with MPI support, the top-level makefile in the project directory was modified to use mpicc/mpiCC.
Figure 10: Job dispatch over MPI
The submission script, mpp-submit, was heavily modified to work with our hybrid model. Additional shell scripts were also created for pre and post processing tasks, which include:
•mpp-scratch – Create scratch directories
•mpp-destroy – Delete scratch directories when finished
•mpp-gather–Send output files to common NFS directory
The mpp-gather script is an important addition because mpp-glue cannot combine the output files when they are spread across the slaves nodes in scratch directories.
3.1.Benchmarking
We also calculated total run time with mpp-submit by calculating the time difference from start to end (including post-processing). Figure 11 shows run times for the different configurations we submitted with. The red and blue bars represent our modified prediction code with 8 and 12 processes forked on the slaves. The green and yellow bars show the original non-MPI code with the same parameters.
We seem to be getting diminishing returns after 3 nodes are allocated to a job. We were under the assumption that we should see better performance with each additional node since we can run 8 additional predictions. To investigate this, we ran 'top' on the slave nodes while they were running a job. We were surprised to see only 4 to 5 processes were being forked simultaneously. This is obviously non-efficient, but we are not sure where the contention is located. Our guess is that MPI has certain amount of overhead we have not accounted for or that the master node can't distribute jobs fast enough to the slaves.
Figure: 11, Run times MPI vs Hybrid
The next step in finding the contention would be to profile the program in hopes of finding a bottleneck. The additional overhead MPI has is apparent when looking at the benchmarking for a single allocated node. The non-MPI code is clearly the winner when only one node is utilized. We also noted the effect the kernel page caching can have when loading Ptrees. If the Ptrees had been loaded by a previous job, the time to load Ptrees on a subsequent job was cut from 3 – 4 minutes to 15 to 20 seconds.
- Conclusions
As we look back at our project, it is clear to us that understanding the problem “fully” before starting implementation would have helped us. We were complacent and somewhat arrogant in thinking that this would be an easy task to complete. We have noticed strange race conditions that are appearing when running with large node allocations (6 or more), but are unsure where they are coming from. The problem appears to be with the master node sending INIT_TAG messages to the slave nodes after they have finished their initialization. We could easily remedy the problem by having the slaves parse the command line arguments rather than having the master do this for them. We then remove the INIT_TAG messages that are being sent from the master to the slaves in favor of a simpler architecture. The reason we went down this path in the first place was to remove unneeded PTree loading on the master node. However, after investigating the possibility of threading slave jobs on the master node, the PTrees would need to be loaded anyway.