flink实践–家用电器用电消耗分析&预测

2018年4月25日

项目目标:

对家用电器电量消耗数据做分析和多种模型的预测

a. 实时提取部分特征用于预测家用用电和季节的关系

b.提取部分特征用于聚类分析—根据用电量特征做聚类

c. 还可以使用Spark离线机器学习做复杂的预测

 

数据集(离线)分析:

household_power_consumption数据维度信息:

1.date: 日期,格式为 dd/mm/yyyy
2.time: 时间特征,格式 hh:mm:ss
3.global_active_power: household global minute-averaged active power (in kilowatt)  平均每分钟的家用active 状态的电力值(单位是千瓦)
4.global_reactive_power: household global minute-averaged reactive power (in kilowatt) 平均每分钟的家用reactive 状态的电力值(单位是千瓦)
5.voltage: minute-averaged voltage (in volt)  平均每分钟的电压值
6.global_intensity: household global minute-averaged current intensity (in ampere)   平均每分钟的家用电流强度值(单位是安培)
7.sub_metering_1: (in watt-hour of active energy).  kitchen, containing mainly a dishwasher, an oven and a microwave (hot plates are not electric but gas powered).  厨房电器的用电量
8.sub_metering_2:(in watt-hour of active energy).  laundry room, containing a washing-machine, a tumble-drier, a refrigerator and a light.  洗衣房电器的用电量
9.sub_metering_3:(in watt-hour of active energy).  electric water-heater and an air-conditioner   热水器,空调的用电量

Date;Time;Global_active_power;Global_reactive_power;Voltage;Global_intensity;Sub_metering_1;Sub_metering_2;Sub_metering_3
16/12/2006;17:24:00;4.216;0.418;234.840;18.400;0.000;1.000;17.000
16/12/2006;17:25:00;5.360;0.436;233.630;23.000;0.000;1.000;16.000
16/12/2006;17:26:00;5.374;0.498;233.290;23.000;0.000;2.000;17.000

数值型特征分析:

原来以为Global_active_power;Global_reactive_power;Voltage;Global_intensity;Sub_metering_1;Sub_metering_2;Sub_metering_3 几列都可能是数值型,看分析结果,并不是这样, 只有Sub_metering_3目前的数据全是Float型…部分数据存在类型不统一的异常值!

       Sub_metering_3
count    2.049280e+06
mean     6.458447e+00
std      8.437154e+00
min      0.000000e+00
25%      0.000000e+00
50%      1.000000e+00
75%      1.700000e+01
max      3.100000e+01

非数值型分布分析:

print(df.describe(include=[‘O’]))

              Date      Time      ...       Sub_metering_1 Sub_metering_2
count      2075250   2075250      ...              2075250        2075250
unique        1442      1440      ...                   89             82
top     24/12/2006  20:22:00      ...                0.000          0.000
freq          1440      1442      ...              1880175        1436830

特征数据类型分析:

再次证明数据种除了Sub_metering_3  之外,都存在混合类型的异常值

RangeIndex: 2075250 entries, 0 to 2075249
Data columns (total 9 columns):
Date                     object
Time                     object
Global_active_power      object
Global_reactive_power    object
Voltage                  object
Global_intensity         object
Sub_metering_1           object
Sub_metering_2           object
Sub_metering_3           float64
dtypes: float64(1), object(8)

异常值分析

如何找到异常值? 个人使用类似相关性分析脚本,设置其他列与Sub_metering_3的相关性分析

print(df[['Global_active_power', 'Sub_metering_3']].groupby(['Global_active_power'], as_index=False).mean().sort_values(by='Sub_metering_3', ascending=False))

比如看到Global_active_power存在为“?”的字符,而一般此时的Sub_metering_3为NaN

4168               9.606        0.000000
3998               8.246        0.000000
4069               8.592        0.000000
4006               8.274        0.000000
3375               6.812        0.000000
4186                   ?             NaN

其他列类似操作,发现都是出现? 号异常值和NaN异常值比较多(用于flink实时流处理数据时参考需要)

104   225.950        0.000000
2753  252.480        0.000000
2836  254.150        0.000000
2837        ?             NaN

sut_meterting_3列有少量空值情况,比例较少,可以考虑直接剔除;

                       Total   Percent
Date                       0  0.000000
Time                       0  0.000000
Global_active_power        0  0.000000
Global_reactive_power      0  0.000000
Voltage                    0  0.000000
Global_intensity           0  0.000000
Sub_metering_1             0  0.000000
Sub_metering_2             0  0.000000
Sub_metering_3         25979  0.012518

特征相关性分析:

所有数值型数据维度之间的相关性:

Global_active_power 和Global_intensity 基本是非常强相关为1.0, 机器学习建模时,这两个特征可以删掉其中一个

Voltage 和大部分特征都 负相关

 

month VS Global_active_power, hour VS Global_active_power相关性:

df["month"] = df['Date'].map(lambda x: x.split('/')[1])
print(df[['month', 'Global_active_power']].groupby(['month'], as_index=False).mean().sort_values(by='Global_active_power', ascending=False))

结果显示冬季11-3约的用电量较高

   month  Global_active_power
3     12             1.489729
0      1             1.462226
2     11             1.318757
4      2             1.300431
5      3             1.231343
1     10             1.128386
6      4             1.047146
7      5             1.029571
11     9             0.981280
8      6             0.909148
9      7             0.700359
10     8             0.548587
df["hour"] = df['Time'].map(lambda x: x.split(':')[0])

print(df[['hour', 'Global_active_power']].groupby(['hour'], as_index=False).mean().sort_values(by='Global_active_power', ascending=False))

结果显示时间点和用电量关系比较大, 一般用电高分分布在晚上7pm-22pm,  早上7am-10am, 00am-06am是用电低峰

   hour  Global_active_power
20   20             1.901192
21   21             1.880670
19   19             1.739385
7    07             1.483839
8    08             1.459619
22   22             1.422973
9    09             1.332947
18   18             1.321225
10   10             1.268536
11   11             1.244401
12   12             1.205533
13   13             1.145469
14   14             1.082330
17   17             1.058686
15   15             0.988936
16   16             0.949170
23   23             0.908003
6    06             0.794893
0    00             0.661046
1    01             0.540794
2    02             0.477152
5    05             0.453438
3    03             0.443682
4    04             0.443305

同理month VS Voltage , hour VS Voltage 相关性:

仍然是冬季Voltage相对较高; 但时间点上,反而是凌晨Voltage较高;

   month     Voltage
3     12  242.749065
0      1  242.177646
5      3  241.528648
4      2  241.459366
2     11  241.134285
6      4  241.044744
1     10  240.316443
11     9  240.231707
10     8  240.061417
9      7  240.003656
8      6  239.912401
7      5  239.008514
   hour     Voltage
23   23  242.946606
3    03  242.657551
2    02  242.587233
4    04  242.375368
0    00  242.348849
5    05  242.036669
15   15  242.018662
6    06  241.900241
16   16  241.757934
1    01  241.680677
14   14  241.485995
22   22  240.886302
13   13  240.678314
17   17  240.269351
12   12  240.126100
7    07  239.885990
8    08  239.462342
11   11  239.388296
9    09  239.367427
18   18  239.286458
10   10  239.278824
21   21  239.071192
20   20  239.036712
19   19  238.655403

同理month VS Sub_metering_1, hour VS Sub_metering_1( 厨房电器)相关性:

季节对于Sub_metering_1 的用电量影响不大,比较均衡,合理;

时间上,早,中,晚饭时间用电量较高,比较合理;

   month  Sub_metering_1
0      1        1.406481
8      6        1.330837
3     12        1.311991
2     11        1.306718
5      3        1.266721
7      5        1.234916
11     9        1.210070
4      2        1.132143
6      4        1.075624
1     10        1.060990
9      7        0.707637
10     8        0.487054
   hour  Sub_metering_1
21   21        2.812301
20   20        2.778697
19   19        2.653954
22   22        2.271091
12   12        1.542238
9    09        1.500025
11   11        1.489728
14   14        1.463379
15   15        1.459986
8    08        1.407922
18   18        1.259636
13   13        1.245665
10   10        1.134093
23   23        0.988710
16   16        0.971822
17   17        0.734044
0    00        0.383752
7    07        0.320170
1    01        0.266354
2    02        0.156038
3    03        0.066523
4    04        0.043836
6    06        0.035152
5    05        0.027251

同理month VS Sub_metering_2, hour VS Sub_metering_2(洗衣房电器)相关性:

可以看出似乎冬季的Sub_metering_2 的用电量比较高,夏季比较少;

时间点上大部分用电时间点在下午和晚上;

  month  Sub_metering_2
5      3        1.661699
0      1        1.548839
1     10        1.530327
3     12        1.441189
2     11        1.412368
4      2        1.386569
7      5        1.307813
11     9        1.280998
8      6        1.244410
6      4        1.173244
9      7        0.992232
10     8        0.782521
   hour  Sub_metering_2
14   14        2.555711
13   13        2.485943
19   19        2.099969
15   15        2.098161
12   12        2.045240
11   11        1.992813
17   17        1.959406
20   20        1.947948
16   16        1.898202
21   21        1.848688
18   18        1.796948
10   10        1.529939
22   22        1.212847
9    09        1.053148
8    08        0.983629
23   23        0.851175
0    00        0.552765
7    07        0.497944
1    01        0.389206
3    03        0.346190
2    02        0.339761
4    04        0.328333
6    06        0.327293
5    05        0.314384

同理month VS Sub_metering_3, hour VS Sub_metering_3(热水器,空调)相关性:

季节上看冬季和初春,Sub_metering_3类型电器用电较多;

时间点上看用电低峰在凌晨,在早上07点开始一直到下午,高峰聚集在09-10点

   month  Sub_metering_3
3     12        7.807011
0      1        7.794054
4      2        7.587332
2     11        7.067275
5      3        6.928493
7      5        6.696841
6      4        6.678212
1     10        6.420007
11     9        6.228666
8      6        6.172105
9      7        4.275768
10     8        3.640474
   hour  Sub_metering_3
9    09       12.496378
8    08       12.491658
10   10       11.554009
7    07       10.514781
11   11       10.204382
12   12        9.119202
20   20        8.466858
13   13        8.330243
21   21        7.622840
14   14        7.289591
19   19        6.796036
15   15        6.135814
22   22        5.745389
18   18        5.744447
16   16        5.402105
17   17        5.324743
6    06        4.164537
23   23        4.082671
0    00        3.015655
1    01        2.372564
5    05        2.106158
2    02        1.967618
4    04        1.828841
3    03        1.661874

 

Flink实现实时特征处理:

经过上述的离线数据分析后,得出一些数据特征处理的规则:

  1. 处理有异常值’?’的event
  2. 处理有空值特征的event
  3. Date特征需要提取抽象出新的季节特征列,并做哑变量化处理
  4. Time特征需要提取抽象出新的时间段特征列,并作哑变量化处理
  5. 如果出现新的异常特征值,有提示机制(方便更新特征处理规则)
  6. 定义各种Sink输出到外部服务, 用于一些批处理服务(ML等)
  7. 是否需要flink实时ML预测?

 

定义数据源

public static class PowerSource implements SourceFunction<myqPower> {

		private final String dataFilePath;
		private final int servingSpeed;

		private transient BufferedReader reader;
		private transient InputStream FStream;


		public PowerSource(String dataFilePath) {
			this(dataFilePath, 1);
		}

		public PowerSource(String dataFilePath, int servingSpeedFactor) {
			this.dataFilePath = dataFilePath;
			this.servingSpeed = servingSpeedFactor;
		}

		@Override
		public void run(SourceContext<myqPower> sourceContext) throws Exception {
			FStream = new FileInputStream(dataFilePath);
			reader = new BufferedReader(new InputStreamReader(FStream, "UTF-8"));

			String line;
			long time;
			while (reader.ready() && (line = reader.readLine()) != null) {
				myqPower power = myqPower.instanceFromString(line);
				if (power == null){
					continue;//出现异常--列个数错误<部分列出现空值>的数据返回null, 不处理该event,继续处理后续event
				}
				time = getEventTime(power);
				sourceContext.collectWithTimestamp(power, time);
				sourceContext.emitWatermark(new Watermark(time - 1));
			}

			this.reader.close();
			this.reader = null;
			this.FStream.close();
			this.FStream= null;
		}

		public long getEventTime(myqPower power) {
			return power.getEventTime();
		}

		@Override
		public void cancel() {
			try {
				if (this.reader != null) {
					this.reader.close();
				}
				if (this.FStream!= null) {
					this.FStream.close();
				}
			} catch (IOException ioe) {
				throw new RuntimeException("Could not cancel SourceFunction", ioe);
			} finally {
				this.reader = null;
				this.FStream= null;
			}
		}
	}

myqPower数据结构类定义:

public class myqPower implements  Serializable {
	public String Date;
	public String Time;
	public float Global_active_power;
	public float Global_reactive_power;
	public float Voltage;
	public float Global_intensity;
	public float Sub_metering_1;
	public float Sub_metering_2;
	public float Sub_metering_3;
	public DateTime eventTime;

        public myqPower() {
		this.eventTime = new DateTime();
	}

	public myqPower(String Date, String Time, float Global_active_power, float Global_reactive_power,
                     float Voltage, float Global_intensity, float Sub_metering_1, float Sub_metering_2,
					float Sub_metering_3) {
		this.eventTime = new DateTime();
		this.Date = Date;
		this.Time = Time;
		this.Global_active_power = Global_active_power;
		this.Global_reactive_power = Global_reactive_power;
		this.Voltage = Voltage;
		this.Global_intensity = Global_intensity;
		this.Sub_metering_1 = Sub_metering_1;
		this.Sub_metering_2 = Sub_metering_2;
		this.Sub_metering_3 = Sub_metering_3;
	}

	public String toString() {
		StringBuilder sb = new StringBuilder();
		sb.append(Date).append(",");
		sb.append(Time).append(",");
		sb.append(Global_active_power).append(",");
		sb.append(Global_reactive_power).append(",");
		sb.append(Voltage).append(",");
		sb.append(Global_intensity).append(",");
		sb.append(Sub_metering_1).append(",");
		sb.append(Sub_metering_2).append(",");
		sb.append(Sub_metering_3).append(",");

		return sb.toString();
	}

	public static myqPower instanceFromString(String line) {
		String[] tokens = line.split(";");
		if (tokens.length != 9) {
			System.out.println("#############Invalid record: " + line+"\n");//异常提示,可以改成Log形式,这里方便调试暂时用Stdout
			return null;
			//throw new RuntimeException("Invalid record: " + line);
		}

		myqPower power = new myqPower();
		try {
			power.Date = tokens[0];
			power.Time = tokens[1];
			power.Global_active_power = tokens[2].length() > 0 ? Float.parseFloat(tokens[2]) : 0.0f;
			power.Global_reactive_power = tokens[3].length() > 0 ? Float.parseFloat(tokens[3]) : 0.0f;
			power.Voltage = tokens[4].length() > 0 ? Float.parseFloat(tokens[4]) : 0.0f;
			power.Global_intensity = tokens[5].length() > 0 ? Float.parseFloat(tokens[5]) : 0.0f;
			power.Sub_metering_1 =Float.parseFloat(tokens[6]);
			power.Sub_metering_2 = Float.parseFloat(tokens[7]);
			power.Sub_metering_3 = Float.parseFloat(tokens[8]);

		} catch (NumberFormatException nfe) {
			throw new RuntimeException("Invalid record: " + line, nfe);
		}

		return power;
	}
	public long getEventTime() {
		return this.eventTime.getMillis();
	}
}

 

  • 异常值处理函数定义:

过滤掉空值,带有”?”号异常字符,Time,Date不满足格式要求的Event

public static class NoneFilter implements FilterFunction<myqPower> {

		@Override
		public boolean filter(myqPower power) throws Exception {
			return DateIsNone(power.Date) && TimeIsNone(power.Time) && power.Global_active_power!='?'&& power.Voltage!='?'
					&& power.Global_intensity!='?'&& power.Sub_metering_1!='?'&& power.Sub_metering_2!='?'&& power.Sub_metering_3!='?' ;
		}

		public boolean DateIsNone(String Date){
			String[] arr = Date.split("/");
			if (arr.length==3)
				return true;
			else
				return false;
		}

		public boolean TimeIsNone(String Time){
			String[] arr = Time.split(":");
			if (arr.length==3)
				return true;
			else
				return false;
		}

	}

 

  • 数值化处理定义函数

不同的算法模型需要不同的特征,因此可以根据需要定义不同的FlatMap函数

比如预测类算法:

Date特征需要提取抽象出新的季节特征列,并做哑变量化处理,转换成春,夏,秋,冬4个季节;

Time特征需要提取抽象出新的时间段特征列,并作哑变量化处理,需要转换成   早晨,上午,中午,下午,晚上,凌晨几个阶段;

public static class powerFlatMap implements FlatMapFunction<myqPower, Tuple2<String,String>> {

		@Override
		public void flatMap(myqPower myqPower, Collector<Tuple2<String,String>> collector) throws Exception {
			myqPower power = myqPower;
			myqPowerWashed powerWashed=new myqPowerWashed();

			int month =  Integer.parseInt(myqPower.Date.split("/")[1]);
			if(month >12){
				throw new RuntimeException("!!!!!!!!!!!!!!!!!!!!!!get month >12:"+ month);
			}
			else if(month >=11 || month<=2){
				powerWashed.winter=1;
			}else if(month>=2 && month<=4){
				powerWashed.Spring =1;
			}else if(month>4 && month <8){
				powerWashed.summer=1;
			}else{
				powerWashed.autumn=1;
			}

			int hour =  Integer.parseInt(myqPower.Time.split(":")[0]);
			if (hour >=7 && hour<12){
				powerWashed.morning=1;
			}else if(hour>=12 && hour<=14){
				powerWashed.noon =1;
			}else if(hour>14 && hour <18){
				powerWashed.afternoon=1;
			}else if(hour>=18 && hour <=23){
				powerWashed.evening=1;
			}else{
				powerWashed.beforeDawn=1;
			}
			powerWashed.Global_active_power=myqPower.Global_active_power;
			powerWashed.Global_reactive_power=myqPower.Global_reactive_power;
			powerWashed.Sub_metering_1=myqPower.Sub_metering_1;
			powerWashed.Sub_metering_2=myqPower.Sub_metering_2;
			powerWashed.Sub_metering_3=myqPower.Sub_metering_3; 
                        powerWashed.Voltage=myqPower.Voltage;
                        powerWashed.Global_intensity=myqPower.Global_intensity;
			collector.collect(new Tuple2<>(myqPower.Date,powerWashed.toString()));
		}
	}

比如聚类算法:

不同的spark 版本的kmeans算法的数据格式要求都不一样(伤脑筋…),比如mlib 库中的kmeans算法需要的格式比较简单就是特征之间用逗号隔开即可,因此定义的flatMap函数如下:

flink 特征处理增加一个flatmap函数用于生成 Spark Mlib 库的Kmeans类的格式:

特征使用了几个相关性较大的: 月份,时间点(小时),Global_active_power,Global_reactive_power,Voltage,Sub_metering_3

public static class powerFlatMapForKmeansMLib implements FlatMapFunction<myqPower, String> {

		@Override
		public void flatMap(myqPower myqPower, Collector<String> collector) throws Exception {
			myqPower power = myqPower;
			String month =  myqPower.Date.split("/")[1];
			String hour =  myqPower.Time.split(":")[0];
			StringBuilder sb = new StringBuilder();
			sb.append(month).append(",");
			sb.append(myqPower.Global_active_power).append(",");
			sb.append(myqPower.Global_reactive_power).append(",");
			sb.append(myqPower.Voltage).append(",");
			sb.append(myqPower.Sub_metering_3);

			collector.collect(sb.toString());
		}
	}

 

定义Flink Operation chain:

把不同的转换格式结果保存到不同的流式文件中,分别用于不同的算法处理

    public static void main(String[] args) throws Exception {

      ParameterTool params = ParameterTool.fromArgs(args);
      final String input = "D:/code/dataML/household_power_consumption.txt";
      final int servingSpeedFactor = 6000; // events of 10 minutes are served in 1 second

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // operate in Event-time
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

      // start the data generator
      DataStream<myqPower> power = env.addSource(
            new PowerSource(input, servingSpeedFactor));

      DataStream<myqPower> modDataStr = power
            .filter(new NoneFilter());
      DataStream<Tuple2<Integer,String>> modDataStrForLR = modDataStr.flatMap(new powerFlatMap())
            .keyBy(0); 
      modDataStrForLR .writeAsText("./powerFlatMapForLR");

      DataStream<String> powerFlatMapForKmeansMLib = modDataStr.flatMap(new powerFlatMapForKmeansMLib());
      powerFlatMapForKmeansMLib.print();
      powerFlatMapForKmeansMLib.writeAsText("./powerFlatMapForKmeansMLib");

      // run the prediction pipeline
      env.execute("household_power Analise");
   }

生成的用于做线性回归的数据特征文件部分内容如下:

0,0,0,1,0,0,1,0,0,5.374,0.498,0.0,0.0,2.0,17.0,
0,0,0,1,0,0,1,0,0,4.448,0.498,0.0,0.0,1.0,17.0,
0,0,0,1,0,0,1,0,0,3.266,0.0,0.0,0.0,0.0,18.0,
0,0,0,1,0,0,1,0,0,3.236,0.0,0.0,0.0,0.0,17.0,
0,0,0,1,0,0,1,0,0,4.058,0.2,0.0,0.0,0.0,17.0,

 

Spark离线模型—根据电量预测季节

根据用电情况预测是否是冬季(有点傻…)

object SparkRF {
  case class Power(
                    month: Double,
                    Spring: Double, summer: Double, autumn: Double, winter: Double, morning: Double, noon: Double, afternoon: Double, evening: Double, beforeDawn: Double, Global_active_power: Double,
                    Global_reactive_power: Double, Voltage: Double, Global_intensity: Double, Sub_metering_1: Double, Sub_metering_2: Double, Sub_metering_3: Double
                  )

  def parsePower(line: Array[Double]): Power = {
    Power(
      line(0),
      line(1), line(2), line(3), line(4), line(5),
      line(6) , line(7) , line(8), line(9) , line(10) ,
      line(11), line(12) , line(13), line(14) , line(15) ,
      line(16)
    )
  }

  def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
    rdd.map(_.split(",")).map(_.map(_.toDouble))
  }


  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkDFebay").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val path="D:/code/flink/PowerModData/4"
    import sqlContext.implicits._
    val powerRdd = parseRDD(sc.textFile(path)).map(parsePower)
    val powerDF=powerRdd.toDF().cache()

    powerDF.registerTempTable("PowerKmeans")


    val Cols = Array("Spring", "summer", "autumn", "winter", "morning",
      "noon", "afternoon", "evening", "beforeDawn", "Global_active_power",
      "Global_reactive_power", "Voltage", "Global_intensity", "Sub_metering_1", "Sub_metering_2",
      "Sub_metering_3")

    val featureCols = Array("Global_active_power","Global_reactive_power", "Global_intensity","Sub_metering_2", "Sub_metering_3")
    val feature1Cols = Array("Global_active_power","Spring", "summer", "autumn", "winter", "morning", "noon", "afternoon", "evening", "beforeDawn")

    val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
    val df2 = assembler.transform(powerDF)
    df2.show(8)

    val labelIndexer = new StringIndexer().setInputCol("winter").setOutputCol("label")
    val df3 = labelIndexer.fit(df2).transform(df2)
    df3.show

    val splitSeed = 5043
    val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)

    val classifier = new RandomForestClassifier().setImpurity("gini").setMaxDepth(3).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043)
    val model = classifier.fit(trainingData)

    val predictions = model.transform(testData)
    model.toDebugString
    predictions.show

    val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")
    val accuracy = evaluator.evaluate(predictions)
    println("accuracy fitting:" + accuracy)
  }
}

 

运行结果:

accuracy  fitting:0.6778437355975941

 

Spark 离线定时任务做Kmeans聚类:

flink 处理的Kmeans 格式数据集:

12,3.894,0.234,0.0
12,3.994,0.408,0.0
12,3.422,0.06,18.0
12,6.936,0.082,18.0
12,4.552,0.196,17.0

 

Spark定时离线训练任务:

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkDFebay").setMaster("local")
    val sc = new SparkContext(conf)

    val file="D:/code/flink-training-exercises-master/powerFlatMapForKmeansMLib/8"
    val rawTrainingData = sc.textFile(file)

    val RDDData = rawTrainingData.map(line => {
      Vectors.dense(line.split(",").map(_.trim).filter(!"".equals(_)).map(_.toDouble))
    }).cache()

    val Array(trainingData, testData) = RDDData.randomSplit(Array(0.7, 0.3))

    //选取最合适的分类数
    val ks: Array[Int] = Array(3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
    var  ret:Map[Int,Double] =Map()

    ks.foreach(cluster => {
      val model: KMeansModel = KMeans.train(trainingData, cluster, 30, 1)
      val ssd = model.computeCost(trainingData)
      ret += (cluster->ssd)
    })

    ret.foreach{ i =>
      println( "Key = " + i )}
  }

 

K为12时的误差相对最小:

Key = (5,512.0118262388754)
Key = (10,216.71946399437297)
Key = (6,437.06659783173933)
Key = (9,244.55911004099448)
Key = (12,152.3216833039682)
Key = (7,384.5780125426356)
Key = (3,1088.713054854713)
Key = (11,175.84191616805106)
Key = (8,307.5536910695736)
Key = (4,633.9407541008707)k=12 -> 53.965646951056

 

 

Spark Stream实现准实时模型训练–线性回归

  • 使用sparkstreaming ML的线性回归函数
val path="D:/code/flink/PowerModData"//被监控的实时流文件目录,由前面的flink服务实时生成并更新文件

val trainingData = ssc.textFileStream(path).map(LabeledPoint.parse).cache()
val testData = ssc.textFileStream(path).map(LabeledPoint.parse)

val numFeatures = 8//特征的数量
val model = new StreamingLinearRegressionWithSGD()
  .setInitialWeights(Vectors.zeros(numFeatures))

model.trainOn(trainingData)
val ret = model.predictOnValues(testData.map(lp => (lp.label, lp.features)))
ret.print()

 

 

没有评论

发表评论

邮箱地址不会被公开。 必填项已用*标注