I was chasing a dependency issue when executing Python code. After much searching and several blind alleys in that search, evidence started to mount that the problem was related to a package dependency not being properly satisfied.

I started using pip show and other commands to piece together the evidence I needed to pinpoint the error. Then I stumbled upon a python tool that does the work for me: pipdeptree.

pip install pipdeptree

Then execute on a package:

[root@jen-master ~]# pipdeptree -p ansible
- cryptography [required: Any, installed: 2.3.1]
- asn1crypto [required: >=0.21.0, installed: 0.24.0]
- cffi [required: >=1.7,!=1.11.3, installed: 1.11.5]
- pycparser [required: Any, installed: 2.19]
- enum34 [required: Any, installed: 1.1.6]
- idna [required: >=2.1, installed: 2.7]
- ipaddress [required: Any, installed: 1.0.22]
- six [required: >=1.4.1, installed: 1.11.0]
- jinja2 [required: Any, installed: 2.10]
- MarkupSafe [required: >=0.23, installed: 1.1.0]
- paramiko [required: Any, installed: 2.4.1]
- bcrypt [required: >=3.1.3, installed: 3.1.4]
- cffi [required: >=1.1, installed: 1.11.5]
- pycparser [required: Any, installed: 2.19]
- six [required: >=1.4.1, installed: 1.11.0]
- cryptography [required: >=1.5, installed: 2.3.1]
- asn1crypto [required: >=0.21.0, installed: 0.24.0]
- cffi [required: >=1.7,!=1.11.3, installed: 1.11.5]
- pycparser [required: Any, installed: 2.19]
- enum34 [required: Any, installed: 1.1.6]
- idna [required: >=2.1, installed: 2.7]
- ipaddress [required: Any, installed: 1.0.22]
- six [required: >=1.4.1, installed: 1.11.0]
- pyasn1 [required: >=0.1.7, installed: 0.4.4]
- pynacl [required: >=1.0.1, installed: 1.3.0]
- cffi [required: >=1.4.1, installed: 1.11.5]
- pycparser [required: Any, installed: 2.19]
- six [required: Any, installed: 1.11.0]
- PyYAML [required: Any, installed: 3.13]
- setuptools [required: Any, installed: 0.9.8]

As you can see, a recursive list of every dependency along with version requirements. Very helpful!


Ansible map() not available on el6

If you are running Ansible playbooks on an el6 machine and you run across an error like this:

2017-05-24 10:21:47,585 p=8525 u=root |  fatal: [localhost]: FAILED! =>
{"failed": true, "msg": "The conditional check '( myvsds is defined and 
( myvsds | map(attribute='target_server_type') | list | issuperset([\"kvm\"]) 
or myvsds | map(attribute='target_server_type') | list | issuperset([\"heat\"])
) ) or ( myvcins is defined and ( myvcins | map(attribute='target_server_type')
| list | issuperset([\"kvm\"]) or myvcins | map(attribute='target_server_type')
| list | issuperset([\"heat\"]) ) )' failed. The error was: template
error while templating string: no filter named 'map'. String: {% if (
myvsds is defined and ( myvsds | map(attribute='target_server_type') | list |
issuperset([\"kvm\"]) or myvsds | map(attribute='target_server_type') | list |
issuperset([\"heat\"]) ) ) or ( myvcins is defined and ( myvcins |
map(attribute='target_server_type') | list | issuperset([\"kvm\"]) or myvcins |
map(attribute='target_server_type') | list | issuperset([\"heat\"]) ) ) %} True
{% else %} False {% endif %}\n\nThe error appears to have been in 
'/metro-2.1.1/roles/build/tasks/get_paths.yml': line 8, column 7, but may\nbe
elsewhere in the file depending on the exact syntax problem.\n\nThe offending
line appears to be:\n\n  - block: # QCOW2\n    - name: Find name of VSD QCOW2
File\n      ^ here\n"}

Note the text in BOLD. The problem is caused by the fact that Ansible as of version 2.1 depends on the map() filter implementation from the package python-jinja2. map() was introduced into python-jinja2 starting with python-jinja2 version 2.7. The base python-jinja2 version for el6 is 2.2, thus creating the error, above.

This means that Ansible using map() must be running el7 on the Ansible host.

Custom Ansible filters: Easy solution to difficult problems

I have recently been using Ansible to automate health checks for some of our software-defined network (SDN) infrastructure. One of the devices my code must query for health is a soft router running the SROS operating system. Ansible 2.2 recently introduced support for an sros_command module (Info here) that simplifies my task somewhat, but I’m still left to do screen-scraping of the command output.

Screen scraping is nasty work. Lots of string processing with split(), strip(), and other commands. The resulting code is heavily dependent on the exact format of the command output. If it changes, the code breaks.

I initially implemented the screen-scraping using Jinja2 code in my playbooks. That put some pretty ugly, complex code right in the playbook. I found a better answer: Create a custom filter or two. Now things are *so much cleaner* in the playbooks themselves, the format-dependent code is now separated from the main code, and Python made it so much easier to code.

The best part: Ansible filters are very easy to create. The Ansible docs aren’t very helpful, perhaps because creation is so simple they thought it didn’t need explanation! The best way to figure out how to create your own filters is to look at some existing filters as a pattern to follow. The simplest of these is in Ansible itself, json_query. Here’s a stripped and simplified version of that code for the purpose of illustration. This code implements two trivial filters, my_to_upper and my_to_lower:

from ansible.errors import AnsibleError

def my_to_upper(string):
    ''' Given a string, return an all-uppercase version of the string.
    if string is None:
        raise AnsibleError('String not found')
    return string.upper()

def my_to_lower(string):
    ''' Given a string, return an all-lowercase version of the string.
    if string is None:
        raise AnsibleError('String not found')
    return string.lower()

class FilterModule(object):
    ''' Query filter '''

    def filters(self):
        return {
            'my_to_upper': my_to_upper,
            'my_to_lower': my_to_lower

Developing this code is as simple as creating the FilterModule class, defining filters for each of the custom filters you need, and then providing a function for each filter. The example is trivial. I think you can see that you can make the filter functions as complex as required for your application.

Note that I have included AnsibleError in the example for illustration purposes because it is an extremely-useful way to get errors all the way to the console. If I were *really* implementing these filters, empty string wouldn’t be an error. I’d just return an empty string.

Here’s a couple of simple examples of how to call the filters and the resultant output:

- name: Create a mixed-case string
  shell: echo "A Mixed String"
  register: mixed_string
  delegate_to: localhost

- name: Print the UPPERCASE string
  debug: msg="{{ mixed_string.stdout|my_to_upper }}"

- name: Print the LOWERCASE string
  debug: msg="{{ mixed_string.stdout|my_to_lower }}"


TASK [my_task : Create a mixed-case string] *********************************
changed: [host.example.com -> localhost]

TASK [my_task : Print the UPPERCASE string] *********************************
ok: [host.example.com] => {
 "msg": "A MIXED STRING"

TASK [my_task : Print the LOWERCASE string] *********************************
ok: [host.example.com] => {
 "msg": "a mixed string"

In my case, instead of my_to_upper and my_to_lower, I created *command*_to_json filters that convert the SROS command output into JSON that is easily parsed in the playbook. This keeps my playbooks generic and isolates my filters as the place where the nasty code lives.

strptime ignores weeks! (under certain circumstances…)

I discovered something unexpected today: strptime from the Python time package ignores weeks in certain situations! This was unexpected.

For background, I’ve been doing some work lately with assigning user IDs to cohorts, then doing some data analytics “business intelligence” processing on the activities of members of each cohort. I have been assigning user IDs to week numbers because we are very early in the process. We don’t have much data. Ultimately we want cohort membership to be month based. We’ll shift to that when the time comes. For now, everything is based on weeks, e.g. ‘2015-24’.

The strptime problem surfaced when I attempted to convert the week string to time. My code looked something like this:

starting_time = strptime(week_string, '%Y-%U')
print starting_time

starting_time was *always* January 1st of the year. ‘2015-07’ > 1-1-2015. ‘2015-24’ > 1-1-2015. ‘2014-50’ > 1-1-2014. Convinced I was doing something wrong, I searched until I found note 6 on this page:


It turns out the designers of strptime decided that the uncertainty of a year-week combination (which day of that week does one pick?) justified the additional requirement that the day of the week must also be included.

More than counting words: mrjob for processing log files using AWS EMR and hadoop

When looking at online docs and examples for map reduce programs, authors frequently turn to the map-reduce equivalent of “Hello, World!”: a word-count example. I found it hard to interpolate from counting words to a log-processing problem. I was attempting to correlate two different kinds of events for the same user. I couldn’t just read each line, split on word boundaries, and so on. I really struggled with how to define the problem until I found Yelp’s mrjob package for Python.

mrjob has three modes of operation: local, EMR, and hadoop. The local implementation suffers a bit in terms of performance. You can test for correctness, but you can’t predict performance until you actually run against an AWS EMR cluster or a hadoop cluster.

The biggest value of the package is that mrjob forces you to think about how to structure your data as key-value pairs. That is the only way it operates: each map, combine, or reduce function must yield (see Python generators) a key, value pair. The a-ha moment for me: The secret to success is to carefully construct your keys, values, or both using tuples. After that I started producing code that actually *used* map-reduce. Using mrjob helped me think more clearly about my Spark scripts, too.

The sample code, below, illustrates the tuple concept. The goal of the script is to produce output that shows the histogram of activation and usage events. Each log entry is its own json string.

from mrjob.job import MRJob
from mrjob.step import MRStep
import json
import datetime

class MRCohort(MRJob):

    def steps(self):
        return [

    def group_by_user_id(self, _, line):
        week = 0
        # parse each line in the file
        j = json.loads(line)
        if "time" in j:
            week = self.mapTimeToWeek(j['time'])
        if "event" in j:
            if j['event'] == "user_activation":
                symbol = "A"
                yield (j['user_id'], [symbol, week])
            if j['event'] == "auth_granted":
                symbol = "B"
                yield (j['user_id'], [symbol, week])

    def count_events(self, key, count):
        yield key, sum(count)

    def join_events(self, user_id, values):
        events = []
        for value in values:
            if value[0] == "A":
            if value[0] == "B":
                for event in events:
                    yield [user_id, event[1:] + value[1:]], 1

    # Get the week of the year based on time
    def mapTimeToWeek(self, time):
        timeData = time.split('T')[0].split('-')
        year, month, day = int(timeData[0]), int(timeData[1]), int(timeData[2])
        return datetime.date(year, month, day).isocalendar()[1]

if __name__ == '__main__':

The map step in group_by_user_id yields a key that is the user ID and a value that is a tuple of (symbol, week). Symbol is set to either A or B. It is nothing more than a hint to the automatic sorting such that the activation events for a given user will precede the authorization events for that user. The output of the map step looks something like this:

(user1, (A, 24))
(user1, (B, 24))
(user1, (B, 24))
(user1, (B, 25))
(user1, (B, 26))
(user2, (A, 25))
(user2, (B, 26))

The mapped RDD tells me that user1 was activated in week 24, used the system twice in week 24, once in week 25, and once in week 26. user2 was activated in week 25 and used the system only once, that coming in week 26.

Now I need to reduce the data using join_events. The function expects to receive the activation event for a user ID before any of that user’s authorization events. When it encounters a line of data with an ‘A’, it saves that value. When it finds a ‘B’ it yields a new kind of key-value pair that joins activation data with authentication data:

((user1, 24, 24), 1)
((user1, 24, 24), 1)
((user1, 24, 25), 1)
((user1, 24, 26), 1)
((user2, 25, 26), 1)

After the first reduce step, our keys are tuples composed of user ID, week activated, and week authenticated. The value for each is 1 so that we can add them up in the next reduce step. Notice that the output here is one line per activation event.

The second reduce step is very simple. It yields a key and the sum of all the counts for lines having the same key. When complete, the output looks like this:

((user1, 24, 24), 2)
((user1, 24, 25), 1)
((user1, 24, 26), 1)
((user2, 25, 26), 1)

From the output we can see that user1 was activated in week 24 and used the system a total of 4 times over 3 weeks. user2 was activated in week 25 and used the system once in week 26.

The real value of this solution comes when you use it to process hundreds of thousands of events. When I ran this script I saw a 10x performance improvement over the equivalent procedural code it replaced.

Getting Spark Data from AWS S3 using Boto and Pyspark

We’ve had quite a bit of trouble getting efficient Spark operation when the data to be processed is coming from an AWS S3 bucket. Aside from pulling all the data to the Spark driver prior to the first map step (something that defeats the purpose of map-reduce!), we experienced terrible performance. In one scenario, Spark spun up 2360 tasks to read the records from one 1.1k log file. In another scenario, the Spark logs showed that reading every line of every file took a handful of repetitive operations–validate the file, open the file, seek to the next line, read the line, close the file, repeat. Processing 450 small log files took 42 minutes. Arrgh.

We also experienced memory problems. When processing the full set of logs we would see out-of-memory heap errors or complaints about exceeding Spark’s data frame size. Very frustrating.

One of my co-workers stumbled upon this site: http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219 This site describes a solution that overcomes the problems we were having. The solution is written in scala. We had to translate because we are working in Python. Here is a snippet of the scala version of their driver program, taken from https://gist.githubusercontent.com/pjrt/f1cad93b154ac8958e65/raw/7b0b764408f145f51477dc05ef1a99e8448bce6d/S3Puller.scala.

import com.amazonaws.services.s3._, model._
import com.amazonaws.auth.BasicAWSCredentials

val request = new ListObjectsRequest()
def s3 = new AmazonS3Client(new BasicAWSCredentials(key, secret))

val objs = s3.listObjects(request) // Note that this method returns 
                                   // truncated data if longer than
                                   // the "pageLength" above. You might
                                   // need to deal with that.
    .flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }

The key to the solution is to follow a process like this:

  1. Go directly to S3 from the driver to get a list of the S3 keys for the files you care about.
  2. Parallelize the list of keys.
  3. Code the first map step to pull the data from the files.

This procedure minimizes the amount of data that gets pulled into the driver from S3–just the keys, not the data. Then, when map is executed in parallel on multiple Spark workers, each worker pulls over the S3 file data for only the files it has the keys for.

S3 access from Python was done using the Boto3 library for Python:

pip install boto3

Here’s a snippet of the python code that is similar to the scala code, above. It is processing log files that are composed of lines of json text:

import argparse
from pyspark import SparkContext, SparkConf
from boto.s3.connection import S3Connection

def main():
    # Use argparse to handle some argument parsing
                        help="AWS_ACCESS_KEY_ID, omit to use env settings",
                        help="AWS_SECRET_ACCESS_KEY, omit to use env settings",
                        help="AWS bucket name",
    # Use Boto to connect to S3 and get a list of objects from a bucket
    conn = S3Connection(args.aws_access_key_id, args.aws_secret_access_key)
    bucket = conn.get_bucket(args.bucket_name)
    keys = bucket.list()
    # Get a Spark context and use it to parallelize the keys
    conf = SparkConf().setAppName("MyFileProcessingApp")
    sc = SparkContext(conf=conf)
    pkeys = sc.parallelize(keys)
    # Call the map step to handle reading in the file contents
    activation = pkeys.flatMap(map_func)
    # Additional map or reduce steps go here...

def map_func(key)
    # Use the key to read in the file contents, split on line endings
    for line in key.get_contents_as_string().splitlines():
        # parse one line of json
        j = json.loads(line)
        if "user_id" in j && "event" in j:
            if j['event'] == "event_we_care_about":
                yield j['user_id'], j['event']

With these changes in place, the 42-minute job now takes under 6 minutes. That is still too long, so on to the next tuning step!