Comment puis-je produire des messages avec l'API Kafka 8.2 en Java?


J'essaie de travailler avec l'API kafka en java. J'utilise la dépendance maven suivante:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.0</version>
</dependency>

J'ai du mal à me connecter à un serveur kafka distant. J'ai changé le kafka du serveur'.l'attribut de port de fichier des propriétés doit être le port 8080. Je peux démarrer le zookeeper et le serveur kafka sans problème. Je peux également utiliser les applications console producer et consumer fournies avec le téléchargement kafka. (Version Scala 2.10)

J'utilise le code client suivant pour créer une télécommande KafkaProducer

Properties propsProducer = new Properties();

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("topic.metadata.refresh.interval.ms", "0");

KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer);

Une fois que j'ai créé le producteur, je peux exécuter la ligne suivante et obtenir des informations de sujet valides renvoyées, étant donné que strTopic est un nom de sujet existant.

List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic);

Lorsque j'essaie d'envoyer un message, je fais ce qui suit:

ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes());

RecordMetadata futureData = m_kafkaProducer.send(prMessage).get();

L'appel à send() bloque indéfiniment et lorsque je termine manuellement le processus, je vois que l'ERREUR Ferme le socket en raison d'une erreur sur le serveur kafka(IOException, Réinitialisation de la connexion par le pair) erreur.

Aussi, ça ne vaut rien que le host.name, advertised.host.name, et annoncé.les propriétés du port sont toujours commentées sur le serveur.les propriétés de fichier. Oh, et si je change la ligne:

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");

À

propsProducer.put("bootstrap.servers", "127.0.0.1:8080");

Et exécutez-le sur le même serveur que celui où le serveur kafka est installé, cela fonctionne mais j'essaie de travailler avec lui à distance.

Appréciez toute aide et si je peux clarifier du tout, faites-le moi savoir.

Author: Patrick Hovsepian, 2015-03-30

1 answers

Après beaucoup de recherches, j'ai décidé d'implémenter l'exemple trouvé ici: Kafka Producer Example. J'ai raccourci le code et n'ai pas implémenté de classe de partitionneur. J'ai mis à jour mon pom avec la dépendance répertoriée et j'avais toujours le même problème. En fin de compte, j'ai fait quelques changements de configuration et tout a fonctionné.

La dernière pièce du puzzle était de définir le serveur Kafka dans /etc/hosts du serveur et des machines clientes. J'ai ajouté ce qui suit aux deux fichier.

172.xx.xx.xxx     serverHost1

Encore une fois, les x ne sont que des masques. Ensuite, je mets le advertised.host.name dans le serveur.fichier de propriétés vers serverHost1. REMARQUE: J'ai obtenu cette adresse IP après avoir exécuté un ifconfig sur la machine serveur.

J'ai changé la ligne

propsProducer.put("metadata.broker.list", "172.xx.xx.xxx:8080");

À

propsProducer.put("metadata.broker.list", "serverHost1:8080");

L'API Kafka n'a pas aimé le fait que je définissais une IP comme une chaîne. Au lieu de cela, il recherchait l'IP à partir du fichier etc/hosts bien que la documentation indique:

" Nom d'hôte auquel le courtier annoncera les producteurs et les consommateurs. S'il n'est pas défini, il utilise la valeur pour "host.name" si configuré. Sinon, il utilisera la valeur renvoyée par java.net.InetAddress. getCanonicalHostName ()."

Qui retournera simplement l'IP, sous la forme de chaîne, que j'utilisais précédemment si ce n'est pas défini dans etc/hosts de la machine cliente, sinon il renvoie le nom associé à l'IP (serverHost1 dans mon cas). De plus, je n'ai jamais défini la valeur de host.name soit.

 3
Author: Patrick Hovsepian, 2015-04-02 15:24:47