More than counting words: mrjob for processing log files using AWS EMR and hadoop

When looking at online docs and examples for map reduce programs, authors frequently turn to the map-reduce equivalent of “Hello, World!”: a word-count example. I found it hard to interpolate from counting words to a log-processing problem. I was attempting to correlate two different kinds of events for the same user. I couldn’t just read each line, split on word boundaries, and so on. I really struggled with how to define the problem until I found Yelp’s mrjob package for Python.

mrjob has three modes of operation: local, EMR, and hadoop. The local implementation suffers a bit in terms of performance. You can test for correctness, but you can’t predict performance until you actually run against an AWS EMR cluster or a hadoop cluster.

The biggest value of the package is that mrjob forces you to think about how to structure your data as key-value pairs. That is the only way it operates: each map, combine, or reduce function must yield (see Python generators) a key, value pair. The a-ha moment for me: The secret to success is to carefully construct your keys, values, or both using tuples. After that I started producing code that actually *used* map-reduce. Using mrjob helped me think more clearly about my Spark scripts, too.

The sample code, below, illustrates the tuple concept. The goal of the script is to produce output that shows the histogram of activation and usage events. Each log entry is its own json string.

from mrjob.job import MRJob
from mrjob.step import MRStep
import json
import datetime


class MRCohort(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.group_by_user_id),
            MRStep(reducer=self.join_events),
            MRStep(reducer=self.count_events),
        ]

    def group_by_user_id(self, _, line):
        week = 0
        # parse each line in the file
        j = json.loads(line)
        if "time" in j:
            week = self.mapTimeToWeek(j['time'])
        if "event" in j:
            if j['event'] == "user_activation":
                symbol = "A"
                yield (j['user_id'], [symbol, week])
            if j['event'] == "auth_granted":
                symbol = "B"
                yield (j['user_id'], [symbol, week])

    def count_events(self, key, count):
        yield key, sum(count)

    def join_events(self, user_id, values):
        events = []
        for value in values:
            if value[0] == "A":
                events.append(value)
            if value[0] == "B":
                for event in events:
                    yield [user_id, event[1:] + value[1:]], 1

    # Get the week of the year based on time
    def mapTimeToWeek(self, time):
        timeData = time.split('T')[0].split('-')
        year, month, day = int(timeData[0]), int(timeData[1]), int(timeData[2])
        return datetime.date(year, month, day).isocalendar()[1]

if __name__ == '__main__':
    MRCohort.run()

The map step in group_by_user_id yields a key that is the user ID and a value that is a tuple of (symbol, week). Symbol is set to either A or B. It is nothing more than a hint to the automatic sorting such that the activation events for a given user will precede the authorization events for that user. The output of the map step looks something like this:

(user1, (A, 24))
(user1, (B, 24))
(user1, (B, 24))
(user1, (B, 25))
(user1, (B, 26))
(user2, (A, 25))
(user2, (B, 26))

The mapped RDD tells me that user1 was activated in week 24, used the system twice in week 24, once in week 25, and once in week 26. user2 was activated in week 25 and used the system only once, that coming in week 26.

Now I need to reduce the data using join_events. The function expects to receive the activation event for a user ID before any of that user’s authorization events. When it encounters a line of data with an ‘A’, it saves that value. When it finds a ‘B’ it yields a new kind of key-value pair that joins activation data with authentication data:

((user1, 24, 24), 1)
((user1, 24, 24), 1)
((user1, 24, 25), 1)
((user1, 24, 26), 1)
((user2, 25, 26), 1)

After the first reduce step, our keys are tuples composed of user ID, week activated, and week authenticated. The value for each is 1 so that we can add them up in the next reduce step. Notice that the output here is one line per activation event.

The second reduce step is very simple. It yields a key and the sum of all the counts for lines having the same key. When complete, the output looks like this:

((user1, 24, 24), 2)
((user1, 24, 25), 1)
((user1, 24, 26), 1)
((user2, 25, 26), 1)

From the output we can see that user1 was activated in week 24 and used the system a total of 4 times over 3 weeks. user2 was activated in week 25 and used the system once in week 26.

The real value of this solution comes when you use it to process hundreds of thousands of events. When I ran this script I saw a 10x performance improvement over the equivalent procedural code it replaced.

Advertisements