spark简单实践—日志挖掘& SVM算法

2018年12月7日

一.  简单实践spark 日志挖掘:

1.  日志是通过flume读取并且保存在ES中的 , 所以需要从ES中读取,需要引入 elasticsearch_spark 的Jar包(Jar版本要和ES版本一致)

2. 日志所在索引内容简单分析: 统计分析 带有“RED”字符串的日志的产生日期和 IP地址个数 的Map关系

 

scala代码:

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.rdd.EsSpark

object SparkFromElasticsearch {
  def main(args: Array[String]): Unit = {
    val conf =new SparkConf().setAppName("SparkFromES").setMaster("local[2]")
    conf.set("es.nodes","10.66.X.X")
    conf.set("es.port","9200")
    conf.set("es.index.auto.create","true")
    val sc =new SparkContext(conf)

    //从ES中读取index 为es_log* 的所有日志,保存到变量esLogs中
    val esLogs = EsSpark.esJsonRDD(sc, "es_log*").values
    val line_num = esLogs.count()
    System.out.println("line_num:" + line_num)

    //对数据进行过滤,只保留显示"Red"的日志行
    val waf1 = esLogs.filter(_.contains("RED"))
    System.out.println("waf1:" + waf1.first())

    //使用正则获取关键信息统计
    val  fileNamePattern="-*([0-9]+)-*([0-9]+)-*([0-9]+)".r
    val  IPPattern=".*([0-9]+)\\.".r
    def getFileNameAndIp(line:String)={
      (fileNamePattern.findFirstIn(line).mkString,IPPattern.findFirstIn(line).mkString)
    }

    val waf2=waf1.map(x=>getFileNameAndIp(x)).groupByKey().map(x=>(x._1,x._2.toList.distinct))
    waf2.foreach(println)

    val waf3=waf2.sortBy(_._2.size,false).take(10).foreach(x=>println("时间:"+x._1+" 独立IP数:"+x._2.size))

    sc.stop()
  }

  }

 

结果(目前日志量还比较少…):

line_num:20 
... 


时间:2018-12-06 独立IP数:2
时间:2018-12-07 独立IP数:2
时间:2018-12-05 独立IP数:1
时间:2018-12-08 独立IP数:1

 

 

二  使用spark 机器学习库

使用spark 支持向量机SVM算法库 训练模型并 使用测试数据做预测验证

  • 训练规则: >0的数 标签为1, 否则为0
  • 输入libsvm数据格式—带有标签的本地向量:   label   index1:feature1    index2:feature2     index3:feature3 这种格式, 用于表示有监督学习算法(回归、分类)的局部向量
  • 对应的spark 数据类型 LabeledPoint也是带有标签的本地向量 用于解析输入的数据格式

 

import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, MulticlassMetrics}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object SVMwithSGD {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SVMTest").setMaster("local[3]")
    val sc = new SparkContext(conf)

    //返回的是组织成RDD的一系列LabeledPoint
    val data: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "D:\\XX\\sample_libsvm_data.txt")
    //data.foreach( x => println(x.features))

    //分割数据集,留30%作为测试集
    val splits = data.randomSplit(Array(0.7, 0.3), seed = 11L)
    val trainData = splits(0).cache()
    val testData = splits(1)

    //模型训练,100次迭代
    val numIterations = 100
    val model: SVMModel = SVMWithSGD.train(trainData, numIterations)
    model.clearThreshold()//为了模型拿到评分 不是处理过之后的分类结果

    //使用测试数据预测
    val scoreAndLabels: RDD[(Double, Double)] = testData.map { point =>
      
      val score = model.predict(point.features)
      (score, point.label)//标签只有大于0 小于0 两类
    }
    scoreAndLabels.foreach(println)

    //获取评估矩阵
    val metrics1 = new BinaryClassificationMetrics(scoreAndLabels)
    val auROC = metrics1.areaUnderROC()
    println("Area under ROC = " + auROC)
  }
}

预测结果, 特征比较比较简单,  基本上都分类正确( >0的数 标签为1, 否则为0)

(-545715.6921596449,0.0)
(522129.0637908484,1.0)
(442613.842465142,1.0)
(-638748.9091949299,0.0)
(-556545.7093283199,0.0)
(-81264.04716772017,1.0)
(682227.1537578608,1.0)
(778954.4512687015,1.0)
(-1220341.2460111887,0.0)
(-268032.8891943345,0.0)
(-1128873.7759435796,0.0)
(686795.7536343219,1.0)
(786239.513058736,1.0)
(-747193.7737528477,0.0)
(-764324.4061261263,0.0)
(855102.4188552641,1.0)
(-299071.1538433387,0.0)
(-1103117.3933866462,0.0)
(-1285350.151835601,0.0)
(71108.3938119981,1.0)
(-853913.9301656107,0.0)
(-764127.8575447138,0.0)
(-632199.8896986677,0.0)
(627200.0063060208,1.0)
(501968.57254469773,1.0)
(-780521.2231053949,0.0)
(611531.0900402353,1.0)
(713932.7146165988,1.0)
(-1020398.9106744471,0.0)
(317117.92594758834,1.0)
(618944.1539774527,1.0)
(-984920.9001353479,0.0)
(399287.89077608666,1.0)
(621758.2446473527,1.0)
(-1124415.409381351,0.0)
(-975351.9510166872,0.0)
(592498.7266916536,1.0)

 

 

没有评论

发表评论

邮箱地址不会被公开。 必填项已用*标注