flink实践–状态管理&特征提取

2018年5月16日

项目说明: 

通过实时检测室内光,温度,湿度,二氧化碳来判断是否有人,一个典型的二元分类问题

 

数据源

http://archive.ics.uci.edu/ml/machine-learning-databases/00357/

"date": 日期
"Temperature":  温度
"Humidity":  湿度
"Light": 光线
"CO2": 二氧化碳
"HumidityRatio":  湿气比率
"Occupancy": 是否有人

 

数据处理:

数据分布情况—数值型:

       Temperature     Humidity     ...       HumidityRatio    Occupancy
count  9752.000000  9752.000000     ...         9752.000000  9752.000000
mean     21.001768    29.891910     ...            0.004589     0.210111
std       1.020693     3.952844     ...            0.000531     0.407408
min      19.500000    21.865000     ...            0.003275     0.000000
25%      20.290000    26.642083     ...            0.004196     0.000000
50%      20.790000    30.200000     ...            0.004593     0.000000
75%      21.533333    32.700000     ...            0.004998     0.000000
max      24.390000    39.500000     ...            0.005769     1.000000

数据分布情况–非数值型:

                       date
count                  9752
unique                 9752
top     2015-02-15 15:07:00
freq                      1

各特征类型和非空情况统计:

数据质量较好,有少量空数据

因为是实时数据,还是做空值判断处理

Data columns (total 7 columns):
date             9752 non-null object
Temperature      9741 non-null float64
Humidity         9738 non-null float64
Light            9752 non-null float64
CO2              9752 non-null float64
HumidityRatio    9745 non-null float64
Occupancy        9752 non-null int64

 

特征相关性—数值型

Occupancy和Light的相关性最大, 和Temperature有一定相关性,和湿度相关性最小;

light和Temperature有一定相关性

HumidityRatio和Humidity有一定相关性

特征相关性—非数值型

只有一个date 不用分析

 

特征数据处理:

主要处理内容:

  1. 空值处理:防止传感器间歇故障传输空值,部分相关性强的特征采用历史数据预测方式,其他空值采用均值填充;
  •                     比如使用取均值填充空的temperature
  •                    使用HumidityRatio预测空Humidity
  •                    使用Humidity预测空HumidityRatio

主要通过使用flink 的state 特性,实现上述功能

 

数据结构类定义:

public class DataOccupancy implements  Serializable {
   public String date;
   public Double Temp;
   public Double Humidity;
   public Double Light;
   public Double CO2;
   public Double HumidityRatio    ;
   public int Occupancy;
   public DateTime eventTime;
   public DataOccupancy() {
      this.eventTime = new DateTime();
   }

   public DataOccupancy(String date,Double Temp, Double Humidity,Double Light, Double CO2, Double HumidityRatio   ,
                   int Occupancy) {
      this.date = date;
      this.Temp = Temp;
      this.Humidity = Humidity;
      this.Light = Light;
      this.CO2 = CO2;
      this.HumidityRatio = HumidityRatio;
      this.Occupancy = Occupancy;
   }

   public String toString() {
      StringBuilder sb = new StringBuilder();
      sb.append(date).append(",");
      sb.append(Temp).append(",");
      sb.append(Humidity).append(",");
      sb.append(Light).append(",");
      sb.append(CO2).append(",");
      sb.append(HumidityRatio).append(",");
      sb.append(Occupancy);

      return sb.toString();
   }

   public static DataOccupancy instanceFromString(String line) {

      String[] tokens = line.split(",");
      if (tokens.length != 7) {
         System.out.println("#############Invalid record: " + line+"\n");
         //return null;
         //throw new RuntimeException("Invalid record: " + line);
      }

      DataOccupancy diag = new DataOccupancy();

      try {
         diag.date = tokens[0].length() > 0 ? tokens[0]:null;
         diag.Temp = tokens[1].length() > 0 ? Double.parseDouble(tokens[1]):null;
         diag.Humidity = tokens[2].length() > 0 ? Double.parseDouble(tokens[2]) : null;
         diag.Light = tokens[3].length() > 0 ? Double.parseDouble(tokens[3]) : null;
         diag.CO2 = tokens[4].length() > 0 ? Double.parseDouble(tokens[4]) : null;
         diag.HumidityRatio = tokens[5].length() > 0 ? Double.parseDouble(tokens[5]) : null;
         diag.Occupancy =tokens[6].length() > 0 ? Integer.parseInt(tokens[6]) : null;

      } catch (NumberFormatException nfe) {
         throw new RuntimeException("Invalid record: " + line, nfe);
      }
      return diag;
   }

   public long getEventTime() {
      return this.eventTime.getMillis();
   }
}

假设我们需要通过预测空值,可以定义一个SimpleRegression类:

public class SimplePredictionModel {
	SimpleRegression model;

	public SimplePredictionModel() {
		model = new SimpleRegression(false);
	}

	public double predictY(double x) {
		double prediction = model.predict(x);

		if (Double.isNaN(prediction)) {
			return -1;
		}
		else {
			return prediction;
		}
	}

	public void refineModel(double x, double y) {
		model.addData(x, y);
	}
}

定义Operation Chain:

DataStream<DataOccupancy> DsOccupancy = env.addSource(
				new occupancySource(input, servingSpeedFactor));

		DataStream<String> modDataStrForLR = DsOccupancy
				.map(new mapTime()).keyBy(0)
				.flatMap(new NullTempFillMean()).keyBy(0)
				.flatMap(new HumidityAndRedioPridict())
				.flatMap(new OccupyFlatMapForLR());
		modDataStrForLR.print();
		modDataStrForLR.writeAsText("./OccupyDataForLR");

定义Map Operation:

	public static class mapTime implements MapFunction<DataEnergy, Tuple2<Long, DataEnergy>> {
		@Override
		public Tuple2<Long, DataEnergy> map(DataEnergy energy) throws Exception {
			long time = energy.eventTime.getMillis();;

			return new Tuple2<>(time, energy);
		}
	}

定义均值填充空temperature 类NullTempFillMean:

public static class NullTempFillMean extends RichFlatMapFunction<Tuple2<Long, DataOccupancy> , Tuple2<Long, DataOccupancy>> {

		private transient ValueState<Double> TemperatureMeanState;

		@Override
		public void flatMap(Tuple2<Long, DataOccupancy>  val, Collector<Tuple2<Long, DataOccupancy>> out) throws Exception {
			DataOccupancy occupancy = val.f1;
			// compute distance and direction
			Double TemperatureMean=TemperatureMeanState.value();
			if (TemperatureMean== null){
				TemperatureMean=0.0;
			}
			if(occupancy.Temp == null){
				occupancy.Temp= TemperatureMean;
			}else if(abs(occupancy.Temp -TemperatureMean) > 8){// 和均值差距较大的温度的event 不保存到文件做模型训练
				// log exception temp
			}
			else
			{
				TemperatureMean=(TemperatureMean+occupancy.Temp)/2;
				TemperatureMeanState.update(TemperatureMean);
			}
			out.collect(new Tuple2<>(val.f0,occupancy));
		}

		@Override
		public void open(Configuration config) {
				ValueStateDescriptor<Double> descriptor2 =
					new ValueStateDescriptor<>(
							"regressionModel",
							TypeInformation.of(Double.class));
			TemperatureMeanState = getRuntimeContext().getState(descriptor2);
		}
	}

 

定义空值预测Flatmap Operation 类HumidityAndRedioPridict:

使用ListState结构做多个参数的同时更新和做多个预测

	public static class HumidityAndRedioPridict extends RichFlatMapFunction<Tuple2<Long, DataOccupancy> , DataOccupancy> {

		private transient ListState<SimplePredictionModel> modelStates;
		private List<SimplePredictionModel> models;

		@Override
		public void flatMap(Tuple2<Long, DataOccupancy>  val, Collector<DataOccupancy> out) throws Exception {
			Iterator<SimplePredictionModel> modStateLst = modelStates.get().iterator();
			SimplePredictionModel HumidityPridictModel=null;
			SimplePredictionModel HumidityRatioPridictModel=null;

			if(!modStateLst.hasNext()){
				HumidityPridictModel = new SimplePredictionModel();
				HumidityRatioPridictModel = new SimplePredictionModel();
			}else{
				HumidityPridictModel=modStateLst.next();
				HumidityRatioPridictModel=modStateLst.next();
			}

			models= new ArrayList<SimplePredictionModel>();
			models.add(HumidityPridictModel);
			models.add(HumidityRatioPridictModel);

			DataOccupancy occupancy = val.f1;
			if(occupancy.HumidityRatio != null && occupancy.Humidity== null){
				occupancy.Humidity = HumidityPridictModel.predictY(occupancy.HumidityRatio);
				out.collect(occupancy);
			}else if(occupancy.Humidity != null && occupancy.HumidityRatio== null){
				occupancy.HumidityRatio = HumidityRatioPridictModel.predictY(occupancy.Humidity);
				out.collect(occupancy);
			}else if(occupancy.Humidity != null && occupancy.HumidityRatio != null) {
					HumidityRatioPridictModel.refineModel(occupancy.Humidity, occupancy.HumidityRatio);
					modelStates.update(models);
					out.collect(occupancy);
			}else{
				//log Humidity,HumidityRatio are both null
				System.out.println("~~~~~!Humidity,HumidityRatio are both null");
			}

		}

		@Override
		public void open(Configuration config) {
			// obtain key-value state for prediction model
			ListStateDescriptor<SimplePredictionModel> descriptor =
					new ListStateDescriptor<>(
							"regressionModel",
							SimplePredictionModel.class);
			modelStates = getRuntimeContext().getListState(descriptor);
		}
	}

 

定义 保存特定列特定文件格式的flatmap operation:

public static class OccupyFlatMapForLR implements FlatMapFunction<DataOccupancy, String> {

   @Override
   public void flatMap(DataOccupancy diag, Collector<String> collector) throws Exception {
      StringBuilder sb = new StringBuilder();
      sb.append(diag.date).append(",");
      sb.append(diag.Temp).append(",");
      sb.append(diag.Humidity).append(",");
      sb.append(diag.Light).append(",");
      sb.append(diag.CO2).append(",");
      sb.append(diag.HumidityRatio).append(",");
      sb.append(diag.Occupancy);

      collector.collect(sb.toString());
   }
}

Spark 定时任务训练DecisionTree模型:

由于特征不是很多,没有选择RF,使用DecisionTree就够了

object OccupyDecisionTree {

  case class Occupy(
                   Temp: Double, Humidity: Double, Light: Double, CO2: Double, HumidityRatio: Double, Occupancy: Double
                 )

  def parseOccupy(line: Array[Double]): Occupy = {
    Occupy(
      line(0),line(1) , line(2), line(3), line(4), line(5)
    )
  }

  def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
    rdd.map(_.split(",")).map(_.map(_.toDouble))
  }

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("SparkDFebay").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val df = parseRDD(sc.textFile("D:/code/flink/OccupyDataForLR")).map(parseOccupy).toDF().cache()

    val featureCols = Array("Temp", "Humidity", "Light", "CO2", "HumidityRatio")


    val featureIndexer = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
    val df2 = featureIndexer.transform(df)
    df2.show

    val Array(trainingData, testData) = df2.randomSplit(Array(0.7, 0.3), 5000)

    val classifier = new DecisionTreeClassifier()
             .setLabelCol("Occupancy")
             .setFeaturesCol("features")
             .setImpurity("gini")
             .setMaxDepth(5)
    val model = classifier.fit(trainingData)
    try {
       val predictions = model.transform(testData)
       predictions.show()
       val evaluator = new BinaryClassificationEvaluator().setLabelCol("Occupancy").setRawPredictionCol("prediction")
       val accuracy = evaluator.evaluate(predictions)
       println("accuracy pipeline fitting: " + accuracy)
    }catch{
      case ex: Exception =>println(ex)
      case ex: Throwable =>println("found a unknown exception"+ ex)
    }

  }
}

模型效果评估打印:

accuracy pipeline fitting: 0.9939532334673

 

 

 

 

没有评论

发表评论

邮箱地址不会被公开。 必填项已用*标注