skip to content

Disco-Powered pymarc

I'd been long interested in starting to develop code using some sort of MapReduce implementation for distributed computing. I have never been able to get my head around Hadoop, so I gave up with that pretty quickly. I recently discovered Disco, a MapReduce framework with an Erlang-based core. Disco also allows you to to write your worker code in Python, which was a huge plus to me.

After stumbling through the tutorial, I took the word count demo and put together some basic code using pymarc that gathered tag count statistics for a bunch of MARC files. The code's still in a very early form, and arguably should carve up large files into smaller chunks to pass off to the worker processes; I've gotten around this for the time being by splitting up the files using yaz-marcdump. Once I split the files, I pushed them into a tag of DDFS, the Disco Distributed File System. This was a useful way for me to write some demo code both for using pymarc and Disco. The code follows, and is also available as a Gist on Github.

#!/usr/bin/env python
#
# pymarc_disco.py - M.A. Matienzo
#   sample MapReduce tasks for Disco to get tag counts from MARC files/streams
#   usage: python pymarc_disco.py  [input2 ... inputN]

import sys
from disco.core import Disco, result_iterator
from disco.settings import DiscoSettings
import pymarc

def read(fd, size, fname):
    return pymarc.MARCReader(fd)

def map(record, params):
    for field in record.fields:
        yield field.tag, 1

def reduce(iter, params):
    from disco.util import kvgroup
    for tag, counts in kvgroup(sorted(iter)):
        yield tag, sum(counts)

disco = Disco(DiscoSettings()['DISCO_MASTER'])
print "Starting Disco job.."
print "Go to %s to see status of the job." % disco.master
results = disco.new_job(name="tagcount",
                        input=sys.argv[1:],
                        map=map,
                        map_reader=read,
                        reduce=reduce,
                        save=True).wait()
print "Job done. Results:"
for word, count in result_iterator(results):
    print word, count

Comments