
- 以时间复杂度O(1)的方式提供消息持久化能力,并对大数据量能保证常数时间的访问性能;
- 高吞吐率,单台服务器可以达到每秒几十万的吞吐速率;
- 支持服务器间的消息分区,支持分布式消费,同时保证了
;- 轻量级,支持实时数据处理和离线数据处理两种方式。
1. FrameWork
Producer 使用 push 模式将消息发布到 broker,consumer 通过监听使用 pull 模式从 broker 订阅并消费消息。 多个 broker 协同工作,producer 和 consumer 部署在各个业务逻辑中。三者通过 zookeeper管理协调请求和转发。这样就组成了一个高性能的分布式消息发布和订阅系统。
- broker: 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
- topic: 主题,Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
- consumergroup: 每个Consumer属于一个特定的Consumer Group,
一条消息可以发送到多个不同的Consumer Group
,但是一个Consumer Group中只能有一个Consumer能够消费该消息
- Partion: 一个topic可以分为多个partition,每个partition内部是有序的
- cleaner-offset-checkpoint: 存了
- meta.properties: broker.id 信息
- recovery-point-offset-checkpoint:表示
。recoveryPoint以下的数据都是已经刷 到磁盘上的了。 - replication-offset-checkpoint: 用来存储每个replica的HighWatermark的(high watermark (HW),表示已经被commited的message,HW以下的数据都是各个replicas间同步的,一致的。)
1. KafkaConsumer
The consumer will
transparently handle the failure of servers in the Kafka cluster,
andadapt as topic-partitions are created or migrate between brokers
. It also interacts with the assigned kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >=
:客户端收到消息后,在处理消息前自动提交,这样kafka就认为consumer已经消费过了,偏移量增加。enable.auto.commit为ture; enable.auto.commit为ture; client不要调用commitSync(),kafka在特定的时间间隔内自动提交。最少一次
。这样就可能出现消息处理完了,在提交反馈前,网络中断或者程序挂了,那么kafka认为这个消息还没有被consumer消费,产生重复消息推送。设置enable.auto.commit为false; client调用commitSync(),增加消息偏移;正好一次
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'], consumer_timeout_ms=1000)
for msg in consumer:
第1个参数为 topic的名称
group_id : 指定此消费者实例属于的组名,可以不指定
bootstrap_servers : 指定kafka服务器
若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待
consumer_timeout_ms : 毫秒数
from kafka import KafkaConsumer
from kafka import TopicPartition
consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
consumer.assign([TopicPartition(topic= 'my_topic', partition= 0)]) #手动分配partition
for msg in consumer:
- 订阅多个topic
from kafka import KafkaConsumer
consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
consumer.subscribe(topics= ['my_topic', 'topic_1'])
for msg in consumer:
- 正则订阅一类topic
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))
consumer.subscribe(pattern= '^my.*')
for msg in consumer:
2. KafkaProducer
。producer选择好partition后,增加record到对应topic和partition的batch record。最后,专有线程负责发送batch record到合适的Kafka broker
,当producer收到Kafka broker的response会触发回调函数
The producer consists of
a pool of buffer space
that holds records that haven’t yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster.
- bootstrap_servers – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the
producer should contact to bootstrap initial cluster metadata
. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092.send
(topic, value=None**,** key=None**,** headers=None**,** partition=None**,** timestamp_ms=None**)**
- a key to associate with the message. Can be used to determine which partition to send the message to. If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly). Must be type bytes, or be serializable to bytes via configured key_serializer.
- 发送字符串类型的key和value
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
future = producer.send('my_topic' , key= b'my_key', value= b'my_value', partition= 0)
result = future.get(timeout= 10) #等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用time.sleep代替
- 消费者收到的为字符串类型,就需要解码操作,key_deserializer= bytes.decode
from kafka import KafkaConsumer
consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], key_deserializer= bytes.decode, value_deserializer= bytes.decode)
consumer.subscribe(pattern= '^my.*')
for msg in consumer:
- 发送msgpack消息: msgpack为MessagePack的简称,是高效二进制序列化类库,比json高效
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
- 可压缩消息发送, 若消息过大,还可压缩消息发送,可选值为 ‘gzip’, ‘snappy’, ‘lz4’, or None
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], compression_type='gzip')
future = producer.send('my_topic' , key= b'key_3', value= b'value_3', partition= 0)
future.get(timeout= 10)
3. KafkaAdminClient
The KafkaAdminClient class will negotiate for the latest version of each message protocol format supported by both the kafka-python client library and the Kafka broker.
4. KafkaClient
A network client for asynchronous request/response network I/O. This is an internal class used to implement the user-facing producer and consumer clients.
5. BrokerConnection
Initialize a Kafka broker connection
6. ClusterMetadata
A class to manage kafka cluster metadata. This class does not perform any IO. It simply updates internal state given API responses (MetadataResponse, GroupCoordinatorResponse).
ConsumerRecord(topic='my_topic', partition=0, offset=4, timestamp=1529569531392, timestamp_type=0, key=b'my_value', value=None, checksum=None, serialized_key_size=8, serialized_value_size=-1)
- topic
- partition
- offset : 这条消息的偏移量
- timestamp : 时间戳
- timestamp_type : 时间戳类型
- key : key值,字节类型
- value : value值,字节类型
- checksum : 消息的校验和
- serialized_key_size : 序列化key的大小
- serialized_value_size : 序列化value的大小,可以看到value=None时,大小为-1
7. Example
- producer
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['broker1:1234'])
# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')
# Block for 'synchronous' sends
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
# produce keyed messages to enable hashed partitioning
producer.send('my-topic', key=b'foo', value=b'bar')
# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('json-topic', {'key': 'value'})
# produce asynchronously
for _ in range(100):
producer.send('my-topic', b'msg')
def on_send_success(record_metadata):
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
# handle exception
# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)
# block until all async messages are sent
# configure multiple retries
producer = KafkaProducer(retries=5)
- consumer.py
from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)
# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
# consume msgpack
# StopIteration if no message after 1sec
# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('my-topic',
consumer2 = KafkaConsumer('my-topic',
2. Application
.1. 日志分析平台
Kafka 性能高效,采集日志时业务无感知以及Hadoop/ODPS 等离线仓库存储和 Storm/Spark 等实时在线分析对接的特性决定它非常适合作为"日志收集中心"。
.2. 网站活动跟踪
通过消息队列 Kafka 版可以
。发布-订阅的模式可以根据不同的业务数据类型,将消息发布到不同的 Topic;还可通过订阅消息的实时投递,将消息流用于实时监控与业务分析或加载到 Hadoop、ODPS 等离线数据仓库系统进行离线处理
.3. 流计算
,由于数据产生快、实时性强、数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需求。而大数据消息中间件 Kafka 以及 Storm/Samza/Spark 等流计算引擎的出现,可以根据业务需求对数据进行计算分析,最终把结果保存或者分发给需要的组件。
.4. 数据中转枢纽
KV存储(HBase)、搜索(ElasticSearch)、流式处理(Storm/Spark Streaming/Samza)、时序数据库(OpenTSDB)
等专用系统应运而生,产生了同一份数据集需要被注入到多个专用系统内的需求。利用大数据消息中间件 Kafka 作为数据中转枢纽,同份数据可以被导入到不同专用系统中。