preloader
  • Inicio
  • Jugando con Postgres y Kafka

blog-thumb

Apache Kafka y Postgres: Capacidades de transacción e informes.

Apache Kafka es una conocida plataforma de transmisión distribuida para el procesamiento de datos y mensajería consistente. Le permite centralizar constantemente los flujos de datos para varios propósitos al consumirlos y producirlos.

Uno de los ejemplos de una buena implementación es la implementación de la canalización de datos de Mozilla, particularmente porque muestra a Kafka como un punto de entrada del flujo de datos. Esto le permite conectar nuevos almacenes de datos debjao de su flujo, lo que facilita el uso de diferentes formatos de almacén de datos (como DRBMS o Documentos, etc.) para recuperar y escribir datos de manera eficiente.

El agua embotellada de Postgres es un enfoque diferente que merece una mención. En este caso, las intancias de Postgres son los productores, los corredores consumen las transmisiones y mantienen el almacén de mensajes disponibles para cualquier acción. La ventaka aquí son las conocidas capacidades ACID de Postgres, combinadas con funciones avanzadas de SQL, este proyecto es una extensión, lo que significa que es posible utilizar las nuevas funciones próximas de Postgres fácilmente portátiles.

También es posible consumir y producir datos para un intermediario mediante el uso de una nueva función que amplió la herramienta COPY para ejecutar comandos de shell para operaciones de entrada / salida. Un buen punto culminante de esta función se puede leer aquí.

Kafkacat y Librdkafka

Kafkacat es una herramienta basada en la biblioteca del mismo auto librdkafka que hace exactamente lo que dice su nombre: producir y consumir desde un cat comando tipo broker de Kafka.

Produciendo a Kakfa broker

Producir datos falsos para corredor de Kafka, compuesto por key y payload:

# Random text
readtext() {cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1}
while (true) ;
    do
        for i in $(seq 1 50)
            do echo "$(uuidgen);$(randtext)"
            done | kafkacat -P -b localhost:9092 -qe -K ';' -t PGSHARD
            sleep 10
        done

-K La opción define el delimitador entre la clave y la carga útil, -t define el tema para el que desea producir. Originalmente, este tema se ha creado con 3 particiones(o -2), lo que nos permitirá consumir datos en diferentes canales, abriendo la puerta a la paralelización

Las claves no son obligatorias cuando se envía a un corredor y, de hecho, para ciertas soluciones puede omitirlas.

Consumir y producir dentro de una instancia de Postgres

La sintaxis general será algo parecido a:

COPY main(group_id,payload)
    FROM PROGRAM
    'kafkacat -C -b localhost:9092 -c100 -qeJ -t PGSHARD -X group.id=1 -o beginning -p 0 | awk ''{print "P0\t" $0}'' ';

La tubería de código a un awk no es estrictamente necesaria y es solo para mostrar la flexibilidad de la función. Al usar la opción -J, la salida se imprimirá em formato json. que contiene toda la información del mensaje, incluida la partición, la clave y el mensaje.

-c La opción limitará la cantidad de finlas en la operación. Como COPY es transaccional, tenga en cuenta que cuanto mayor sea la cantidad de filas, mayor será la transacción y los tiempos de COMMIT se verán afectados

Consumir temas de forma incremental

Consumir las particiones de temas de beginning y establecer un límite de 100 documentos es tan fácil como:

bin/psql -p7777 -Upostgres master <<EOF
COPY main(group_id,payload) FROM PROGRAM  'kafkacat -C -b localhost:9092 -c100 -qeJ -t PGSHARD -X group.id=1 -o beginning -p 0 | awk ''{print "P0\t" $0}'' ';
COPY main(group_id,payload) FROM PROGRAM  'kafkacat -C -b localhost:9092 -c100 -qeJ -t PGSHARD -X group.id=1 -o beginning -p 1 | awk ''{print "P1\t" $0}'' ';
COPY main(group_id,payload) FROM PROGRAM  'kafkacat -C -b localhost:9092 -c100 -qeJ -t PGSHARD -X group.id=1 -o beginning -p 2 | awk ''{print "P2\t" $0}'' ';
EOF

Y luego usar stored, para consumir dede la ultima compensación consumida por el consumidor en el grupo:

bin/psql -p7777 -Upostgres master <<EOF
COPY main(group_id,payload) FROM PROGRAM  'kafkacat -C -b localhost:9092 -c100 -qeJ -t PGSHARD -X group.id=1 -o stored -p 0 | awk ''{print "P0\t" $0}'' ';
COPY main(group_id,payload) FROM PROGRAM  'kafkacat -C -b localhost:9092 -c100 -qeJ -t PGSHARD -X group.id=1 -o stored -p 1 | awk ''{print "P1\t" $0}'' ';
COPY main(group_id,payload) FROM PROGRAM  'kafkacat -C -b localhost:9092 -c100 -qeJ -t PGSHARD -X group.id=1 -o stored -p 2 | awk ''{print "P2\t" $0}'' ';
EOF

Cada línea COPY se puede ejecutar en paralelo en diferentes instancias de Postgres, lo que hace que este enfoque sea flexible y fácil de escalar en una placa de servidores.

Esto no es del todo coherente, ya que una vez que se consume la compensación, se marcará en el corredor, ya que si la transacción falla en el lado de Postgres puede potencialmente provocar pérdida de datos.

Producir mensajes desde la instancias de Postgres

De la misma manera es posible consumir cambios, es posible hacer lo mismo para producir datos al broker. Esto puede ser increíblemente útil para microagregaciones, realizadas sobre los datos brutos consumidos del corredor.

El siguiente ejemplo muestra como ejecutar una consulta simple con una agregración muy simplista y publicarla en formato JSON para el corredor:

master=# COPY (select row_to_json(row(row() ,group_id , count(*))) from main group by group_id)
        TO PROGRAM 'kafkacat -P -b localhost:9092 -qe  -t AGGREGATIONS';
COPY 3

Si tiene una granja de servidores y desea buscar el contenido del tema usando una clave, puede hacer el siguiente ajuste:

COPY (select inet_server_addr() || ';' , row_to_json(row(row() ,group_id , count(*))) from main group by group_id)    )
    TO PROGRAM 'kafkacat -P -K '';'' -b localhost:9092 -qe  -t AGGREGATIONS';

Así es como se ven las cargas útiles publicadas (sin llave):

-> PG10 Kafkacat -C -b localhost:9092 -qeJ -t AGGREGATIONS -X group.id=1 -o beginning
{"topic": "AGREGATIONS","partition":0,"offset":0,"Key":"","payload":"{\"f1\":\"2017-02-24T12:31:13.711732-03:00\",\"f2\":\"P0\",\"f3\":172}"}
{"topic": "AGREGATIONS","partition":0,"offset":1,"Key":"","payload":"{\"f1\":\"2017-02-24T12:31:13.711732-03:00\",\"f2\":\"P1\",\"f3\":140}"}
{"topic": "AGREGATIONS","partition":0,"offset":2,"Key":"","payload":"{\"f1\":\"2017-02-24T12:31:13.711732-03:00\",\"f2\":\"P2\",\"f3\":155}"}

.. y con llave:

{"topic": "AGREGATIONS","partition":0,"offset":3,"Key":"127.0.0.1/32","payload":"\t{\"f1\":\"2017-02-24T12:40:39.017644-03:00\",\"f2\":\"P0\",\"f3\":733}"}
{"topic": "AGREGATIONS","partition":0,"offset":4,"Key":"127.0.0.1/32","payload":"\t{\"f1\":\"2017-02-24T12:40:39.017644-03:00\",\"f2\":\"P1\",\"f3\":944}"}
{"topic": "AGREGATIONS","partition":0,"offset":5,"Key":"127.0.0.1/32","payload":"\t{\"f1\":\"2017-02-24T12:40:39.017644-03:00\",\"f2\":\"P2\",\"f3\":716}"}

Manipulación básica de temas

Si es nuevo en Kafka, le resultará útil contar con algunos ejemplos de comandos para jugar con su corredor local:

Empezando todo:

bin/zookeeper-server-start-sh config/zookeeper.properties 2> zookeeper.log &
bin/kafka-server-star-sh config/server.properties 2> kafka.log &

Creando temas y otros:

bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic PGSHARD
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic PGSHARD
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic AGGREGATIONS
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic AGGREGATIONS

NOTA: Para eliminar temas, debe habilitar el delete.topic.enable=true archivo en server.properties

Espero que encuentres este artículo de utilidad!