Parallelism in Spark Notebook Execution in Microsoft Fabric
This blog post discusses the concept of parallelism in Spark notebook execution within the Microsoft Fabric Data Engineering Experience. It explores how parallelism can improve the performance and scalability of data processing tasks in a distributed computing environment.
The following two utilities are commonly utilized to initiate parallel notebook executions.
mssparkutils.notebook.run()
mssparkutils.notebook.runMultiple()
The blogpost is source controlled here.
Introduction
Parallelism is a fundamental concept in distributed computing that allows multiple tasks to be executed simultaneously, thereby improving the efficiency and performance of data processing workflows. In the context of Spark notebooks within the Microsoft Fabric spark engine, parallelism plays a crucial role in optimizing the execution of Spark jobs and enhancing the overall data processing capabilities.
use case
We have TPCH data in Azure Data Lake Storage Gen2, with each year comprising 12 files. To expedite the data loading process, we aim to process multiple years of data concurrently without processing all files together, which would consume extensive resources given the dataframe’s size of several hundred GBs. Our goal is to execute this operation in batches.
Solution: Parallelism in Fabric Spark Notebooks
Within the Microsoft Fabric Data Engineering Experience, Spark notebooks offer a robust platform for creating and running data processing workflows. These notebooks utilize the Spark engine’s capabilities to distribute and parallelize tasks across several nodes in a cluster.
For the mentioned use case, a master notebook will read the number of input files and organize them into subsets depending on the number of jobs we would like to trigger, which will then be processed by child notebooks. Depending on the desired number of jobs or parallel executions, an equivalent number of child notebook instances will run concurrently.
Compute configurations
To efficiently process multiple jobs within a Spark session, it is essential to optimize the compute configuration for parallel execution. This involves configuring the number of executors, the memory allocated to each executor, and the number of cores per executor. By fine-tuning these parameters, we can enhance the parallelism of Spark jobs and boost the overall performance of data processing tasks.
Here are few considerations for compute configurations:
- We are going to use F64 Capacity Unity (CU) in Fabric. 1 CU = 2 spark vCores. So, in total we will get for the job maximum 128 VCores.
- Fabric capacities are enabled with bursting which allows you to consume extra compute cores beyond what have been purchased to speed the execution of a workload. For Spark workloads bursting allows users to submit jobs with a total of 3X the Spark VCores purchased. Here is more detail about bursting
- Example calculation:
F64 SKU offers 128 Spark VCores
. The burst factor applied for aF64 SKU is 3
, which gives a total of384 Spark Vcores
. The burst factor is only applied to help with concurrency and does not increase the max cores available for a single Spark job. That means a single Notebook or Spark Job Definition or Lakehouse Job can use a pool configuration of max 128 vCores and 3 jobs with the same configuration can be run concurrently. If notebooks are using a smaller compute configuration, they can be run concurrently till the max utilization reaches the 384 Spark Vcore limit. - We can set the number of executors, the amount of memory allocated to each executor, and the number of cores per executor in the Spark configuration. We can do this by creating the environment in Fabric and attach the Fabric environment to the master notebook, so that master notebook can use the spark session configurations and run the parallel notebook execution with maximum available resources.
Method 1 — using the mssparkutils.notebook.run() utility
Master notebook
We can create a master notebook to orchestrate the parallel execution of multiple notebooks. This master notebook will trigger individual notebooks to run in parallel, monitor their progress, and aggregate the results. By utilizing the parallelism capabilities of Spark notebooks, we can process large volumes of data more efficiently and rapidly. We’ve set the number of parallel jobs to 6, but we need to adjust this number to determine the maximum number of parallel jobs that can be executed before they start queuing.
Master notebook performs the following steps:
- List the files in the input directory.
- Filters the files which are required to be processed.
# declaring the variables
input_file_path = f"Files/nycyellotaxi-backup"
output_path = "Files/parquet-to-delta-table-fabric"
keywords_to_be_considered =['2022','2021','2020','2019']
no_of_parallel_jobs = 6
# Listing the files in the shortcut path
from notebookutils import mssparkutils
_input_files_path = mssparkutils.fs.ls(f"{input_file_path}")
input_files_path = []
for fileinfo in _input_files_path:
input_files_path.append(fileinfo.path)
# Filtering the files based on the keywords.
files_path = []
filtered_list_of_path = []
for keyword in keywords_to_be_considered:
filtered_list_of_path = [i for i in input_files_path if keyword in i]
for f in filtered_list_of_path:
files_path.append(f)
3. Group the files based on the number of jobs that can be run in parallel.
# Group the files based on the number of jobs that can be run in parallel.
def chunkIt(seq, num):
avg = len(seq) / float(num)
out = []
last = 0.0
while last < len(seq):
out.append(seq[int(last):int(last + avg)])
last += avg
return out
files_list_part = chunkIt(files_path, no_of_parallel_jobs)
4. Call the child notebooks to process the data in parallel. We have used the ThreadPoolExecutor method to run the child notebooks in parallel.
# creating the list of notebooks to be executed in parallel with the parameters that are required for the child notebook.
notebooks = []
for i in range(0, no_of_parallel_jobs):
notebook = {"path": "/child_notebook_parallelism", "params": {"files_list_part": f"{files_list_part[i]}", "output_path" : f"{output_path}/temp/batch{i}"}}
notebooks.append(notebook)
# execute the child notebooks in parallel
from concurrent.futures import ThreadPoolExecutor
timeout = 1800 # 3600 seconds = 1 hour
# notebooks = [
# {"path": "/childnotebook", "params": {"files_list_part": f"{files_list_part[0]}", "output_path" : f"{output_path}/temp/batch0"}},
# {"path": "/childnotebook", "params": {"files_list_part": f"{files_list_part[1]}", "output_path" : f"{output_path}/temp/batch1"}},
# {"path": "/childnotebook", "params": {"files_list_part": f"{files_list_part[2]}", "output_path" : f"{output_path}/temp/batch2"}},
# ]
with ThreadPoolExecutor() as ec:
for notebook in notebooks:
f = ec.submit(mssparkutils.notebook.run, notebook["path"], timeout, notebook["params"])
5. Merge the temporary delta tables to create the final delta table.
6. Save the final delta table as a table in the Lakehouse.
table_delta_file_location = f"Tables/test2_merge"
table_full_name = "test2_merge"
merge_join_condition = "source.hash_key = target.hash_key"
def create_delta_table(
df,
table_full_name,
table_delta_file_location
):
isDeltaTableAlreadyPresent = 0
try:
mssparkutils.fs.ls(table_delta_file_location)
isDeltaTableAlreadyPresent = 1
except:
#writing the delta table into the curated location
df.write.format("delta").mode("overwrite").save(table_delta_file_location)
sqltext = f"CREATE TABLE IF NOT EXISTS {table_full_name} USING DELTA LOCATION '{table_delta_file_location}'"
# print(sqltext)
spark.sql(sqltext)
return isDeltaTableAlreadyPresent
def mergeDeltaTable(
table_full_name,
df,
merge_join_condition
):
df.createOrReplaceTempView("temp_vw_new_data")
sqltext = (f'''
MERGE INTO {table_full_name} as source
USING temp_vw_new_data as target
ON {merge_join_condition}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
''')
# print(sqltext)
spark.sql(sqltext)
import time
import timeit
import functools
from pyspark.sql import DataFrame
from pyspark.sql.functions import expr, col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
start_time = time.time()
output_dfs = []
# reading all the output parquet files from the parallel jobs output
for i in range(0, no_of_parallel_jobs):
input_path = f"{output_path}/temp/batch{i}"
df = spark.read.parquet(input_path)
output_dfs.append(df)
# union all the dataframes into one dataframe
df_output = functools.reduce(DataFrame.unionAll, output_dfs)
# removing the duplicates from the dataframe
windowSpec = Window.partitionBy("hash_key").orderBy("hash_key")
df_output = df_output.withColumn("row_num", row_number().over(windowSpec)).filter("row_num=1")
df_output = df_output.drop("row_num")
# creating the delta table if it is not present
isDeltaTableAlreadyPresent = create_delta_table(
df=df_output,
table_full_name=table_full_name,
table_delta_file_location=table_delta_file_location
)
print (f"isDeltaTableAlreadyPresent = {isDeltaTableAlreadyPresent} [[ 0= Not Present, so we created the delta table. 1= present ]], we skip creation of the delta table")
# merging the new dataframe with the delta table
if (isDeltaTableAlreadyPresent==1):
print(" We are going to merge the new dataframe with the delta table")
mergeDeltaTable(
merge_join_condition=merge_join_condition,
df=df_output,
table_full_name=table_full_name
)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Execution time: {elapsed_time}")
Child notebook
Child notebooks are responsible for processing a subset of the data in parallel. These notebooks can be triggered by the master notebook and run concurrently to process different parts of the data.
Child notebook performs the following steps:
- Read the parameter passed by the master notebook. The child notebook expects which files to be processed and the output path where is should write the data.
# print(type(files_list_part))
files_list_part = eval(files_list_part)
print(files_list_part)
print(type(files_list_part))
print(output_path)
# delete the data file if it exists ( always overwrite scenario)
if mssparkutils.fs.exists(output_path):
mssparkutils.fs.rm(output_path, recurse=True)
2. Read the input data from the specified files.
3. Perform data processing tasks. Here we are adding 2 columns, one to add the hash_key and another column to add the input file name.
4. Union all the dataframes into one dataframe.
5. Write the output data to the specified output path.
import time
import timeit
import functools
from pyspark.sql import DataFrame
from pyspark.sql.functions import expr
hash_expression_list = []
start_time = time.time()
output_dfs = []
for file in files_list_part:
df = spark.read.parquet(file)
# if the file_name column is not present then, add the file_name column with the file name that is being processed
if 'file_name' not in df.columns:
df = df.withColumn('file_name', expr(f"'{file}'"))
#get the hashkey
for i in df.schema:
if i.name == 'file_name':
pass
else:
name_adjusted = "coalesce(cast("+i.name+" AS STRING),'')"
hash_expression_list.append(name_adjusted)
hash_expression_txt = ",'|',".join (hash_expression_list)
hash_expression_txt = 'md5(concat('+hash_expression_txt+'))'
df = df.withColumn ('hash_key', expr(hash_expression_txt) )
output_dfs.append(df)
df_output = functools.reduce(DataFrame.unionAll, output_dfs)
df_output.write.format("parquet").mode("overwrite").save(output_path)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Execution time: {elapsed_time}")
Master Notebook Execution
Here is how we can observe the triggering of different instances of child notebooks.
Method — 2 : Using the mssparkutils.notebook.runMultiple() utility
We have a utility called runMultiple() that enables the parallel execution of notebooks by specifying dependencies and several other parameters. And we can specify the execution order, parameters and dependencies in the JSON. More details here.
In our use case, we do not have any dependencies between the notebooks, so we will run all the notebooks in parallel. The first step is to create the JSON.
DAG={}
activities = []
for i in range(0, no_of_parallel_jobs):
activity = {"name": f"childNotebookcall-{i}", "timeoutPerCellInSeconds": 90000 , "path": "/child_notebook_parallelism", "args": {"files_list_part": f"{files_list_part[i]}", "output_path" : f"{output_path}/temp/batch{i}"}}
activities.append(activity)
DAG["activities"]= activities
Here is the JSON that got created for the job:
{'activities': [{'name': 'childNotebookcall-0',
'timeoutPerCellInSeconds': 90000,
'path': '/child_notebook_parallelism',
'args': {'files_list_part': "['abfss://redacted@msit-onelake.dfs.fabric.microsoft.com/redacted/Files/nycyellotaxi-backup/yellow_tripdata_2022-01.parquet', 'abfss://redacted@msit-onelake.dfs.fabric.microsoft.com/redacted/Files/nycyellotaxi-backup/yellow_tripdata_2022-02.parquet']",
'output_path': 'Files/parquet-to-delta-table-fabric/temp/batch0'}},
{'name': 'childNotebookcall-1',
'timeoutPerCellInSeconds': 90000,
'path': '/child_notebook_parallelism',
'args': {'files_list_part': "['abfss://redacted@msit-onelake.dfs.fabric.microsoft.com/redacted/Files/nycyellotaxi-backup/yellow_tripdata_2022-03.parquet', 'abfss://redacted@msit-onelake.dfs.fabric.microsoft.com/redacted/Files/nycyellotaxi-backup/yellow_tripdata_2022-04.parquet']",
'output_path': 'Files/parquet-to-delta-table-fabric/temp/batch1'}},
{'name': 'childNotebookcall-2',
'timeoutPerCellInSeconds': 90000,
'path': '/child_notebook_parallelism',
'args': {'files_list_part': "['abfss://redacted@msit-onelake.dfs.fabric.microsoft.com/redacted/Files/nycyellotaxi-backup/yellow_tripdata_2022-05.parquet', 'abfss://redacted@msit-onelake.dfs.fabric.microsoft.com/redacted/Files/nycyellotaxi-backup/yellow_tripdata_2022-06.parquet']",
'output_path': 'Files/parquet-to-delta-table-fabric/temp/batch2'}},
‘timeoutPerCellInSeconds’: The timeout value for each cell execution. If you are loading any large data, please put a large number to avoid the timeout.
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
Here is the result post execution:
Hope this helps!