spark 实践-ML-分类-使用随机森林做风控预测

2018年12月12日

这是一组信用风控的场景, 需要根据 个人信用度的数据集来预测;

  • 需要预测什么?
    • 某个人是否会按时还款?
  • 来参与预测的特征有哪些?
    • 申请人的基本信息和社会身份信息:职业,年龄,存款储蓄,婚姻状态等等……

从这个场景看,这个是一个二元分类预测的问题, 可以使用Python  本地环境也可以是使用spark , 都支持随机森林的ML库; 下面是Spark ML 库+ SparkSQL + dataframe 的实现方法

 

  • 1.了解数据

数据维度包括: “可信度”,“存款”,“期限”,“历史记录”,“目的”,“数额”,“储蓄”,“是否在职”,“婚姻”,“担保人”,“居住时间”,“资产”,“年龄”,“历史信用”,“居住公寓”,“贷款”,“职业”,“监护人”,“是否有电话”,“外籍”

网上下载的数据,下面是部分数据:

1,1,18,4,2,1049,1,2,4,2,1,4,2,21,3,1,1,3,1,1,1
1,1,9,4,0,2799,1,3,2,3,1,2,1,36,3,1,2,3,2,1,1
1,2,12,2,9,841,2,4,2,2,1,4,1,23,3,1,1,2,1,1,1
1,1,12,4,0,2122,1,3,3,3,1,2,1,39,3,1,2,2,2,1,2
1,1,12,4,0,2171,1,3,4,3,1,4,2,38,1,2,2,2,1,1,2
1,1,10,4,0,2241,1,2,1,3,1,3,1,48,3,1,2,2,2,1,2
1,1,8,4,0,3398,1,4,1,3,1,4,1,39,3,2,2,2,1,1,2
1,1,6,4,0,1361,1,2,2,3,1,4,1,40,3,2,1,2,2,1,2


  • 2.数据预处理

需要引入的库:

import org.apache.spark._
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

定义Credit的属性维度

对应于csv文件中的一行

  case class Credit(
    creditability: Double,
    balance: Double, duration: Double, history: Double, purpose: Double, amount: Double,
    savings: Double, employment: Double, instPercent: Double, sexMarried: Double, guarantors: Double,
    residenceDuration: Double, assets: Double, age: Double, concCredit: Double, apartment: Double,
    credits: Double, occupation: Double, dependents: Double, hasPhone: Double, foreign: Double
  )

下面子函数解析一行数据文件,将值存入Credit类中

类别的索引值减去了1,因此起始索引值为0

def parseCredit(line: Array[Double]): Credit = {
    Credit(
      line(0),
      line(1) - 1, line(2), line(3), line(4), line(5),
      line(6) - 1, line(7) - 1, line(8), line(9) - 1, line(10) - 1,
      line(11) - 1, line(12) - 1, line(13), line(14) - 1, line(15) - 1,
      line(16) - 1, line(17) - 1, line(18) - 1, line(19) - 1, line(20) - 1
    )
  }

  def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
    rdd.map(_.split(",")).map(_.map(_.toDouble))
  }

 

导入数据存为一个String类型的RDD。然后我们对RDD做map操作,将RDD中的每个字符串经过ParseRDDR函数的映射,转换为一个Double类型的数组。紧接着是另一个map操作,使用ParseCredit函数,将每个Double类型的RDD转换为Credit对象; 然后转化为Dataframe格式;

用某个表名将DataFrame注册为一张临时表, 方面使用Spark SQL查询:

  val creditDF = parseRDD(sc.textFile("D:\\code\\sparkProject\\sparkInput\\germancredit.csv")).map(parseCredit).toDF().cache()
    creditDF.registerTempTable("credit")
    creditDF.printSchema
部分打印数据
+————-+——-+——–+——-+——-+——+——-+———-+———–+———-+———-+—————–+——+—-+———-+———+——-+———-+———-+——–+——-+
|creditability|balance|duration|history|purpose|amount|savings|employment|instPercent|sexMarried|guarantors|residenceDuration|assets| age|concCredit|apartment|credits|occupation|dependents|hasPhone|foreign|
+————-+——-+——–+——-+——-+——+——-+———-+———–+———-+———-+—————–+——+—-+———-+———+——-+———-+———-+——–+——-+
| 1.0| 0.0| 18.0| 4.0| 2.0|1049.0| 0.0| 1.0| 4.0| 1.0| 0.0| 3.0| 1.0|21.0| 2.0| 0.0| 0.0| 2.0| 0.0| 0.0| 0.0|
| 1.0| 0.0| 9.0| 4.0| 0.0|2799.0| 0.0| 2.0| 2.0| 2.0| 0.0| 1.0| 0.0|36.0| 2.0| 0.0| 1.0| 2.0| 1.0| 0.0| 0.0|
| 1.0| 1.0| 12.0| 2.0| 9.0| 841.0| 1.0| 3.0| 2.0| 1.0| 0.0| 3.0| 0.0|23.0| 2.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|
然后用SQLContext提供的sql方法执行SQL命令:
    sqlContext.sql("SELECT creditability, avg(balance) as avgbalance, avg(amount) as avgamt, avg(duration) as avgdur  FROM credit GROUP BY creditability ").show
打印查询临时表的结果如下:
+————-+——————+——————+——————+
|creditability| avgbalance| avgamt| avgdur|
+————-+——————+——————+——————+
| 1.0|1.8657142857142857| 2985.442857142857|19.207142857142856|
| 0.0|0.9033333333333333|3938.1266666666666| 24.86|
+————-+——————+——————+——————+
  • 3.提取数据的特征:

每个样本的特征包括以下的字段:

  • 特征 -> {“存款”,“期限”,“历史记录”,“目的”,“数额”,“储蓄”,“是否在职”,“婚姻”,“担保人”,“居住时间”,“资产”,“年龄”,“历史信用”,“居住公寓”,“贷款”,“职业”,“监护人”,“是否有电话”,“外籍”}
  • 标签 -> 是否可信:0或者1

 

用VectorAssembler方法生成特征转换器assembler,    assembler将每个维度的特征都做变换,返回一个新的dataframe,并增加features 列

    val featureCols = Array("balance", "duration", "history", "purpose", "amount",
      "savings", "employment", "instPercent", "sexMarried", "guarantors",
      "residenceDuration", "assets", "age", "concCredit", "apartment",
      "credits", "occupation", "dependents", "hasPhone", "foreign")
    val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
    val df2 = assembler.transform(creditDF)
    df2.show
新增加features列:
+————-+——-+——–+——-+——-+——+——-+———-+———–+———-+———-+—————–+——+—-+———-+———+——-+———-+———-+——–+——-+——————–+
|creditability|balance|duration|history|purpose|amount|savings|employment|instPercent|sexMarried|guarantors|residenceDuration|assets| age|concCredit|apartment|credits|occupation|dependents|hasPhone|foreign| features|
+————-+——-+——–+——-+——-+——+——-+———-+———–+———-+———-+—————–+——+—-+———-+———+——-+———-+———-+——–+——-+——————–+
| 1.0| 0.0| 18.0| 4.0| 2.0|1049.0| 0.0| 1.0| 4.0| 1.0| 0.0| 3.0| 1.0|21.0| 2.0| 0.0| 0.0| 2.0| 0.0| 0.0| 0.0|(20,[1,2,3,4,6,7,...|

增加信用度这一列作为标签(使用StringIndexer方法返回一个Dataframe)

StringIndexer 就是把字符串 数值化的操作函数

val labelIndexer = new StringIndexer().setInputCol("creditability").setOutputCol("label")
val df3 = labelIndexer.fit(df2).transform(df2)
df3.show
部分数据打印(label的初始值都为0):
+————-+——-+——–+——-+——-+——+——-+———-+———–+———-+———-+—————–+——+—-+———-+———+——-+———-+———-+——–+——-+——————–+—–+
|creditability|balance|duration|history|purpose|amount|savings|employment|instPercent|sexMarried|guarantors|residenceDuration|assets| age|concCredit|apartment|credits|occupation|dependents|hasPhone|foreign| features|label|
+————-+——-+——–+——-+——-+——+——-+———-+———–+———-+———-+—————–+——+—-+———-+———+——-+———-+———-+——–+——-+——————–+—–+
| 1.0| 0.0| 18.0| 4.0| 2.0|1049.0| 0.0| 1.0| 4.0| 1.0| 0.0| 3.0| 1.0|21.0| 2.0| 0.0| 0.0| 2.0| 0.0| 0.0| 0.0|(20,[1,2,3,4,6,7,…| 0.0|

 

  • 4.模型训练

数据集划分–70%的数据用来训练模型,30%的数据用来测试模型

val splitSeed = 5043
val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)

定义随机森林分类器, 通过对trainingData 训练, 返回一个模型:

  • maxDepth:每棵树的最大深度。增加树的深度可以提高模型的效果,但是会延长训练时间。
  • maxBins:连续特征离散化时选用的最大分桶个数,并且决定每个节点如何分裂。(这个不太懂一般使用默认值)
  • impurity:计算信息增益的指标
  • auto:在每个节点分裂时是否自动选择参与的特征个数
  • seed:随机数生成种子
    val classifier = new RandomForestClassifier().setImpurity("gini").setMaxDepth(3).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043)
    val model = classifier.fit(trainingData)
  • 5.预测

使用模型对testData行预测, 生成预测结果,并修改label列的值:

   val predictions = model.transform(testData)
    predictions.show
部分数据打印:

+————-+——-+——–+——-+——-+——+——-+———-+———–+———-+———-+—————–+——+—-+———-+———+——-+———-+———-+——–+——-+——————–+—–+——————–+——————–+———-+
|creditability|balance|duration|history|purpose|amount|savings|employment|instPercent|sexMarried|guarantors|residenceDuration|assets| age|concCredit|apartment|credits|occupation|dependents|hasPhone|foreign| features|label| rawPrediction| probability|prediction|
+————-+——-+——–+——-+——-+——+——-+———-+———–+———-+———-+—————–+——+—-+———-+———+——-+———-+———-+——–+——-+——————–+—–+——————–+——————–+———-+
| 0.0| 0.0| 6.0| 1.0| 6.0|1198.0| 0.0| 4.0| 4.0| 1.0| 0.0| 3.0| 3.0|35.0| 2.0| 2.0| 0.0| 2.0| 0.0| 0.0| 0.0|[0.0,6.0,1.0,6.0,…| 1.0|[12.9404276368705…|[0.64702138184352…| 0.0|
| 0.0| 0.0| 6.0| 4.0| 2.0|3384.0| 0.0| 2.0| 1.0| 0.0| 0.0| 3.0| 0.0|44.0| 2.0| 0.0| 0.0| 3.0| 0.0| 1.0| 0.0|(20,[1,2,3,4,6,7,…| 1.0|[14.4063871570449…|[0.72031935785224…| 0.0|
| 0.0| 0.0| 9.0| 2.0| 3.0|1366.0| 0.0| 1.0| 3.0| 1.0| 0.0| 3.0| 1.0|22.0| 2.0| 0.0| 0.0| 2.0| 0.0| 0.0| 0.0|(20,[1,2,3,4,6,7,…| 1.0|[10.8490316065663…|[0.54245158032831…| 0.0|
| 0.0| 0.0| 12.0| 0.0| 5.0|1108.0| 0.0| 3.0| 4.0| 2.0| 0.0| 2.0| 0.0|28.0| 2.0| 1.0| 1.0| 2.0| 0.0| 0.0| 0.0|(20,[1,3,4,6,7,8,…| 1.0|[13.5136522480229…|[0.67568261240114…| 0.0|
| 0.0| 0.0| 12.0| 2.0| 3.0| 727.0| 1.0| 1.0| 4.0| 3.0| 0.0| 2.0| 3.0|33.0| 2.0| 1.0| 0.0| 1.0| 0.0| 1.0| 0.0|[0.0,12.0,2.0,3.0…| 1.0|[12.7445735864865…|[0.63722867932432…| 0.0|
| 0.0| 0.0| 18.0| 0.0| 2.0|3114.0| 0.0| 1.0| 1.0| 1.0| 0.0| 3.0| 1.0|26.0| 2.0| 0.0| 0.0| 2.0| 0.0| 0.0| 0.0|(20,[1,3,4,6,7,8,…| 1.0|[10.4280569825588…|[0.52140284912794…| 0.0|

生成一个二元分类预测器evaluator,它将预测结果与样本的实际标签相比较,返回一个AUC准确度指标(ROC曲线所覆盖的面积)

AUC值为ROC曲线所覆盖的区域面积,AUC越大,分类器分类效果越好

    val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")
    val accuracy = evaluator.evaluate(predictions)
    println("accuracy fitting" + accuracy)

测试数据的预测准确度:

accuracy  fitting  0.7178197908286402

 

附上POM.xml

    <properties>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.tools.version>2.11</scala.tools.version>
        <scala.version>2.11.0</scala.version>
        <spark.version>1.6.2</spark.version>
   
    </properties>

    <repositories>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>

        <repository>
            <id>mapr-releases</id>
            <url>http://repository.mapr.com/maven/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>

    </repositories>

    <dependencies>
    


        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.tools.version}</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.tools.version}</artifactId>
            <version>${spark.version}</version>	
        </dependency>
		
           <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.tools.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

 

没有评论

发表评论

邮箱地址不会被公开。 必填项已用*标注