I'd been long interested in starting to develop code using some sort of MapReduce implementation for distributed computing. I have never been able to get my head around Hadoop, so I gave up with that pretty quickly. I recently discovered Disco, a MapReduce framework with an Erlang-based core. Disco also allows you to to write your worker code in Python, which was a huge plus to me.
After stumbling through the tutorial, I took the word count demo and put together some basic code using pymarc that gathered tag count statistics for a bunch of MARC files. The code's still in a very early form, and arguably should carve up large files into smaller chunks to pass off to the worker processes; I've gotten around this for the time being by splitting up the files using yaz-marcdump. Once I split the files, I pushed them into a tag of DDFS, the Disco Distributed File System. This was a useful way for me to write some demo code both for using pymarc and Disco. The code follows, and is also available as a Gist on Github.
#!/usr/bin/env python # # pymarc_disco.py - Mark Matienzo # sample MapReduce tasks for Disco to get tag counts from MARC files/streams # usage: python pymarc_disco.py
[input2 ... inputN] import sys from disco.core import Disco, result_iterator from disco.settings import DiscoSettings import pymarc def read(fd, size, fname): return pymarc.MARCReader(fd) def map(record, params): for field in record.fields: yield field.tag, 1 def reduce(iter, params): from disco.util import kvgroup for tag, counts in kvgroup(sorted(iter)): yield tag, sum(counts) disco = Disco(DiscoSettings()['DISCO_MASTER']) print "Starting Disco job.." print "Go to %s to see status of the job." % disco.master results = disco.new_job(name="tagcount", input=sys.argv[1:], map=map, map_reader=read, reduce=reduce, save=True).wait() print "Job done. Results:" for word, count in result_iterator(results): print word, count