博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka详解与总结(二)
阅读量:5711 次
发布时间:2019-06-17

本文共 1839 字,大约阅读时间需要 6 分钟。

Kafka Stream

Kafka Streams是一个客户端库,用于构建任务关键型实时应用程序和微服务,其中输入和输出数据存储在Kafka集群中。Kafka Streams结合了在客户端编写和部署标准Java和Scala应用程序的简单性以及Kafka服务器端集群技术的优势,使这些应用程序具有高度可扩展性,弹性,容错性,分布式等等。

以下是示例代码的要点(为了方便阅读,使用的是java8 lambda表达式)。

步骤:

1.启动zk和kafka

> bin/zookeeper-server-start.sh config/zookeeper.properties

> bin/kafka-server-start.sh config/server.properties

 

  1. 准备输入主题并启动生产者

创建名为streams-plaintext-input的输入主题和名为streams-wordcount-output的输出主题:

> bin/kafka-topics.sh --create \

    --zookeeper localhost:2181 \

    --replication-factor 1 \

    --partitions 1 \

    --topic streams-plaintext-input

Created topic "streams-plaintext-input".

注意:我们创建输出主题并启用压缩,因为输出流是更改日志流

> bin/kafka-topics.sh --create \

    --zookeeper localhost:2181 \

    --replication-factor 1 \

    --partitions 1 \

    --topic streams-wordcount-output \

    --config cleanup.policy=compact

Created topic "streams-wordcount-output".

使用相同的kafka-topics工具描述创建的主题:

> bin/kafka-topics.sh --zookeeper localhost:2181 --describe

 

  1. 启动Wordcount应用程序

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

演示应用程序将从输入主题stream-plaintext-input读取,对每个读取消息执行WordCount算法的计算,并将其当前结果连续写入输出主题streams-wordcount-output

 

  1. 处理数据

开启一个生产者终端:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic

 streams-plaintext-input

all streams lead to kafka

开启一个消费者终端:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \

    --topic streams-wordcount-output \

    --from-beginning \

    --formatter kafka.tools.DefaultMessageFormatter \

    --property print.key=true \

    --property print.value=true \

    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \

--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all     1

streams 1

lead    1

to      1

kafka   1

这里,第一列是java.lang.String格式的Kafka消息键,表示正在计数的单词,第二列是java.lang.Long格式的消息值,表示单词的最新计数。

转载于:https://www.cnblogs.com/htkj/p/10941438.html

你可能感兴趣的文章
几种进程间的通信方式
查看>>
PS 如何使用液化工具给人物减肥
查看>>
cvc-complex-type.2.4.c: The matching wildcard...
查看>>
android 读取json数据(遍历JSONObject和JSONArray)
查看>>
pyjamas build AJAX apps in Python (like Google did for Java)
查看>>
<JavaScript语言精粹>-读书笔记(一)
查看>>
NPM教程
查看>>
Java学习笔记(40)——Java集合12之fail-fast
查看>>
Centos 配置IP的方式
查看>>
Go 的吉祥物,萌不萌
查看>>
【iOS】AFN网络请求通过获取cookies保持会话
查看>>
Java 的swing.GroupLayout布局管理器的使用方法和实例
查看>>
Android中Activity和Fragment的生命周期的对比
查看>>
C++Primer_笔记_异常处理
查看>>
分区交换 alter table exchange partition 在线表 历史表交换
查看>>
思科三层交换 HSRP 热备 配置方法
查看>>
zabbix详解:(二)添加被监控机器
查看>>
设计模式单列
查看>>
人像模式的灯光效果?iPhone 8开挂袭来
查看>>
Linux下MongoDB安装与配置
查看>>