Rafael Coelho
AllowMe

Em nosso post anterior sobre o Apache Kafka falamos sobre sua utilidade, objetivos e funcionamento. Agora vamos botar a mão na massa e ver como colocamos uma aplicação básica no ar. Os códigos apresentados abaixo fazem parte de um teste completo que tem o seguinte formato:

Praticando as funcionalidades do Apache Kafka

Subindo e executando o Kafka: Para começar, recomendamos baixar o Kafka que está disponível no site em formato ZIP. Este arquivo contém todos os scripts para executar o Kafka localmente. Também existe uma versão famosa dockerizada do Kafka feita pelo Spotify.

Para subir, é necessário rodar uma instância do Zookeeper, que acompanha o Kafka. Então, deve-se criar os tópicos informando o nome, a quantidade de replicações e de partições que cada tópico terá. Para isso, basta executar os seguintes comandos:

// Subir o zookeeper com suas configs default

bin/zookeeper-server-start.sh config/zookeeper.properties

// Subir o zookeeper com suas configs default

bin/kafka-server-start.sh config/server.properties

// Criar os tópicos do kafka

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic kafka_topic

// List os topicos existentes no kafka

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

O último comando exibe todos os tópicos criados no Kafka, se parar o Kafka e subir novamente, os tópicos continuarão no ar e ele continuará a distribuir as mensagens de onde parou. Então, para enviar ou receber uma mensagem, basta subir um consumer ou producer. Para fazer isso também são disponibilizados os seguintes scripts:

// Para produzir uma mensagem

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_topic

// Para consumir uma mensagem

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka_topic --from-beginning

PS: O comando “–from-beginning” faz com que o consumidor inicie a leitura do tópico a partir do offset 0 ao subir.

Kafka Publish and Subscriber: Inicialmente precisamos importar o cliente que  o próprio Apache Kafka fornece, usando o Maven (arquivo pom.xml):

<dependencies>

<dependency>

  <groupId>org.apache.kafka</groupId>

   <artifactId>kafka-clients</artifactId>

   <version>2.3.0</version>

   </dependency>

</dependencies>

O programa, em Java, abaixo, é de um producer que gera uma mensagem em Json:

package myapps;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.serialization.LongSerializer;

import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Arrays;

import java.util.List;

import java.util.Properties;

import java.util.Random;

public class Producer {

    private static String topico = "streams-all-messages";

    private static String mensage = "{\"type\":\"adr\",\"content\":{“...”}}";

    public static void main(String[] args) throws InterruptedException {

        Properties properties = new Properties();

        // IP e Porta do broker bootstrap

        properties.put("bootstrap.servers", "localhost:9092");

        // Id do producer que esta se connectando no broker

        properties.put("client.id", "cliente.1");

        // A chave será serializada para Long

 properties.put("key.serializer", LongSerializer.class.getName());

        // O valor será serializado para String

        properties.put("value.serializer", StringSerializer.class.getName());

        KafkaProducer<String,String> producer = new KafkaProducer<>(properties);

System.out.println("Send on topic " + topico + " the message: " + mensage);

ProducerRecord<String,String> record = new ProducerRecord<>(topico, mensage);

producer.send(record);

    }

}

Já o modelo abaixo é de um Consumer que recebe quatro mensagens diferentes (sendo duas com dados em string e duas com dados em long):

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.LongDeserializer;

import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;

import java.util.Arrays;

import java.util.List;

import java.util.Properties;

public class Consumer {

    private static List<String> topics_type_1 

Arrays.asList("streams-all-messages",

  "streams-only-addresses",

  "streams-updated-addresses");

    private static List<String> topics_type_2 

     Arrays.asList("streams-count-updated-addresses");

 

    public static void main(String[] args) {

        Properties properties = new Properties();

        // IP e Porta do broker bootstrap

        properties.setProperty("bootstrap.servers", "localhost:9092");

        // Id do grupo do tópico que esta se conectando no broker

        properties.setProperty("group.id", "group.1");

        // Id do producer que esta se conectando no broker

        properties.setProperty("client.id", "cliente.1");

        // Define se o commit das mensagens lidas vai ser automático

        properties.setProperty("enable.auto.commit", "true");

        // Tempo maximo de espera do commit até ser considerado falha

        properties.setProperty("auto.commit.interval.ms", "5000");

        // Pega o offset mais recente ao se conectar com o kafka

        properties.setProperty("auto.offset.reset", "earliest");

        // Máximo de mensagem que irá ser pega por vez (lote)

        properties.setProperty("max.poll.records", "500");

        // Tempo do heartbeat que o componente irá enviar ao kafka

        properties.setProperty("heartbeat.interval.ms", "1000");

        // Espera do heartbeat do componente até ser considerado falha

        properties.setProperty("session.timeout.ms", "10000");

        KafkaConsumer<String, String> consumerType1 

  getConsumer(properties, topics_type_1, StringDeserializer.class.getName());

        KafkaConsumer<String, Long> consumerType2 

  getConsumer(properties, topics_type_2, LongDeserializer.class.getName());

        try {

            while (true) {

               consumerType1.poll(Duration.ofSeconds(10)).forEach(record -> {

           System.out.println(record.topic()+": "+record.key()+" - "+record.value());

               });

               consumerType2.poll(Duration.ofSeconds(10)).forEach(record -> {

            System.out.println(record.topic()+": "+record.key()+" - "+record.value());

                });

            }

        } finally {

            consumerType1.close();

            consumerType2.close();

        }

    }

    private static <T> KafkaConsumer<String, T> getConsumer(Properties properties

  List<String> topics_type,

  String valueDeserializer) {

        // A chave será serializada para Long

        properties.setProperty("key.deserializer", StringDeserializer.class.getName());

        // O valor será serializado para String

        properties.setProperty("value.deserializer", valueDeserializer);

        KafkaConsumer<String, T> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(topics_type);

        return consumer;

    }

}

Kafka Streams: Tanto o Kafka Streams como Kafka Tables trabalham com o conceito de Topologia. O stream nada mais é que a execução das topologias informadas. Ao gerar um stream ou table, ele retorna a classe Topology, que será executada (da mesma forma que os predicados no próprio Stream do Java).

Inicialmente deve-se importar a biblioteca do Kafka Streams usando o Maven:

<dependencies>

<dependency>

  <groupId>org.apache.kafka</groupId>

   <artifactId>kafka-streams</artifactId>

   <version>2.3.0</version>

   </dependency>

</dependencies>

Abaixo, mais um modelo de programa em Java, que é um Stream que “ouve” o tópico “streams-all-messages” e filtra a mensagem apenas quando ela for do tipo “Adr” (Address):

import com.google.gson.Gson;

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.*;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;

import static java.util.Objects.nonNull;

public class FilterAddresses {

    public static void main(String[] args) throws Exception {

        FilterAddresses filterAddresses = new FilterAddresses();

        filterAddresses.run();

    }

    private void run() {

        Properties properties = getProperties();

        Topology topology = getTopology();

        KafkaStreams streams = new KafkaStreams(topology, properties);

        CountDownLatch latch = new CountDownLatch(1);

        shutdown(streams, latch);

        try {

            streams.start();

            latch.await();

        } catch (Throwable e) {

            System.exit(1);

        }

        System.exit(0);

    }

    private Topology getTopology() {

        StreamsBuilder builder = new StreamsBuilder();

        builder.<String, String>stream("streams-all-messages")

                .map((key, value) -> KeyValue.pair(key, parseAsMessage(value)))

                .filter((key, value) -> nonNull(value) && value.isAddress())

                .map((key,value) -> KeyValue.pair(key,parseAsString(value.getContent())))

                .to("streams-only-addresses");

        return builder.build();

    }

    private Message parseAsMessage(String value) {

        try {

            return (new Gson()).fromJson(value, Message.class);

        } catch (Exception ex) {

            System.out.println(ex.getMessage());

            return null;

        }

    }

    private String parseAsString(UpdateAddresses.Message value) {

        try {

            return (new Gson()).toJson(value);

        } catch (Exception ex) {

            System.out.println(ex.getMessage());

            return null;

        }

    }

    private Properties getProperties() {

        Properties properties = new Properties();

        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");

        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,

  Serdes.String().getClass());

   properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG

  Serdes.String().getClass());

        properties.setProperty("group.id", "group.2");

        return properties;

    }

    private void shutdown(KafkaStreams streams, CountDownLatch latch) {

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {

            @Override

            public void run() {

                streams.close();

                latch.countDown();

            }

        });

    }

    class Message {

        String type;

        UpdateAddresses.Message content;

        public boolean isAddress() {

            return MsgType.ADDRESS.getValue().equals(type);

        }

        public UpdateAddresses.Message getContent() {

            return content;

        }

    }

    enum MsgType {

        ADDRESS("adr");

        String type;

        MsgType(String type) {

            this.type = type;

        }

        String getValue() {

            return type;

        }

    }

}

Kafka Tables: O Kafka tables faz parte do Streams Framework, assim, basta importar o mesmo framework, conforme foi feito anteriormente via Maven.

O exemplo a seguir é um stream que processa a mensagem sumarizando e separando os endereços que foram filtrados no stream acima, tornando eles uma Kafka Table:

import com.google.gson.Gson;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.*;

import org.apache.kafka.streams.kstream.KGroupedStream;

import org.apache.kafka.streams.kstream.KStream;

import org.apache.kafka.streams.kstream.KTable;

import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

import java.util.UUID;

import java.util.concurrent.CountDownLatch;

import static java.util.Objects.nonNull;

public class UpdateAddresses {

    public static void main(String[] args) throws Exception {

        UpdateAddresses updateAddresses = new UpdateAddresses();

        updateAddresses.run();

    }

    private void run() {

        Properties properties = getProperties();

        Topology topology = getTopology();

        KafkaStreams streams = new KafkaStreams(topology, properties);

        CountDownLatch latch = new CountDownLatch(1);

        shutdown(streams, latch);

        try {

            streams.start();

            latch.await();

        } catch (final Throwable e) {

            System.out.println(e.getMessage());

        }

    }

    private Topology getTopology() {

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> stream = builder.stream("streams-only-addresses");

        KGroupedStream<String, String> result = stream

                .map((key, value) -> KeyValue.pair(key,parseAsMessage(value)))

                .filter((key, value) -> nonNull(value))

                .selectKey((keu, value) ->  value.getPerson())

                .map((key, value) -> KeyValue.pair(key,parseAsString(value)))

                .groupByKey();

        KTable<String, String> address = result.reduce((key, value) -> value);

        KTable<String, Long> changes = result.count();

]  address.toStream().to("streams-updated-addresses"

  Produced.with(Serdes.String(), Serdes.String()));

       changes.toStream().to("streams-count-updated-addresses"

  Produced.with(Serdes.String(), Serdes.Long()));

        return builder.build();

    }

    private Message parseAsMessage(String value) {

        try {

            return (new Gson()).fromJson(value, Message.class);

        } catch (Exception ex) {

            System.out.println("parseAsMessage: " + ex.getMessage());

            System.out.println(ex.getStackTrace());

            return null;

        }

    }

    private String parseAsString(Message value) {

        try {

            return (new Gson()).toJson(value);

        } catch (Exception ex) {

            System.out.println("parseAsString: " + ex.getMessage());

            System.out.println(ex.getStackTrace());

            return null;

        }

    }

    private Properties getProperties() {

        Properties properties = new Properties();

        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount2");

        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

  properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG

  Serdes.String().getClass().getName());        

  properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG

  Serdes.String().getClass().getName());

        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        properties.setProperty("group.id", "group.2");

        return properties;

    }

    private static void shutdown(KafkaStreams streams, CountDownLatch latch) {

        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {

            @Override

            public void run() {

                streams.close();

                latch.countDown();

            }

        });

    }

    class Message {

        String person;

        String street;

        Integer number;

        String neighborhood;

        String city;

        String state;

        String country;

        String zipcode;

        String getPerson() { return person; }

    }

}

Conclusões: O Apache Kafka é uma ferramenta incrível, e além de extremamente performática, é segura e resiliente. São várias as empresas que utilizam o Kafka, e no geral, são empresas que necessitam de extrema performance em suas aplicações, como Netflix, Spotify e no próprio AllowMe, aqui na Tempest.