preloader
  • Inicio
  • Importe datos desde Redshift a Clickhouse en un solo comando.

blog-thumb

Scope of this post

Si está leyendo el presente artículo, es porque está interesado en probar la funcionalidad de su set de datos residente en Redshift en un Clickhouse. En el presente artículo mostraremos algunos trucos para que acelere este proceso.

Actualización (4 de Julio): Aquí hay una serie de posteos sobre comparaciones entre ClickHouse vs Redshift, el primer post lo puedes encontrar aquí.

El modo estándar de mover los datos fuera de Redshift es utilizando UNLOAD, el cual guarda la salida en archivos ubicados en AWS S3. No es sorpresa que Redshift no soporta la utilidad de nativa COPY (<query>) TO STDOUT de Postgres (considerando que está basado en 8.0, una versión EOL), lo cual hubiese hecho más fácil todo el proceso. Más información al respecto, puede ser encontrada en la documentación de COPY para RS.

Clickhouse soporta varios motores de tablas, pero para empezar es recomendable hacerlo por el MergeTree. Es más limitado en cuanto a tipos de datos, pero son suficientes para analíticas decentes. Es recomendable agreagar soporte de “sampling” al momento de crear la tabla, esto se hace desde los parámetros del motor utilizando - en este caso - cityHash64 ya que no es de cifrado, además de tener un buen rendimiento y precisión.

Tabla en ClickHouse:

CREATE TABLE thenewdb.thetable (
normdate Date,
id String,
datefield DateTime,
(... many others ...)
data String
)
ENGINE = MergeTree(normdate,cityHash64(id), (datefield, id,cityHash64(id)),8192);

NOTE: Los parámetros del motor son: columna de fecha, la expresión para el “sampleo” (cityHash64) , la llave primaria (datefield,id) y la granularidad del índice.

Tabla en Redshift:

     Column     |            Type             | Modifiers
----------------+-----------------------------+-----------
 id             | character varying(32)       | not null
 datefield      | timestamp without time zone | not null
 (... other columns...)
  data           | character varying(8192)     |
Indexes:
    "thetable_pkey1" PRIMARY KEY, btree (id)

ClickHouse require una columna de fecha, lo que hay que considerar en cuanto a el tamaño de los datos. Más información de este requerimiento, puede ser encontrado en la documentación de MergeTree.

La magia

  • Es recomendable abrir una sesión en screen o tmux.

  • El siguiente comando ejecutará una consulta sobre la tabla de RS y a través de un pipe, insertará en la tabla destino.

time psql -h rs1.garbagestring.redshift.amazonaws.com \
          -p 5439 -U redshift thedatabase \
          -Antq --variable="FETCH_COUNT=1000000" -F $'\t' <<EOF | \
          clickhouse-client --database thenewdb --query="INSERT INTO thenewdb.thetable FORMAT TabSeparated"
select trunc(datefield),
  id,
  datefield::timestamp(0) ,
  store_id ,
(... many columns more ... )
  regexp_replace(data,'\\t|\\n','') 
from theoriginaltable
EOF

Calculando la RAM

El motor MergeTree es una implementación realmente interesante. No es un LSM ya que no procesa en memtables ni en log. Procesa los datos en lotes y escribe directamente en el sistema de archivos, consumiendo una cantidad significativa de RAM a costa de ahorrar operaciones de disco (y ocasionalmente ciclos de CPU) por parte de los trabajadores en segundo plano que realizan las funciones.

UN error común cuando te quedas sin memoria durante este proceso de merge que consume RAM es el siguiente:

Code: 240. DB::Exception: Allocator: 
Cannot mremap., errno: 12, strerror: Cannot allocate memory

La razón de porque sucede esto es debido a que los procesos en segundo plano consumieron una cantidad considerable de memoria RAM. Tenemos 5 elementos, en orden, para tener en mente cuando necesitamos calcular la cantidad de memoria que necesitamos:

  • background_pool_size es 6, determinando el máximo numero de merges en segundo plano.
  • Número máximo de piezas de merge durante el proceso de fusionado (por defecto 100)
  • Tamaño del bloque para el fusionado (merge) (8192 filas)
  • Tamaño medio de la fila sin comprimir
  • Asignación de memoria de sobrecarga máxima para los búferes (2)

Podemos asumir una fila de 1024 bytes y multiplicar todo lo anterior juntos. i.e. SELECT formatReadableSize( 2* 6 * 100 * 8192 * 1024);

El problema actual es que el algoritmo de fusión (merge) procesa por fila en lugar de cada columna por separado, y se espera que tenga una ganancia de rendimiento . Podemos probar la función vertical algorithm mediante el siguiente comando enable_vertical_merge_algorithm en el archivo de configuración.

Entonces, suponiendo que se obtiene un tamaño de fila de: 13557 bytes (14k) medido con la query 1), puedes obetener una aproximación de la RAM necesaria para el bloque de operaciones 2).

time psql -h rs-clusterandhash.us-east-1.redshift.amazonaws.com\
 -p 5439 -U redshift reportdb  -Antq --variable="FETCH_COUNT=1000000" -F $'\t' <<EOF | wc -c
select
  *
from big_table
LIMIT 1
EOF
13835
SELECT formatReadableSize((((2 * 6) * 100) * 8192) * 13557)
┌─formatReadableSize(multiply(multiply(multiply(multiply(2, 6), 100), 8192), 13557))─┐
│ 124.12 GiB                                                                         │
└────────────────────────────────────────────────────────────────────────────────────┘

Más información aquí: google groups thread.

Desafortunadamente, el cleinte no puede manejar esto apropiadamente, todavía. Incluso limitando el uso de memoria con --max_memory_usage 5GB (i.e), obtendremos un error diferente como el siguiente:

Code: 241. DB::Exception: 
Received from localhost:9000, 127.0.0.1. 
DB::Exception: Memory limit (for query) exceeded: 
would use 1.00 MiB (attempt to allocate chunk of 1048576 bytes), maximum: 5.00 B.

Si la memoria RAM necesaria está muy cerca de tu actual recurso, una posible solución podría ser usar el motor ReplacingMergeTree, pero la deduplicación no está garantizada y, efectivamente, vas a “jugar” en un limite reducido (deberías estar muy cerca de los calculos de arriba). También hay varios ajustes a nivel de motor para afinar dicho motor mergetree a través de la configuración en MergeTreeSettings.h

i.e., el siguiente reducirá el consumo de RAM significativamente, a costa de reducir también la durabilidad y cambiar el algoritmo de fusión.

    <merge_tree>
        <max_suspicious_broken_parts>20</max_suspicious_broken_parts>
        <enable_vertical_merge_algorithm>1</enable_vertical_merge_algorithm>
        <max_delay_to_insert>5</max_delay_to_insert>
        <parts_to_delay_insert>100</parts_to_delay_insert>
    </merge_tree>

La explicación

  • Porqué TabSeparated?

Clickhouse ofrece severos formatos input/output, muchos. Incluso, el tabulador en este caso parecía suficiente para importar textos planos (hasta que un JSON mágico con tabulaciones y nuevas líneas rompió la importación).

  • Porqué hacer un casting sin microsegundos ::timestamp(0)?

CH no soporta microsegundos.

  • Porqué hacer el reemplazo regexp_replace(data,'\\t|\\n','')?

Estamos importando utilizando TSV, que por norma no soporta nuevas líneas y, obviamente, tabulaciones. Lamentablemente, no es posible por el momento utilizar la codificación/decodificación usando base64 para insertar sin reemplazar (por streaming en los datos codificados y decodificados sobre la marcha por clickhouse)

  • P --variable="FETCH_COUNT=1000000"?

Esto es salsa!. psql intentará colocar todo el conjunto de resultados en la memoria, haciendo que la caja explote a los pocos minutos después de empezar a correr. Dentro de esto, se crea un cursor del lado del servidor, lo que nos permite importar el conjunto de resultados más grande que el cliente de la máquina

  • Porqué -F $'\t'?

Dependiendo de su shell, puede considerar esto. Necesita usar un tabulador literal, lo que significa que tiene que ser el propio carácter. En UNIX Ctrl-V tab Debe hacer la cosa.

Puedes hacer una pequeña prueba sobre esto con echo. La opción -e permite la interpretación de los ecapes de barra invertida. También printf es una opción limpia para imprimir caracteres especiales.

ubuntu@host:~$ echo $'\n'

ubuntu@host:~$ echo '\n'
\n
ubuntu@host:~$ echo -e '\n'

Número de la vida cotidiana

El proceso en sí es considerablemente rápido: movió una tabla de 15GB a un MergeTree de Clickhouse de unos 11GB en 20 minutos. Detalles de la instancia RS: dc1.large 15GB RAM, vCPU 2, 2 nodos + 1 coordinador. Instancia CH: EC2 r4.2xlarge único, volumen 3000 iops EBS ¡Espero que este consejo les haya resultado útil!