kafka docker-compose


单实例

  • docker-compose启动
version: '2'

services:
  zoo1:
    image: wurstmeister/zookeeper
    restart: unless-stopped
    hostname: zoo1
    ports:
      - "2181:2181"
    container_name: zookeeper

  kafka1:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1"
    depends_on:
      - zoo1
    container_name: kafka

kafka集群


  • docker-compose启动
version: '3'

services:
  zoo1:
    image: wurstmeister/zookeeper
    restart: unless-stopped
    hostname: zoo1
    ports:
      - "2181:2181"
    container_name: zookeeper

  kafka1:
    image: wurstmeister/kafka
    ports:
      - "9091:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.16.149.242  # 此处填写为宿主机IP
      KAFKA_ADVERTISED_PORT: 9091
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1"
    depends_on:
      - zoo1
    container_name: kafka1
  kafka2:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.16.149.242
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zoo1
    container_name: kafka2
  kafka3:
    image: wurstmeister/kafka
    ports:
      - "9093:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.16.149.242
      KAFKA_ADVERTISED_PORT: 9093
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zoo1
    container_name: kafka3

  kafka-manage:
    image: sheepkiller/kafka-manager
    ports:
      - "9999:9000"
    environment:
      ZK_HOSTS: zoo1:2181
      APPLICATION_SECRET: "letmein"
    container_name: kafka-manager
    depends_on:
      - zoo1

测试脚本


  • 生产者
  • kafka-send.py
# -*- coding: utf-8 -*-

import json
from kafka import KafkaProducer


for i in range(500):
    msg_dict = {
    "sleep_time": 10,
    "db_config": {
        "database": "xx_db",
        "host": "xxxx"
        },
    "table": "msg",
    "msg": "Hello World"
    }   
    msg_dict["number"] = i
    msg=json.dumps(msg_dict)
    # print(msg)
    for host in ["172.16.149.242:9091","172.16.149.242:9092","172.16.149.242:9093"]:
        producer = KafkaProducer(bootstrap_servers=host)
        producer.send('sakura', value=msg.encode("utf-8"), partition=0)

producer.close()
  • 消费者
  • kafka-consumer.py
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer

consumer = KafkaConsumer('sakura', bootstrap_servers=["172.16.149.242:9092"])
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" %(msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print(recv)

文章作者: SakuraGaara
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 SakuraGaara !
  目录