1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import numpy
19
20 from numpy import array, dot, shape
21 from pyspark import SparkContext
22 from pyspark.mllib._common import \
23 _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
24 _serialize_double_matrix, _deserialize_double_matrix, \
25 _serialize_double_vector, _deserialize_double_vector, \
26 _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
27 LinearModel, _linear_predictor_typecheck
28 from math import exp, log
31 """A linear binary classification model derived from logistic regression.
32
33 >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
34 >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
35 >>> lrm.predict(array([1.0])) > 0
36 True
37 """
39 _linear_predictor_typecheck(x, self._coeff)
40 margin = dot(x, self._coeff) + self._intercept
41 prob = 1/(1 + exp(-margin))
42 return 1 if prob > 0.5 else 0
43
45 @classmethod
46 - def train(cls, data, iterations=100, step=1.0,
47 miniBatchFraction=1.0, initialWeights=None):
48 """Train a logistic regression model on the given data."""
49 sc = data.context
50 return _regression_train_wrapper(sc, lambda d, i:
51 sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(d._jrdd,
52 iterations, step, miniBatchFraction, i),
53 LogisticRegressionModel, data, initialWeights)
54
56 """A support vector machine.
57
58 >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
59 >>> svm = SVMWithSGD.train(sc.parallelize(data))
60 >>> svm.predict(array([1.0])) > 0
61 True
62 """
64 _linear_predictor_typecheck(x, self._coeff)
65 margin = dot(x, self._coeff) + self._intercept
66 return 1 if margin >= 0 else 0
67
69 @classmethod
70 - def train(cls, data, iterations=100, step=1.0, regParam=1.0,
71 miniBatchFraction=1.0, initialWeights=None):
72 """Train a support vector machine on the given data."""
73 sc = data.context
74 return _regression_train_wrapper(sc, lambda d, i:
75 sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(d._jrdd,
76 iterations, step, regParam, miniBatchFraction, i),
77 SVMModel, data, initialWeights)
78
80 """
81 Model for Naive Bayes classifiers.
82
83 Contains two parameters:
84 - pi: vector of logs of class priors (dimension C)
85 - theta: matrix of logs of class conditional probabilities (CxD)
86
87 >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 2.0, 1.0, 1.0]).reshape(3,3)
88 >>> model = NaiveBayes.train(sc.parallelize(data))
89 >>> model.predict(array([0.0, 1.0]))
90 0
91 >>> model.predict(array([1.0, 0.0]))
92 1
93 """
94
96 self.pi = pi
97 self.theta = theta
98
100 """Return the most likely class for a data vector x"""
101 return numpy.argmax(self.pi + dot(x, self.theta.transpose()))
102
104 @classmethod
105 - def train(cls, data, lambda_=1.0):
106 """
107 Train a Naive Bayes model given an RDD of (label, features) vectors.
108
109 This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can
110 handle all kinds of discrete data. For example, by converting
111 documents into TF-IDF vectors, it can be used for document
112 classification. By making every vector a 0-1 vector, it can also be
113 used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
114
115 @param data: RDD of NumPy vectors, one per element, where the first
116 coordinate is the label and the rest is the feature vector
117 (e.g. a count vector).
118 @param lambda_: The smoothing parameter
119 """
120 sc = data.context
121 dataBytes = _get_unmangled_double_vector_rdd(data)
122 ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
123 return NaiveBayesModel(
124 _deserialize_double_vector(ans[0]),
125 _deserialize_double_matrix(ans[1]))
126
129 import doctest
130 globs = globals().copy()
131 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
132 (failure_count, test_count) = doctest.testmod(globs=globs,
133 optionflags=doctest.ELLIPSIS)
134 globs['sc'].stop()
135 if failure_count:
136 exit(-1)
137
138 if __name__ == "__main__":
139 _test()
140