building a term-document matrix in spark

A year-old stack overflow question that I’m able to answer? This is like spotting Bigfoot. I’m going to assume access to nothing more than a spark context. Let’s start by parallelizing some familiar sentences.

corpus = sc.parallelize([
    "We're talking about practice, man.",
    "We're talking about practice.",
    "We ain't talking about the game."])

The first step is to tokenize our documents and cache the resulting RDD. In practice, by which I mean the game, we would use a real tokenizer, but for illustrative purposes I’m going to keep it simple and split on spaces.

tokenized_corpus = corpus \
    .map(lambda raw_text: raw_text.split()) \
    .cache()

Next we need to map words to distinct indices and pull the resulting dictionary into the driver.

local_vocab_map = tokenized_corpus \
    .flatMap(lambda token: token) \
    .distinct() \
    .zipWithIndex() \
    .collectAsMap()

Then we can broadcast the vocabulary-index map (and its size) back out to the workers:

vocab_map = sc.broadcast(local_vocab_map)
vocab_size = sc.broadcast(len(local_vocab_map))

Finally, we need to:
1) Convert each document into a dictionary of word counts
2) Replace the keys in the dictionary with their indices
3) Convert the dictionary into a vector

With the help of two imports we can do this in three calls to map:

from pyspark.mllib.linalg import SparseVector
from collections import Counter

term_document_matix = tokenized_corpus \
    .map(Counter) \
    .map(lambda counts: {vocab_map.value[token]: float(counts[token]) for token in counts}) \
    .map(lambda index_counts: SparseVector(vocab_size.value, index_counts))

Voila:

for vec in term_document_matix.collect():
    print vec

(10,[0,1,6,7,8],[1.0,1.0,1.0,1.0,1.0])
(10,[0,3,7,8],[1.0,1.0,1.0,1.0])
(10,[0,2,4,5,7,9],[1.0,1.0,1.0,1.0,1.0,1.0])

The practice rant turned fourteen today.

Published by Dave Fernig

data@shopify