Uber永久定位系统实时数据分析过程实践!
在下面的代码中,我们使用parseUber函数注册一个用户自定义函数(UDF)来反序列化消息值字符串。我们在带有df1列值的String Cast的select表达式中使用UDF,该值返回Uber对象的DataFrame。 ![]() 使用集群中心ID和位置丰富的Uber对象数据集 VectorAssembler用于转换并返回一个新的DataFrame,其中包含向量列中的纬度和经度要素列。 ![]() ![]() k-means模型用于通过模型转换方法从特征中获取聚类,该方法返回具有聚类ID(标记为预测)的DataFrame。生成的数据集与先前创建的集群中心数据集(ccdf)连接,以创建UberC对象的数据集,其中包含与集群中心ID和位置相结合的行程信息。 ![]() ![]() 最后的数据集转换是将唯一ID添加到对象以存储在MapR-DB JSON中。createUberwId函数创建一个唯一的ID,包含集群ID和反向时间戳。由于MapR-DB按id对行进行分区和排序,因此行将按簇的ID新旧时间进行排序。 此函数与map一起使用以创建UberwId对象的数据集。 ![]() 接下来,为了进行调试,我们可以开始接收数据并将数据作为内存表存储在内存中,然后进行查询。 ![]() 以下是来自 %sqlselect * from uber limit 10 的示例输出: ![]() 现在我们可以查询流数据,询问哪段时间和集群内的搭乘次数最多?(输出显示在Zeppelin notebook中)
SELECT hour(uber.dt) as hr,cid, count(cid) as ct FROM uber group By hour(uber.dt), cid ![]() Spark Streaming写入MapR-DB ![]() 用于Apache Spark的MapR-DB连接器使用户可以将MapR-DB用作Spark结构化流或Spark Streaming的接收器。 ![]() 当你处理大量流数据时,其中一个挑战是存储位置。对于此应用程序,可以选择MapR-DB JSON(一种高性能NoSQL数据库),因为它具有JSON的可伸缩性和灵活易用性。 JSON模式的灵活性 MapR-DB支持JSON文档作为本机数据存储。MapR-DB使用JSON文档轻松存储,查询和构建应用程序。Spark连接器可以轻松地在JSON数据和MapR-DB之间构建实时或批处理管道,并在管道中利用Spark。 ![]() 使用MapR-DB,表按集群的键范围自动分区,提供可扩展行和快速读写能力。在此用例中,行键_id由集群ID和反向时间戳组成,因此表将自动分区,并按最新的集群ID进行排序。 ![]() (编辑:衡水站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |