位置: IT常识 - 正文

Apache Druid 数据摄取---本地数据和kafka流式数据(apache druid使用)

编辑:rootadmin
Durid概述 Apache Druid是一个集时间序列数据库、数据仓库和全文检索系统特点于一体的分析性数据平台。本文将带你简单了解Druid的特性,使用场景,技术特点和架构。这将有助于你选型数据存储方案,深入了解Druid存储,深入了解时间序列存储等。 Apache Druid是一个高性能的实时分 ... Durid概述

推荐整理分享Apache Druid 数据摄取---本地数据和kafka流式数据(apache druid使用),希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:druid api,apache druid使用,apache druid github,apache的druid和阿里的druid,druid数据源,druid数据库,apache druid github,apache druid github,内容如对您有帮助,希望把文章链接给更多的朋友!

Apache Druid是一个集时间序列数据库、数据仓库和全文检索系统特点于一体的分析性数据平台。本文将带你简单了解Druid的特性,使用场景,技术特点和架构。这将有助于你选型数据存储方案,深入了解Druid存储,深入了解时间序列存储等。

Apache Druid是一个高性能的实时分析型数据库。

上篇文章,我们了解了Druid的加载方式,

咱么主要说两种,一种是加载本地数据,一种是通过kafka加载流式数据。

数据摄取4.1 加载本地文件

我们导入演示案例种的演示文件

4.1.1.1 数据选择

通过UI选择local disk

并选择Connect data

4.1.1.2 演示数据查看

演示数据在quickstart/tutorial目录下的wikiticker-2015-09-12-sampled.json.gz文件

4.1.1.3 选择数据源

因为我们是通过imply安装的,在Base directory输入绝对路径/usr/local/imply/imply-2021.05-1/dist/druid/quickstart/tutorial,File filter输入wikiticker-2015-09-12-sampled.json.gz,并选择apply应用配置,我们数据已经加载进来了

Base directory 和 File filter 分开是因为可能需要同时从多个文件中摄取数据。

4.1.1.4 加载数据

数据定位后,您可以点击"Next: Parse data"来进入下一步。

数据加载器将尝试自动为数据确定正确的解析器。在这种情况下,它将成功确定json。可以随意使用不同的解析器选项来预览Druid如何解析您的数据。

4.1.2 数据源规范配置4.1.2.1 设置时间列

json 选择器被选中后,点击 Next:Parse time 进入下一步来决定您的主时间列。

​Druid的体系结构需要一个主时间列(内部存储为名为_time的列)。如果您的数据中没有时间戳,请选择 固定值(Constant Value) 。在我们的示例中,数据加载器将确定原始数据中的时间列是唯一可用作主时间列的候选者。

这里可以选择时间列,以及时间的显示方式

4.1.2.2 设置转换器

在这里可以新增虚拟列,将一个列的数据转换成另一个虚拟列,这里我们没有设置,直接跳过

4.1.2.3 设置过滤器

这里可以设置过滤器,对于某些数据可以不进行显示,这里我们也跳过

4.1.2.4 配置schema

在 Configure schema 步骤中,您可以配置将哪些维度和指标摄入到Druid中,这些正是数据在被Druid中摄取后出现的样子。 由于我们的数据集非常小,关掉rollup、确认更改。

4.1.2.5 配置Partition

一旦对schema满意后,点击 Next 后进入 Partition 步骤,该步骤中可以调整数据如何划分为段文件的方式,因为我们数据量非常小,这里我们按照DAY进行分段

4.1.3 提交任务4.1.3.1 发布数据

点击完成 Tune 步骤,进入到 Publish 步,在这里我们可以给我们的数据源命名,这里我们就命名为druid-sampled,

点击下一步就可以查看我们的数据规范

​这就是您构建的规范,为了查看更改将如何更新规范是可以随意返回之前的步骤中进行更改,同样,您也可以直接编辑规范,并在前面的步骤中看到它。

4.1.3.2 提交任务

对摄取规范感到满意后,请单击 Submit,然后将创建一个数据摄取任务。

您可以进入任务视图,重点关注新创建的任务。任务视图设置为自动刷新,请等待任务成功。

当一项任务成功完成时,意味着它建立了一个或多个段,这些段现在将由Data服务器接收。

4.1.3.3 查看数据源

从标题导航到 Datasources 视图,一旦看到绿色(完全可用)圆圈,就可以查询数据源。此时,您可以转到 Query 视图以对数据源运行SQL查询。

4.1.3.4 查询数据

可以转到查询页面进行数据查询,这里在sql窗口编写sql后点击运行就可以查询数据了

4.2 kafka加载流式数据4.2.1 安装Kafka

这里我们使用docker-compose的方式启动kafka

4.2.1.1 编辑资源清单vi docker-compose.ymlversion: '2'services: zookeeper: image: zookeeper container_name: zookeeper ports: - 2181:2181 kafka: image: wurstmeister/kafka ## 镜像 volumes: - /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直) ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 192.168.64.190 ## 修改:宿主机IP KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 ## 卡夫卡运行是基于zookeeper的 KAFKA_ADVERTISED_PORT: 9092 KAFKA_LOG_RETENTION_HOURS: 120 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 KAFKA_NUM_PARTITIONS: 3 KAFKA_DELETE_RETENTION_MS: 10004.2.2.2 启动容器docker-compose up -ddocker-compose ps

4.2.3 验证kafka

启动kafka后需要验证kafka是否可用

4.2.3.1 登录容器

登录容器并进入指定目录

#进入容器docker exec -it kafka_kafka_1 bash#进入 /opt/kafka_2.13-2.7.0/bin/ 目录下cd /opt/kafka_2.13-2.7.0/bin/

4.2.3.2 发送消息

运行客户端发送消息,注意这里的连接地址需要写我们配置的宿主机地址

#运行kafka生产者发送消息./kafka-console-producer.sh --broker-list 192.168.64.173:9092 --topic test

发送的数据如下

{"datas":[{"channel":"","metric":"temperature","producer":"ijinus","sn":"IJA0101-00002245","time":"1543207156000","value":"80"}],"ver":"1.0"}

4.2.3.3 消费消息

运行消费者消费消息

./kafka-console-consumer.sh --bootstrap-server 192.168.64.173:9092 --topic test --from-beginning

Apache Druid 数据摄取---本地数据和kafka流式数据(apache druid使用)

有数据打印说明我们kafka安装是没有问题的

4.2.4 发送数据到kafka4.2.4.1 编写代码

编写代码发送消息到kafka中

@Componentpublic class KafkaSender { @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * 发送消息到kafka * * @param topic 主题 * @param message 内容体 */ public void sendMsg(String topic, String message) { kafkaTemplate.send(topic, message); }}@RestController@RequestMapping("/taxi")public class KafkaController { @Autowired private KafkaSender kafkaSender; @RequestMapping("/batchTask/{num}") public String batchAdd(@PathVariable("num") int num) { for (int i = 0; i < num; i++) { Message message = Utils.getRandomMessage(); kafkaSender.sendMsg("message", JSON.toJSONString(message)); } return "OK"; }}4.2.4.2 发送消息

使用postman 发送消息到kafka,消息地址:http://localhost:8010/taxi/batchTask/10,消息数据如下

显示OK说明消息已经发送到了kafka中

4.2.5 数据选择4.2.51 kafka数据查看

在load页面选择kafka,进行数据摄取模式选择

4.2.5.2 选择数据源

在这里输入ZK的地址以及需要选择数据的topic

116.62.213.90:10903,116.62.213.90:10904

4.2.5.3 加载数据

点击apply应用配置,设置加载数据源

4.2.6 数据源规范配置4.2.6.1 设置时间列

json 选择器被选中后,点击 Next:Parse time 进入下一步来决定您的主时间列。

​因为我们的时间列有两个创建时间以及打车时间,我们配置时间列为trvelDate

4.2.6.2 设置转换器

在这里可以新增虚拟列,将一个列的数据转换成另一个虚拟列,这里我们增加一个状态的虚拟列,来显示状态的中文名称我们定义 0:测试数据, 1:发起打车,2:排队中,3:司机接单,4:乘客上车,5:完成打车

我们使用case_simple来实现判断功能,更多判断功能参考

case_simple(status,0,'测试数据',1,'发起打车',2,'排队中',3,'司机接单',4,'完成打车','状态错误')

在这里我们新建了一个status_text的虚拟列来展示需要中文显示的列

配置年龄默认值,如果为空我们设置为25

nvl(age,25)

配置性别设置,我们需要设置为男女,0:男,1:女,如果为null,我们设置为男

case_simple(nvl(sex,0),0,'男',1,'女','男')

4.2.6.3 设置过滤器

这里可以设置过滤器,对于某些数据不展示,这里我们使用区间过滤器选择显示status>=1的数据,具体表达式可用参考

{ "type" : "bound", "dimension" : "status", "ordering": "numeric", "lower": "1", }

因为我们把数据是0的测试数据不显示了,所以只显示了一条数据为1的数据

4.2.6.4 配置schema

在 Configure schema 步骤中,您可以配置将哪些维度和指标摄入到Druid中,这些正是数据在被Druid中摄取后出现的样子。 由于我们的数据集非常小,关掉rollup、确认更改。

4.2.6.5 配置Partition

一旦对schema满意后,点击 Next 后进入 Partition 步骤,该步骤中可以调整数据如何划分为段文件的方式,因为我们打车一般按照小时来算的,我们设置为分区为``hour

4.2.6.6 配置拉取方式

这里设置kafka的拉取方式,主要设置偏移量的一些配置

​在 Tune 步骤中,将 Use earliest offset 设置为 True 非常重要,因为我们需要从流的开始位置消费数据。 其他没有任何需要更改的地方,进入到 Publish 步

4.5.7 提交任务4.2.7.1 发布数据

点击完成 Tune 步骤,进入到 Publish 步,在这里我们可以给我们的数据源命名,这里我们就命名为taxi-message,

点击下一步就可以查看我们的数据规范

​这就是您构建的规范,为了查看更改将如何更新规范是可以随意返回之前的步骤中进行更改,同样,您也可以直接编辑规范,并在前面的步骤中看到它。

4.2.7.2 提交任务

对摄取规范感到满意后,请单击 Submit,然后将创建一个数据摄取任务。

您可以进入任务视图,重点关注新创建的任务。任务视图设置为自动刷新,请等待任务成功。

当一项任务成功完成时,意味着它建立了一个或多个段,这些段现在将由Data服务器接收。

4.2.7.3 查看数据源

从标题导航到 Datasources 视图,一旦看到绿色(完全可用)圆圈,就可以查询数据源。此时,您可以转到 Query 视图以对数据源运行SQL查询。

4.2.7.4 查询数据

可以转到查询页面进行数据查询,这里在sql窗口编写sql后点击运行就可以查询数据了

4.2.7.5 动态添加数据

发送一条数据到kafka

druid 查询数据,发现新的数据已经进来了

4.2.8 清理数据4.2.8.1 关闭集群# 进入impl安装目录cd /usr/local/imply/imply-2021.05-1# 关闭集群./bin/service --down

4.2.8.2 等待关闭服务

通过进程查看,查看服务是否已经关闭

ps -ef|grep druid

4.2.8.3 清理数据

通过删除druid软件包下的var目录的内容来重置集群状态

llrm -rf var

4.2.8.4 重新启动集群 nohup bin/supervise -c conf/supervise/quickstart.conf > logs/quickstart.log 2>&1 &4.2.8.5 查看数据源

登录后查看数据源,我们发现已经被重置了

本文由传智教育博学谷 - 狂野架构师教研团队发布如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力转载请注明出处!

本文链接地址:https://www.jiuchutong.com/zhishi/304694.html 转载请保留说明!

上一篇:关于 background-image 渐变gradient()那些事!(关于我和鬼变成家人的那件事)

下一篇:[Python]解密pyc文件(python解密加密文件)

  • 冰箱冷藏室结冰怎么解决小妙招(冰箱冷藏室结冰怎么解决)(冰箱冷藏室结冰是什么原因怎样处理)

    冰箱冷藏室结冰怎么解决小妙招(冰箱冷藏室结冰怎么解决)(冰箱冷藏室结冰是什么原因怎样处理)

  • 快手上传有赞商品信息无法识别(快手有赞商家版)

    快手上传有赞商品信息无法识别(快手有赞商家版)

  • 什么是IP(什么是ip营销)

    什么是IP(什么是ip营销)

  • 拼多多直接免拼是(拼多多直接免拼怎么获得)

    拼多多直接免拼是(拼多多直接免拼怎么获得)

  • 优酷快进震动怎么关(优酷按住屏幕快进的速度怎么调)

    优酷快进震动怎么关(优酷按住屏幕快进的速度怎么调)

  • qq注销别人能看见吗(qq注销了别人还能查到个人信息吗)

    qq注销别人能看见吗(qq注销了别人还能查到个人信息吗)

  • 淘小铺怎么推广(淘小铺如何推广)

    淘小铺怎么推广(淘小铺如何推广)

  • launchpad在苹果电脑里是什么意思(launchpad 在哪)

    launchpad在苹果电脑里是什么意思(launchpad 在哪)

  • 手机联系人黑名单在哪里查看(手机联系人黑名单查询方法)

    手机联系人黑名单在哪里查看(手机联系人黑名单查询方法)

  • 刚换的路由器能连上没有网络(换的新路由器)

    刚换的路由器能连上没有网络(换的新路由器)

  • 底端对齐怎么设置(底端对齐怎么设置出来)

    底端对齐怎么设置(底端对齐怎么设置出来)

  • 苹果id无效或不受支持是什么意思(苹果id无效或不受支持怎么办解除)

    苹果id无效或不受支持是什么意思(苹果id无效或不受支持怎么办解除)

  • 抖音如何给主播点赞(抖音如何给主播开会员)

    抖音如何给主播点赞(抖音如何给主播开会员)

  • 华为nova7是什么系统(华为nova7是什么充电器)

    华为nova7是什么系统(华为nova7是什么充电器)

  • 酷安app是干嘛用的(酷安app是做什么的)

    酷安app是干嘛用的(酷安app是做什么的)

  • 魅族手机右上角显示电话HD是什么情况(魅族手机右上角出现hd怎么取消)

    魅族手机右上角显示电话HD是什么情况(魅族手机右上角出现hd怎么取消)

  • 打印机打印不清晰是什么原因(打印机打印不清晰怎么解决)

    打印机打印不清晰是什么原因(打印机打印不清晰怎么解决)

  • wms系统是什么意思

    wms系统是什么意思

  • 新ipad哪些app可以删(ipad购买新手必看)

    新ipad哪些app可以删(ipad购买新手必看)

  • 淘宝买的东西出现质量问题怎么解决呢(淘宝买的东西出现质量问题怎么解决呢,时间也过了)

    淘宝买的东西出现质量问题怎么解决呢(淘宝买的东西出现质量问题怎么解决呢,时间也过了)

  • 7p的电池可以用在8p吗(7p的电池用了快6年了)

    7p的电池可以用在8p吗(7p的电池用了快6年了)

  • ipad能安装万能钥匙吗(ipad可以下载万能wifi吗)

    ipad能安装万能钥匙吗(ipad可以下载万能wifi吗)

  • 快手和抖音怎么同步(快手和抖音怎么投屏到电视)

    快手和抖音怎么同步(快手和抖音怎么投屏到电视)

  • 三星note10无线充电多少w(三星note10无线充电已暂停)

    三星note10无线充电多少w(三星note10无线充电已暂停)

  • 微博怎么设置虚拟位置(微博怎么设置虚拟抽奖)

    微博怎么设置虚拟位置(微博怎么设置虚拟抽奖)

  • 3d打印lom的中文名称(3d打印pom)

    3d打印lom的中文名称(3d打印pom)

  • xr怎么用副卡打电话(苹果xr怎么用副卡拨号)

    xr怎么用副卡打电话(苹果xr怎么用副卡拨号)

  • pokemmo怎么进去(pokemmorom在哪里)

    pokemmo怎么进去(pokemmorom在哪里)

  • Uni-app中的生命周期(uni-app原理)

    Uni-app中的生命周期(uni-app原理)

  • 可供出售金融资产包括哪些内容
  • 会计准则折旧年限规定最新
  • 资金占用费税前扣除比例
  • 累计折旧影响所得税费用吗
  • 另收取包装物押金
  • 会计做账写错了怎么办
  • 实控人变更是利好吗
  • 农产品收购发票可以跨区域开吗
  • 已入账未抵扣退回发票
  • 交易性金融资产有哪些
  • 发票大头小尾什么意思
  • 现金支票提取现金备用
  • 餐饮加盟公司怎么经营
  • 广告业福利发放会计处理怎么做?
  • 现金流量表年报期末现金余额
  • 处置子公司的收益
  • 运输业应列入
  • 辅导期一般纳税人预缴增值税
  • 税务大厅申报社保需要资料
  • 个体工商户增值税税率2023年
  • 企业所得税税率2022年最新税率表
  • 公司绿化购买的苗木如何进行账务处理?
  • 资产处置损益借贷方向表示什么
  • 做账的是什么会计
  • 报销车费怎么贴
  • 清算结业是什么意思
  • 购买的承兑怎么下账
  • 电脑安装的软件桌面上没显示
  • 成本转入费用
  • 主板bios在线升级
  • 音量图标不显示怎么回事
  • iis6配置php网站
  • 工程结算转收入
  • 怎么查上市公司
  • 和平之城
  • 商誉减值计算步骤
  • framework启动
  • 6372056181电子退库收入
  • 股份有限公司都是私企吗
  • 气温和降水空间变化一月平均气温规律是什么原因是什么
  • 收到国税退回的发票
  • python如何删除字典中的键值对
  • python怎么求列表里的和
  • 金税盘怎么取消
  • 只有进项发票,没有销项可以吗
  • 契税计入会计利润总额吗
  • 企业合并兼并收购的区别和联系
  • pnpnpn的工作原理
  • 善意取得增值税专用发票
  • sql语句中的decode函数
  • 月末库存商品成本计算
  • 车间劳保费计入什么科目
  • 个体工商户生产经营所得税优惠政策
  • 没有计提坏账准备的情况下,应收账款的计税基础
  • 申请个税退税账号是什么
  • 小微企业免征增值税优惠政策
  • 采购人员垫付怎么入账
  • 填制记账凭证有哪些
  • freebsd怎么样
  • freebsd重置密码
  • win10周年更新版是什么意思
  • win10系统版本20h2
  • win10电脑cmd命令大全
  • 向Windows8靠拢 全新的个性化库页面
  • perl 比较符
  • unity怎么创建摄像机
  • div垂直居中怎么设置css
  • jq获取单选框的值
  • html概念及作用
  • [置顶]津鱼.我爱你
  • ajax简单实例
  • window.parent与window.openner区别介绍
  • jquery控制css样式
  • 安卓版本apk
  • dom事件模型由什么组成
  • python文件的读操作方法
  • 由简入繁是成语吗
  • netbeans12.0安装教程
  • 北京税务稽查局网站
  • 去地税开个人发票
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设