title: 推荐系统项目学习
date: 2019-1-18
update: 2019-1-18
tags:

- Linux系统环境
    - HDFS
    - Base

categories: flume
grammar_cjkRuby: true
mathjax: true
overdue: true #这一行文章提醒
no_word_count: false
description: “线性回归”

[TOC]

机器学习

是人工智能的分支,主要学习常用算法

深度学习是机器学习的分支

一、人工智能与Spark MLLib

分类:强人工智能 ,弱人工智能(目前使用最多)

训练模型:使用概率论,需要大量的数据样本,不断的迭代计算

HDFS:海量数据的存储

迭代计算:
使用MR受限(MR只能把中间结果存储在磁盘上,不利于下次计算的重新读取,对迭代算法是性能瓶颈,使用时,耗时,耗磁盘IO)

使用Spark(Spark基于内存计算,适合迭代计算,同时提供基于海量数据的ML库)

Spark MLLib : spark + machine + learning + lib(库)

基本数据类型:

向量(有大小和方向,相当于元组,数字化描述特征)

二、==线性回归==

利用历史数据找出规律用于预测。

回归(regression):关注自变量和因变量之间的对应关系,

用于预测 (相当于M=ax+by+cx ,M就是因变量(就是需要预测的值) ,x,y,z就是自变量(参与预测的变量),a,b,c(就是训练模型提供的))

使用最小二乘法算出:

(损失函数)总误差:

梯度下降法:不断调试,迭代计算,求出局部最小的误差(局部收敛)

模拟出来的图线,成为拟合函数

LabeledPoint标注点

训练模型:需要多维度的历史数据,以LabeledPoint数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.util.MLUtils

object DataTypeTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("DataTypeTest")
val sc = new SparkContext(conf)

//稠密向量
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
//稀疏向量 个数, 索引 , 值
val sv1: Vector = Vectors.sparse(10, Array(0, 2,5), Array(1.0, 3.0,8.0))
// val sv2: Vector = Vectors.sparse(10, Seq((0, 1.0), (2, 3.0)))

println(dv)
println(sv1)
// println(sv2)

println(sv1.toDense)
// 标注点LabeledPoint :带有标签的本地向量
val pos: LabeledPoint = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
// y 向量
val neg: LabeledPoint =
LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
println(pos.label) // 标注点对应的y
println(neg.features.toDense) //标注点对应的维度向量,转成dense输出

val labeledPointRDD = sc.parallelize(Array(pos,neg));
//存储标注点数据
MLUtils.saveAsLibSVMFile(labeledPointRDD,"labeledPointRDD.txt")
/* 1.0 1:1.0 2:0.0 3:3.0
0.0 1:1.0 3:3.0
*/
//加载标注点数据----------------------------
val examples: RDD[LabeledPoint] =
MLUtils.loadLibSVMFile(sc, "sample_libsvm_data.txt")

examples.foreach { x =>
{
val label = x.label
val features = x.features
println("label:" + label + "\tfeatures:" + features.toDense)
}
}
sc.stop()
}
}

梯度下降算法

训练模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object LinearRegression {

def main(args: Array[String]) {
// 构建Spark对象
val conf =
new SparkConf().setAppName("LinearRegressionWithSGD").setMaster("local")
val sc = new SparkContext(conf)
Logger.getRootLogger.setLevel(Level.WARN)
// sc.setLogLevel("WARN")

//读取样本数据
//-0.4307829,-1.63735562648104 -2.00621178480549 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
val data_path1 = "lpsa.data"
val data: RDD[String] = sc.textFile(data_path1)
val examples: RDD[LabeledPoint] = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble,
Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}.cache()
//1 为随机种子 , 将数据集按比例随机切分为0.8训练集(0) , 0.2测试集(1)
val train2TestData: Array[RDD[LabeledPoint]] =
examples.randomSplit(Array(0.8, 0.2), 1)

// val numExamples = examples.count(
// 迭代次数
val numIterations = 100

//在每次迭代的过程中 梯度下降算法的下降步长大小
val stepSize = 0.1

//每一次下山后,是否计算所有样本的误差值 fraction:n. 分数;部分;小部分;稍微
val miniBatchFraction = 1.0
val lrs = new LinearRegressionWithSGD()//结合梯度的线性回归算法

//设置需不需要有截距
lrs.setIntercept(true)
lrs.optimizer.setStepSize(stepSize)
lrs.optimizer.setNumIterations(numIterations)
lrs.optimizer.setMiniBatchFraction(miniBatchFraction)
//开始不停的训练:针对训练集
val model: LinearRegressionModel = lrs.run(train2TestData(0))
//训练模型的权重:a , b
println(model.weights)
//训练模型的截距: intercept截;截断;窃听 n. 拦截;[数] 截距;截获的情报
println(model.intercept)

// 对样本进行测试 :使用测试集数据 使用测试集的特征值测试
val prediction: RDD[Double] =
model.predict(train2TestData(1).map(_.features))
//K:训练集的预测 V:测试集的预测
val predictionAndLabel: RDD[(Double, Double)] =
prediction.zip(train2TestData(1).map(_.label))

val print_predict: Array[(Double, Double)] = predictionAndLabel.take(100)
println("prediction" + "\t" + "label")
for (i <- 0 to print_predict.length - 1) {
// val tuple: (Double, Double) = print_predict(i)
println(print_predict(i)._1 + "\t" + print_predict(i)._2)
//用以对比预测值和实际值 ,可能存在误差, 因为会有一些噪声数据
}
// 计算测试误差
val loss = predictionAndLabel.map {
case (p, v) =>
val err = p - v
Math.abs(err)
}.reduce(_ + _)
val error = loss / train2TestData(1).count
println(s"Test RMSE = " + error)
// 模型保存
val ModelPath = "model"
// model.save(sc, ModelPath)
// 使用模型预测
// val sameModel: LinearRegressionModel = LinearRegressionModel.load(sc, ModelPath)
// RDD[Double] rdd = sameModel.pridict(testData:RDD[Vector])
sc.stop()
}
}

三、==贝叶斯分类算法==

依据概率原则进行垃圾邮件分类

朴素贝叶斯算法:依据概率原则进行分类,应用先前事件的有关数据来估计未来事件发生的概率。

基于贝叶斯定理的条件概率:事件A和事件B为相互独立事件


$$
P(A|B) = P(A)*P(B|A)/P(B)
$$

案例应用:

垃圾邮件案例:

某邮件出现几个单词,判断它是垃圾邮件的概率

但是因为其中某一单词的出现,抵消了或否决了所有其他证据

拉普拉斯估计

拉普拉斯估计本质上是给频率表中的每个计数加上一个较小的数,这样就保证了每一类中每个特征发生概率非零

通常情况下,拉普拉斯估计中加上的数值设定为1,这样就保证每一个特征至少在数据中出现一次

二分类→正负例

二分类:只有两种情况

​ 正:期望结果(如垃圾邮件) 负: 与之相对的结果(正常邮件)

方法一:基于ML (针对DataFrame)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import org.apache.spark.ml.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Naive_bayes1 {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("word2vector").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//加载数据
// ham,It took Mr owl 3 licks
//spam,"complimentary 4 STAR Ibiza Holiday or £10,000 cash needs your URGENT
val idData = sc.textFile("sms_spam.txt").map(_.split(",")).cache()
//1.0为正常邮件 0.0为垃圾邮件
val idDataRows: RDD[Row] = idData.map(x =>
Row((if (x(0) == "ham") 1.0 else 0.0), x(1).split(" ").map(_.trim)))

val schema = StructType(List(
StructField("label", DoubleType, nullable = false),
StructField("words", ArrayType(StringType, true), nullable = false)
))

val df = sqlContext.createDataFrame(idDataRows, schema)

//构建词汇表/词袋 【i hate you love dont 】
val countVectorizer: CountVectorizerModel =
new CountVectorizer().setInputCol("words").setOutputCol("features").fit(df)

//查看词汇表
countVectorizer.vocabulary.take(100).foreach(println)

//文本向量化 CountVector (1,2,1) IdfVector WordVertor
val cvDF: DataFrame = countVectorizer.transform(df)
//是否最多只显示20个字符,默认为true。
cvDF.show(false)

//正负例样本,显示前10个
val example: DataFrame = cvDF.drop("words")
example.show(10)

// 切分数据集与训练集
val Array(trainingData, testData) =
example.randomSplit(Array(0.8, 0.2), seed = 1234L)

// 训练朴素贝叶斯模型
val model: NaiveBayesModel =
new NaiveBayes() .fit(trainingData)

// 预测 predict
val predictions: DataFrame = model.transform(testData)

predictions.show()
//okmail: Dear Dave this is your final notice to collect your!
//模型评估
//将结果注册为临时表
predictions.registerTempTable("result")
//计算正确率
val accuracy: DataFrame = sqlContext.sql(
"select (1- (sum(abs(label-prediction)))/count(label)) as accuracy from result")
accuracy.show()

//保存模型
// model.save("sms_spam")
}
}

第二种:基于Ml (RDD)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.{SparkConf, SparkContext}
/**
* 贝叶斯算法
2,0 0 3
2,0 0 4
*/
object Naive_bayes2 {
def main(args: Array[String]) {
//1 构建Spark对象
val conf = new SparkConf().setAppName("Naive_bayes").setMaster("local")
val sc = new SparkContext(conf)
//读取样本数据1
val data = sc.textFile("sample_naive_bayes_data.txt")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble,
Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}

//样本数据划分训练样本与测试样本
val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0)
val test = splits(1)

//新建贝叶斯分类模型模型,并训练
val model = NaiveBayes.train(training, lambda = 1.0)
//对测试样本进行测试
val predictionAndLabel = test.map(p => (model.predict(p.features), p.label))
val print_predict = predictionAndLabel.take(20)
println("prediction" + "\t" + "label")
for (i <- 0 to print_predict.length - 1) {
println(print_predict(i)._1 + "\t" + print_predict(i)._2)
}

val accuracy =
1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()
println(accuracy)
//保存模型
val ModelPath = "naive_bayes_model"
// model.save(sc, ModelPath)
// val sameModel = NaiveBayesModel.load(sc, ModelPath)
}
}

四、==Kmeans聚类算法==

http://stanford.edu/class/ee103/visualizations/kmeans/kmeans.html
http://shabal.in/visuals/kmeans/4.html

根据聚类中心点将数据自动划分成类

K均值聚类

聚类:给事物打标签,寻找同一组内的个体之间的一些潜在的相似模式。力图找到数据的自然分组kmeans

Kmeans算法的基本原理:

  • 聚类是一种无监督的机器学习任务,它会自动将数据划分成类cluster。因此聚类分组不需要提前被告知所划分的组应该是什么样的。因为我们甚至可能都不知道我们再寻找什么,所以聚类是用于知识发现而不是预测。

  • 聚类原则是一个组内的记录彼此必须非常相似,而与该组之外的记录截然不同。所有聚类做的就是遍历所有数据然后找到这些相似性

  • 使用距离来分配和更新类

选择适当的聚类数:

聚类原则:

类内部成员越相似越好;类与类之间的成员差异越大越好

肘部法:求拐点的值

算法思想:

–以空间中K个点为中心进行聚类,对最靠近他们的对象归类,通过迭代的方法
,逐次更新各聚类中心的值,直到得到最好的聚类结果

算法流程总结:

–1、适当选择c个类的初始中心
–2、在第K次迭代中,对任意一个样本,求其到c各中心的距离,将该样本归到距离最短的中心所在的类
–3、利用均值等方法更新该类的中心值
–4、对于多有的c个聚类中心,如果利用2,3的迭代法更新后,值保持不变,则迭代结束,否则继续迭代

Kmeans算法的缺陷

 聚类中心的个数K 需要事先给定,但在实际中这个 K 值的选定是非常难以估计的,很多时候,事先并不知道给定的数据集应该分成多少个类别才最合适
 Kmeans需要人为地确定初始聚类中心,不同的初始聚类中心可能导致
完全不同的聚类结果。(可以使用Kmeans++算法来解决)

Kmeans++算法

介绍

 从输入的数据点集合中随机选择一个点作为第一个聚类中心
 对于数据集中的每一个点x,计算它与最近聚类中心(指已选择的聚类中心)的距离D(x)
 选择一个新的数据点作为新的聚类中心,选择的原则是:D(x)较大的点,被选取作为聚类中心的概率较大
 重复2和3直到k个聚类中心被选出来
 利用这k个初始的聚类中心来运行标准的k-means算法

Kmeans算法的应用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object KMeans {
def main(args: Array[String]) {
//1 构建Spark对象
val conf = new SparkConf().setAppName("KMeans").setMaster("local")
val sc = new SparkContext(conf)

// 读取样本数据1,格式为LIBSVM format
/* 0.0 0.0 0.0
0.1 0.1 0.1 */
val data = sc.textFile("kmeans_data.txt")
val parsedData: RDD[linalg.Vector] =
data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()

val numClusters = 4
val numIterations = 100
val model: KMeansModel = new KMeans().
setK(numClusters).
setMaxIterations(numIterations).
run(parsedData)
val centers: Array[linalg.Vector] = model.clusterCenters
println("centers")
for (i <- 0 to centers.length - 1) {
println(centers(i)(0) + "\t" + centers(i)(1) + "\t" + centers(i)(2))
}
// 误差计算
val WSSSE = model.computeCost(parsedData)
println("Errors = " + WSSSE)
//打印出属于哪个聚类中心的类 ,的索引
println(model.predict(Vectors.dense(10, 10, 10)))
//保存模型
// val ModelPath = "KMeans_Model"
// model.save(sc, ModelPath)
// val sameModel = KMeansModel.load(sc, ModelPath)
}
}

五、==关联规则==

指定商品之间关系模式(如啤酒尿布)

一个典型的规则可以表述为: {花生酱,果酱} –> {面包}

支持度和置信度:

一个项集或者规则度量法的支持度是指其在数据中出现的频率

置信度是指该规则的预测能力或者准确度的度量

理解:

​ 项集 的支持度:指该相集在数据集中的概率

​ 关联规则:

​ 就是 由项集 A , 推出项集 B 发生的概率 , 而这个概率就是置信度

​ 置信度 : 指由 {AB} 的支持度 / {A }的支持度

Apriori算法是通过设置支持度阈值 , 找出频繁项集 ,结合关联规则 , 得出置信度。

Spark中没有Apriori算法,它的关联规则算法是FPGrowth算法。

Apriori算法/FPGrowth算法

 Apriori原则指的是一个频繁项集的所有子集也必须是频繁的,如果{A,B}是频繁的,那么{A}和{B}都必须是频繁的
 根据定义,支持度表示一个项集出现在数据中的频率。因此,如果知道{A}不满足所期望的支持度阈值,那么就没有考虑{A,B}或者任何包含{A}的项集,这些项集绝对不可能是频繁的

 Apriori算法利用这个逻辑在实际评估他们之前潜在的关联规则

  • 分为两个阶段:

    • 识别所有满足最小支持度阈值的项集

    • 根据满足最小支持度阈值的这些项集来创建规则

  • 例如,迭代1需要评估一组1项集,迭代2评估2项集,以此类推。在迭代中没有产生新的项集,算法将停止。之后,算法会根据产生的频繁项集,根据所有可能的子集产生关联规则。例如,{A,B}将产生候选规则{A}->{B}和{B}->{A}。这些规则将根据最小置信度阈值评估,任何不满足所期望的置信度的规则将被排除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer

object AssociationRule {
/**
* Spark购物篮关联规则算法
a,b,c
a,b,d
a,c
a,d
**/
def main(args: Array[String]): Unit = {
val inputPath = "shopping_cart"
val outputPath = "rs/shopping_cart"

val sparkConf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("AssociationRule")
val sc: SparkContext = SparkContext.getOrCreate(sparkConf)
val transactions: RDD[String] = sc.textFile(inputPath)
//求出商品组合:(List(a,b),1) (List(b,c),1) (List(a,b,c),1)...
val patterns: RDD[(List[String], Int)] =
transactions.flatMap(line => {
val items = line.split(",").toList
//combinations 对项集执行两两自由组合
(0 to items.size).flatMap(items.combinations).filter(xs =>
!xs.isEmpty)
}).map((_, 1))

//商品组合出现的频度计算
/**
* (List(b, c, e),1)
* (List(b, d, e),1)
* (List(c, d),2)
* (List(c, e),2)
* (List(b),9)
* (List(a, b, d),1)
* (List(b, d),5)
* (List(a, b),3)
* (List(d, e),2)
*/
val combined: RDD[(List[String], Int)] = patterns.reduceByKey(_ + _)
combined.collect().foreach(println)
/**
* 算出所有的关联规则
* (List(b, c, e),(List(),1))
* (List(c, e),(List(b, c, e),1))
* (List(b, e),(List(b, c, e),1))
* (List(b, c),(List(b, c, e),1))
* (List(b, d, e),(List(),1))
* (List(e),(List(c, e),2))
* (List(c),(List(c, e),2))
*/
val subpatterns: RDD[(List[String], (List[String], Int))] =
combined.flatMap(pattern => {
val result =
ListBuffer.empty[Tuple2[List[String], Tuple2[List[String], Int]]]
result += ((pattern._1, (Nil, pattern._2)))
print(result)
val sublist= for {
i <- 0 until pattern._1.size
xs = pattern._1.take(i) ++ pattern._1.drop(i + 1)
if xs.size > 0
} yield (xs, (pattern._1, pattern._2))
result ++= sublist
println(" : " + result.toList)
result.toList
})

subpatterns.collect().foreach(x => {println(x + "-----------")})

val rules: RDD[(List[String], Iterable[(List[String], Int)])] =
subpatterns.groupByKey()

//计算每个规则的概率
val assocRules: RDD[List[(List[String], List[String], Double)]] =
rules.map(in => {
// val a: Iterable[(List[String], Int)] = in._2
val fromCount = in._2.find(p => p._1 == Nil).get
val lstData = in._2.filter(p => p._1 != Nil).toList
if (lstData.isEmpty) Nil
else {
val result = {
for {
t2 <- lstData
confidence = t2._2.toDouble / fromCount._2.toDouble
difference = t2._1 diff in._1
} yield (((in._1, difference, confidence)))
}
result
}
})

val formatResult: RDD[(String, String, Double)] =
assocRules.flatMap(f => {
f.map(s =>
(s._1.mkString("[", ",", "]"), s._2.mkString("[", ",", "]"), s._3))
}).sortBy(tuple => tuple._3, false, 1)
//保存结果
//formatResult.saveAsTextFile(outputPath)
//打印商品组合频度
combined.foreach(println)
//打印商品关联规则和置信度
formatResult.foreach(println)
sc.stop()
}
}

六、==逻辑回归==

官网:http://spark.apache.org/docs/latest/mllib-linear-methods.html#classification

逻辑回归是一种线性有监督分类模型

主要用于做分类 , 常见的是二分类(分成两个类), 被命名为 正负例类。还有多分类(多于两个类型)

有监督:有Y值用来测试结果

无监督:无Y值用来参考

预测是否生病

逻辑回归是一种用于分类的模型,就相当于y=f(x),表明输入与输出(类别)的关系。

最常见问题有如医生治病时的望、闻、问、切,之后判定病人是否生病或生了什么病,其中的望闻问切就是输入,即特征数据,判断是否生病就相当于获取因变量y,即分类结果。

二分类:结合训练医疗模型,根据输入的特征数据,判断因变量Y的值(要么健康,要么生病)。

逻辑回归和线性回归的差别:

线性回归 逻辑回归
目的 预测 分类
y^{(i)}y^{(i)} 未知 {0,1}
函数 拟合函数 预测函数
参数计算方式 最小二乘 最大似然估计

代码案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionWithSGD}
import org.apache.spark.mllib.optimization.{L1Updater, SquaredL2Updater}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object LogisticRegression4 {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("spark").setMaster("local[3]")
val sc = new SparkContext(conf)
/*数据类型:
1 1:56 2:1 3:0 4:3 5:4 6:3
0 1:18 2:0 3:0 4:4 5:3 6:3
*/
val inputData: RDD[LabeledPoint] =
MLUtils.loadLibSVMFile(sc, "健康状况训练集.txt")
val splits = inputData.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
val lr = new LogisticRegressionWithLBFGS()
lr.setIntercept(true//设置截距
// val model = lr.run(trainingData)
// val result = testData.map{point=>
// Math.abs(point.label-model.predict(point.features)) }
// println("正确率="+(1.0-result.mean()))
// println(model.weights.toArray.mkString(" "))
// println(model.intercept)
//将模型设置为不返回 0 , 1 结果,而返回结果为 计算的概率
val model = lr.run(trainingData).clearThreshold()
val errorRate = testData.map { p =>
val score = model.predict(p.features)
// 癌症病人宁愿错判断出得癌症也别错过一个得癌症的病人
val result = score > 0.5 match {
case true => 1;
case false => 0
}
//为了规避风险,可以调整固定阈值0.5
Math.abs(result - p.label)
}.mean()//求均值
println(1 - errorRate)
}
}

出现线性不可分的情况时,可以使用调维方法, 如将二维数据变成三维数据

鲁棒性调优

W在数值上越小越好,这样越能抵抗数据的扰动

数值优化

最大值最小值法

  • 归一化的一种方法:最大值最小值法
  • 缺点
    • 抗干扰能力 弱
    • 受离群值得影响比较大
    • 中间容易没有数据

方差归一化

  • 归一化的一种方法:方差归一化
  • 优点
    • 抗干扰能力强,和所有数据都有关, 求标准差需要所有值的介入,重要有离群值的话,会被抑
      制下来
  • 缺点
    • 最终未必会落到0到1之间
  • 牺牲归一化结果为代价提高稳定

均值归一化:

 每个数量减去平均值

七、==随机森林==

天气与车祸的关系

决策树

  • 决策树

    • 决策树是一个预测模型;他代表的是对象属性与对象值之间的一种映射关系
    • 决策树是一种树形结构,其中每个内部节点表示一个属性上的测试,每个分支代表一个测试
      输出,每个叶节点代表一种类别。
  • 决策树 思想,实际上就是寻找最纯净的划分 方法。

  • 决策树是一种 非线性 有监督 分类 模型
    线性分类模型比如说逻辑回归,可能会存在不可分问题,但是非线性分类就不存在

  • 决策树是通过固定的条件来对类别进行判断:

  • 决策树方法

    • 决策树的生成:数据不断分裂的递归过程,每一次分裂,尽可能让类别一样的数据在树的一边,当树的叶子节点的数据都是一类的时候,则停止分类。(if else 语句)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ClassificationDecisionTree {
val conf = new SparkConf()
conf.setAppName("analysItem")
conf.setMaster("local[3]")
val sc = new SparkContext(conf)

def main(args: Array[String]): Unit = {
/*
1 1:3 2:1 3:1 4:1 5:66
0 1:1 2:3 3:2 4:2 5:47
*/
val data = MLUtils.loadLibSVMFile(sc, "汽车数据样本.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
//指明类别,二分类
val numClasses = 2
//指定离散变量,未指明的都当作连续变量处理
//1,2,3,4维度进来就变成了0,1,2,3
//这里天气维度有3类,但是要指明4,这里是个坑,后面以此类推
val categoricalFeaturesInfo = Map[Int, Int](0 -> 4, 1 -> 4, 2 -> 3, 3 -> 3)
//设定评判标准
val impurity = "entropy"
//树的最大深度,太深运算量大也没有必要 剪枝
val maxDepth = 3
//设置离散化程度,连续数据需要离散化,分成32个区间,默认其实就是32,分割的区间保证数量差不多 这个参数也可以进行剪枝
val maxBins =10
//生成模型
val model: DecisionTreeModel =
DecisionTree.trainClassifier(trainingData, numClasses,
categoricalFeaturesInfo, impurity,
maxDepth, maxBins)
//测试
val labelAndPreds: RDD[(Double, Double)] = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count()
println("Test Error = " + testErr)
println("Learned classification tree model:\n" + model.toDebugString)
}
}

随机森林

 随机森林是一种非线性有监督分类模型
 森林:由树组成
 随机:生成树的数据都是从数据集中随机选取的

生成方式

当数据集很大的时候,我们随机选取数据集的一部分,生成一棵树,重复上述过程,我们可以生成一堆形态各异的树,这些树放在一起就叫森林。

逻辑回归 随机森林
软分类 硬分类
线性模型 非线性模型
输出有概率意义 输出无概率意义
抗干扰能力强 抗干扰能力弱
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.tree.RandomForest

object ClassificationRandomForest {
val conf = new SparkConf()
conf.setAppName("analysItem")
conf.setMaster("local[3]")
val sc = new SparkContext(conf)
def main(args: Array[String]): Unit = {
//读取数据
val data = MLUtils.loadLibSVMFile(sc, "汽车数据样本.txt")
//将样本按7:3的比例分成
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
//分类数
val numClasses = 2
// categoricalFeaturesInfo 为空,意味着所有的特征为连续型变量
val categoricalFeaturesInfo = Map[Int, Int](0 -> 4, 1 -> 4, 2 -> 3, 3 -> 3)
//树的个数
val numTrees = 3
//特征子集采样策略,auto 表示算法自主选取
//"auto"根据特征数量在4个中进行选择
// 1,all 全部特征 2,sqrt 把特征数量开根号后随机选择的 3,log2 取对数个 4,onethird 三分之一
val featureSubsetStrategy = "auto"
//纯度计算
val impurity = "entropy"
//树的最大层次
val maxDepth = 3
//特征最大装箱数,即连续数据离散化的区间
val maxBins = 32
//训练随机森林分类器,trainClassifier 返回的是 RandomForestModel 对象
val model =
RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
//打印模型
println(model.toDebugString)
//保存模型
//model.save(sc,"汽车保险")
//在测试集上进行测试
val count = testData.map { point =>
val prediction = model.predict(point.features)
// Math.abs(prediction-point.label)
(prediction, point.label)
}.filter(r => r._1 != r._2).count()
println("Test Error = " + count.toDouble / testData.count().toDouble)
}
}
|
载入天数...载入时分秒...