这是一组信用风控的场景, 需要根据 个人信用度的数据集来预测;
- 需要预测什么?
- 某个人是否会按时还款?
- 来参与预测的特征有哪些?
- 申请人的基本信息和社会身份信息:职业,年龄,存款储蓄,婚姻状态等等……
从这个场景看,这个是一个二元分类预测的问题, 可以使用Python 本地环境也可以是使用spark , 都支持随机森林的ML库; 下面是Spark ML 库+ SparkSQL + dataframe 的实现方法
-
1.了解数据
数据维度包括: “可信度”,“存款”,“期限”,“历史记录”,“目的”,“数额”,“储蓄”,“是否在职”,“婚姻”,“担保人”,“居住时间”,“资产”,“年龄”,“历史信用”,“居住公寓”,“贷款”,“职业”,“监护人”,“是否有电话”,“外籍”
网上下载的数据,下面是部分数据:
1,1,18,4,2,1049,1,2,4,2,1,4,2,21,3,1,1,3,1,1,1
1,1,9,4,0,2799,1,3,2,3,1,2,1,36,3,1,2,3,2,1,1
1,2,12,2,9,841,2,4,2,2,1,4,1,23,3,1,1,2,1,1,1
1,1,12,4,0,2122,1,3,3,3,1,2,1,39,3,1,2,2,2,1,2
1,1,12,4,0,2171,1,3,4,3,1,4,2,38,1,2,2,2,1,1,2
1,1,10,4,0,2241,1,2,1,3,1,3,1,48,3,1,2,2,2,1,2
1,1,8,4,0,3398,1,4,1,3,1,4,1,39,3,2,2,2,1,1,2
1,1,6,4,0,1361,1,2,2,3,1,4,1,40,3,2,1,2,2,1,2
-
2.数据预处理
需要引入的库:
import org.apache.spark._ import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler} import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} import org.apache.spark.mllib.evaluation.RegressionMetrics import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext
定义Credit的属性维度
对应于csv文件中的一行
case class Credit(
creditability: Double,
balance: Double, duration: Double, history: Double, purpose: Double, amount: Double,
savings: Double, employment: Double, instPercent: Double, sexMarried: Double, guarantors: Double,
residenceDuration: Double, assets: Double, age: Double, concCredit: Double, apartment: Double,
credits: Double, occupation: Double, dependents: Double, hasPhone: Double, foreign: Double
)
下面子函数解析一行数据文件,将值存入Credit类中
类别的索引值减去了1,因此起始索引值为0
def parseCredit(line: Array[Double]): Credit = {
Credit(
line(0),
line(1) - 1, line(2), line(3), line(4), line(5),
line(6) - 1, line(7) - 1, line(8), line(9) - 1, line(10) - 1,
line(11) - 1, line(12) - 1, line(13), line(14) - 1, line(15) - 1,
line(16) - 1, line(17) - 1, line(18) - 1, line(19) - 1, line(20) - 1
)
}
def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
rdd.map(_.split(",")).map(_.map(_.toDouble))
}
导入数据存为一个String类型的RDD。然后我们对RDD做map操作,将RDD中的每个字符串经过ParseRDDR函数的映射,转换为一个Double类型的数组。紧接着是另一个map操作,使用ParseCredit函数,将每个Double类型的RDD转换为Credit对象; 然后转化为Dataframe格式;
用某个表名将DataFrame注册为一张临时表, 方面使用Spark SQL查询:
val creditDF = parseRDD(sc.textFile("D:\\code\\sparkProject\\sparkInput\\germancredit.csv")).map(parseCredit).toDF().cache()
creditDF.registerTempTable("credit")
creditDF.printSchema
sqlContext.sql("SELECT creditability, avg(balance) as avgbalance, avg(amount) as avgamt, avg(duration) as avgdur FROM credit GROUP BY creditability ").show
-
3.提取数据的特征:
每个样本的特征包括以下的字段:
- 特征 -> {“存款”,“期限”,“历史记录”,“目的”,“数额”,“储蓄”,“是否在职”,“婚姻”,“担保人”,“居住时间”,“资产”,“年龄”,“历史信用”,“居住公寓”,“贷款”,“职业”,“监护人”,“是否有电话”,“外籍”}
- 标签 -> 是否可信:0或者1
用VectorAssembler方法生成特征转换器assembler, assembler将每个维度的特征都做变换,返回一个新的dataframe,并增加features 列
val featureCols = Array("balance", "duration", "history", "purpose", "amount",
"savings", "employment", "instPercent", "sexMarried", "guarantors",
"residenceDuration", "assets", "age", "concCredit", "apartment",
"credits", "occupation", "dependents", "hasPhone", "foreign")
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val df2 = assembler.transform(creditDF)
df2.show
features列:
增加信用度这一列作为标签(使用StringIndexer方法返回一个Dataframe)
StringIndexer 就是把字符串 数值化的操作函数
val labelIndexer = new StringIndexer().setInputCol("creditability").setOutputCol("label") val df3 = labelIndexer.fit(df2).transform(df2) df3.show
-
4.模型训练
数据集划分–70%的数据用来训练模型,30%的数据用来测试模型
val splitSeed = 5043 val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)
定义随机森林分类器, 通过对trainingData 训练, 返回一个模型:
- maxDepth:每棵树的最大深度。增加树的深度可以提高模型的效果,但是会延长训练时间。
- maxBins:连续特征离散化时选用的最大分桶个数,并且决定每个节点如何分裂。(这个不太懂一般使用默认值)
- impurity:计算信息增益的指标
- auto:在每个节点分裂时是否自动选择参与的特征个数
- seed:随机数生成种子
val classifier = new RandomForestClassifier().setImpurity("gini").setMaxDepth(3).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043)
val model = classifier.fit(trainingData)
-
5.预测
使用模型对testData行预测, 生成预测结果,并修改label列的值:
val predictions = model.transform(testData)
predictions.show
+————-+——-+——–+——-+——-+——+——-+———-+———–+———-+———-+—————–+——+—-+———-+———+——-+———-+———-+——–+——-+——————–+—–+——————–+——————–+———-+
|creditability|balance|duration|history|purpose|amount|savings|employment|instPercent|sexMarried|guarantors|residenceDuration|assets| age|concCredit|apartment|credits|occupation|dependents|hasPhone|foreign| features|label| rawPrediction| probability|prediction|
+————-+——-+——–+——-+——-+——+——-+———-+———–+———-+———-+—————–+——+—-+———-+———+——-+———-+———-+——–+——-+——————–+—–+——————–+——————–+———-+
| 0.0| 0.0| 6.0| 1.0| 6.0|1198.0| 0.0| 4.0| 4.0| 1.0| 0.0| 3.0| 3.0|35.0| 2.0| 2.0| 0.0| 2.0| 0.0| 0.0| 0.0|[0.0,6.0,1.0,6.0,…| 1.0|[12.9404276368705…|[0.64702138184352…| 0.0|
| 0.0| 0.0| 6.0| 4.0| 2.0|3384.0| 0.0| 2.0| 1.0| 0.0| 0.0| 3.0| 0.0|44.0| 2.0| 0.0| 0.0| 3.0| 0.0| 1.0| 0.0|(20,[1,2,3,4,6,7,…| 1.0|[14.4063871570449…|[0.72031935785224…| 0.0|
| 0.0| 0.0| 9.0| 2.0| 3.0|1366.0| 0.0| 1.0| 3.0| 1.0| 0.0| 3.0| 1.0|22.0| 2.0| 0.0| 0.0| 2.0| 0.0| 0.0| 0.0|(20,[1,2,3,4,6,7,…| 1.0|[10.8490316065663…|[0.54245158032831…| 0.0|
| 0.0| 0.0| 12.0| 0.0| 5.0|1108.0| 0.0| 3.0| 4.0| 2.0| 0.0| 2.0| 0.0|28.0| 2.0| 1.0| 1.0| 2.0| 0.0| 0.0| 0.0|(20,[1,3,4,6,7,8,…| 1.0|[13.5136522480229…|[0.67568261240114…| 0.0|
| 0.0| 0.0| 12.0| 2.0| 3.0| 727.0| 1.0| 1.0| 4.0| 3.0| 0.0| 2.0| 3.0|33.0| 2.0| 1.0| 0.0| 1.0| 0.0| 1.0| 0.0|[0.0,12.0,2.0,3.0…| 1.0|[12.7445735864865…|[0.63722867932432…| 0.0|
| 0.0| 0.0| 18.0| 0.0| 2.0|3114.0| 0.0| 1.0| 1.0| 1.0| 0.0| 3.0| 1.0|26.0| 2.0| 0.0| 0.0| 2.0| 0.0| 0.0| 0.0|(20,[1,3,4,6,7,8,…| 1.0|[10.4280569825588…|[0.52140284912794…| 0.0|
生成一个二元分类预测器evaluator,
它将预测结果与样本的实际标签相比较,返回一个AUC准确度指标(ROC曲线所覆盖的面积)
AUC值为ROC曲线所覆盖的区域面积,AUC越大,分类器分类效果越好
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")
val accuracy = evaluator.evaluate(predictions)
println("accuracy fitting" + accuracy)
测试数据的预测准确度:
accuracy fitting 0.7178197908286402
附上POM.xml
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.tools.version>2.11</scala.tools.version>
<scala.version>2.11.0</scala.version>
<spark.version>1.6.2</spark.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
<repository>
<id>mapr-releases</id>
<url>http://repository.mapr.com/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
没有评论