项目说明:
IOT领域,家庭温度和湿度感知器传回的数据信息做分析,用于线性回归预测
数据源
http://archive.ics.uci.edu/ml/datasets/Appliances+energy+prediction
date time year-month-day hour:minute:second
Appliances, energy use in Wh
lights, energy use of light fixtures in the house in Wh
T1, Temperature in kitchen area, in Celsius
RH_1, Humidity in kitchen area, in %
T2, Temperature in living room area, in Celsius
RH_2, Humidity in living room area, in %
T3, Temperature in laundry room area
RH_3, Humidity in laundry room area, in %
T4, Temperature in office room, in Celsius
RH_4, Humidity in office room, in %
T5, Temperature in bathroom, in Celsius
RH_5, Humidity in bathroom, in %
T6, Temperature outside the building (north side), in Celsius
RH_6, Humidity outside the building (north side), in %
T7, Temperature in ironing room , in Celsius
T_out, Temperature outside (from Chievres weather station), in Celsius
Press_mm_hg (from Chievres weather station), in mm Hg
RH_out, Humidity outside (from Chievres weather station), in %
Wind speed (from Chievres weather station), in m/s
Visibility (from Chievres weather station), in km
Tdewpoint (from Chievres weather station), °C
rv1, Random variable 1, nondimensional
rv2, Random variable 2, nondimensional
数据处理:
数据分布情况—数值型:
Appliances lights ... rv1 rv2
count 19735.000000 19735.000000 ... 19735.000000 19735.000000
mean 97.694958 3.801875 ... 24.988033 24.988033
std 102.524891 7.935988 ... 14.496634 14.496634
min 10.000000 0.000000 ... 0.005322 0.005322
25% 50.000000 0.000000 ... 12.497889 12.497889
50% 60.000000 0.000000 ... 24.897653 24.897653
75% 100.000000 0.000000 ... 37.583769 37.583769
max 1080.000000 70.000000 ... 49.996530 49.996530
数据分布情况–非数值型:
date
count 19735
unique 19735
top 2016/3/14 3:10
freq 1
各特征类型和非空情况统计:
数据质量较好,暂无非空数据
Data columns (total 23 columns):
date 19735 non-null object
Appliances 19735 non-null int64
lights 19735 non-null int64
T1 19735 non-null float64
RH_1 19735 non-null float64
T2 19735 non-null float64
RH_2 19735 non-null float64
T3 19735 non-null float64
RH_3 19735 non-null float64
T4 19735 non-null float64
RH_4 19735 non-null float64
T5 19735 non-null float64
RH_5 19735 non-null float64
T6 19735 non-null float64
RH_6 19735 non-null float64
T_out 19735 non-null float64
Press_mm_hg 19735 non-null float64
RH_out 19735 non-null float64
Windspeed 19735 non-null float64
Visibility 19735 non-null float64
Tdewpoint 19735 non-null float64
rv1 19735 non-null float64
rv2 19735 non-null float64
特征相关性—数值型
T_out 和T6 基本是强一致的,看数据定义也比较类似,可以删除掉其中一个;
室内各房间的温度之间相关性都比较大,可以用于做回归预测,也可以做空值预测;
室内各个房间的湿度,除了洗手间以外,相关性也都比较大,可以用于做回归预测,也可以做空值预测;
室外的湿度 和房间内温度 负相关,可以用于做回归预测,也可以做空值预测;
rv1,rv2 作用不明确,且与其他特征相关性不大,暂时不用;
特征相关性—非数值型
只有一个date 不用分析
特征数据处理:
数据结构类定义:
public class DataEnergy implements Serializable { public String date; public Integer Appliances; public Integer lights; public Double T1; public Double RH_1; public Double T2; public Double RH_2; public Double T3 ; public Double RH_3; public Double T4 ; public Double RH_4; public Double T5 ; public Double RH_5; public Double T6; public Double RH_6; public Double T_out; public Double Press_mm_hg; public Double RH_out; public Double Windspeed; public Double Visibility; public Double Tdewpoint; public Double rv1; public Double rv2; public DateTime eventTime; public DataEnergy() { this.eventTime = new DateTime(); } public DataEnergy(String date,int Appliances, int lights, Double T1, Double RH_1, Double T2, Double RH_2, Double T3, Double RH_3, Double T4, Double RH_4, Double T5, Double RH_5, Double T6, Double RH_6, Double T_out,Double Press_mm_hg,Double RH_out,Double Windspeed, Double Visibility, Double Tdewpoint,Double rv1,Double rv2) { this.date = dateConvetor(date); this.eventTime = new DateTime(this.date); this.Appliances = Appliances; this.lights = lights; this.T1 = T1; this.RH_1 = RH_1; this.T2 = T2; this.RH_2 = RH_2; this.T3 = T3; this.RH_3 = RH_3; this.T4 = T4; this.RH_4 = RH_4; this.T5 = T5; this.RH_5 = RH_5; this.T6 = T6; this.RH_6 = RH_6; this.T_out = T_out; this.Press_mm_hg = Press_mm_hg; this.RH_out = RH_out; this.Windspeed = Windspeed; this.Visibility = Visibility; this.Tdewpoint = Tdewpoint; this.rv1 = rv1; this.rv2 = rv2; } private static String dateConvetor(String date){ String ret=date.replace("/","-"); return ret; } public String toString() { StringBuilder sb = new StringBuilder(); sb.append(date).append(","); sb.append(Appliances).append(","); sb.append(lights).append(","); sb.append(T1).append(","); sb.append(RH_1).append(","); sb.append(T2).append(","); sb.append(RH_2).append(","); sb.append(T3).append(","); sb.append(RH_3).append(","); sb.append(T4); sb.append(RH_4).append(","); sb.append(T5); sb.append(RH_5).append(","); sb.append(T6); sb.append(RH_6).append(","); sb.append(T_out); sb.append(Press_mm_hg).append(","); sb.append(RH_out); sb.append(Windspeed).append(","); sb.append(Visibility); sb.append(Tdewpoint).append(","); sb.append(rv1).append(","); sb.append(rv2); return sb.toString(); } public static DataEnergy instanceFromString(String line) { String[] tokens = line.split(","); if (tokens.length != 23) { System.out.println("#############Invalid record: " + line+"\n"); return null; //throw new RuntimeException("Invalid record: " + line); } DataEnergy energy = new DataEnergy(); try { energy.date = tokens[0].length() > 0 ? dateConvetor(tokens[0]):null; energy.Appliances = tokens[1].length() > 0 ? Integer.parseInt(tokens[1]):null; energy.lights = tokens[2].length() > 0 ? Integer.parseInt(tokens[2]) : null; energy.T1 = tokens[3].length() > 0 ? Double.parseDouble(tokens[3]) : null; energy.RH_1 = tokens[4].length() > 0 ? Double.parseDouble(tokens[4]) : null; energy.T2 = tokens[5].length() > 0 ? Double.parseDouble(tokens[5]): null; energy.RH_2 =tokens[6].length() > 0 ? Double.parseDouble(tokens[6]) : null; energy.T3 = tokens[7].length() > 0 ? Double.parseDouble(tokens[7]): null; energy.RH_3 =tokens[8].length() > 0 ? Double.parseDouble(tokens[8]) : null; energy.T4 = tokens[9].length() > 0 ? Double.parseDouble(tokens[9]): null; energy.RH_4 =tokens[10].length() > 0 ? Double.parseDouble(tokens[10]) : null; energy.T5 = tokens[11].length() > 0 ? Double.parseDouble(tokens[11]): null; energy.RH_5 =tokens[12].length() > 0 ? Double.parseDouble(tokens[12]) : null; energy.T6 = tokens[13].length() > 0 ? Double.parseDouble(tokens[13]): null; energy.RH_6 =tokens[14].length() > 0 ? Double.parseDouble(tokens[14]) : null; energy.T_out = tokens[15].length() > 0 ? Double.parseDouble(tokens[15]): null; energy.Press_mm_hg =tokens[16].length() > 0 ? Double.parseDouble(tokens[16]) : null; energy.RH_out = tokens[17].length() > 0 ? Double.parseDouble(tokens[17]): null; energy.Visibility =tokens[18].length() > 0 ? Double.parseDouble(tokens[18]) : null;; energy.Tdewpoint =tokens[19].length() > 0 ? Double.parseDouble(tokens[19]) : null;; energy.rv1 =tokens[20].length() > 0 ? Double.parseDouble(tokens[20]) : null;;; energy.rv2 =tokens[21].length() > 0 ? Double.parseDouble(tokens[21]) : null;; } catch (NumberFormatException nfe) { throw new RuntimeException("Invalid record: " + line, nfe); } return energy; } public long getEventTime() { return this.eventTime.getMillis(); } }
假设我们需要通过T1 预测部分T3的空值,可以定义一个SimpleRegression类:
public class TempPredictionModelT1ToT3 {
SimpleRegression model;
public TempPredictionModelT1ToT3() {
model = new SimpleRegression(false);
}
public double predictMissTmep(double T1) {
double prediction = model.predict(T1);
if (Double.isNaN(prediction)) {
return -1;
}
else {
return (int)prediction;
}
}
public void refineModel(double T1, double T3) {
model.addData(T1, T3);
}
}
定义Operation Chain:
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String input = "D:/code/dataML/energydata_complete1.csv";
final int servingSpeedFactor = 6000; // events of 10 minutes are served in 1 second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// operate in Event-time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// start the data generator
DataStream<DataEnergy> DsDiag = env.addSource(
new EnergySource(input, servingSpeedFactor));
DataStream<String> modDataStrForLR = DsDiag
.filter(new NoneFilter())
.map(new mapTime()).keyBy(0)
.flatMap(new T3PredictionModel())//使用T1 预测T3 因为相关性大
.flatMap(new energyFlatMapForLR());
//modDataStr2.print();
modDataStrForLR.writeAsText("./energyDataForLR");
// run the prediction pipeline
env.execute("EnergyData Prediction");
}
定义空值过滤Filter:
public static class NoneFilter implements FilterFunction<DataEnergy> {
@Override
public boolean filter(DataEnergy energy) throws Exception {
return IsNotNone(energy.Appliances) && IsNotNone(energy.date) &&IsNotNone(energy.Press_mm_hg)
&& IsNotNone(energy.RH_1) && IsNotNone(energy.RH_2) ;//一些强制非空域
}
public boolean IsNotNone(Object Data){
if (Data == null )
return false;
else
return true;
}
}
定义Map Operation:
public static class mapTime implements MapFunction<DataEnergy, Tuple2<Long, DataEnergy>> {
@Override
public Tuple2<Long, DataEnergy> map(DataEnergy energy) throws Exception {
long time = energy.eventTime.getMillis();;
return new Tuple2<>(time, energy);
}
}
定义空值预测Flatmap Operation:
用于预测缺失T3值的FlatMap函数定义(假设T3会有空值出现):
public static class T3PredictionModel extends RichFlatMapFunction<Tuple2<Long, DataEnergy> , DataEnergy> {
private transient ValueState<TempPredictionModelT1ToT3> modelState;
@Override
public void flatMap(Tuple2<Long, DataEnergy> val, Collector<DataEnergy> out) throws Exception {
// fetch operator state
TempPredictionModelT1ToT3 model = modelState.value();
if (model == null) {
model = new TempPredictionModelT1ToT3();
}
DataEnergy energy = val.f1;
// compute distance and direction
if (energy.T3 == null) {
// 根据T1预测 不存在的T3
energy.T3 = model.predictMissTmep(energy.T1);
} else {
// we have an end event: Update model
// refine model
model.refineModel(energy.T1, energy.T3);
// update operator state
modelState.update(model);
}
// emit prediction
out.collect(energy);
}
@Override
public void open(Configuration config) {
// obtain key-value state for prediction model
ValueStateDescriptor<TempPredictionModelT1ToT3> descriptor =
new ValueStateDescriptor<>(
// state name
"regressionModel",
// type information of state
TypeInformation.of(TempPredictionModelT1ToT3.class));
modelState = getRuntimeContext().getState(descriptor);
}
}
定义 保存特定列特定文件格式的flatmap operation:
//使用Spark ML RF, 数据格式保存为csv常见格式
// label 转换成一个二元变量表示
public static class energyFlatMapForLR implements FlatMapFunction<DataEnergy, String> {
@Override
public void flatMap(DataEnergy InputDiag, Collector<String> collector) throws Exception {
DataEnergy diag = InputDiag;
StringBuilder sb = new StringBuilder();
//sb.append(diag.date).append(",");
sb.append(diag.lights).append(",");
sb.append(diag.T1).append(",");
sb.append(diag.RH_1).append(",");
sb.append(diag.T2).append(",");
sb.append(diag.RH_2).append(",");
sb.append(diag.T3).append(",");
sb.append(diag.RH_3).append(",");
sb.append(diag.T4).append(",");
sb.append(diag.RH_4).append(",");
sb.append(diag.T5).append(",");
sb.append(diag.RH_5).append(",");
sb.append(diag.T6).append(",");
sb.append(diag.RH_6).append(",");
sb.append(diag.Press_mm_hg).append(",");
sb.append(diag.RH_out).append(",");
sb.append(diag.Windspeed).append(",");
sb.append(diag.Visibility).append(",");
sb.append(diag.Tdewpoint);
collector.collect(sb.toString());
}
}
Spark 定时任务训练LR模型:
根据之前的特征相关性分析,使用多个室内温度T2,T3,T4,T5 预测其中一个室内温度T1:
object EnergyModLR {
case class Energy(
lights: Double, T1: Double, RH_1: Double, T2: Double, RH_2: Double, T3: Double, RH_3: Double, T4: Double, RH_4: Double, T5: Double, RH_5: Double,
T6: Double, RH_6: Double,Press_mm_hg: Double, RH_out: Double,Windspeed: Double, Visibility: Double,Tdewpoint:Double
)
def parseEnergy(line: Array[Double]): Energy = {
Energy(
line(0), line(1) , line(2) , line(3) , line(4) , line(5) , line(6) , line(7) , line(8) , line(9) , line(10) , line(11) , line(12), line(13), line(14)
, line(15), line(16), line(17)
)
}
def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
rdd.map(_.split(",")).map(_.map(_.toDouble)).filter(_.length==18)
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SparkDFEnergy").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val data_path = "D:/code/flink/energyDataForLR/5"
val EnergyDF = parseRDD(sc.textFile(data_path)).map(parseEnergy).toDF().cache()
val featureCols = Array( "T2", "T3", "T4","T5")
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val df2 = assembler.transform(EnergyDF)
df2.show
val splitSeed = 5043
val Array(trainingData, testData) = df2.randomSplit(Array(0.7, 0.3), splitSeed)
val classifier = new LinearRegression().setFeaturesCol("features").setLabelCol("T1").setFitIntercept(true).setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
val model = classifier.fit(trainingData)
// 输出模型全部参数
model.extractParamMap()
// Print the coefficients and intercept for linear regression
println(s"Coefficients: ${model.coefficients} Intercept: ${model.intercept}")
val predictions = model.transform(trainingData)
predictions.selectExpr("T1", "round(prediction,1) as prediction").show
// 模型进行评价
val trainingSummary = model.summary
val rmse =trainingSummary.rootMeanSquaredError
println(s"RMSE: ${rmse}")
println(s"r2: ${trainingSummary.r2}")
//val predictions = model.transform(testData)
if (rmse <0.3) {
try {
model.write.overwrite().save("./model/spark-LR-model-energy")
val sameModel = LinearRegressionModel.load("./model/spark-LR-model-energy")
val predictions= sameModel.transform(testData)
predictions.show(3)
} catch {
case ex: Exception => println(ex)
case ex: Throwable => println("found a unknown exception" + ex)
}
}
}
}
模型效果评估打印:
RMSE: 0.608214469574435 //RMSE是预测值与真实值的误差平方根的均值,越小越好
r2: 0.8654551151239023 //R2方法是将预测值跟只使用均值的情况下相比,看能好多少,越大(接近1)越好
没有评论