strptime ignores weeks! (under certain circumstances…)

I discovered something unexpected today: strptime from the Python time package ignores weeks in certain situations! This was unexpected.

For background, I’ve been doing some work lately with assigning user IDs to cohorts, then doing some data analytics “business intelligence” processing on the activities of members of each cohort. I have been assigning user IDs to week numbers because we are very early in the process. We don’t have much data. Ultimately we want cohort membership to be month based. We’ll shift to that when the time comes. For now, everything is based on weeks, e.g. ‘2015-24’.

The strptime problem surfaced when I attempted to convert the week string to time. My code looked something like this:

starting_time = strptime(week_string, '%Y-%U')
print starting_time

starting_time was *always* January 1st of the year. ‘2015-07’ > 1-1-2015. ‘2015-24’ > 1-1-2015. ‘2014-50’ > 1-1-2014. Convinced I was doing something wrong, I searched until I found note 6 on this page:

https://docs.python.org/2/library/datetime.html#strftime-and-strptime-behavior

It turns out the designers of strptime decided that the uncertainty of a year-week combination (which day of that week does one pick?) justified the additional requirement that the day of the week must also be included.

More than counting words: mrjob for processing log files using AWS EMR and hadoop

When looking at online docs and examples for map reduce programs, authors frequently turn to the map-reduce equivalent of “Hello, World!”: a word-count example. I found it hard to interpolate from counting words to a log-processing problem. I was attempting to correlate two different kinds of events for the same user. I couldn’t just read each line, split on word boundaries, and so on. I really struggled with how to define the problem until I found Yelp’s mrjob package for Python.

mrjob has three modes of operation: local, EMR, and hadoop. The local implementation suffers a bit in terms of performance. You can test for correctness, but you can’t predict performance until you actually run against an AWS EMR cluster or a hadoop cluster.

The biggest value of the package is that mrjob forces you to think about how to structure your data as key-value pairs. That is the only way it operates: each map, combine, or reduce function must yield (see Python generators) a key, value pair. The a-ha moment for me: The secret to success is to carefully construct your keys, values, or both using tuples. After that I started producing code that actually *used* map-reduce. Using mrjob helped me think more clearly about my Spark scripts, too.

The sample code, below, illustrates the tuple concept. The goal of the script is to produce output that shows the histogram of activation and usage events. Each log entry is its own json string.

from mrjob.job import MRJob
from mrjob.step import MRStep
import json
import datetime


class MRCohort(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.group_by_user_id),
            MRStep(reducer=self.join_events),
            MRStep(reducer=self.count_events),
        ]

    def group_by_user_id(self, _, line):
        week = 0
        # parse each line in the file
        j = json.loads(line)
        if "time" in j:
            week = self.mapTimeToWeek(j['time'])
        if "event" in j:
            if j['event'] == "user_activation":
                symbol = "A"
                yield (j['user_id'], [symbol, week])
            if j['event'] == "auth_granted":
                symbol = "B"
                yield (j['user_id'], [symbol, week])

    def count_events(self, key, count):
        yield key, sum(count)

    def join_events(self, user_id, values):
        events = []
        for value in values:
            if value[0] == "A":
                events.append(value)
            if value[0] == "B":
                for event in events:
                    yield [user_id, event[1:] + value[1:]], 1

    # Get the week of the year based on time
    def mapTimeToWeek(self, time):
        timeData = time.split('T')[0].split('-')
        year, month, day = int(timeData[0]), int(timeData[1]), int(timeData[2])
        return datetime.date(year, month, day).isocalendar()[1]

if __name__ == '__main__':
    MRCohort.run()

The map step in group_by_user_id yields a key that is the user ID and a value that is a tuple of (symbol, week). Symbol is set to either A or B. It is nothing more than a hint to the automatic sorting such that the activation events for a given user will precede the authorization events for that user. The output of the map step looks something like this:

(user1, (A, 24))
(user1, (B, 24))
(user1, (B, 24))
(user1, (B, 25))
(user1, (B, 26))
(user2, (A, 25))
(user2, (B, 26))

The mapped RDD tells me that user1 was activated in week 24, used the system twice in week 24, once in week 25, and once in week 26. user2 was activated in week 25 and used the system only once, that coming in week 26.

Now I need to reduce the data using join_events. The function expects to receive the activation event for a user ID before any of that user’s authorization events. When it encounters a line of data with an ‘A’, it saves that value. When it finds a ‘B’ it yields a new kind of key-value pair that joins activation data with authentication data:

((user1, 24, 24), 1)
((user1, 24, 24), 1)
((user1, 24, 25), 1)
((user1, 24, 26), 1)
((user2, 25, 26), 1)

After the first reduce step, our keys are tuples composed of user ID, week activated, and week authenticated. The value for each is 1 so that we can add them up in the next reduce step. Notice that the output here is one line per activation event.

The second reduce step is very simple. It yields a key and the sum of all the counts for lines having the same key. When complete, the output looks like this:

((user1, 24, 24), 2)
((user1, 24, 25), 1)
((user1, 24, 26), 1)
((user2, 25, 26), 1)

From the output we can see that user1 was activated in week 24 and used the system a total of 4 times over 3 weeks. user2 was activated in week 25 and used the system once in week 26.

The real value of this solution comes when you use it to process hundreds of thousands of events. When I ran this script I saw a 10x performance improvement over the equivalent procedural code it replaced.

Getting Spark Data from AWS S3 using Boto and Pyspark

We’ve had quite a bit of trouble getting efficient Spark operation when the data to be processed is coming from an AWS S3 bucket. Aside from pulling all the data to the Spark driver prior to the first map step (something that defeats the purpose of map-reduce!), we experienced terrible performance. In one scenario, Spark spun up 2360 tasks to read the records from one 1.1k log file. In another scenario, the Spark logs showed that reading every line of every file took a handful of repetitive operations–validate the file, open the file, seek to the next line, read the line, close the file, repeat. Processing 450 small log files took 42 minutes. Arrgh.

We also experienced memory problems. When processing the full set of logs we would see out-of-memory heap errors or complaints about exceeding Spark’s data frame size. Very frustrating.

One of my co-workers stumbled upon this site: http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219 This site describes a solution that overcomes the problems we were having. The solution is written in scala. We had to translate because we are working in Python. Here is a snippet of the scala version of their driver program, taken from https://gist.githubusercontent.com/pjrt/f1cad93b154ac8958e65/raw/7b0b764408f145f51477dc05ef1a99e8448bce6d/S3Puller.scala.

import com.amazonaws.services.s3._, model._
import com.amazonaws.auth.BasicAWSCredentials

val request = new ListObjectsRequest()
request.setBucketName(bucket)
request.setPrefix(prefix)
request.setMaxKeys(pageLength)
def s3 = new AmazonS3Client(new BasicAWSCredentials(key, secret))

val objs = s3.listObjects(request) // Note that this method returns 
                                   // truncated data if longer than
                                   // the "pageLength" above. You might
                                   // need to deal with that.
sc.parallelize(objs.getObjectSummaries.map(_.getKey).toList)
    .flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }

The key to the solution is to follow a process like this:

  1. Go directly to S3 from the driver to get a list of the S3 keys for the files you care about.
  2. Parallelize the list of keys.
  3. Code the first map step to pull the data from the files.

This procedure minimizes the amount of data that gets pulled into the driver from S3–just the keys, not the data. Then, when map is executed in parallel on multiple Spark workers, each worker pulls over the S3 file data for only the files it has the keys for.

S3 access from Python was done using the Boto3 library for Python:

pip install boto3

Here’s a snippet of the python code that is similar to the scala code, above. It is processing log files that are composed of lines of json text:

import argparse
from pyspark import SparkContext, SparkConf
from boto.s3.connection import S3Connection

def main():
    # Use argparse to handle some argument parsing
    parser.add_argument("-a",
                        "--aws_access_key_id",
                        help="AWS_ACCESS_KEY_ID, omit to use env settings",
                        default=None)
    parser.add_argument("-s",
                        "--aws_secret_access_key",
                        help="AWS_SECRET_ACCESS_KEY, omit to use env settings",
                        default=None)
    parser.add_argument("-b",
                        "--bucket_name",
                        help="AWS bucket name",
                        default="spirent-orion")
    # Use Boto to connect to S3 and get a list of objects from a bucket
    conn = S3Connection(args.aws_access_key_id, args.aws_secret_access_key)
    bucket = conn.get_bucket(args.bucket_name)
    keys = bucket.list()
    # Get a Spark context and use it to parallelize the keys
    conf = SparkConf().setAppName("MyFileProcessingApp")
    sc = SparkContext(conf=conf)
    pkeys = sc.parallelize(keys)
    # Call the map step to handle reading in the file contents
    activation = pkeys.flatMap(map_func)
    # Additional map or reduce steps go here...

def map_func(key)
    # Use the key to read in the file contents, split on line endings
    for line in key.get_contents_as_string().splitlines():
        # parse one line of json
        j = json.loads(line)
        if "user_id" in j && "event" in j:
            if j['event'] == "event_we_care_about":
                yield j['user_id'], j['event']

With these changes in place, the 42-minute job now takes under 6 minutes. That is still too long, so on to the next tuning step!

Hadoop Single-Node Cluster on Linux: Setup for map-reduce using mrjob

This week I setup a single-node Hadoop cluster in the lab. I used Ubuntu 12.04LTS running in a VMware 5.0 VM. The frustrating part of the install and configuration was the lack of good documentation. In this space, I am not going to create an exhaustive recipe for the installation and configuration because it would soon be out of date (as is one of my sources…). Instead I will share the 3 online sources that I used bits and pieces of to finally get it done.

https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html

The Apache instructions, above, are incomplete. The authors of the documentation appear to make assumptions about what we know. In other words, the document leaves out many details that, if not known, will prevent you from being successful. I don’t see how a novice could possibly succeed using these instructions.

http://www.michael-noll.com/tutorials/running-hadoop-on-ubuntu-linux-single-node-cluster/

Michael Noll’s tutorial, above, is excellent. He takes you step-by-step through the process, explaining things along the way. The problem is that Mr. Noll wrote based on an older version of hadoop. A small but significant portion of the instructions are incorrect for the latest versions of hadoop.

Using the first two sources I was able to get the hadoop cluster running, but when I submitted my first map-reduce job using mrjob, I ran into errors. The Stack Overflow answer, link below, contains a description of the error I saw *and* describes a fix. The fix updates some of the out-of-date configuration information that Michael Noll’s otherwise excellent tutorial gets wrong.

http://stackoverflow.com/questions/25358793/error-launching-job-using-mrjob-on-hadoop

Now I am able to run map-reduce python scripts based on the mrjob package. They are massively SLOW. Perhaps that will be the subject of another post.

goamz: Using Temporary AWS STS Credentials to Access S3

Writing in go and using the goamz package for Amazon AWS, I had been following the standard directions for gaining access to objects stored on Amazon S3. I used an IAM-issued awsAccessKeyId and and awsSecretAccessKey corresponding to my user ID, something along the lines of:

    auth := aws.Auth{AccessKey: "myAwsAccessKeyId",
        SecretKey: "myAwsSecretAccessKey"}
    region := aws.USWest2
    connection := s3.New(auth, region)
    myBucket := connection.Bucket("myBucketName")
    err = mybucket.Put(path,
        []byte("Hello, World!"),
        "text/plain",
        s3.BucketOwnerFull,
        s3.Options{})

The problem occurred when I tried to repeat the same operation using an AccessKey and a SecretKey that I got from AWS STS via the GetFederationToken() call. GetFederationToken() yields a set of temporary credentials that can be tailored to the client via a policy document. I wanted to be able to hand these credentials to a client so that they could go directly to their data on S3 without me being in the middle. That the credentials would expire helped maintain sanity and made the job a little easier. But, when I followed the code, above, using the temporary STS credentials I received back the following error:

    <Code>InvalidAccessKeyId</Code>
    <Message>
        The AWS Access Key Id you provided does not exist
        in our records.
    </Message>

The error code and message are not particularly helpful. In fact, I would assert that they are wrong. After much debugging and searching, I discovered that the error is caused by the fact that with the temporary STS credentials I must also provide the awsSessionToken in my Auth structure. Let’s look at goamz’s Auth definition:

    type Auth struct {
        AccessKey, SecretKey string
        token                string
        expiration           time.Time
    }

By convention, struct elements whose names begin with an uppercase letter are exposed; those with names that begin with a lowercase letter are not. Therefore only AccessKey and SecretKey can be set via a New() as shown in the example, above. After a little digging, I found that goamz provides another method for creating an Auth struct:

    // To be used with other APIs that return auth
         // credentials such as STS
    func NewAuth(accessKey,
        secretKey,
        token string,
        expiration time.Time) *Auth {
            return &Auth{
            AccessKey:  accessKey,
            SecretKey:  secretKey,
            token:      token,
            expiration: expiration,
        }
    }

Thus the final solution for using temporary credentials from STS looks something like this:

    auth := aws.NewAuth(tempAwsAccessKeyId,
        tempAwsSecretAccessKey,
        tempAwsSessionToken,
        tempAwsExpiration)
    region := aws.USWest2
    connection := s3.New(auth, region)
    myBucket := connection.Bucket("myBucketName")
    err = mybucket.Put(path,
        []byte("Hello, World!"),
        "text/plain",
        s3.BucketOwnerFull,
        s3.Options{})

Note that all four parameters passed to aws.NewAuth() were returned to me from a call to GetFederationToken.

PEP8 and Pyflakes in Sublime

I am a fan of the Sublime text editor. When developing in Python, I have found it useful to have PEP8 and Pyflakes checks in the editor. After trying several packages, I have settled on the Flake8 linter. It provides intelligent marking of problems in my Python code.

To install, assuming you have Package Control already installed in Sublime, select:

Tools > Command Palette… > Install Package > Python Flake8 Lint

Customization is available through the normal package user settings method.

Foscam cloud configuration

I recently purchased a Foscam IP camera (FI9816P Plug & Play Indoor 720P Megapixel Pan/Tilt Wireless P2P IP Camera (Black)) for my home. The setup using the Foscam phone apps (Android and IOS) worked flawlessly. The only caveat I have is that Foscam does not appear to support multiple accounts. Everyone in the household uses the same username/password to login.

I am less impressed with Foscam’s process for configuring their cloud video capture service. Unlike the seamless phone app setup, the cloud service connectivity requires configuration from three places: the camera itself (via browser), your router, and from the Foscam cloud website. You need to enable port forwarding on your router. You need to access the camera to find out what ports it is configured for. And you need the website to configure the ports it is going to talk to. If the port numbers on the cloud website do not match those enabled on the camera, you will see a useless “Network connection” error message that lacks actionable information. I just had to keep trying different combinations until it worked.

The Support section of their website has a large collection of help documents. Many are old. Some of the information is contradictory. The main issues I ran into are:

  • I had to configure my wireless router to enable port forwarding.
    • Port forwarding requires that I know the IP address of the camera. The IP address of the camera is not available from the web app. I had to check the status of my router and guess which of the devices it had given IP addresses to was the camera. (Hint: Most routers hand out IP addresses in ascending order…)
    • To point my browser at the camera, I needed to know the port number for http. 80 didn’t work. 8090 suggested by some of the Foscam support docs didn’t work. I finally found a document that stated that the default http port for most cameras is 88! 88 worked! After logging in, I saw that HTTPS is configured for standard port 443, so I could have used https://camera_ip.
    • My camera’s ports were configured as follows:
      • HTTP: 88
      • HTTPS: 443
      • ONVIF: 888
      • RTSP: 554
    • I enabled port forwarding on a range from 88-888.
    • I configured the Foscam cloud website to use ports 554 and 88

Now everything is working. Whew.