2017-08-01 40 views
2

我正尝试在Azure托管的kubernetes集群上设置多代理kafka。我有一个经纪人设置工作。对于多代理设置,目前我有一个管理kafka服务的zookeeper节点(3)的集合。我将kafka群集部署为复制因子为3的复制控制器。即3个经纪人。我如何使用Zookeeper注册三个经纪人,使他们在Zookeeper中注册不同的IP地址?如何为kubernetes上的Kafka multi broker设置指定公布的侦听器并在集群外公开集群?

在部署服务后启动我的复制控制器,并在我的replication-controller yaml文件中使用群集IP来指定两个advertised.listeners,一个用于SSL,另一个用于PLAINTEXT。但是,在这种情况下,所有代理都使用相同的IP注册并写入副本失败。我不想将每个代理作为单独的复制控制器/ pod和服务部署,因为扩展成为问题。我真的很感激任何想法/想法。

编辑1:

我还试图集群暴露到另一个VPC云。我必须为我使用advertised.listeners的客户端公开SSL和PLAINTEXT端口。如果我使用复制因子为3的statefulset,并让kubernetes将主机名的标准主机名显示为主机名,则无法从外部客户端解析这些主机名。我得到这个工作的唯一方法是使用/公开与每个代理对应的外部服务。但是,这并没有规模。

+0

你是否曾经能够解决这个问题? –

+0

嗨迈克,我还没有解决卡夫卡的“易扩展性”问题。我正在使用statefulset(对于一个perisistent卷)中的每个代理,并将广告主机作为Ingress-IP-address(到我的kubernetes集群):port,为不同的代理提供不同的端口。这很难扩展,因为需要为更多经纪商开放新的端口,并且需要相应地更改broker.yaml中的广告端口。 – Annu

回答

1

Kubernetes的概念是Statefulsets来解决这些问题。 statefulset的每个实例都有自己的DNS名称,因此您可以通过dns名称引用每个实例。

该概念更详细地描述为here。你也可以看看这个complete example

apiVersion: v1 
kind: Service 
metadata: 
    name: zk-headless 
    labels: 
    app: zk-headless 
spec: 
    ports: 
    - port: 2888 
    name: server 
    - port: 3888 
    name: leader-election 
    clusterIP: None 
    selector: 
    app: zk 
--- 
apiVersion: v1 
kind: ConfigMap 
metadata: 
    name: zk-config 
data: 
    ensemble: "zk-0;zk-1;zk-2" 
    jvm.heap: "2G" 
    tick: "2000" 
    init: "10" 
    sync: "5" 
    client.cnxns: "60" 
    snap.retain: "3" 
    purge.interval: "1" 
--- 
apiVersion: policy/v1beta1 
kind: PodDisruptionBudget 
metadata: 
    name: zk-budget 
spec: 
    selector: 
    matchLabels: 
     app: zk 
    minAvailable: 2 
--- 
apiVersion: apps/v1beta1 
kind: StatefulSet 
metadata: 
    name: zk 
spec: 
    serviceName: zk-headless 
    replicas: 3 
    template: 
    metadata: 
     labels: 
     app: zk 
     annotations: 
     pod.alpha.kubernetes.io/initialized: "true" 

    spec: 
     affinity: 
     podAntiAffinity: 
      requiredDuringSchedulingIgnoredDuringExecution: 
      - labelSelector: 
       matchExpressions: 
        - key: "app" 
        operator: In 
        values: 
        - zk-headless 
       topologyKey: "kubernetes.io/hostname" 
     containers: 
     - name: k8szk 
     imagePullPolicy: Always 
     image: gcr.io/google_samples/k8szk:v1 
     resources: 
      requests: 
      memory: "4Gi" 
      cpu: "1" 
     ports: 
     - containerPort: 2181 
      name: client 
     - containerPort: 2888 
      name: server 
     - containerPort: 3888 
      name: leader-election 
     env: 
     - name : ZK_ENSEMBLE 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: ensemble 
     - name : ZK_HEAP_SIZE 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: jvm.heap 
     - name : ZK_TICK_TIME 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: tick 
     - name : ZK_INIT_LIMIT 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: init 
     - name : ZK_SYNC_LIMIT 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: tick 
     - name : ZK_MAX_CLIENT_CNXNS 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: client.cnxns 
     - name: ZK_SNAP_RETAIN_COUNT 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: snap.retain 
     - name: ZK_PURGE_INTERVAL 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: purge.interval 
     - name: ZK_CLIENT_PORT 
      value: "2181" 
     - name: ZK_SERVER_PORT 
      value: "2888" 
     - name: ZK_ELECTION_PORT 
      value: "3888" 
     command: 
     - sh 
     - -c 
     - zkGenConfig.sh && zkServer.sh start-foreground 
     readinessProbe: 
      exec: 
      command: 
      - "zkOk.sh" 
      initialDelaySeconds: 15 
      timeoutSeconds: 5 
     livenessProbe: 
      exec: 
      command: 
      - "zkOk.sh" 
      initialDelaySeconds: 15 
      timeoutSeconds: 5 
     volumeMounts: 
     - name: datadir 
      mountPath: /var/lib/zookeeper 
     securityContext: 
     runAsUser: 1000 
     fsGroup: 1000 
    volumeClaimTemplates: 
    - metadata: 
     name: datadir 
    spec: 
     accessModes: [ "ReadWriteOnce" ] 
     resources: 
     requests: 
      storage: 20Gi 
+0

嗨卢卡斯,感谢您的回应。然而,我关心的是,所有经纪人(作为复制控制器的一部分)向zookeeper注册了相同的advertised.listener主机和端口。如果他们是有组织的一部分,他们会注册不同吗?即使他们仍然在宣传相同的主机和端口?我会尝试这种方法,让知道! – Annu

+0

Hi @Annu,这种方法是在Kubernetes上运行多动物园管理员和多经纪人Kafka集群的经过验证的方法。你可以看一下https://github.com/kubernetes/charts/tree/master/incubator/kafka,它显示了一个很好的默认Kafka部署Kubernetes与头盔。 –

+0

嗨@Luckas,我仍然在努力让它与StatefulSet一起工作。我正在使用advertised.listeners来公开SSL和PLAINTEXT端口。我的问题是我需要能够通过云中不同的VPC访问集群。我对卡夫卡的理解是,生产者/消费者需要能够直接连接(解析)经纪人以读取/写入数据。我不想使用三种不同的服务来揭露我的经纪人。但我没有看到任何其他方式去实现它。 – Annu