Instagram Geolocation Analysis with MPI

Introduction

MPI stands for Message Passing Interface. The aim of this project is to analyze a large data set of 3.2 million Instagram posts and classify the posts based on their geo-location information. Each post occupies a single line in bigInstagram.json. The basic idea is to read each line and extract the post coordinates, then identify the box in which the post occurred and increment the counter of the identified box.

Parallelism

Given two processes working together, Process 0 deals with even-numbered lines (i.e. Line 0, Line 2, …) while Process 1 works on odd-numbered lines (i.e. Line 1, Line 3, …). If $P$ processes are working together, process $p$ handles line $n\ \forall n \mod P = p$.

Disk I/O Overhead

Unlike database systems, it is hard to conduct ad-hoc queries on flat file. In fact, a new line is signified by EOL. For example, Process 0 cannot directly jump to Line 2 because each process has to iterate over all characters to determine a new line. As a result, given n processes working together, bigInstagram.json will be read thoroughly for n times.

Execution time - Number of tasks

For implementations with high execution efficiency like Java and C++, the time spent on disk I/O will be significant comparing with the time spent on data processing. The figure above shows the execution time on 1 node and 8 cores. For Java, the execution time increases dramatically when the number of tasks increases. However, Python implementation benefits from multi tasks.

Data Reduction

Each process has a list of counters for the number of posts happened in each box. Before the program finishes, all counters will be summarized to the root process by summation A set of integer lists is reduced to a single list. The communication overhead is negligible because MPI.COMM_WORLD.Reduce() is only invoked once by each process.

Job Submission

one node one core

#!/bin/bash
#SBATCH -p physical
#SBATCH --time=00:05:00
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
printf "$SLURM_JOB_NODELIST\n"

# Python
module load Python/3.5.2-goolf-2015a
mpiexec -np 1 python3 Instagram.py melbGrid.json bigInstagram.json

# Java
module load MPJ-Express/0.44-goolf-2015a-Java-9.0.4
javac -cp .:gson-2.8.2.jar:$MPJ_HOME/lib/mpj.jar *.java
mpjrun.sh -np 1 -cp gson-2.8.2.jar Instagram melbGrid.json bigInstagram.json

In line 4, --nodes=1 requests a single node. In line 5, --ntasks-per-node=1 requests sufficient resources to launch 1 task. Line 6 prints the list of nodes allocated to the current job, e.g. spartan-bm013 (single node) or spartan-bm[007,013] (double nodes).

SBATCH requests resources but does not launch tasks. -np specifies the number of processes to start. In this case, only 1 process will start.

one node eight cores

...
#SBATCH --ntasks-per-node=8
...
# Python
mpiexec -np 8 python3 Instagram.py melbGrid.json bigInstagram.json
# Java
mpjrun.sh -np 2 -cp gson-2.8.2.jar Instagram melbGrid.json bigInstagram.json

--ntasks-per-node=8 requests sufficient resources to launch up to 8 tasks, i.e. eight cores. Python will start 8 concurrent processes. Java will start only 2 concurrent processes due to the disk I/O overhead.

two nodes eight cores

...
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=4
...
# Python
mpiexec --map-by node -np 8 python3 Instagram.py melbGrid.json bigInstagram.json
# Java
mpjrun.sh -np 2 -cp gson-2.8.2.jar Instagram melbGrid.json bigInstagram.json

--nodes=2 and --ntasks-per-node=4 request sufficient resources to launch 8 tasks on 2 nodes, i.e. four cores per node. --map-by node launches processes one per node, cycling by node in a round-robin fashion.

Execution Time

1 node 1 core vs 1 node 8 cores

For both Java and Python, the execution time decreases. Obviously, 8 cores provide more computational resources than 1 core. Also, multiple cores permit several tasks to be executed simultaneously.

Execution Time (Mean with Standard Deviation)

1 node 8 cores vs 2 nodes 8 cores

For Java, eight cores concentrated on one single node or distributed on two nodes only change the execution time slightly. In fact, MPJ Express is not properly configured on spartan. Although $SLURM_JOB_NODELIST (a slurm constant) contains two nodes like spartan-bm[007,013], only one unique node is returned by MPI.Get_processor_name(). It was this problem that pushed me to rewrite the program in Python.

For Python, tasks can be distributed to different nodes by mpiexec in a round-robin fashion. The computing performance varies from node to node. Additionally, network communication has more ramdom factors than inter-process communication. As a result, the execution time for 2 nodes has a much larger variance than the one for 1 node.

Classification Result

C2: 174,128 posts
B2: 22,212 posts
C3: 18,184 posts
B3: 6,187 posts
C4: 4,141 posts
B1: 3,311 posts
C5: 2,605 posts
D3: 2,313 posts

D4: 1,857 posts
C1: 1,595 posts
B4: 1,023 posts
D5: 783 posts
A2: 477 posts
A3: 467 posts
A1: 262 posts
A4: 131 posts

C-Row: 200,653 posts
B-Row: 32,733 posts
D-Row: 4,953 posts
A-Row: 1,337 posts

Column 2: 196,817 posts
Column 3: 27,151 posts
Column 4: 7,152 posts
Column 1: 5,168 posts
Column 5: 3,388 posts