A year-old stack overflow question that I’m able to answer? This is like spotting Bigfoot. I’m going to assume access to nothing more than a spark context. Let’s start by parallelizing some familiar sentences.
corpus = sc.parallelize([ "We're talking about practice, man.", "We're talking about practice.", "We ain't talking about the game."])
The first step is to tokenize our documents and cache the resulting RDD. In practice, by which I mean the game, we would use a real tokenizer, but for illustrative purposes I’m going to keep it simple and split on spaces.
tokenized_corpus = corpus \ .map(lambda raw_text: raw_text.split()) \ .cache()
Next we need to map words to distinct indices and pull the resulting dictionary into the driver.
local_vocab_map = tokenized_corpus \ .flatMap(lambda token: token) \ .distinct() \ .zipWithIndex() \ .collectAsMap()
Then we can broadcast the vocabulary-index map (and its size) back out to the workers:
vocab_map = sc.broadcast(local_vocab_map) vocab_size = sc.broadcast(len(local_vocab_map))
Finally, we need to:
1) Convert each document into a dictionary of word counts
2) Replace the keys in the dictionary with their indices
3) Convert the dictionary into a vector
With the help of two imports we can do this in three calls to map
:
from pyspark.mllib.linalg import SparseVector from collections import Counter term_document_matix = tokenized_corpus \ .map(Counter) \ .map(lambda counts: {vocab_map.value[token]: float(counts[token]) for token in counts}) \ .map(lambda index_counts: SparseVector(vocab_size.value, index_counts))
Voila:
for vec in term_document_matix.collect(): print vec (10,[0,1,6,7,8],[1.0,1.0,1.0,1.0,1.0]) (10,[0,3,7,8],[1.0,1.0,1.0,1.0]) (10,[0,2,4,5,7,9],[1.0,1.0,1.0,1.0,1.0,1.0])
The practice rant turned fourteen today.