flink&spark stream 实践–在线LR模型训练与预测

2018年4月30日

项目目的:

和《flink实践–datastream-taxi到达目的地时间预测》章节一样,目的还是利用实时获取taxi行车轨迹数据,实时数据经过处理和提取特征过程,特征数据再实时被用于模型在线训练模型,用于实时预测taxi 到达目的地的时间,

只不不过实现方式稍微不同,由于flink的ML库不是很成熟, 新方案试着使用flink  实现特征提取 dataStream, 通过Kafka/socket 传给 spark stream , 再利用Spark ML 库实现实时模型训练

  1.  flink 服务

1.1  获取实时数据并提取模型需要的特征数据

1.2 保存部分数据到文件系统, 用于训练离线初始模型

1.3 实时数据传输到kafka

2. spark 服务

2.1 接受kafka实时流数据/直接监控flink服务保存的数据文件

2.2 使用spark MLlib支持streaming的 算法函数做模型训练

 

数据集:

数据集也和之前文档中一样,是包含纽约市的出租车出行的信息,每一次出行包含两个事件:START和END,可以分别理解为开始和结束该行程。每一个事件包括11个属性:

taxiId         : Long      // a unique id for each taxi
driverId       : Long      // a unique id for each driver
isStart        : Boolean   // TRUE for ride start events, FALSE for ride end events
startTime      : DateTime  // the start time of a ride
endTime        : DateTime  // the end time of a ride,
                           //   "1970-01-01 00:00:00" for start events
startLon       : Float     // the longitude of the ride start location
startLat       : Float     // the latitude of the ride start location
endLon         : Float     // the longitude of the ride end location
endLat         : Float     // the latitude of the ride end location
passengerCnt   : Short     // number of passengers on the ride

 

详细说明

  1.  flink 服务

1.1 生成流数据:

和之前的文档内容一样,这里不赘述;

1.2 流数据处理:

和之前不一样的地方:

a. 增加一个Sink 算子,用于把数据传输到Kafka 传输到外部服务(本样例暂时不用):

b. 增加写数据集到文件的操作,用于传输到外部服务,本样例用于做实时训练模型;

        DataStream<String> modDataStr = rides
                // filter out rides that do not start or stop in NYC
                .filter(new NYCFilter())
                // map taxi ride events to the grid cell of the destination
                .map(new GridCellMatcher())
                // organize stream by destination
                .keyBy(0)
                // predict and refine model per destination
                .map(new ModelDataStr());

        modDataStr.print();
        // write the filtered data to a Kafka sink
        Properties kafkaprops=new Properties();
        kafkaprops.setProperty("zookeeper.connect",ZOOKEEPER_HOST);
        kafkaprops.setProperty("bootstrap.servers",KAFKA_BROKER);
        kafkaprops.setProperty("group.id",GROUP);
        modDataStr.addSink(new FlinkKafkaProducer011<String>(
                TOPIC,
                new SimpleStringSchema(),
                kafkaprops));

        DataStream<Tuple2<String,String>> ModData=modDataStr.map(new ModelDataTuple2());
        ModData.writeAsCsv("./travalModDataT2");

新的flatmap自定义函数ModelDataStr用于输出到kafka的String格式

    public static class ModelDataStr implements MapFunction<Tuple2<Integer, TaxiRide>,  String> {

        @Override
        public  String map (Tuple2<Integer, TaxiRide> val) throws Exception {

            TaxiRide ride = val.f1;
            // compute distance and direction
            double distance = GeoUtils.getEuclideanDistance(ride.startLon, ride.startLat, ride.endLon, ride.endLat);
            int direction = GeoUtils.getDirectionAngle(ride.endLon, ride.endLat, ride.startLon, ride.startLat);
            double travelTime = (ride.endTime.getMillis() - ride.startTime.getMillis()) / 60000.0;
            if (travelTime<0)
                travelTime =travelTime/10000000;

            return new String( String.valueOf(direction)+' '+ String .format("%.3f",distance)+' '+ String .format("%.3f",travelTime));
        }
    }

ModelDataTuple2用于生成Tuple2类型存储到CSV,为了满足spark stream ML 函数StreamingLinearRegressionWithSGD 的入参要求,我们把结构输出到csv中为如下样式: label , feature1  feature2

public static class ModelDataTuple2 implements MapFunction<String,Tuple2<String, String>> {

   @Override
   public Tuple2<String, String> map(String val) throws Exception {
      // compute distance and direction
      String[] lst = val.split(" ");
      String direction = lst[0];
      String distance = lst[1];
      String travelTime = lst[2];

      return new Tuple2<>(travelTime,  distance+' '+direction);//travelTime是label,  distance,direction分别是特征
   }
}

生成的csv文件部分内容,schema 为travelTime,dirction ,distance:

-2.262,5.839 131
-2.262,2.629 133
-2.262,0.062 281

 

2.  spark stream + ML 在线训练服务

方案1: 对Dstream 调用foreachRDD ,自定义UDF函数中,每个RDD 使用数据训练成新模型,再保存新模型(数据量太少,感觉训练意义不大,放弃)

方案2: 使用spark stream 监控flink 流文件夹,检测到新增的文件后使用streaming ML函数读取相关流文件‘(StreamingLinearRegression数据集需要结构保存为y,[x1,x2] 结构数据, y表示标签);

—–优点:实现简单

—–缺点:Streaming ML的函数种类不太丰富(flink没有Streaming 类型的ML函数库,apatche math库中只有simpleRegression<只支持一元特征> 类支持数据实时积累 )

val path="D:/code/flink/travalModDataT2"//被监控的实时流文件目录,由前面的flink服务实时生成并更新文件

val trainingData = ssc.textFileStream(path).map(LabeledPoint.parse).cache()
val testData = ssc.textFileStream(path).map(LabeledPoint.parse)

val numFeatures = 2//特征的数量,我们样例中只有distance,direction两个特征,所以这里是2
val model = new StreamingLinearRegressionWithSGD()
  .setInitialWeights(Vectors.zeros(numFeatures))

model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()
ssc.awaitTermination()

测试数据实时训练模型的预测结果打印,第一列是输入Label ,第二列是预测的值:

-------------------------------------------
Time: 1548077230000 ms
-------------------------------------------
(-2.262,-2.280029162941661E151)
(-2.262,-2.3139863113041387E151)
(-2.262,-4.887529329162964E151)
(1.0,-1.7916585601831595E151)
(-2.262,-5.239095409639574E151)
(-2.262,-5.722627451818867E151)
(-2.262,-4.366333831190446E151)
(-2.262,-6.157797050574829E151)
(-2.262,-1.617725185431678E151)
(-2.262,-2.3658044264772473E151)

<相关streaming算法类是否提供 模型评估函数? 目前没有找到相关函数可以使用>

参考:

http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.html

http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.html#predictOnValues-org.apache.spark.streaming.dstream.DStream-scala.reflect.ClassTag-

 

方案3: spark定时任务读取flink生成的静态文件,就可以用非Streaming 的ML函数库了(之前有类似的样例,不赘述)。

 

没有评论

发表评论

邮箱地址不会被公开。 必填项已用*标注