项目说明:
通过实时检测室内光,温度,湿度,二氧化碳来判断是否有人,一个典型的二元分类问题
数据源
http://archive.ics.uci.edu/ml/machine-learning-databases/00357/
"date": 日期
"Temperature": 温度
"Humidity": 湿度
"Light": 光线
"CO2": 二氧化碳
"HumidityRatio": 湿气比率
"Occupancy": 是否有人
数据处理:
数据分布情况—数值型:
Temperature Humidity ... HumidityRatio Occupancy count 9752.000000 9752.000000 ... 9752.000000 9752.000000 mean 21.001768 29.891910 ... 0.004589 0.210111 std 1.020693 3.952844 ... 0.000531 0.407408 min 19.500000 21.865000 ... 0.003275 0.000000 25% 20.290000 26.642083 ... 0.004196 0.000000 50% 20.790000 30.200000 ... 0.004593 0.000000 75% 21.533333 32.700000 ... 0.004998 0.000000 max 24.390000 39.500000 ... 0.005769 1.000000
数据分布情况–非数值型:
date count 9752 unique 9752 top 2015-02-15 15:07:00 freq 1
各特征类型和非空情况统计:
数据质量较好,有少量空数据
因为是实时数据,还是做空值判断处理
Data columns (total 7 columns):
date 9752 non-null object
Temperature 9741 non-null float64
Humidity 9738 non-null float64
Light 9752 non-null float64
CO2 9752 non-null float64
HumidityRatio 9745 non-null float64
Occupancy 9752 non-null int64
特征相关性—数值型
Occupancy和Light的相关性最大, 和Temperature有一定相关性,和湿度相关性最小;
light和Temperature有一定相关性
HumidityRatio和Humidity有一定相关性
特征相关性—非数值型
只有一个date 不用分析
特征数据处理:
主要处理内容:
- 空值处理:防止传感器间歇故障传输空值,部分相关性强的特征采用历史数据预测方式,其他空值采用均值填充;
- 比如使用取均值填充空的temperature
- 使用HumidityRatio预测空Humidity
- 使用Humidity预测空HumidityRatio
主要通过使用flink 的state 特性,实现上述功能
数据结构类定义:
public class DataOccupancy implements Serializable { public String date; public Double Temp; public Double Humidity; public Double Light; public Double CO2; public Double HumidityRatio ; public int Occupancy; public DateTime eventTime; public DataOccupancy() { this.eventTime = new DateTime(); } public DataOccupancy(String date,Double Temp, Double Humidity,Double Light, Double CO2, Double HumidityRatio , int Occupancy) { this.date = date; this.Temp = Temp; this.Humidity = Humidity; this.Light = Light; this.CO2 = CO2; this.HumidityRatio = HumidityRatio; this.Occupancy = Occupancy; } public String toString() { StringBuilder sb = new StringBuilder(); sb.append(date).append(","); sb.append(Temp).append(","); sb.append(Humidity).append(","); sb.append(Light).append(","); sb.append(CO2).append(","); sb.append(HumidityRatio).append(","); sb.append(Occupancy); return sb.toString(); } public static DataOccupancy instanceFromString(String line) { String[] tokens = line.split(","); if (tokens.length != 7) { System.out.println("#############Invalid record: " + line+"\n"); //return null; //throw new RuntimeException("Invalid record: " + line); } DataOccupancy diag = new DataOccupancy(); try { diag.date = tokens[0].length() > 0 ? tokens[0]:null; diag.Temp = tokens[1].length() > 0 ? Double.parseDouble(tokens[1]):null; diag.Humidity = tokens[2].length() > 0 ? Double.parseDouble(tokens[2]) : null; diag.Light = tokens[3].length() > 0 ? Double.parseDouble(tokens[3]) : null; diag.CO2 = tokens[4].length() > 0 ? Double.parseDouble(tokens[4]) : null; diag.HumidityRatio = tokens[5].length() > 0 ? Double.parseDouble(tokens[5]) : null; diag.Occupancy =tokens[6].length() > 0 ? Integer.parseInt(tokens[6]) : null; } catch (NumberFormatException nfe) { throw new RuntimeException("Invalid record: " + line, nfe); } return diag; } public long getEventTime() { return this.eventTime.getMillis(); } }
假设我们需要通过预测空值,可以定义一个SimpleRegression类:
public class SimplePredictionModel {
SimpleRegression model;
public SimplePredictionModel() {
model = new SimpleRegression(false);
}
public double predictY(double x) {
double prediction = model.predict(x);
if (Double.isNaN(prediction)) {
return -1;
}
else {
return prediction;
}
}
public void refineModel(double x, double y) {
model.addData(x, y);
}
}
定义Operation Chain:
DataStream<DataOccupancy> DsOccupancy = env.addSource(
new occupancySource(input, servingSpeedFactor));
DataStream<String> modDataStrForLR = DsOccupancy
.map(new mapTime()).keyBy(0)
.flatMap(new NullTempFillMean()).keyBy(0)
.flatMap(new HumidityAndRedioPridict())
.flatMap(new OccupyFlatMapForLR());
modDataStrForLR.print();
modDataStrForLR.writeAsText("./OccupyDataForLR");
定义Map Operation:
public static class mapTime implements MapFunction<DataEnergy, Tuple2<Long, DataEnergy>> {
@Override
public Tuple2<Long, DataEnergy> map(DataEnergy energy) throws Exception {
long time = energy.eventTime.getMillis();;
return new Tuple2<>(time, energy);
}
}
定义均值填充空temperature 类NullTempFillMean:
public static class NullTempFillMean extends RichFlatMapFunction<Tuple2<Long, DataOccupancy> , Tuple2<Long, DataOccupancy>> {
private transient ValueState<Double> TemperatureMeanState;
@Override
public void flatMap(Tuple2<Long, DataOccupancy> val, Collector<Tuple2<Long, DataOccupancy>> out) throws Exception {
DataOccupancy occupancy = val.f1;
// compute distance and direction
Double TemperatureMean=TemperatureMeanState.value();
if (TemperatureMean== null){
TemperatureMean=0.0;
}
if(occupancy.Temp == null){
occupancy.Temp= TemperatureMean;
}else if(abs(occupancy.Temp -TemperatureMean) > 8){// 和均值差距较大的温度的event 不保存到文件做模型训练
// log exception temp
}
else
{
TemperatureMean=(TemperatureMean+occupancy.Temp)/2;
TemperatureMeanState.update(TemperatureMean);
}
out.collect(new Tuple2<>(val.f0,occupancy));
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Double> descriptor2 =
new ValueStateDescriptor<>(
"regressionModel",
TypeInformation.of(Double.class));
TemperatureMeanState = getRuntimeContext().getState(descriptor2);
}
}
定义空值预测Flatmap Operation 类HumidityAndRedioPridict:
使用ListState结构做多个参数的同时更新和做多个预测
public static class HumidityAndRedioPridict extends RichFlatMapFunction<Tuple2<Long, DataOccupancy> , DataOccupancy> {
private transient ListState<SimplePredictionModel> modelStates;
private List<SimplePredictionModel> models;
@Override
public void flatMap(Tuple2<Long, DataOccupancy> val, Collector<DataOccupancy> out) throws Exception {
Iterator<SimplePredictionModel> modStateLst = modelStates.get().iterator();
SimplePredictionModel HumidityPridictModel=null;
SimplePredictionModel HumidityRatioPridictModel=null;
if(!modStateLst.hasNext()){
HumidityPridictModel = new SimplePredictionModel();
HumidityRatioPridictModel = new SimplePredictionModel();
}else{
HumidityPridictModel=modStateLst.next();
HumidityRatioPridictModel=modStateLst.next();
}
models= new ArrayList<SimplePredictionModel>();
models.add(HumidityPridictModel);
models.add(HumidityRatioPridictModel);
DataOccupancy occupancy = val.f1;
if(occupancy.HumidityRatio != null && occupancy.Humidity== null){
occupancy.Humidity = HumidityPridictModel.predictY(occupancy.HumidityRatio);
out.collect(occupancy);
}else if(occupancy.Humidity != null && occupancy.HumidityRatio== null){
occupancy.HumidityRatio = HumidityRatioPridictModel.predictY(occupancy.Humidity);
out.collect(occupancy);
}else if(occupancy.Humidity != null && occupancy.HumidityRatio != null) {
HumidityRatioPridictModel.refineModel(occupancy.Humidity, occupancy.HumidityRatio);
modelStates.update(models);
out.collect(occupancy);
}else{
//log Humidity,HumidityRatio are both null
System.out.println("~~~~~!Humidity,HumidityRatio are both null");
}
}
@Override
public void open(Configuration config) {
// obtain key-value state for prediction model
ListStateDescriptor<SimplePredictionModel> descriptor =
new ListStateDescriptor<>(
"regressionModel",
SimplePredictionModel.class);
modelStates = getRuntimeContext().getListState(descriptor);
}
}
定义 保存特定列特定文件格式的flatmap operation:
public static class OccupyFlatMapForLR implements FlatMapFunction<DataOccupancy, String> { @Override public void flatMap(DataOccupancy diag, Collector<String> collector) throws Exception { StringBuilder sb = new StringBuilder(); sb.append(diag.date).append(","); sb.append(diag.Temp).append(","); sb.append(diag.Humidity).append(","); sb.append(diag.Light).append(","); sb.append(diag.CO2).append(","); sb.append(diag.HumidityRatio).append(","); sb.append(diag.Occupancy); collector.collect(sb.toString()); } }
Spark 定时任务训练DecisionTree模型:
由于特征不是很多,没有选择RF,使用DecisionTree就够了
object OccupyDecisionTree {
case class Occupy(
Temp: Double, Humidity: Double, Light: Double, CO2: Double, HumidityRatio: Double, Occupancy: Double
)
def parseOccupy(line: Array[Double]): Occupy = {
Occupy(
line(0),line(1) , line(2), line(3), line(4), line(5)
)
}
def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
rdd.map(_.split(",")).map(_.map(_.toDouble))
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SparkDFebay").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = parseRDD(sc.textFile("D:/code/flink/OccupyDataForLR")).map(parseOccupy).toDF().cache()
val featureCols = Array("Temp", "Humidity", "Light", "CO2", "HumidityRatio")
val featureIndexer = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val df2 = featureIndexer.transform(df)
df2.show
val Array(trainingData, testData) = df2.randomSplit(Array(0.7, 0.3), 5000)
val classifier = new DecisionTreeClassifier()
.setLabelCol("Occupancy")
.setFeaturesCol("features")
.setImpurity("gini")
.setMaxDepth(5)
val model = classifier.fit(trainingData)
try {
val predictions = model.transform(testData)
predictions.show()
val evaluator = new BinaryClassificationEvaluator().setLabelCol("Occupancy").setRawPredictionCol("prediction")
val accuracy = evaluator.evaluate(predictions)
println("accuracy pipeline fitting: " + accuracy)
}catch{
case ex: Exception =>println(ex)
case ex: Throwable =>println("found a unknown exception"+ ex)
}
}
}
模型效果评估打印:
accuracy pipeline fitting: 0.9939532334673
没有评论