Exploring various Lakehouse Streaming patterns within Microsoft Fabric.
What is Microsoft Fabric?
Microsoft Fabric is an all-in-one analytics solution for enterprises that covers everything from data movement to data science, Real-Time Analytics, and business intelligence. It offers a comprehensive range of services, including data lake, data engineering, and data integration, all in one place.
In this blogpost, we are going to explore a few Real time analytics options that are possible with Microsoft Fabric.
Pattern 1: Ingesting Events using Eventstream.
Pattern 2: Ingesting Events using Synapse Data Engineering structured streaming.
Pattern 3: Ingesting smaller Files continuously into Lakehouse
The video below showcases a comprehensive demonstration of the use cases that we are going to discuss in this blogpost.
Please note that we also have a feature in Fabric, Real-Time Analytics that reduces complexity and simplifies data integration. Quick access to data insights is achieved through automatic data streaming, automatic indexing and data partitioning of any data source or format, and by using the on-demand query generation and visualizations. While the current blog primarily focuses on ingesting streams directly into Lakehouse, we will delve deeper into Real-Time Analytics in future blog posts.
In my previous blog posts, I explored batch ingestion and walked through the essential steps for creating a Data Lake Architecture, also known as Medallion Architecture.
Link to Blog — https://medium.com/@learn-it-all/data-lake-implementation-using-microsoft-fabric-ccea72a8d162
Pattern 1: Ingesting Events using Eventstream.
What is Eventstream? Eventstream feature in Fabric is the centralized place to receive, transform the real-time event data and ingest them to various destinations with no-code experience. We can consider this as subset of the functionalities that are being currently provided by the Azure Event hub & Azure Stream Analytics.
Source: Event Stream currently supports EventHub, and custom App as the source. Custom app provides the endpoint where we can push the real-time event data from any applications.
Destinations: Event streams currently supports 3 types of destinations, such as Lakehouse, KQL Database, Custom App.
1) Event Generator: This can be any external application or device which is responsible for sending the streaming event data to the event stream endpoint. In this example, we utilized a Python application for testing purposes. Here the example dataset is being retrieved periodically from https://opensky-network.org to mock the streaming events. Full Script is here.
2) Event Stream: Event Stream will receive events from the event generator, utilizing a custom app as the source. When creating the custom app, we will obtain a connection string that needs to be integrated into the Python app. Subsequently, the Event Stream will do transformation on the data by cleansing, and then will forward a cleansed output to the Lakehouse table and KQL table. Thus, we have a single source, sending the same data to two destinations.
3) Dataset — Since the data will change frequently, we are going to use DirectQuery mode to get the live data. For the Lakehouse table, we can use the PBI desktop’s “Azure Synapse Analytics SQL connector” to connect to the Fabric SQL Endpoint Direct Query mode selected.
4) Power BI — This is used here for the data visualization.
Pattern 2: Ingesting Events using spark structured streaming in Microsoft Fabric.
Here is the Dataflow diagram for this pattern.
Unlike Pattern 1, this approach provides enhanced flexibility in transformation and the added capability to convert continuous streaming into micro-batches, thus reducing the workload on the workspace. However, it is worth noting that Pattern 1 offers a no-code experience and is better suited for use cases with minimal transformation requirements.
This pattern comprises the following key components:
- Event Generator (Python app): To send data to the event hub, we utilize a python-based app. For this purpose, we obtain an example dataset periodically from https://opensky-network.org to simulate streaming events. You can find the app’s python file here.
- Event Hub: As a modern big data streaming platform, Event Hubs facilitates the streaming of data.
- Spark Structured Streaming (Notebook): This component is responsible for receiving data from the Event Hub and continuously streaming it into a Lakehouse table. The corresponding notebook can be accessed here.
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
df1 = df.withColumn("bodyAsString", df["body"].cast("string")) \
.select(from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(24) # paritioning the data in memory
def write2table(df2, epoch_id):
df2.write.format("delta").mode("append").partitionBy("estDepartureAirport").save(f"Tables/Struc_streaming_flight_data")
# here paritionby partition the data in disk
# We are microbatching in every 15 seconds.
df1.writeStream \
.outputMode("append") \
.trigger(processingTime='15 seconds') \
.option("checkpointLocation",checkpointLocation) \
.foreachBatch(write2table) \
.start() \
.awaitTermination() # Here we can provide the timeout value incase we want spark streaming job to be stopped after sometime.
What is Spark Structured Streaming? Structured Streaming is a fault-tolerant and scalable stream processing engine built on Spark. It handles streaming operations incrementally and continuously as new data arrives. For further details about Structured Streaming, refer to the provided document. In this example, we continuously and incrementally ingest flight details data from the event hub into a Lakehouse table.
4. Dataset and Power BI Report: To create the report based on the Lakehouse table, we utilize the direct query mode, as explained in pattern 1
Consider these configurations when working with Spark Structured Streaming:
- Fault tolerance: Ensure to enable notebook retry to prevent job termination in case the underlying infrastructure requires patching from the cloud provider. Utilize Data Pipeline to trigger the notebook with a retry policy set to a few minutes or seconds.
2. Secret management: To fetch the required data from the event hub, the notebook requires the connection string. Utilize Azure Key Vault to securely retrieve and manage these secrets.
3. Micro-batching: When dealing with a large volume of ingesting events, it is advisable to use small batches for data ingestion. This approach helps avoid row-by-row operations for each event and improves efficiency.
Pattern 3 — Ingesting smaller Files continuously into Lakehouse.
Here is the data flow diagram for this pattern.
- File Generator: This python-based app generates smaller files containing sample data into a storage account container.
- Storage Account: The container holds the smaller files and serves as the source for the Spark streaming job.
- Spark Structured Streaming (Notebook): The steps are similar to what we’ve applied in pattern 2, with the only difference being the source type. By leveraging CSV support in structured streaming, we eliminate the need for watermarking typically used in data ingestion. Checkpointing now automates the watermarking process, reducing development overhead. Sample Notebook
# Read all the csv files written atomically in a directory
from pyspark.sql.types import *
userSchema = StructType().add("Name", "string").add("Age", "integer").add("Country", "string")
csvDF = spark \
.readStream \
.option("sep", ",") \
.schema(userSchema) \
.csv(source_data_folder) \
# Equivalent to format("csv").load("/path/to/directory")
from pyspark.sql.functions import input_file_name, col , element_at , split
from pyspark.sql.functions import current_timestamp
df = csvDF.withColumn("fileName", element_at(split(input_file_name(), "/"), -1)) \
.withColumn("current_timestamp",current_timestamp())
def write2table(df2, epoch_id):
df2.write.format("delta").mode("append").save(f"Tables/Struc_streaming_csv_data")
# here paritionby partition the data in disk
# We are microbatching in every `10` seconds.
df.writeStream \
.outputMode("append") \
.trigger(processingTime='10 seconds') \
.option("checkpointLocation",checkpointLocation) \
.foreachBatch(write2table) \
.start() \
.awaitTermination()
File source — Reads files written in a directory as a stream of data. The files are processed based on their modification time, and if ‘latestFirst’ is set, the order will be reversed. Supported file formats include text, CSV, JSON, ORC, and Parquet.
4. Dataset and Power BI: We create the report based on the Lakehouse table using the direct query mode, as explained in pattern 1.
Hope this helps!