Data aggregation

Why to use reduceByKey?

Using reduceByKey is more efficient than groupByKey and then mapping, because it starts the aggregation on the individual nodes first and sends the data over the network (shuffling) after the initial reducing phase, which hopefully results in less data sent.

spark reduce by key Image from: https://www.coursera.org/learn/scala-spark-big-data/lecture/bT1YR/shuffling-what-it-is-and-why-its-important

Partitioning

Why to use mapValues?

Using mapValues preserves the partitioning in oposite to map, which does not preserve partitioning (because it can change the key of the data!).

Narrow vs. wide dependencies

Narrow dependencies are the ones that happen when resulting elements in RDD have at most one parent element in each of the input RDDs. Wide dependencies are the ones that can have multiple dependencies from the parent elements.

Debugging dependencies:

  • use .dependencies on RDD
  • use .toDebugString on RDD

DataFrames

Usefool functions

  • .show() - shows first 20 elements
  • .printSchema() - shows schema

Data cleansing

  • drop(params) - drops NaN / null
  • fill(Map("col"->value)) - fill column "col" with value where "col" is null / NaN
  • replace(Array("col"), Map(1->2, 3->4)) - replaces values from Map keys in "col" to values from Map

Joins

// join_type can be: inner, outer, left_outer, right_outer, leftsemi; default is "inner"
df1.join(df2, $"df1.id" === $"df2.other_id", join_type)

Spark DevOps

Caching Spark dependencies in Docker

Some libraries, like SynapseML don't use spark.jars, instead they use spark.jars.packages:

import pyspark
spark = (pyspark.sql.SparkSession.builder.appName("MyApp")
        .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.11.2") # Please use 0.11.2 version for Spark3.2 and 0.11.2-spark3.3 version for Spark3.3
        .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
        .getOrCreate())
import synapse.ml

This is problematic when used in Docker (e.g. with Spark on Kubernetes), because the depenedencies will be installed at runtime. To cache this during docker build use:

RUN touch /tmp/empty.py && \
    spark-submit --master local --conf spark.jars.packages="com.microsoft.azure:synapseml_2.12:0.11.2" --conf spark.jars.repositories="https://mmlspark.azureedge.net/maven" \
    --conf spark.jars.excludes="org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind" /tmp/empty.py \
    && mv ~/.ivy2/jars/* $SPARK_HOME/jars && rm -rf ~/.ivy2

Azure Databricks

Read from Azure Blob Storage / Data Lake Storage Gen2 with SAS tokens

Generate SAS token in the Azure UI, then set config:

import getpass
storage_account = "yolo"
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", getpass.getpass())

Reading binary files from Azure Blob Storage / Data Lake Storage Gen2

With SAS tokens - the problem is that the Azure Storage Account must have the Hierarchical Namespace enabled in order to use adfss:// protocol.

Moreover, there is sth wrong with the spark.sparkContext.binaryFiles - it does not work properly, even though the credentials are set. You will get:

Failure to initialize configuration for storage account <storage account>.dfs.core.windows.net: Invalid configuration value detected for fs.azure.account.keyInvalid configuration value detected for fs.azure.account.key

which is total pain. For binary files use:

spark.read.format("binaryFile").load(f"abfss://datasets@{storage_account}.dfs.core.windows.net/data/*.zip")

Spark Configuration

Mouting volumes in Spark on Kubernetes

spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-kedro.options.claimName: OnDemand
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-kedro.options.storageClass: standard-rwo
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-kedro.options.sizeLimit: 64Gi
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-kedro.mount.path: /pd-volume
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-kedro.mount.readOnly: false
No matches...