项目目的:
和《flink实践–datastream-taxi到达目的地时间预测》章节一样,目的还是利用实时获取taxi行车轨迹数据,实时数据经过处理和提取特征过程,特征数据再实时被用于模型在线训练模型,用于实时预测taxi 到达目的地的时间,
只不不过实现方式稍微不同,由于flink的ML库不是很成熟, 新方案试着使用flink 实现特征提取 dataStream, 通过Kafka/socket 传给 spark stream , 再利用Spark ML 库实现实时模型训练
- flink 服务
1.1 获取实时数据并提取模型需要的特征数据
1.2 保存部分数据到文件系统, 用于训练离线初始模型
1.3 实时数据传输到kafka
2. spark 服务
2.1 接受kafka实时流数据/直接监控flink服务保存的数据文件
2.2 使用spark MLlib支持streaming的 算法函数做模型训练
数据集:
数据集也和之前文档中一样,是包含纽约市的出租车出行的信息,每一次出行包含两个事件:START和END,可以分别理解为开始和结束该行程。每一个事件包括11个属性:
taxiId : Long // a unique id for each taxi
driverId : Long // a unique id for each driver
isStart : Boolean // TRUE for ride start events, FALSE for ride end events
startTime : DateTime // the start time of a ride
endTime : DateTime // the end time of a ride,
// "1970-01-01 00:00:00" for start events
startLon : Float // the longitude of the ride start location
startLat : Float // the latitude of the ride start location
endLon : Float // the longitude of the ride end location
endLat : Float // the latitude of the ride end location
passengerCnt : Short // number of passengers on the ride
详细说明
-
flink 服务
1.1 生成流数据:
和之前的文档内容一样,这里不赘述;
1.2 流数据处理:
和之前不一样的地方:
a. 增加一个Sink 算子,用于把数据传输到Kafka 传输到外部服务(本样例暂时不用):
b. 增加写数据集到文件的操作,用于传输到外部服务,本样例用于做实时训练模型;
DataStream<String> modDataStr = rides // filter out rides that do not start or stop in NYC .filter(new NYCFilter()) // map taxi ride events to the grid cell of the destination .map(new GridCellMatcher()) // organize stream by destination .keyBy(0) // predict and refine model per destination .map(new ModelDataStr()); modDataStr.print(); // write the filtered data to a Kafka sink Properties kafkaprops=new Properties(); kafkaprops.setProperty("zookeeper.connect",ZOOKEEPER_HOST); kafkaprops.setProperty("bootstrap.servers",KAFKA_BROKER); kafkaprops.setProperty("group.id",GROUP); modDataStr.addSink(new FlinkKafkaProducer011<String>( TOPIC, new SimpleStringSchema(), kafkaprops)); DataStream<Tuple2<String,String>> ModData=modDataStr.map(new ModelDataTuple2()); ModData.writeAsCsv("./travalModDataT2");
新的flatmap自定义函数ModelDataStr用于输出到kafka的String格式
public static class ModelDataStr implements MapFunction<Tuple2<Integer, TaxiRide>, String> {
@Override
public String map (Tuple2<Integer, TaxiRide> val) throws Exception {
TaxiRide ride = val.f1;
// compute distance and direction
double distance = GeoUtils.getEuclideanDistance(ride.startLon, ride.startLat, ride.endLon, ride.endLat);
int direction = GeoUtils.getDirectionAngle(ride.endLon, ride.endLat, ride.startLon, ride.startLat);
double travelTime = (ride.endTime.getMillis() - ride.startTime.getMillis()) / 60000.0;
if (travelTime<0)
travelTime =travelTime/10000000;
return new String( String.valueOf(direction)+' '+ String .format("%.3f",distance)+' '+ String .format("%.3f",travelTime));
}
}
ModelDataTuple2用于生成Tuple2类型存储到CSV,为了满足spark stream ML 函数StreamingLinearRegressionWithSGD 的入参要求,我们把结构输出到csv中为如下样式: label , feature1 feature2
public static class ModelDataTuple2 implements MapFunction<String,Tuple2<String, String>> { @Override public Tuple2<String, String> map(String val) throws Exception { // compute distance and direction String[] lst = val.split(" "); String direction = lst[0]; String distance = lst[1]; String travelTime = lst[2]; return new Tuple2<>(travelTime, distance+' '+direction);//travelTime是label, distance,direction分别是特征 } }
生成的csv文件部分内容,schema 为travelTime,dirction ,distance:
-2.262,5.839 131
-2.262,2.629 133
-2.262,0.062 281
2. spark stream + ML 在线训练服务
方案1: 对Dstream 调用foreachRDD ,自定义UDF函数中,每个RDD 使用数据训练成新模型,再保存新模型(数据量太少,感觉训练意义不大,放弃)
方案2: 使用spark stream 监控flink 流文件夹,检测到新增的文件后使用streaming ML函数读取相关流文件‘(StreamingLinearRegression数据集需要结构保存为y,[x1,x2] 结构数据, y表示标签);
—–优点:实现简单
—–缺点:Streaming ML的函数种类不太丰富(flink没有Streaming 类型的ML函数库,apatche math库中只有simpleRegression<只支持一元特征> 类支持数据实时积累 )
val path="D:/code/flink/travalModDataT2"//被监控的实时流文件目录,由前面的flink服务实时生成并更新文件 val trainingData = ssc.textFileStream(path).map(LabeledPoint.parse).cache() val testData = ssc.textFileStream(path).map(LabeledPoint.parse) val numFeatures = 2//特征的数量,我们样例中只有distance,direction两个特征,所以这里是2 val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.zeros(numFeatures)) model.trainOn(trainingData) model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination()
测试数据实时训练模型的预测结果打印,第一列是输入Label ,第二列是预测的值:
-------------------------------------------
Time: 1548077230000 ms
-------------------------------------------
(-2.262,-2.280029162941661E151)
(-2.262,-2.3139863113041387E151)
(-2.262,-4.887529329162964E151)
(1.0,-1.7916585601831595E151)
(-2.262,-5.239095409639574E151)
(-2.262,-5.722627451818867E151)
(-2.262,-4.366333831190446E151)
(-2.262,-6.157797050574829E151)
(-2.262,-1.617725185431678E151)
(-2.262,-2.3658044264772473E151)
<相关streaming算法类是否提供 模型评估函数? 目前没有找到相关函数可以使用>
参考:
方案3: spark定时任务读取flink生成的静态文件,就可以用非Streaming 的ML函数库了(之前有类似的样例,不赘述)。
没有评论