apache druid 实时加载kafka 中的数据(一)

–>

简介

apache druid 是分布式列存储的 OLAP 框架。还是一个时间序列数据库。本篇文章主要是druid 在kafka 加载数据的配置。由于druid 升级情况太快,本人的环境还是在0.13,主要改动方面还是UI,新的版本在UI方面更适合新手入门。

文章如有帮助,请关注微信公共号。

最终使用druid时,是0.9版本,当时在kafka加载数据推荐的方式是两种

  • Tranquility

  • kafka index service

Tranquility

是用于将流实时推送到Druid的工具包。是一个独立,需要单独下载。

** 其特点**

无缝地处理分区,复制,服务发现和架构过渡,而无需停机。集成了http server,Samza,Spark ,Storm,Flink 等工具。

可以*的控制向druid,主动发送数据。

** 劣势**

本身具有时间窗,超过时间窗的数据直接丢弃。

版本落后,由于没有官方组织维护,目前版本只是兼容值0.9.2,后面druid升级后,Tranquility未及时升级,有些新的api 无法适配。

kafka index service

这是druid 自身携带的扩展插件,使用时,需要在common.runtime.properties 文件中的属性 druid.extensions.loadList 添加druid-kafka-indexing-service。

**  其特点**

支持实时查询按时间分segment,非实追加到对应时间的segment 。

通过算法把Peon分配到 不同的【 Middle Managers】上实现分布式

加大对应kafka的topic的partition数量 加大taskCount的值,产生更多的Peon

创建 supervisor

上面是一个完整的supervisor的内容,主要包含type,dataSchema,tuningConfig,ioConfig 四个部分

  • type

标记类型,supervisor  的类型 就是kafka.

  • dataSchema

数据库的配置,主要包含dataSource,parser,metricsSpec,granularitySpec

dataSource

druid的数据库名称。

parser

配置与解析数据。简单理解就是kafka中的数据与druid存储之间的关系映射。主要包含以下配置

timestampSpec

配置处于的位置 dataSchema->parser->timestampSpec

druid 本身是时间序列数据库,故此时间就是数据的主键。由于druid 在 0.9后,已经不支持设置时区了,时间都是采用的utc格式。druid查询时,可以设置时区。包括一些roll-up操作都是按照utc时间进行。如有必须需改动源码。

dimensionsSpec

位置:dataSchema->parser->dimensionsSpec

维度。数据库需要存储的字段,需要与kafka中的对应。

dimensions

是一个数组类型,默认字段的类型都是string

可以设置字段的类型,例如{ "type": "long", "name": "userId" }

metricsSpec

位置:dataSchema->metricsSpec

度量。此值roll-up 启用是才有意义。

`{      “name”: “theta_customer_id”,

“type”: “thetaSketch”,

“fieldName”: “customer_id”
           } `

name: druid中字段的名称。

type:指标类型。thetaSketch 去重。还支持doubleSum,longSum,doubleMin,doubleMax 等聚合类型。

fieldName:kafka中 属性的名称

granularitySpec

位置:      dataSchema->granularitySpec

segmentGranularity: Segment粒度(SegmentGranularity)表示每一个实时索引任务中产生的Segment所涵盖的时间范围。

queryGranularity:查询粒度。例如 {“queryGranularity”:”DAY”} 查询的最小粒度就是DAY,经过roll-up后,维度完全一样的数据,一天范围内将聚合为一条数据。

  • tuningConfig

调优相关的配置。

配置一个segment大小。

调整压缩算法。

  • ioConfig

    消费者的配置。对于kafak index service 就是kafka 消费者一个配置。

    下面的实例,配置了kafka的topic,启动的任务数量,任务执行的时间,kafka的地址。

    completionTimeout:这个值将发布任务声明为失败并终止之前等待的时间。如果设置得太低,您的任务可能永远不会发布。任务的发布时间大约在taskDuration过去之后开始。默认是30M,为防止任务未发布,调整为与任务时间一致(PT3600S)

"ioConfig": {        

"topic": "com.test",       

 "replicas": 1,         "taskCount": 1,     

   "taskDuration": "PT3600S",     

   "consumerProperties": {        

   "bootstrap.servers": "10.0.0.1:9096,10.0.0.1:9096"    

   },     

   "completionTimeout": "PT3600S"  

 }

提交supervisor

提交至overlord节点。

新版中出现界面配置

第一种,根据界面的配置向导来加载kafka数据

访问:8888  端口

一直按照向导配置,就可以自动生成supervisor的配置 很方便。

第二种,通过页面 提供的Submit supervisor提交 相应的json文件

总结

简单介绍了下supervisor  重点配置的具体含义,由于篇幅问题,详细的配置还需要去官网文档中查看。本文的目的就是通过个人使用 kafka index service时一些新得,帮助新手能快速跑通第一个druid实例。

文章如有帮助,请关注微信公共号。

本文来源 互联网收集,文章内容系作者个人观点,不代表 本站 对观点赞同或支持。如需转载,请注明文章来源,如您发现有涉嫌抄袭侵权的内容,请联系本站核实处理。

© 版权声明

相关文章