TL;DR;
How do I use mllib to train my wiki data (text & category) for prediction against tweets?
I have trouble figuring out how to convert my tokenized wiki data so that it can be trained through either NaiveBayes or LogisticRegression. My goal is to use the trained model for comparison against tweets*. I've tried using pipelines with LR and HashingTF with IDF for NaiveBayes but I keep getting wrong predictions. Here's what I've tried:
*Note that I would like to use the many categories in the wiki data for my labels...I've only seen binary classification (it's one category or another)....is it possible to do what I want?
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.feature.RegexTokenizer
case class WikiData(category: String, text: String)
case class LabeledData(category: String, text: String, label: Double)
val wikiData = sc.parallelize(List(WikiData("Spark", "this is about spark"), WikiData("Hadoop","then there is hadoop")))
val categoryMap = wikiData.map(x=>x.category).distinct.zipWithIndex.mapValues(x=>x.toDouble/1000).collectAsMap
val labeledData = wikiData.map(x=>LabeledData(x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))).toDF
val tokenizer = new RegexTokenizer()
  .setInputCol("text")
  .setOutputCol("words")
  .setPattern("/W+")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))
val model = pipeline.fit(labeledData)
model.transform(labeledData).show
val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documentsAsWordSequenceAlready)
import org.apache.spark.mllib.feature.IDF
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
tf.cache()
val idf = new IDF(minDocFreq = 2).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
//to create tfidfLabeled (below) I ran a map set the labels...but again it seems to have to be 1.0 or 0.0?
NaiveBayes.train(tfidfLabeled)
  .predict(hashingTF.transform(tweet))
  .collect
ML LogisticRegression doesn't support multinomial classification yet, but it is supported by both MLLib NaiveBayes and LogisticRegressionWithLBFGS. In the first case it should work by default:
import org.apache.spark.mllib.classification.NaiveBayes
val nbModel = new NaiveBayes()
  .setModelType("multinomial") // This is default value
  .run(train)
but for logistic regression you should provide a number of classes:
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
val model = new LogisticRegressionWithLBFGS()
  .setNumClasses(n) // Set number of classes
  .run(trainingData)
Regarding preprocessing steps it is a quite broad topic and it is hard to give you a meaningful advice without an access to your data so everything you find below is just a wild guess:
HashingTF can be a good way to obtain a baseline model but it is extremely simplified approach, especially if you don't apply any filtering steps. If you decide to use it you should at least increase number of features or use a default value (2^20)EDIT (Preparing data for Naive Bayes with IDF)
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.feature.IDF
import org.apache.spark.sql.Row
val tokenizer = ???
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("rawFeatures")
val idf = new IDF()
  .setInputCol(hashingTF.getOutputCol)
  .setOutputCol("features")
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, idf))
val model = pipeline.fit(labeledData)
model
 .transform(labeledData)
 .select($"label", $"features")
 .map{case Row(label: Double, features: Vector) => LabeledPoint(label, features)}
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.feature.{IDF, IDFModel}
val labeledData = wikiData.map(x => 
  LabeledData(x.category, x.text, categoryMap.get(x.category).getOrElse(0.0)))
val p = "\\W+".r
val raw = labeledData.map{
    case LabeledData(_, text, label) => (label, p.split(text))}
val hashingTF: org.apache.spark.mllib.feature.HashingTF = new HashingTF(1000)
val tf = raw.map{case (label, text) => (label, hashingTF.transform(text))}
val idf: org.apache.spark.mllib.feature.IDFModel = new IDF().fit(tf.map(_._2))
tf.map{
  case (label, rawFeatures) => LabeledPoint(label, idf.transform(rawFeatures))}
Note: Since transformers require JVM access MLlib version won't work in PySpark. If you prefer Python you have to split data transform and zip.
EDIT (Preparing data for ML algorithms):
While following piece of code looks valid at first glance
val categoryMap = wikiData
  .map(x=>x.category)
  .distinct
  .zipWithIndex
  .mapValues(x=>x.toDouble/1000)
  .collectAsMap
val labeledData = wikiData.map(x=>LabeledData(
    x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))).toDF
it won't generate valid labels for ML algorithms. 
First of all ML expects labels to be in (0.0, 1.0, ..., n.0) where n is number of classes. If your example pipeline where one of the classes get label 0.001 you'll get an error like this:
ERROR LogisticRegression: Classification labels should be in {0 to 0 Found 1 invalid labels.
The obvious solution is to avoid division when you generate mapping
.mapValues(x=>x.toDouble)
While it will work for LogisticRegression other ML algorithms will still fail. For example with RandomForestClassifier you'll get
RandomForestClassifier was given input with invalid label column label, without the number of classes specified. See StringIndexer.
What it interesting ML version of RandomForestClassifier, unlike its MLlib counterpart, doesn't provide a method to set a number of classes. Turns out it expects special attributes to be set on a DataFrame column. The simplest approach is to use StringIndexer mentioned in the error message:
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("label")
val pipeline = new Pipeline()
  .setStages(Array(indexer, tokenizer, hashingTF, idf, lr))
val model = pipeline.fit(wikiData.toDF)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With