Mastering MapReduce: A Step-by-Step Java Tutorial for Big Data Processing

Matthew MacFarquhar
5 min readJul 8, 2023

--

Introduction

In this comprehensive tutorial, we explore MapReduce, a powerful programming paradigm for processing big data. I will provide a step-by-step guide to implementing a toy MapReduce program in Java, covering setup, coding, and execution.

What is Map Reduce?

MapReduce is a programming model and processing paradigm designed to handle massive amounts of data efficiently. It consists of three main stages: the Map stage, the (lesser known) Shuffle Stage and the Reduce stage.

In the Map stage, data is divided into smaller chunks and processed in parallel across a distributed computing environment. For example, taking a log file and mapping it to a key-value store of log type to count.

The Shuffle stage reorganizes the mapped data so that like terms are together. Like taking the log type to count maps and grouping all the same log types in one section.

The Reduce stage then aggregates and combines the results from the Shuffle stage to produce the final output. Like summing up all the counts for a given log type.

This approach offers significant advantages for big data processing, such as scalability, fault tolerance, and parallelism. By distributing the workload across multiple machines, MapReduce enables faster processing — compared with processing on a single machine — of large datasets, making it useful for tasks like data analysis, log processing, and machine learning.

Programming Tutorial

Now let’s jump in and start coding! First, I am going to show you a script to generate some dummy log files that we will use for our example. Then, I’ll show you the Map Agents and Reduce Agents which will correspond to the orange and yellow boxes above respectively. Finally, we will combine it all together using a thread pool to emulate a distributed system.

Log Generation Script

import random

if __name__ == '__main__':
log_levels = ["ERROR", "INFO", "WARN"]

for i in range(1, 11):
file_name = f"log_{i}.txt"

with open(file_name, "w") as file:
for _ in range(100):
log_level = random.choice(log_levels)
file.write(log_level + "\n")

print(f"Generated log file: {file_name}")

This script generates 10 files (log_1.txt to log_10.txt), each file will have 100 lines which are either ERROR, INFO, or WARN. Below is an example of the first 10 lines of one file.

INFO
WARN
WARN
ERROR
ERROR
WARN
INFO
ERROR
INFO
ERROR

Map Agent

package org.example;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

public class MapCallable implements Callable<Map<String, Integer>> {
private final String filePath;
private final Map<String, Integer> logMap;

public MapCallable(String filePath) {
this.filePath = filePath;
this.logMap = new HashMap<>();
}

@Override
public Map<String, Integer> call() throws Exception {
BufferedReader reader = new BufferedReader(new FileReader(this.filePath));
String line;
while ((line = reader.readLine()) != null) {
String key = line.trim();
int valueToPut = this.logMap.getOrDefault(key, 0) + 1;
this.logMap.put(key, valueToPut);
}
return this.logMap;
}
}

This Map Agent will take in one of these log files during its construction. When it is called, it will read each line and maintain a map for how many times it sees ERROR, WARN or INFO and return its map of log type to occurrence like {ERROR=25, INFO=35, WARN=40}. We will have 10 of these guys since there are 10 log files.

Reduce Agent

package org.example;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

public class ReduceCallable implements Callable<Map.Entry<String, Integer>> {
private final String metricName;
private final List<Integer> metricCounts;

public ReduceCallable(String metricName, List<Integer> metricCounts) {
this.metricName = metricName;
this.metricCounts = metricCounts;
}

@Override
public Map.Entry<String, Integer> call() throws Exception {
int total = 0;
for (Integer metricCount : metricCounts) {
total += metricCount;
}

return Map.entry(this.metricName, total);
}
}

Our Reduce Agent will take in the log metric counts from the Map Agent (after we shuffle all like keys together). We will have three of these, one for each log type. As an example, the inputs for the ERROR reducer will be the log type ERROR for the metricName and a list of size 10 of the counts of ERROR log lines from each of the 10 log files for the metricCounts. It then aggregates this list into a single value and sends back a Map Entry e.x.{ERROR=352}.

All Combined

package org.example;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// MAP
String[] filePaths = {
"src/main/resources/logs/log_1.txt",
"src/main/resources/logs/log_2.txt",
"src/main/resources/logs/log_3.txt",
"src/main/resources/logs/log_4.txt",
"src/main/resources/logs/log_5.txt",
"src/main/resources/logs/log_6.txt",
"src/main/resources/logs/log_7.txt",
"src/main/resources/logs/log_8.txt",
"src/main/resources/logs/log_9.txt",
"src/main/resources/logs/log_10.txt",
};

ExecutorService mapExecutor = Executors.newFixedThreadPool(10);
List<Future<Map<String, Integer>>> futureList = new ArrayList<>();
for (String filePath : filePaths) {
MapCallable mapCallable = new MapCallable(filePath);
Future<Map<String, Integer>> future = mapExecutor.submit(mapCallable);
futureList.add(future);
}
mapExecutor.shutdown();
mapExecutor.awaitTermination(1, TimeUnit.SECONDS);

// SHUFFLE
Map<String, List<Integer>> shuffledMetrics = new HashMap<>();
for (Future<Map<String, Integer>> mapFuture : futureList) {
Map<String, Integer> mappedMetrics = mapFuture.get();
mappedMetrics.forEach((key, value) -> {
List<Integer> metricCountList = shuffledMetrics.getOrDefault(key, new ArrayList<>());
metricCountList.add(value);
shuffledMetrics.put(key, metricCountList);
});
}

// REDUCE
ExecutorService reduceExecutor = Executors.newFixedThreadPool(3);
List<Future<Map.Entry<String, Integer>>> futureReducerList = new ArrayList<>();
shuffledMetrics.forEach((key, value) -> {
ReduceCallable reduceCallable = new ReduceCallable(key, value);
Future<Map.Entry<String, Integer>> futureReducer = reduceExecutor.submit(reduceCallable);
futureReducerList.add(futureReducer);
});
reduceExecutor.shutdown();
reduceExecutor.awaitTermination(1, TimeUnit.SECONDS);

// GET RESULTS
Map<String, Integer> resultMap = new HashMap<>();
for (Future<Map.Entry<String, Integer>> futureEntry : futureReducerList) {
Map.Entry<String, Integer> entry = futureEntry.get();
resultMap.put(entry.getKey(), entry.getValue());
}

System.out.println(resultMap);
}
}

First, we have the Map Stage, where we will create our Executor which has 10 threads — 1 for each log file. For each log file, we create and call a Map Agent to generate our metric count maps from our log file.

Next, we get all of our metric count maps from the first stage and shuffle the metric stats so that we have a single map — shuffledMetrics — with 3 keys: ERROR, INFO and WARN and the values of these entries is a list of the number of occurrences of these log types in the files.

After that, we spin off 3 more threads for our Reducer Agents for each log type. They will take in a log type and a list of the occurrences in each of the files and aggregate the list to send back to us.

Finally, we combing the Reducers’ map entries into a single map and print it out. {ERROR=307, INFO=339, WARN=354}.

Conclusion

This tutorial gave an overview of how MapReduce works and some useful applications for it. MapReduce is used in lots of everyday applications and is a common question which comes up in systems design interviews. Now you will be able to understand and implement MapReduce systems throughout your career.

--

--

Matthew MacFarquhar

I am a software engineer working for Amazon living in SF/NYC.