Pregunta ¿Cómo crear Kafka ZKStringSerializer en Java?


Al buscar cómo crear un tema de Kafka a través de la API, encontré este ejemplo en Scala:

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient

// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs,
                            connectionTimeoutMs, ZKStringSerializer)

// Create a topic with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName, 
                       numPartitions, replicationFactor, topicConfig)

Fuente: https://stackoverflow.com/a/23360100/871012

El último arg ZKStringSerializer es aparentemente un objeto Scala. No me queda claro cómo hacer que este ejemplo funcione en Java.

Esta publicación Cómo crear un objeto scala en clojure hace la misma pregunta en Clojure y la respuesta fue:

ZKStringSerializer$/MODULE$

que en Java (creo) se traduciría a:

ZKStringSerializer$.MODULE$

Pero cuando intento eso (o cualquier cantidad de otras variaciones) ninguno de ellos compila.
El error de compilación es:

KafkaTopicCreator.java:[16,18] cannot find symbol
symbol:   variable ZKStringSerializer$
location: class org.sample.KafkaTopicCreator

Estoy usando kafka_2.9.2-0.8.1.1 y Java 8.


5
2017-11-18 23:28


origen


Respuestas:


Para Java prueba lo siguiente,

Primera importación debajo de la declaración

import kafka.utils.ZKStringSerializer$;

Crear objeto para ZkClient de la siguiente manera,

String zkHosts = "127.0.0.1:2181"; //If more than one zookeeper then "127.0.0.1:2181,127.0.0.2:2181"
ZkClient zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

El código anterior no funcionará para kafka> 0.9 ya que la API ha sido modificada,   Use el siguiente código para kafka> 0.9

import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            String topicName = "testTopic";
            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
}

17
2017-11-20 11:25