Data aggregation
Why to use reduceByKey?
Using reduceByKey
is more efficient than groupByKey
and then mapping, because it starts the aggregation on the individual nodes first and sends the data over the network (shuffling) after the initial reducing phase, which hopefully results in less data sent.
Image from: https://www.coursera.org/learn/scala-spark-big-data/lecture/bT1YR/shuffling-what-it-is-and-why-its-important
Partitioning
Why to use mapValues?
Using mapValues
preserves the partitioning in oposite to map
, which does not preserve partitioning (because it can change the key of the data!).
Narrow vs. wide dependencies
Narrow dependencies are the ones that happen when resulting elements in RDD have at most one parent element in each of the input RDDs. Wide dependencies are the ones that can have multiple dependencies from the parent elements.
Debugging dependencies:
- use
.dependencies
on RDD - use
.toDebugString
on RDD
DataFrames
Usefool functions
.show()
- shows first 20 elements.printSchema()
- shows schema
Data cleansing
drop(params)
- drops NaN / nullfill(Map("col"->value))
- fill column "col" with value where "col" is null / NaNreplace(Array("col"), Map(1->2, 3->4))
- replaces values from Map keys in "col" to values from Map
Joins
// join_type can be: inner, outer, left_outer, right_outer, leftsemi; default is "inner"
df1.join(df2, $"df1.id" === $"df2.other_id", join_type)
Spark DevOps
Caching Spark dependencies in Docker
Some libraries, like SynapseML don't use spark.jars
, instead they use spark.jars.packages
:
import pyspark
spark = (pyspark.sql.SparkSession.builder.appName("MyApp")
.config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.11.2") # Please use 0.11.2 version for Spark3.2 and 0.11.2-spark3.3 version for Spark3.3
.config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
.getOrCreate())
import synapse.ml
This is problematic when used in Docker (e.g. with Spark on Kubernetes), because the depenedencies will be installed at runtime. To cache this during docker build use:
RUN touch /tmp/empty.py && \
spark-submit --master local --conf spark.jars.packages="com.microsoft.azure:synapseml_2.12:0.11.2" --conf spark.jars.repositories="https://mmlspark.azureedge.net/maven" \
--conf spark.jars.excludes="org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind" /tmp/empty.py \
&& mv ~/.ivy2/jars/* $SPARK_HOME/jars && rm -rf ~/.ivy2
Azure Databricks
Read from Azure Blob Storage / Data Lake Storage Gen2 with SAS tokens
Generate SAS token in the Azure UI, then set config:
import getpass
storage_account = "yolo"
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", getpass.getpass())
Reading binary files from Azure Blob Storage / Data Lake Storage Gen2
With SAS tokens - the problem is that the Azure Storage Account must have the Hierarchical Namespace enabled in order to use adfss://
protocol.
Moreover, there is sth wrong with the spark.sparkContext.binaryFiles
- it does not work properly, even though the credentials are set. You will get:
Failure to initialize configuration for storage account <storage account>.dfs.core.windows.net: Invalid configuration value detected for fs.azure.account.keyInvalid configuration value detected for fs.azure.account.key
which is total pain. For binary files use:
spark.read.format("binaryFile").load(f"abfss://datasets@{storage_account}.dfs.core.windows.net/data/*.zip")
Spark Configuration
Mouting volumes in Spark on Kubernetes
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-kedro.options.claimName: OnDemand
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-kedro.options.storageClass: standard-rwo
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-kedro.options.sizeLimit: 64Gi
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-kedro.mount.path: /pd-volume
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-kedro.mount.readOnly: false