单实例
version: '2'
services:
zoo1:
image: wurstmeister/zookeeper
restart: unless-stopped
hostname: zoo1
ports:
- "2181:2181"
container_name: zookeeper
kafka1:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1"
depends_on:
- zoo1
container_name: kafka
kafka集群
version: '3'
services:
zoo1:
image: wurstmeister/zookeeper
restart: unless-stopped
hostname: zoo1
ports:
- "2181:2181"
container_name: zookeeper
kafka1:
image: wurstmeister/kafka
ports:
- "9091:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.16.149.242
KAFKA_ADVERTISED_PORT: 9091
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1"
depends_on:
- zoo1
container_name: kafka1
kafka2:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.16.149.242
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zoo1
container_name: kafka2
kafka3:
image: wurstmeister/kafka
ports:
- "9093:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.16.149.242
KAFKA_ADVERTISED_PORT: 9093
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zoo1
container_name: kafka3
kafka-manage:
image: sheepkiller/kafka-manager
ports:
- "9999:9000"
environment:
ZK_HOSTS: zoo1:2181
APPLICATION_SECRET: "letmein"
container_name: kafka-manager
depends_on:
- zoo1
测试脚本
import json
from kafka import KafkaProducer
for i in range(500):
msg_dict = {
"sleep_time": 10,
"db_config": {
"database": "xx_db",
"host": "xxxx"
},
"table": "msg",
"msg": "Hello World"
}
msg_dict["number"] = i
msg=json.dumps(msg_dict)
for host in ["172.16.149.242:9091","172.16.149.242:9092","172.16.149.242:9093"]:
producer = KafkaProducer(bootstrap_servers=host)
producer.send('sakura', value=msg.encode("utf-8"), partition=0)
producer.close()
from kafka import KafkaConsumer
consumer = KafkaConsumer('sakura', bootstrap_servers=["172.16.149.242:9092"])
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" %(msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print(recv)