大数据之Kafka Consumer端的一些解惑
张军 2018-03-28 来源 : 阅读 676 评论 0

摘要:最近一直忙着各种设计和文档,终于有时间来更新一点儿关于kafka的东西。之前有一篇文章讲述的是kafka Producer端的程序,也就是日志的生产者,这部分比较容易理解,业务系统将运行日志或者业务日志发送到broker中,由broker代为存储。那讲的是如何收集日志,今天要写的是如何获取日志,然后再做相关的处理。之前写过kafka是讲日志按照topic的形式存储,一个topic会按照partition存在同一个文件夹下,目录在config/server.properties中指定。

最近一直忙着各种设计和文档,终于有时间来更新一点儿关于kafka的东西。之前有一篇文章讲述的是kafka Producer端的程序,也就是日志的生产者,这部分比较容易理解,业务系统将运行日志或者业务日志发送到broker中,由broker代为存储。那讲的是如何收集日志,今天要写的是如何获取日志,然后再做相关的处理。

之前写过kafka是讲日志按照topic的形式存储,一个topic会按照partition存在同一个文件夹下,目录在config/server.properties中指定,具体的存储规则可以查看之前的文章:


# The directory under which to   store log files 


2  

log.dir=/tmp/kafka-logs

   


Consumer端的目的就是为了获取log日志,然后做进一步的处理。在这里我们可以将数据的处理按照需求分为两个方向,线上和线下,也可以叫实时和离线。实时处理部分类似于网站里的站短,有消息了马上就推送到前端,这是一种对实时性要求极高的模式,kafka可以做到,当然针对站短这样的功能还有更好的处理方式,我主要将kafka线上消费功能用在了实时统计上,处理一些如实时流量汇总、各系统实时吞吐量汇总等。

这种应用,一般采用一个consumer中的一个group对应一个业务,配合多个producer提供数据,如下图模
式:

大数据之Kafka Consumer端的一些解惑

采用这种方式处理很简单,采用官网上给的例子即可解决,只是由于版本的问题,代码稍作更改即可:


01  package com.a2.kafka.consumer;    
02   
03  import java.util.HashMap;   
04  import java.util.List;   
05  import java.util.Map;   
06  import java.util.Properties;   
07  import java.util.concurrent.ExecutorService;    
08  import java.util.concurrent.Executors;    
09    
10  import kafka.consumer.Consumer;    
11  import kafka.consumer.ConsumerConfig;   
12  import kafka.consumer.KafkaStream;    
13  import kafka.javaapi.consumer.ConsumerConnector;   
14  import kafka.message.Message;    
15  import kafka.message.MessageAndMetadata;    
16   
17  public classCommonConsumer {   
18  publicstatic void main(String[] args) {   
19  // specify some consumer   properties    
20  Properties props =new Properties();    
21  props.put("zk.connect","192.168.181.128:2181");    
22  props.put("zk.connectiontimeout.ms","1000000");    
23  props.put("groupid","test_group");    
24    
25  // Create the connection to the   cluster    
26  ConsumerConfig consumerConfig   =new ConsumerConfig(props);    
27   ConsumerConnector   consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);    
28    
29  Map   map=newHashMap();    
30  map.put("test",2);  
31  // create 4 partitions of the   stream for topic “test”, to allow 4 threads to consume    
32  Map<String,   List<KafkaStream>> topicMessageStreams =
33  consumerConnector.createMessageStreams(map);
34  List<KafkaStream>   streams = topicMessageStreams.get("test"); 
35
36  // create list of 4 threads to   consume from each of the partitions
37   ExecutorService executor =   Executors.newFixedThreadPool(4);
38
39   // consume the messages in the   threads
40   for(finalKafkaStream   stream: streams) {
41   executor.submit(newRunnable() {
42   publicvoid run() {   
43   for(MessageAndMetadata   msgAndMetadata: stream) {
44   // process message   (msgAndMetadata.message())     
45   System.out.println(msgAndMetadata.message());     
46 }     
47 } 
48 });
49 } 
50 } 
51 }

   

这是一个user level的API,还有一个low level的API可以从官网找到,这里就不贴出来了。这个consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据打印出来,这是不是十分符合实时性的要求。

当然这里会产生一个很严重的问题,如果你重启一下上面这个程序,那你连一条数据都抓不到,但是你去log文件中明明可以看到所有数据都好好的存在。换句话说,一旦你消费过这些数据,那你就无法再次用同一个groupid消费同一组数据了。我已经把结论说出来了,要消费同一组数据,你可以采用不同的group。

简单说下产生这个问题的原因,这个问题类似于transaction commit,在消息系统中都会有这样一个问题存在,数据消费状态这个信息到底存哪里。是存在consumer端,还是存在broker端。对于这样的争论,一般会出现三种情况:

  • At most once—this handles the first case described. Messages are immediately marked as consumed, so they can't be given out twice, but many failure scenarios may lead to losing messages.

  • At least once—this is the second case where we guarantee each message will be delivered at least once, but in failure cases may be delivered twice.

  • Exactly once—this is what people actually want, each message is delivered once and only once.

第一种情况是将消费的状态存储在了broker端,一旦消费了就改变状态,但会因为网络原因少消费信息,第二种是存在两端,并且先在broker端将状态记为send,等consumer处理完之后将状态标记为consumed,但也有可能因为在处理消息时产生异常,导致状态标记错误等,并且会产生性能的问题。第三种当然是最好的结果。

Kafka解决这个问题采用high water mark这样的标记,也就是设置offset:

1   

Kafka does two unusual things   with respect to metadata. First the stream is partitioned on the brokers into   asetof distinct partitions. The semantic meaning of these partitions is left   up to the producer and the producer specifieswhich partition a message belongs to.   Within a partition messages are storedinthe order inwhich they arrive at the broker, and   will be given out to consumersin that same order. This means   that rather than store metadataforeach message (marking it as consumed, say),   we just need to store the"high water mark" foreach combination of   consumer, topic, and partition. Hence the total metadata required to   summarize the state of the consumer is actually quite small. In Kafka we   refer to this high-water mark as"the offset" for reasons that will become clear in  the implementation section.

   

所以在每次消费信息时,log4j中都会输出不同的offset:

1  

[FetchRunnable-0] INFO :   kafka.consumer.FetcherRunnable#info : FetchRunnable-0start fetching topic:   test part: 0offset: 0 from 192.168.181.128:9092

2

3  

[FetchRunnable-0] INFO :   kafka.consumer.FetcherRunnable#info : FetchRunnable-0start fetching topic:   test part: 0offset: 15 from 192.168.181.128:9092   

除了采用不同的groupid去抓取已经消费过的数据,kafka还提供了另一种思路,这种方式更适合线下的操作,镜像。


本文由 @职坐标 发布于职坐标。未经许可,禁止转载。
喜欢 | 3 不喜欢 | 0
看完这篇文章有何感觉?已经有3人表态,100%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论
本文作者 联系TA

10年项目开发经验,精通pc、mobile平台软件开发主流技术和框架

  • 7
    文章
  • 1681
    人气
  • 88%
    受欢迎度

已有17人表明态度,88%喜欢该老师!

进入TA的空间
名师指导直通车
  • 资料索取
    资料索取
  • 答疑解惑
    答疑解惑
  • 技术交流
    技术交流
  • 职业测评
    职业测评
  • 面试技巧
    面试技巧
  • 高薪秘笈
    高薪秘笈
TA的其他文章 更多>>
大数据离线处理—hive实用教学
经验技巧 0% 的用户喜欢
大数据离线处理—Hive实战
经验技巧 0% 的用户喜欢
大数据离线处理—hive的性能调优
经验技巧 0% 的用户喜欢
大数据离线处理—hive动态分区
经验技巧 0% 的用户喜欢
大数据离线处理—hive分区表
经验技巧 0% 的用户喜欢
其他海同名师 更多>>
刘新华
刘新华 联系TA
实力型。激情饱满,对专业充满热情
吴翠红
吴翠红 联系TA
独创“教、学、练、测”循环教学模式
吕益平
吕益平 联系TA
熟悉企业软件开发的产品设计及开发
黄泽民
黄泽民 联系TA
擅长javase核心技术
程钢
程钢 联系TA
擅长大型企业商业网站开发和管理
经验技巧30天热搜词 更多>>

您输入的评论内容中包含违禁敏感词

我知道了

X
免费获取海同IT培训资料
验证码手机号,获得海同独家IT培训资料
获取验证码
提交

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    ICP许可  沪B2-20190160

站长统计