1. Descripción

El siguiente documento se detalla la integración de las herramientas Attunity Replicate y Apache kafka utilizando Amazon Managed Streaming for Apache Kafka (Amazon MSK), Con el objetivo de realizar ingesta de datos en tiempo real y ser consumidos desde múltiples plataformas. Se presentará la instalación de las herramientas, configuración y posterior integración, mediante un caso de uso de ejemplo.


2. Conceptos

2.1. Apache Kafka

2.1.1. ¿Qué es Apache Kafka?

2.1.2. Terminología

Broker Servidor Kafka. Varios broker forman un cluster.
Cluster Grupo de dos o más brokers que brindan redundancia y escalabilidad
Producer Cliente Kafka. Es quien envía los mensajes al broker.
Consumer Cliente Kafka. Es quien recibe los mensajes desde el broker.
Topic Un topic es una categoría en la que se publican los registros. Cada Topic de Kafka puede tener desde cero a muchos consumidores que se suscriben a los datos escritos en él.
Partition Subconjunto de topics creado para permitir la redundancia y lectura / escritura paralela para un mayor rendimiento
Message Cualquier búfer (texto / binario). El formato debe coordinarse entre producers y consumers
Message Key Opcional. Utilizado para particionamiento y compactación.
Offset Posición de un mensaje en una partición. Utilizado por el consumidor.

2.2. Amazon MSK

2.2.1. ¿Qué es Amazon MSK?

2.3. Attunity Replicate

2.3.1. ¿Qué es Attunity Replicate?


3. Cluster Amazon MSK

Antes de crear un clúster de Amazon MSK se deben realizar configuraciones preliminares para desplegar su entorno.

3.1. Crear una VPC para el clúster de MSK

En el primer paso de la Introducción al uso de Amazon MSK, se utiliza la Consola de Amazon VPC para crear una Amazon Virtual Private Cloud (Amazon VPC). Creará un Clúster de MSK en esta VPC más adelante.

Para realizar esta acción debe seguir los siguientes pasos:

Paso 3: Seleccione Select (Seleccionar) para aceptar la configuración predeterminada de Amazon VPC, denominada VPC with a Single Public Subnet (VPC con una única subred pública).

3.2. Añadir subredes a la VPC

Para asegurar la alta disponibilidad y la tolerancia a errores Amazon MSK requiere que sea distribuido en más de una subred y en más de una zona de disponibilidad.

Amazon MSK requiere que se creen mínimo 2 y máximo 3 subredes cada una en una zona de disponibilidad distinta.

Para realizar esta acción debe seguir los siguientes pasos:

3.3. Creación clúster de MSK

Una vez configurada la VPC en la cual estará alojada el clúster de Amazon MSK, podemos proceder a la creación de este mismo.

Para realizar esta acción debe seguir los siguientes pasos:



4. Cliente Amazon MSK

4.1. Creación cliente Amazon MSK

Para poder acceder al cluster se creará una instancia EC2 como cliente de Amazon MSK. Este cliente se utilizará para crear los tópicos y almacenar los servicios que permitirán consumir los datos. Por motivos de simplicidad, pondremos este equipo cliente en la misma VPC que el clúster de Amazon MSK.

Para realizar esta acción se deben realizar los siguientes pasos:

4.2. Configuración de cliente Amazon MSK

Una vez creado el cliente para Amazon MSK se debe configurar las reglas del grupo de seguridad para permitir la conexión entre el clúster y el cliente que acabamos de crear.

Para realizar esta acción se deben realizar los siguientes pasos:



Paso 5:
En el apartado Security Groups de la consola Amazon VPC pegue la VPC ID copiada en el paso anterior y péguela en el buscador.Paso 6: Seleccionar el grupo de seguridad por defecto de la VPC

Paso 7: En la pestaña Inbound Rules (Reglas de entrada), elija Edit rules (Editar reglas).

Paso 8: Seleccione Add Rule (Añadir regla).

Paso 9: En la nueva regla, elija All traffic (Todo el tráfico) en la columna Type (Tipo). En el segundo campo de la columna Source (Origen) escriba el ID del grupo de seguridad del equipo cliente obtenida en el paso 2.

Paso 10: Seleccione Save rules (Guardar reglas).

Repita estos pasos para añadir una regla de entrada en el grupo de seguridad que se corresponda con su equipo cliente para permitir que reciba tráfico del grupo de seguridad del VPC. Ahora su equipo cliente puede comunicarse de manera bidireccional con el Clúster de MSK.

Una vez realizado esto se puede acceder al cliente recién creado y configurado vía ssh.

4.3. Creación de tópicos

Una vez creado el cliente que tiene acceso al clúster de Amazon MSK se procede a crear tópicos a los cuales enviar y consumir mensajes

4.3.1. Pre requisitos.

Posteriormente se procede a crear los tópicos siguiendo los siguientes pasos:

El valor ClusterArn se obtiene seleccionando el cluster creado anteriormente desde la consola de Amazon MSK. https://console.aws.amazon.com/msk/

Una vez ingresado el comando anterior obtenemos el siguiente resultado:
{
«ClusterInfo»: {
«EncryptionInfo»: {
«EncryptionInTransit»: {
«ClientBroker»: «TLS_PLAINTEXT»,
«InCluster»: true
},
«EncryptionAtRest»: {
«DataVolumeKMSKeyId»: «arn:aws:kms:us-east-1:581027662077:key/1db13367-6660-4bc8-a575-ffd215041ef1»
}
},
«BrokerNodeGroupInfo»: {
«BrokerAZDistribution»: «DEFAULT»,
«ClientSubnets»: [
«subnet-0538eb3e4ad6c12a5»,
«subnet-0fb732dbc7c9f752d»
],
«StorageInfo»: {
«EbsStorageInfo»: {
«VolumeSize»: 100
}
},
«SecurityGroups»: [
«sg-08b9d453e3f4d22eb»
],
«InstanceType»: «kafka.t3.small»
},
«ClusterName»: «Kafka-test»,
«CurrentBrokerSoftwareInfo»: {
«KafkaVersion»: «2.2.1»
},
«Tags»: {
«owner»: «cvalenzuela»,
«Project»: «poc-kafka»
},
«CreationTime»: «2020-05-15T15:42:33.683Z»,
«NumberOfBrokerNodes»: 2,

        «ZookeeperConnectString»: «z-3.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:2181,z-1.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:2181,z-2.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:2181»,

        «State»: «ACTIVE»,

        «CurrentVersion»: «K3AEGXETSR30VB»,

        «ClusterArn»: «arn:aws:kafka:us-east-1:581027662077:cluster/Kafka-test/6fed5882-f025-4369-a775-71be9b525521-5»,

        «EnhancedMonitoring»: «DEFAULT»,

        «OpenMonitoring»: {
«Prometheus»: {
«NodeExporter»: {
«EnabledInBroker»: false
},
«JmxExporter»: {
«EnabledInBroker»: false
}
}
}
}
}

bin/kafka-topics.sh –create –zookeeper –replication-factor 2 –partitions 1 –topic

Si el comando se ejecuta correctamente, verá el siguiente mensaje: Created topic .


4.4. Producir y consumir datos localmente

Para probar que la conexión hacia clúster y el tópico recién creado sea satisfactoria se realizará una prueba produciendo y consumiendo datos localmente desde el cliente de Amazon MSK.

Para hacer esto se deben seguir los siguientes pasos:

Debe configurar sus credenciales de AWS en el cliente utilizando el comando aws config

El valor ClusterArn se obtiene seleccionando el cluster creado anteriormente desde la consola de Amazon MSK. https://console.aws.amazon.com/msk/

{
«BootstrapBrokerStringTls»: «b-2.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:9094,b-1.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:9094»,

«BootstrapBrokerString»: «b-2.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:9092,b-1.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:9092»
}

Si todo se ha ejecutado de manera correcta podremos observar que al escribir un mensaje desde la consola del productor (izquierda) se replica hacia la consola del consumidor (derecha).


5. Productor de datos

Una vez creado el clúster y el tópico de Apache Kafka utilizando Amazon MSK, se procede a configurar el productor de datos que es el que se encargará de enviar los mensajes hacia el clúster. Para hacer esto se utilizara Attunity Replicate ya que nos permite la ingesta de datos desde múltiples fuentes hacia un tópico en específico de Apache Kafka.

5.1. Instalación Attunity Replicate

Para la instalación de esta herramienta se puede utilizar una imagen AMI con Attunity previamente instalado, por lo que sólo se debe crear la instancia EC2 correspondiente a esta herramienta. En este caso se llama Attunity Replicate Capacitacion y es una instancia windows por lo que una vez creada se puede acceder a esta mediante windows remote.

Se creará esta instancia dentro del VPC donde se encuentra el cluster, por lo que se debe configurar eso al momento de crear la imagen de la herramienta. Para hacer esto se deben realizar los siguientes pasos:


5.2. Configuración Attunity Replicate

Una vez creada la instancia que contiene Attunity Replicate se debe configurar las reglas del grupo de seguridad para permitir la conexión entre el clúster y Attunity Replicate.

Para realizar esta acción se deben realizar los siguientes pasos:

Repita estos pasos para añadir una regla de entrada en el grupo de seguridad que se corresponda con la instancia de Attunity Replicate para permitir que reciba tráfico del grupo de seguridad del VPC. Ahora su instancia de Attunity Replicate puede comunicarse de manera bidireccional con el Clúster de MSK.

Una vez realizado esto se puede acceder a la instancia mediante windows remote.

5.3. Acceso a Attunity Replicate

Una vez ingresado a la instancia EC2 de Attunity Replicate para acceder a la herramienta se debe buscar Attunity Replicate Console desde el buscador de aplicaciones de windows.

5.4. Configuración de conectores

Attunity Replicate permite acceder o enviar datos mediante conectores que pueden ser de tipo source o tipo target.

Para manejar estos conectores seleccionar la siguiente opción:

5.4.1. Configurar source connector

Este tipo de conectores permiten obtener la información desde diferentes fuentes. Los tipo soportados por los conectores source de Attunity son:

Para crear un conector de tipo source se deben seguir los siguientes pasos:

Una vez dentro de la pestaña de Manage Endpoint Connections que es de donde se manejan los conectores seleccionar New Endpoint Connection

Seleccionar tipo source en el checkbox de role y completar datos del conector según el tipo de este.

A modo de ejemplo se crea conector source de tipo Amazon RDS for MySQL

Los datos de origen corresponden a una base de datos que contiene información de ciudades del mundo, recopilados en el siguiente enlace https://dev.mysql.com/doc/index-other.html

Finalmente se prueba que la conexión al conector sea realizada con éxito y se guarda el conector presionando save

5.4.2. Configurar target connector

Este tipo de conectores permiten enviar la información de un source hacia otro tipo de aplicación. Los tipo soportados por los conectores target de Attunity son:

Para crear un conector de tipo target se deben seguir los siguientes pasos:

Una vez dentro de la pestaña de Manage Endpoint Connections que es de donde se manejan los conectores seleccionar New Endpoint Connection

Seleccionar tipo target en el checkbox de role y completar datos del conector según el tipo de este.

IMPORTANTE: recordar que el broker servers es donde está alojado el cluster, en este caso es el valor BootstrapBrokerString obtenido en el paso 1 del apartado “Producir y consumir datos localmente” de este mismo documento.

5.5. Tasks

Attunity Replicate permite replicar datos desde un punto de origen hacia un punto objetivo. Para realizar esto se utilizan las tareas o task. Cada tarea tiene un punto de origen (Conector Source) y punto objetivo (Conector target).

5.5.1.  Creation of Tasks

Para crear una nueva tarea se debe presionar la opción New Task desde la pantalla principal, como se muestra a continuación.

Posterior a esto se deben especificar el nombre y la descripción del task que debe considerar la acción que realizará ese caso de uso de replicate.

También se debe configurar el Replication Profile que describa a la tarea a realizar. hay 3 tipos de replication profile, los cuales son :

Finalmente se configuran las opciones del task donde se eligen al menos una de las opciones que se presentan, las cuales son:

En este caso en particular se selecciona el replication pr
ofile Unidirectional ya que solo se requiere enviar datos hacia un punto final y en task options se seleccionan Full Load para realizar una carga inicial de datos y Apply Changes para enviar datos a medida que los registros de la base de datos cambian, quedando de la siguiente manera.

5.5.2. Configuración de Tarea

Una vez creada la tarea se debe configurar para su funcionamiento.

Como primer paso se deben asignar los conectores que se utilizarán en la tarea, esto se hace arrastrándolos hacia su posición dentro de la tarea de la siguiente forma.

Posterior a esto se seleccionan las tablas a replicar hacia el destino seleccionado.

En este caso se utilizará la tabla city del esquema world.

Una vez creada la tarea que replicara los datos hacia el clúster de Kafka, sólo queda iniciar la tarea recién creada presionando Run en la parte superior de la interfaz.

5.6. Producir datos

Una vez presionado Run para iniciar la tarea comienza el Full Load o la carga inicial de datos para observar esto nos debemos dirigir al monitor de Attunity Replicate.

Luego de realizarse la carga inicial de datos la tarea no se detiene ya que al agregar la opción de Apply Changes queda a la espera de la realización de algún cambio en la base de dato para poder replicar un nuevo mensaje hacia el tópico suscrito.

Para visualizar esto se realiza un nuevo registro en la tabla city en la base de datos contemplando los siguientes datos:

«Name»: «Cibinong»,
«CountryCode»: «IDN»,
«District»: «West Java»,
«Population»: 101300

Una vez realizado esto se visualiza en el monitor de Attunity el cambio realizado en la base de datos

Para observar los mensajes replicados por Attunity Replicate se debe seguir el paso 6 del apartado “Producir y consumir datos localmente” de este mismo documento.

El formato del mensaje obtenido por Amazon MSK es el siguiente:

{
«magic»: «atMSG»,
«type»: «DT»,
«headers»: null,
«messageSchemaId»: null,
«messageSchema»: null,
«message»: {
«data»: {
«ID»: 4097,
«Name»: «Jombang»,
«CountryCode»: «IDN»,
«District»: «East Java»,
«Population»: 92600
},
«beforeData»: null,
«headers»: {
«operation»: «INSERT»,
«changeSequence»: «20200520193227000000000000000000005»,
«timestamp»: «2020-05-20T19:32:27.000»,
«streamPosition»: «mysql-bin-changelog.004646:350:0:416:19954418057435:mysql-bin-changelog.004646:292»,
«transactionId»: «000000000000000000001226000000DB»,
«changeMask»: «1F»,
«columnMask»: «1F»,
«transactionEventCounter»: 1,
«transactionLastEvent»: true
}
}
}


6. Consumidor de datos

Una vez publicados los mensajes en el tópico del clúster de Amazon MSK, se debe crear un consumidor de estos datos, para realizar esto configuraremos un Proxy-Rest en el cliente de Amazon MSK.

Se implementará el Proxy-Rest de Confluent el cual proporciona una interfaz RESTful a un clúster de Kafka, lo que facilita la producción y el consumo de mensajes, ver el estado del clúster y realizar acciones administrativas sin utilizar el protocolo o los clientes nativos de Kafka.

6.1. Configuración Proxy-Rest

Para realizar la configuración del Proxy-Rest se deben seguir los siguientes pasos:

Puede utilizar su editor de texto de su preferencia.

6.2. Api Proxy-Rest

El Proxy-Rest que provee confluent cuenta con diferentes endpoint a los cuales recurrir para administrar Apache Kafka mediante peticiones HTTP. Permite consumir y producir datos, crear y eliminar tópicos, obtener datos de los consumidores, etc.

La documentación de dicha API se encuentra disponible en: https://docs.confluent.io/current/kafka-rest/api.html#crest-api-v2

Se utilizó la versión 2 de esta API.

A continuación se presenta un archivo adjunto que contiene un conjunto de peticiones que permiten probar la API de antes referenciada.

Amazon MSK.postman_collection.json

Archivo compatible con Postman.

Cambiar
el host del clúster de Amazon MSK en cada petición.

6.3. Creación de consumidor

Para la creación del consumidor de los datos, se utilizó NodeJs y la dependencia Kafka-Rest (https://www.npmjs.com/package/kafka-rest).

El código que permite realizar la creación del consumidor es el siguiente:

‘use strict’
var KafkaRest = require(‘kafka-rest’);
var kafka = new KafkaRest({ ‘url’: ‘http://:8082’ });

//Permite crear el consumidor

//NOMBRE_CONSUMIDOR = nombre del consumidor

kafka.consumer(«NOMBRE_CONSUMIDOR»).join({

    «format»: «binary»,

    «auto.offset.reset»: «smallest»,

    «auto.commit.enable»: «false»

}, function (err, consumer_instance) {

//El crear el consumidor retorna una instancia que es la que permite suscribirse al tópico
    //TOPICO = nombre del tópico    var stream = consumer_instance.subscribe(‘TOPICO’)
//Establece conexión con el tópico y espera la llegada de nuevos mensajes

    stream.on(‘data’, function (msgs) {
for (var i = 0; i < msgs.length; i++) {
//Json que contiene metadata del tópico
            var metadata = {
«topic»: stream.topic,
«key»: msgs[i].key,
«partition»: msgs[i].partition,
«offset»: msgs[i].offset
}
//Json que contiene el mensaje
            //El mensaje viene cifrado en base64
            var data = JSON.parse(msgs[i].value.toString(‘utf8’))
console.log({ metadata, data });
}
});
//Actúa cuando existe una interrupción en la conexión con el clúster
    stream.on(‘error’, function (err) {
console.log(«Something broke: » + err);
});
});

6.4. Consumo de datos

El código anteriormente expuesto imprime en la consola los mensajes del tópico una vez que son producidos por Attunity Replicate.

El formato del mensaje obtenido es el siguiente:

{
metadata: {
topic: ‘test’,
key: null,
partition: 0,
offset: 16
},
data: {
magic: ‘atMSG’,
type: ‘DT’,
headers: null,
messageSchemaId: null,
messageSchema: null,
message: {
data: {
ID: 1017,
Name: ‘Jombang’,
CountryCode: ‘IDN’,
District: ‘East Java’,
Population: 92600
},
beforeData: null,
headers: {
operation: ‘DELETE’,
changeSequence: ‘20200520194057000000000000000000029’,
timestamp: ‘2020-05-20T19: 40: 57.000′,
streamPosition:’mysql-bin-changelog.004648: 729: 0: 795: 19963007992027:mysql-bin-changelog.004648: 671’,
transactionId: ‘000000000000000000001228000000DB’,
changeMask: ’01’,
columnMask: ‘1F’,
transactionEventCounter: 4,
transactionLastEvent: true

            }
}
}
}


7. Conectores

Apache Kafka no sólo permite producir o consumir datos, sino que también permite añadirle conectores que ayudan a sincronizar los mensajes que llegan a un tópico y enviarlos a sistemas externos como bases de datos, almacenes de valores clave, índices de búsqueda y sistemas de archivos. Para realizar esta tarea se utilizará el servicio kafka-connect provisto por Confluent en el cliente de Amazon MSK

Para efectos prácticos se creará un conector que envié los mensajes en formato JSON a un bucket de s3.

7.1. Descargar Plugins

Para realizar esto se deben seguir los siguientes pasos:

Si ya realizó los pasos para configurar el proxy-rest saltarse el paso 2

unzip archive

7.2. Configuración Kafka-Connect

Para realizar esto se deben seguir los siguientes pasos:

7.3. Configuración Conector

Para realizar esto se deben seguir los siguientes pasos:

Dentro del archivo realizar la siguiente configuración:

name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=
s3.region=
s3.bucket.name=
s3.part.size=5242880
flush.size=100000

key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.conver
ter.schemas.enable: false

storage.class=io.confluent.connect.s3.storage.S3Storage
#format.class=io.confluent.connect.s3.format.avro.AvroFormat
format.class=io.confluent.connect.s3.format.json.JsonFormat
#partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner

schema.compatibility=NONE

locale: es_CL
timezone: UTC
path.format: YYYY-MM-dd/HH
partition.duration.ms: 3600000
rotate.interval.ms: 60000
timestamp.extractor: Record
Reemplazar los datos que están entre <>

7.4. Despliegue conexión

Una vez descargado el plugin y configurados los archivos tanto de kafka-connect como del propio conector se procede a desplegar la conexión y ver cómo se sincronizan los mensajes contenidos en un tópico dentro del bucket de S3.

Para desplegar la conexión utilizar el siguiente comando dentro de la carpeta de Confluent:

bin/connect-standalone etc/kafka/connect-standalone.properties etc/kafka-connect-s3/quickstart-s3.properties

si no hay problemas se replican los datos del tópico dentro del bucket:


8. Fail Recovery

Si bien se espera que el sistema no presente fallas al momento de su despliegue hay que tener en consideración como mitigar o solucionar en caso de que ocurran es por esto que se detallarán como se manejan los errores en cada una de las partes.

8.1. Productor

Como se utilizó como productor de datos la herramienta Attunity Replicate esta permite en casos de errores en la conexión de alguno de los conectores de la tarea obtener un log detallado de todos los errores presentes en la operación.

8.2. Clúster Amazon MSK

Como principal barrera que tiene Amazon MSK para hacer frente a una posible caída de los servicios es que permite crear y replicar los brokers en distintas zonas de disponibilidad lo que no solo permite obtener una alta disponibilidad de los servicios sino que ayuda a que si ocurre un fallo de uno de los brokers existe otro en una zona de disponibilidad distinta que puede tomar su lugar.

Sin embargo cuando ocurre una falla masiva la única medida que existe en caso de que falle el clúster de Amazon MSK es creando uno nuevo. Por lo que se deben tomar medidas para mitigar o disminuir las consecuencias en caso de requerir realizar esta acción.

Antes de la catástrofe se recomienda utilizar kafka-connect para hacer un backup del stream de mensajes  enviándolos a un bucket de s3.

Revisar el apartado Conectores de este mismo documento.

Con el objetivo de que cuando suceda la catástrofe y se requiera crear un clúster nuevo se utilice ese backup de datos.

8.2.1. Procedimiento de recuperación

8.2.2. Métricas

Amazon MSK tiene métricas para medir ciertos aspectos del clúster. Estas metricas estan disponibles en https://docs.aws.amazon.com/es_es/msk/latest/developerguide/monitoring.html.

la única métrica que permite medir el estado de uno de sus elementos es:

ZooKeeperSessionState.

La cual se mide en los siguientes valores.

NOT_CONNECTED ‘0.0’
ASSOCIATING ‘0.1’
CONNECTING ‘0.5’
CONNECTEDREADONLY ‘0.8’
CONNECTED ‘1.0’
CLOSED ‘5.0’
AUTH_FAILED ‘10.0’

Esta métrica mide el estado de la sesión de ZooKeeper del cluster.

El Zookeeper se utiliza para almacenar el estado de los brokers.

8.3. Consumidor

En caso de que el consumidor presente algún tipo de fallas debe ser capaz de retomar desde el último mensaje consumido.

Para lograr esto se debe configurar el consumidor utilizando las siguientes opciones:

«auto.offset.reset»: «smallest»,
«auto.commit.enable»: «false»

Por defecto en la instancia del consumidor se guarda el offset del último mensaje consumido, por lo que este sabrá dónde retomar la tarea una vez de que ocurra algún tipo de falla en el consumidor.

También se debe configurar cuál será el offset inicial del consumidor. Use auto.offset.reset para definir el comportamiento del consumidor cuando no hay una posición comprometida (que sería el caso cuando el grupo se inicializa por primera vez) o cuando un offset está fuera de rango. Puede elegir restablecer la posición al offset más antiguo smallest  o al offset más reciente largest (el valor predeterminado).

Al inicializar auto.commit.enable en false nos aseguramos que solo se realice la transición al siguiente offset cuando el mensaje anterior es recibido de manera satisfactoria, si se inicializa en true realizará la transición o desplazamiento después de un tiempo determinado.

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *