python

超轻量级php框架startmvc

python3连接kafka模块pykafka生产者简单封装代码

更新时间:2020-08-15 19:18:01 作者:startmvc
1.1安装模块pipinstallpykafka1.2基本使用#-*coding:utf8*-frompykafkaimportKafkaClienthost='IP:9092,IP:9092,IP:9092

1.1安装模块


pip install pykafka

1.2基本使用


# -* coding:utf8 *- 
from pykafka import KafkaClient 
host = 'IP:9092, IP:9092, IP:9092'
client = KafkaClient(hosts = host) 
# 生产者 
topicdocu = client.topics['my-topic'] 
producer = topicdocu.get_producer() 
for i in range(100): 
 print i 
 producer.produce('test message ' + str(i ** 2)) 
producer.stop()

1.3简单封装


class KafkaProduct():

 def __init__(self,hosts,topic):
 """
 初始化实例
 :param hosts: 连接地址
 :param topic:
 """
 self.__client = KafkaClient(hosts=hosts)
 self.__topic = self.__client.topics[topic.encode()]

 def __set_topic(self, topic):
 self.__topic = self.__client.topics[topic.encode()]

 def set_topic(self, topic):
 """
 设置topic
 :param topic:
 :return:
 """
 self.__set_topic(topic)

 def get_topics(self):
 """
 获取当前所有topic
 :return:
 """
 return self.__client.topics

 def get_topic(self):
 """
 获取当前topic
 :return:
 """
 return self.__topic

 def Producer(self):
 """
 生产者对象
 :return:
 """
 with self.__topic.get_producer(delivery_reports=True) as producer:
 next_data = ''
 while True:
 if next_data:
 producer.produce(str(next_data).encode())
 next_data = yield True

 def send_data(self,datas):
 """
 发送数据
 :param datas:需要传入的可迭代对象
 :return:
 """
 c = self.Producer()
 next(c)
 for i in datas:
 c.send(i)

if __name__ == '__main__':

hosts = "1.2.3.4:9999,2.3.4.5:9090" #连接hosts
topic = "test_523"
K = KafkaProduct(hosts=hosts, topic=topic) #
#K.set_topic("test") #切换设置新的topic
K.get_topic() #获取当前设置的topic
#K.get_topics() #获取所有topic
data = range(10000) #要发送的可迭代对象
K.send_data(data)

以上这篇python3连接kafka模块pykafka生产者简单封装代码就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

python3 kafka pykafka