flink实践–家庭温度和湿度感知器数据处理&预测

2018年5月7日

项目说明: 

IOT领域,家庭温度和湿度感知器传回的数据信息做分析,用于线性回归预测

 

数据源

http://archive.ics.uci.edu/ml/datasets/Appliances+energy+prediction

date time year-month-day hour:minute:second 
Appliances, energy use in Wh 
lights, energy use of light fixtures in the house in Wh 
T1, Temperature in kitchen area, in Celsius 
RH_1, Humidity in kitchen area, in % 
T2, Temperature in living room area, in Celsius 
RH_2, Humidity in living room area, in % 
T3, Temperature in laundry room area 
RH_3, Humidity in laundry room area, in % 
T4, Temperature in office room, in Celsius 
RH_4, Humidity in office room, in % 
T5, Temperature in bathroom, in Celsius 
RH_5, Humidity in bathroom, in % 
T6, Temperature outside the building (north side), in Celsius 
RH_6, Humidity outside the building (north side), in % 
T7, Temperature in ironing room , in Celsius  
T_out, Temperature outside (from Chievres weather station), in Celsius 
Press_mm_hg (from Chievres weather station), in mm Hg 
RH_out, Humidity outside (from Chievres weather station), in % 
Wind speed (from Chievres weather station), in m/s 
Visibility (from Chievres weather station), in km 
Tdewpoint (from Chievres weather station), °C 
rv1, Random variable 1, nondimensional 
rv2, Random variable 2, nondimensional 

 

数据处理:

数据分布情况—数值型:

         Appliances        lights      ...                rv1           rv2
count  19735.000000  19735.000000      ...       19735.000000  19735.000000
mean      97.694958      3.801875      ...          24.988033     24.988033
std      102.524891      7.935988      ...          14.496634     14.496634
min       10.000000      0.000000      ...           0.005322      0.005322
25%       50.000000      0.000000      ...          12.497889     12.497889
50%       60.000000      0.000000      ...          24.897653     24.897653
75%      100.000000      0.000000      ...          37.583769     37.583769
max     1080.000000     70.000000      ...          49.996530     49.996530

数据分布情况–非数值型:

                  date
count            19735
unique           19735
top     2016/3/14 3:10
freq                 1

各特征类型和非空情况统计:

数据质量较好,暂无非空数据

Data columns (total 23 columns):
date           19735 non-null object
Appliances     19735 non-null int64
lights         19735 non-null int64
T1             19735 non-null float64
RH_1           19735 non-null float64
T2             19735 non-null float64
RH_2           19735 non-null float64
T3             19735 non-null float64
RH_3           19735 non-null float64
T4             19735 non-null float64
RH_4           19735 non-null float64
T5             19735 non-null float64
RH_5           19735 non-null float64
T6             19735 non-null float64
RH_6           19735 non-null float64
T_out          19735 non-null float64
Press_mm_hg    19735 non-null float64
RH_out         19735 non-null float64
Windspeed      19735 non-null float64
Visibility     19735 non-null float64
Tdewpoint      19735 non-null float64
rv1            19735 non-null float64
rv2            19735 non-null float64

 

特征相关性—数值型

T_out 和T6 基本是强一致的,看数据定义也比较类似,可以删除掉其中一个;

室内各房间的温度之间相关性都比较大,可以用于做回归预测,也可以做空值预测;

室内各个房间的湿度,除了洗手间以外,相关性也都比较大,可以用于做回归预测,也可以做空值预测;

室外的湿度 和房间内温度 负相关,可以用于做回归预测,也可以做空值预测;

rv1,rv2  作用不明确,且与其他特征相关性不大,暂时不用;

 

特征相关性—非数值型

只有一个date 不用分析

 

 

特征数据处理:

数据结构类定义:

public class DataEnergy implements  Serializable {
   public String date;
   public Integer Appliances;
   public Integer lights;
   public Double T1;
   public Double RH_1;
   public Double T2;
   public Double RH_2;
   public Double T3   ;
   public Double RH_3;
   public Double T4   ;
   public Double RH_4;
   public Double T5   ;
   public Double RH_5;
   public Double T6;
   public Double RH_6;
   public Double T_out;
   public Double Press_mm_hg;
   public Double RH_out;
   public Double Windspeed;
   public Double Visibility;
   public Double Tdewpoint;
   public Double rv1;
   public Double rv2;
   public DateTime eventTime;

   public DataEnergy() {
      this.eventTime = new DateTime();
   }

   public DataEnergy(String date,int Appliances, int lights, Double T1, Double RH_1,
                 Double T2, Double RH_2, Double T3, Double RH_3,
                 Double T4, Double RH_4, Double T5, Double RH_5, Double T6, Double RH_6,
                 Double T_out,Double Press_mm_hg,Double RH_out,Double Windspeed, Double Visibility,
                 Double Tdewpoint,Double rv1,Double rv2) {
      this.date = dateConvetor(date);
      this.eventTime = new DateTime(this.date);
      this.Appliances = Appliances;
      this.lights = lights;
      this.T1 = T1;
      this.RH_1 = RH_1;
      this.T2 = T2;
      this.RH_2 = RH_2;
      this.T3 = T3;
      this.RH_3 = RH_3;
      this.T4 = T4;
      this.RH_4 = RH_4;
      this.T5 = T5;
      this.RH_5 = RH_5;
      this.T6 = T6;
      this.RH_6 = RH_6;
      this.T_out = T_out;
      this.Press_mm_hg = Press_mm_hg;
      this.RH_out = RH_out;
      this.Windspeed = Windspeed;
      this.Visibility = Visibility;
      this.Tdewpoint = Tdewpoint;
      this.rv1 = rv1;
      this.rv2 = rv2;
   }

   private static String dateConvetor(String date){
      String ret=date.replace("/","-");
      return ret;
   }

   public String toString() {
      StringBuilder sb = new StringBuilder();
      sb.append(date).append(",");
      sb.append(Appliances).append(",");
      sb.append(lights).append(",");
      sb.append(T1).append(",");
      sb.append(RH_1).append(",");
      sb.append(T2).append(",");
      sb.append(RH_2).append(",");
      sb.append(T3).append(",");
      sb.append(RH_3).append(",");
      sb.append(T4);
      sb.append(RH_4).append(",");
      sb.append(T5);
      sb.append(RH_5).append(",");
      sb.append(T6);
      sb.append(RH_6).append(",");
      sb.append(T_out);
      sb.append(Press_mm_hg).append(",");
      sb.append(RH_out);
      sb.append(Windspeed).append(",");
      sb.append(Visibility);
      sb.append(Tdewpoint).append(",");
      sb.append(rv1).append(",");
      sb.append(rv2);

      return sb.toString();
   }

   public static DataEnergy instanceFromString(String line) {

      String[] tokens = line.split(",");
      if (tokens.length != 23) {
         System.out.println("#############Invalid record: " + line+"\n");
         return null;
         //throw new RuntimeException("Invalid record: " + line);
      }

      DataEnergy energy = new DataEnergy();

      try {
         energy.date = tokens[0].length() > 0 ? dateConvetor(tokens[0]):null;
         energy.Appliances = tokens[1].length() > 0 ? Integer.parseInt(tokens[1]):null;
         energy.lights = tokens[2].length() > 0 ? Integer.parseInt(tokens[2]) : null;
         energy.T1 = tokens[3].length() > 0 ? Double.parseDouble(tokens[3]) : null;
         energy.RH_1 = tokens[4].length() > 0 ? Double.parseDouble(tokens[4]) : null;
         energy.T2 = tokens[5].length() > 0 ? Double.parseDouble(tokens[5]): null;
         energy.RH_2 =tokens[6].length() > 0 ? Double.parseDouble(tokens[6]) : null;
         energy.T3 = tokens[7].length() > 0 ? Double.parseDouble(tokens[7]): null;
         energy.RH_3 =tokens[8].length() > 0 ? Double.parseDouble(tokens[8]) : null;
         energy.T4 = tokens[9].length() > 0 ? Double.parseDouble(tokens[9]): null;
         energy.RH_4 =tokens[10].length() > 0 ? Double.parseDouble(tokens[10]) : null;
         energy.T5 = tokens[11].length() > 0 ? Double.parseDouble(tokens[11]): null;
         energy.RH_5 =tokens[12].length() > 0 ? Double.parseDouble(tokens[12]) : null;
         energy.T6 = tokens[13].length() > 0 ? Double.parseDouble(tokens[13]): null;
         energy.RH_6 =tokens[14].length() > 0 ? Double.parseDouble(tokens[14]) : null;
         energy.T_out = tokens[15].length() > 0 ? Double.parseDouble(tokens[15]): null;
         energy.Press_mm_hg =tokens[16].length() > 0 ? Double.parseDouble(tokens[16]) : null;
         energy.RH_out = tokens[17].length() > 0 ? Double.parseDouble(tokens[17]): null;
         energy.Visibility =tokens[18].length() > 0 ? Double.parseDouble(tokens[18]) : null;;
         energy.Tdewpoint =tokens[19].length() > 0 ? Double.parseDouble(tokens[19]) : null;;
         energy.rv1 =tokens[20].length() > 0 ? Double.parseDouble(tokens[20]) : null;;;
         energy.rv2 =tokens[21].length() > 0 ? Double.parseDouble(tokens[21]) : null;;

      } catch (NumberFormatException nfe) {
         throw new RuntimeException("Invalid record: " + line, nfe);
      }
      return energy;
   }

   public long getEventTime() {
      return this.eventTime.getMillis();
   }
}

假设我们需要通过T1 预测部分T3的空值,可以定义一个SimpleRegression类:

public class TempPredictionModelT1ToT3 {
	SimpleRegression model;

	public TempPredictionModelT1ToT3() {
		model = new SimpleRegression(false);
	}

	public double predictMissTmep(double T1) {
		double prediction = model.predict(T1);

		if (Double.isNaN(prediction)) {
			return -1;
		}
		else {
			return (int)prediction;
		}
	}

	public void refineModel(double T1, double T3) {
		model.addData(T1, T3);
	}
}

定义Operation Chain:

public static void main(String[] args) throws Exception {

		ParameterTool params = ParameterTool.fromArgs(args);
		final String input = "D:/code/dataML/energydata_complete1.csv";
		final int servingSpeedFactor = 6000; // events of 10 minutes are served in 1 second

		// set up streaming execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		// operate in Event-time
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

		// start the data generator
		DataStream<DataEnergy> DsDiag = env.addSource(
				new EnergySource(input, servingSpeedFactor));

		DataStream<String> modDataStrForLR = DsDiag
				.filter(new NoneFilter())
				.map(new mapTime()).keyBy(0)
				.flatMap(new T3PredictionModel())//使用T1 预测T3 因为相关性大
				.flatMap(new energyFlatMapForLR());
		//modDataStr2.print();
		modDataStrForLR.writeAsText("./energyDataForLR");

		// run the prediction pipeline
		env.execute("EnergyData Prediction");
	}

定义空值过滤Filter:

public static class NoneFilter implements FilterFunction<DataEnergy> {

		@Override
		public boolean filter(DataEnergy energy) throws Exception {
			return IsNotNone(energy.Appliances) && IsNotNone(energy.date) &&IsNotNone(energy.Press_mm_hg)
					 && IsNotNone(energy.RH_1)  && IsNotNone(energy.RH_2)  ;//一些强制非空域
		}

		public boolean IsNotNone(Object Data){
			if (Data == null )
				return false;
			else
				return true;
		}
	}

定义Map Operation:

	public static class mapTime implements MapFunction<DataEnergy, Tuple2<Long, DataEnergy>> {
		@Override
		public Tuple2<Long, DataEnergy> map(DataEnergy energy) throws Exception {
			long time = energy.eventTime.getMillis();;

			return new Tuple2<>(time, energy);
		}
	}

定义空值预测Flatmap Operation:

用于预测缺失T3值的FlatMap函数定义(假设T3会有空值出现):

public static class T3PredictionModel extends RichFlatMapFunction<Tuple2<Long, DataEnergy> , DataEnergy> {

		private transient ValueState<TempPredictionModelT1ToT3> modelState;

		@Override
		public void flatMap(Tuple2<Long, DataEnergy>  val, Collector<DataEnergy> out) throws Exception {

			// fetch operator state
			TempPredictionModelT1ToT3 model = modelState.value();
			if (model == null) {
				model = new TempPredictionModelT1ToT3();
			}

			DataEnergy energy = val.f1;
			// compute distance and direction

			if (energy.T3 == null) {
				// 根据T1预测 不存在的T3
				energy.T3 = model.predictMissTmep(energy.T1);
			} else {
				// we have an end event: Update model
				// refine model
				model.refineModel(energy.T1, energy.T3);
				// update operator state
				modelState.update(model);
			}
			// emit prediction
			out.collect(energy);
		}

		@Override
		public void open(Configuration config) {
			// obtain key-value state for prediction model
			ValueStateDescriptor<TempPredictionModelT1ToT3> descriptor =
					new ValueStateDescriptor<>(
							// state name
							"regressionModel",
							// type information of state
							TypeInformation.of(TempPredictionModelT1ToT3.class));
			modelState = getRuntimeContext().getState(descriptor);
		}
	}

定义 保存特定列特定文件格式的flatmap operation:

//使用Spark ML RF, 数据格式保存为csv常见格式
	// label 转换成一个二元变量表示
	public static class energyFlatMapForLR implements FlatMapFunction<DataEnergy, String> {

		@Override
		public void flatMap(DataEnergy InputDiag, Collector<String> collector) throws Exception {
			DataEnergy diag = InputDiag;
			StringBuilder sb = new StringBuilder();
			//sb.append(diag.date).append(",");
			sb.append(diag.lights).append(",");
			sb.append(diag.T1).append(",");
			sb.append(diag.RH_1).append(",");
			sb.append(diag.T2).append(",");
			sb.append(diag.RH_2).append(",");
			sb.append(diag.T3).append(",");
			sb.append(diag.RH_3).append(",");
			sb.append(diag.T4).append(",");
			sb.append(diag.RH_4).append(",");
			sb.append(diag.T5).append(",");
			sb.append(diag.RH_5).append(",");
			sb.append(diag.T6).append(",");
			sb.append(diag.RH_6).append(",");
			sb.append(diag.Press_mm_hg).append(",");
			sb.append(diag.RH_out).append(",");
			sb.append(diag.Windspeed).append(",");
			sb.append(diag.Visibility).append(",");
			sb.append(diag.Tdewpoint);

			collector.collect(sb.toString());
		}
	}

 

Spark 定时任务训练LR模型:

根据之前的特征相关性分析,使用多个室内温度T2,T3,T4,T5 预测其中一个室内温度T1:

object EnergyModLR {

  case class Energy(
                     lights: Double, T1: Double, RH_1: Double, T2: Double, RH_2: Double, T3: Double, RH_3: Double, T4: Double, RH_4: Double, T5: Double, RH_5: Double,
                     T6: Double, RH_6: Double,Press_mm_hg: Double, RH_out: Double,Windspeed: Double, Visibility: Double,Tdewpoint:Double
                   )

  def parseEnergy(line: Array[Double]): Energy = {
    Energy(
      line(0), line(1) , line(2) , line(3) , line(4) , line(5) , line(6) , line(7) , line(8) , line(9) , line(10) , line(11) , line(12), line(13), line(14)
      , line(15), line(16), line(17)
    )
  }

  def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
    rdd.map(_.split(",")).map(_.map(_.toDouble)).filter(_.length==18)
  }

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("SparkDFEnergy").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val data_path = "D:/code/flink/energyDataForLR/5"
    val EnergyDF = parseRDD(sc.textFile(data_path)).map(parseEnergy).toDF().cache()

    val featureCols = Array( "T2", "T3", "T4","T5")
    val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
    val df2 = assembler.transform(EnergyDF)
    df2.show

    val splitSeed = 5043
    val Array(trainingData, testData) = df2.randomSplit(Array(0.7, 0.3), splitSeed)

    val classifier = new LinearRegression().setFeaturesCol("features").setLabelCol("T1").setFitIntercept(true).setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
    val model = classifier.fit(trainingData)

    // 输出模型全部参数
    model.extractParamMap()
    // Print the coefficients and intercept for linear regression
    println(s"Coefficients: ${model.coefficients} Intercept: ${model.intercept}")
    val predictions = model.transform(trainingData)
    predictions.selectExpr("T1", "round(prediction,1) as prediction").show

    // 模型进行评价
    val trainingSummary = model.summary
    val rmse =trainingSummary.rootMeanSquaredError
    println(s"RMSE: ${rmse}")
    println(s"r2: ${trainingSummary.r2}")

    //val predictions = model.transform(testData)
    if (rmse <0.3) {
      try {
        model.write.overwrite().save("./model/spark-LR-model-energy")

        val sameModel = LinearRegressionModel.load("./model/spark-LR-model-energy")
        val predictions= sameModel.transform(testData)

        predictions.show(3)
      } catch {
        case ex: Exception => println(ex)
        case ex: Throwable => println("found a unknown exception" + ex)
      }
    }
   }
}

模型效果评估打印:

RMSE: 0.608214469574435 //RMSE是预测值与真实值的误差平方根的均值,越小越好
r2: 0.8654551151239023  //R2方法是将预测值跟只使用均值的情况下相比,看能好多少,越大(接近1)越好

没有评论

发表评论

邮箱地址不会被公开。 必填项已用*标注