Saltar al contenido

Mejorando el rendimiento en spark usando particiones – Hadoop en el mundo real

22 de julio de 2020

En esta entrada del blog vamos a mostrar cómo optimizar tu trabajo en spark partiendo los datos correctamente. Para demostrarlo vamos a utilizar el conjunto de datos públicos del College Score Card, que tiene varios puntos de datos clave de universidades de todo Estados Unidos. Vamos a calcular el promedio de las tasas de los estudiantes por estado con este conjunto de datos.

Partición y barajadura

La barajadura es una operación costosa, ya sea que la hagas con los viejos programas de MapReduce o con Spark. La barajadura es el proceso de llevar los pares de valores clave de diferentes mapeadores (o tareas en Spark) por medio de la llave a un solo reductor (tarea en Spark). Así que todos los pares de valores clave de la misma clave terminarán en una tarea (nodo). Así que podemos hacer un bucle a través de los pares de valores clave y hacer la agregación necesaria.

Dado que los trabajos de producción suelen implicar muchas tareas en spark, el movimiento de los pares de valores clave entre los nodos durante la barajadura (de una tarea a otra) causará un cuello de botella significativo. En algunos casos la barajadura no se puede evitar, pero en muchos casos se podría evitar la barajadura estructurando los datos de forma poco diferente. Evitar la barajadura tendrá un impacto positivo en el rendimiento.

Repartir el conjunto de datos en Parquet

Nuestro conjunto de datos está actualmente en formato de parquet. El conjunto de datos ya está dividido por estado (nombre de columna – STABBR). Hemos establecido el número de particiones en 10. Tenemos 50 estados en los EE.UU., así que al dividir el conjunto de datos por estado y establecer el número de particiones en 10, terminaremos con 10 archivos. La partición de los datos por estado nos ayuda a almacenar todos los registros de un estado dado en un archivo para que los datos de un estado no se dispersen en muchos archivos. Una partición podría tener registros para más de un estado.

Por ejemplo, en tiempo de ejecución si Spark decide que todos los registros del estado de NY deben ir a la partición # 2, entonces todos los registros de NY del conjunto de datos se escribirán en el archivo # 2.

Recomendado:  ¿Cómo funciona el Cartesian Product Join en Spark? - Hadoop en el mundo real

Veamos cómo podemos particionar los datos como se explica arriba en Spark. Inicialmente el conjunto de datos estaba en formato CSV. Vamos a convertir el formato de archivo a Parquet y junto con eso usaremos la función de repartición para dividir los datos en 10 particiones.

import org.apache.spark.sql.SaveMode
val colleges = spark
.read.format("csv")
.option("header", "true")
.load("/user/hirw/input/college")

val colleges_rp = colleges.repartition(partitionExprs = col("STABBR"), numPartitions = 10)

colleges_rp.write
.mode(SaveMode.Overwrite)
.format("parquet")
.option("inferSchema", false)
.save("/user/hirw/input/college_parquet")

El parquet es un formato mucho más eficiente en comparación con el CSV. Puedes comparar el tamaño del conjunto de datos CSV y el del Parquet para ver la eficiencia. El conjunto de datos CSV tiene un tamaño de 147 MB y el mismo conjunto de datos en el formato Parquet tiene un tamaño de 33 MB. Parquet no sólo ofrece eficiencia en el almacenamiento, sino también en la ejecución.

hirw@play2:~$ hadoop fs -du -s -h /user/hirw/input/college
147.3 M 441.8 M /user/hirw/input/college
hirw@play2:~$ hadoop fs -du -s -h /user/hirw/input/college_parquet
32.9 M 98.7 M /user/hirw/input/college_parquet

¿Cómo resulta la partición en la mejora del rendimiento?

Cuando los datos ya están particionados en una columna y cuando realizamos operaciones de agregación en la columna particionada, la tarea de spark puede simplemente leer el archivo (partición), hacer un bucle a través de todos los registros de la partición y realizar la agregación y no tiene que ejecutar una barajadura porque todos los registros necesarios para realizar la agregación están dentro de la única partición. Ninguna barajadura equivaldría a un mejor rendimiento.

Ahora que ya hemos dividido nuestro conjunto de datos en la columna de estados y nos gustaría calcular el promedio de las tasas estudiantiles por estado, estamos agregando los datos en la misma columna que se utiliza para la división. Así que esto no debería resultar en una barajadura. ¿Qué opina usted?

Vamos a intentarlo. Aquí está el código para calcular los honorarios promedio por estado.

import org.apache.spark.sql.functions._

val colleges_parquet = spark.read
.format("parquet")
.option("header", true)
.option("inferSchema", false)
.load("/user/hirw/input/college_parquet")

val avg_student_fees_by_state = colleges_parquet.groupBy($"STABBR").agg(avg($"TUITIONFEE_OUT")).alias("avg_out_student_fees")
avg_student_fees_by_state.show

~~ Output ~~

+——+——————-+
|…y la gente de la ciudad…
+——+——————-+
| AZ| 13763.746268656716|
| SC| 16507.64|
| LA 13080. 415384615384.
| MN| 15321.97247706422|
| NJ| 17568.434210526317|
| DC| 23772.0|
| O 18013. 036363636365.
| VA| 18098.120689655174|
| RI| 29762.5|
| KY| 16749.76388888889|
| 8425.555555555555|
| NH| 21777.76|
| MI| 15696.561403508771|
| NV| 14703.55|
| WI| 16624.0641025641|
| ID| 14148.157894736842|
| CA| 16642.457386363636|
| CT| 22527.555555555555|
| NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO.
| MT| 11756.625|
+——+——————-+
sólo mostrando las 20 primeras filas

Recomendado:  Construyendo una tubería de datos con Apache NiFi - Hadoop en el mundo real

Aquí está la visualización del trabajo que hizo la agregación y calculó el promedio y allí se puede ver, dos etapas estuvieron involucradas y la representación de abajo indica una barajadura.

Barajar en los datos divididos

¿Por qué Spark ejecutó una barajadura, a pesar de que los datos ya están divididos? La razón es simple, Spark no sabe que los datos fueron particionados. No había ningún metadato asociado a nuestro conjunto de datos que Spark pudiera leer para deducir que los datos están particionados.

Así que para evitar la barajadura tenemos que hacer saber a Spark que los datos están divididos.

Cubra los datos con la Colmena

Cuando Spark carga los datos que están detrás de una tabla de la Colmena, puede inferir cómo está estructurada la tabla mirando los metadatos de la tabla y al hacerlo entenderá cómo se almacenan los datos.

Ya que estamos tratando de agregar los datos por la columna de estado, podemos agrupar los datos por la columna de estado. Haciendo esto resultará en un resultado muy similar al de la operación de reparto que hicimos anteriormente.

Nota: Partición de soporte de colmena Por el funcionamiento también y la partición Por la columna de estado resultará en 50 archivos separados uno por estado.

En el siguiente código, organizamos nuestros datos en 10 cubos y los guardamos en una mesa de colmena.

import org.apache.spark.sql.SaveMode

val colleges = spark
.read.format("csv")
.option("header", "true")
.load("/user/hirw/input/college")

colleges.write
.bucketBy(10, "STABBR")
.saveAsTable("colleges_hive")

Cuando repartimos el conjunto de datos fijando el número de pariciones en 10, terminamos con 10 particiones o archivos. Ahora veamos cuántos archivos obtuvimos cuando repartimos los datos estableciendo el número de cubos a 10.

hirw@play2:~$ hadoop fs -ls /user/hive/warehouse/colleges_hive
Found 31 items

Vemos un total de 31 artículos. ¿Por qué? ¿No deberíamos ver sólo 10? La tarea que cargó la tabla de la Colmena resultó en 3 tareas y cada tarea creó 10 cubos resultando en 3 *10= 30 archivos. El archivo 31 es el archivo _SUCCESS que es un archivo vacío creado por Spark indicando una carga exitosa de datos.

Recomendado:  Construyendo Grandes Tuberías de Flujo de Datos - arquitectura, conceptos y elección de herramientas - Hadoop en el Mundo Real

Esto significa que los registros de un solo estado pueden repartirse en 3 archivos porque cada tarea que procese una porción del conjunto de datos podría haber procesado los registros de un estado determinado. ¿Qué sucedería si tratamos de calcular las tasas promedio por estado? ¿Será Spark capaz de aprovechar los metadatos de la tabla de la Colmena y entender que los datos están en un cubo y evitar una barajadura?

Vamos a intentarlo. Aquí está el código para calcular el promedio de las cuotas de los estudiantes por estado en la tabla de la colmena

val colleges = spark.sql("SELECT * FROM colleges_hive")
val avg_student_fees_by_state = colleges.groupBy($"STABBR").agg(avg($"TUITIONFEE_OUT")).alias("avg_out_student_fees")
avg_student_fees_by_state.show

~~ Output ~~

+——+——————-+
|…y la gente de la ciudad…
+——+——————-+
| AZ| 13763.746268656716|
| SC| 16507.64|
| LA 13080. 415384615384.
| MN| 15321.97247706422|
| NJ| 17568.434210526317|
| DC| 23772.0|
| O 18013. 036363636365.
| VA| 18098.120689655174|
| RI| 29762.5|
| KY| 16749.76388888889|
| 8425.555555555555|
| NH| 21777.76|
| MI| 15696.561403508771|
| NV| 14703.55|
| WI| 16624.0641025641|
| ID| 14148.157894736842|
| CA| 16642.457386363636|
| CT| 22527.555555555555|
| NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO, NO.
| MT| 11756.625|
+——+——————-+
sólo mostrando las 20 primeras filas

Tenemos la misma salida, lo cual es genial. Revisemos la ejecución y veamos si spark hizo una barajadura o no.

La tabla de cubos de la colmena indica que no se puede barajar

Arriba está la etapa correspondiente que calculó el promedio de las cuotas de los estudiantes por estado y podemos ver que Spark no ejecutó la barajadura. Así que esto significa que incluso cuando los datos de un estado dado se dividieron posiblemente entre los 3 archivos, Spark fue capaz de inferir que la tabla es una tabla con cubos mirando los metadatos de la tabla y pasará a través del conjunto de datos para tener una idea sobre el contenido dentro de los cubos y fue capaz de realizar la ejecución evitando una barajadura.