In this blog, we will explore Azure Databricks, a cloud-based analytics platform, and how it can be used to parse a CSV file from Azure storage and then store the data in a database. Additionally, we will also learn how to process stream data and use Databricks notebook in Azure Data Pipeline.
Azure Databricks Overview
Azure Databricks is an Apache Spark-based analytics platform that provides a collaborative workspace for data scientists, data engineers, and business analysts. It is a cloud-based service that is designed to handle big data and allows users to process data at scale. Databricks also provides tools for data analysis, machine learning, and visualization. With its integration with Azure Storage, Azure Data Factory, and other Azure services, Azure Databricks can be used to build end-to-end data processing pipelines.
Parsing CSV File from Azure BlobStorage to Database using Azure Databricks
Azure Databricks can be used to parse CSV files from Azure Storage and then store the data in a database. Here are the steps to accomplish this:
Configure Various Azure Components
1. Create Azure Resource Group
Image 1
2. Create Azure DataBricks Resource
Image 2
3. Create SQL Server Resource
Image 3
4. Create SQL Database Resource
Image 4
5. Create Azure Storage Account
Image 5
6. Create Azure DataFactory Resource
Image 6
7. Launch Databricks Resource Workspace
Image 7
8. Create Computing Cluster
Image 8
9. Create New Notebook
Image 9
Parsing CSV File from Azure Storage to Database using Azure Databricks
Azure Databricks can be used to parse CSV files from Azure Storage and then store the data in a database. Here are the steps to accomplish this:
1. Create a cluster: First, create a cluster in Azure Databricks as above. A cluster is a group of nodes that work together to process data.
2. Import all the necessary models in the databricks notebook
%python
from datetime import datetime, timedelta
from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions
import pandas as pd
import pymssql
import pyspark.sql
Code 1
3. Mount Azure Storage: Next, mount the Azure Storage account in Databricks as follows
#Configure Blob Connection
storage_account_name = "storage"
storage_account_access_key="***********************************"
blob_container = "blob-container"
Code 2
4. Establish The DataBase Connection
#DB connection
conn = pymssql.connect(server='****************.database.windows.net', user='*****', password='*****', database='DataBricksDB')
cursor = conn.cursor()
Code 3
5. Parse CSV file: Once the storage account is mounted, you can parse the CSV file using the following code
#get a list of all blob from the container
blob_list = []
for blob_i in container_client.list_blobs():
blob_list.append(blob_i.name)
# print(blob_list)
df_list = []
#Generate SAS key for each file and load to the dataframe
for blob_i in blob_list:
print(blob_i)
sas_i = generate_blob_sas(account_name = storage_account_name,
container_name = blob_container,
blob_name = blob_i,
account_key = storage_account_access_key,
permission = BlobSasPermissions(read=True),
expiry = datetime.utcnow() + timedelta(hours=12))
sas_url = 'https://' + storage_account_name +'.blob.core.windows.net/' + blob_container + '/' +blob_i
print(sas_url)
df=pd.read_csv(sas_url)
df_list.append(df)
Code 4
6. Transform and Store data in a database: Finally, you can store the data in a database using the following code
#Truncate Table Sales
Truncate_Query = "IF EXISTS (SELECT * FROM sysobjects WHERE name='sales' and xtype='U') truncate table sales"
cursor.execute(Truncate_Query)
conn.commit()
# SQL Query For Table Creation
create_table_query = "IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='sales' and xtype='U') CREATE TABLE sales (REGION varchar(max),COUNTRY varchar(max),ITEMTYPE varchar(max),SALESCHANNEL varchar(max),ORDERPRIORITY varchar(max),ORDERDATE varchar(max),ORDERID varchar(max),SHIPDATE varchar(max),UNITSSOLD varchar(max),UNITPRICE varchar(max),UNITCOST varchar(max),TOTALREVENUE varchar(max),TOTALCOST varchar(max),TOTALPROFIT varchar(max))IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='sales' and xtype='U') CREATE TABLE sales (REGION varchar(max),COUNTRY varchar(max),ITEMTYPE varchar(max),SALESCHANNEL varchar(max),ORDERPRIORITY varchar(max),ORDERDATE varchar(max),ORDERID varchar(max),SHIPDATE varchar(max),UNITSSOLD varchar(max),UNITPRICE varchar(max),UNITCOST varchar(max),TOTALREVENUE varchar(max),TOTALCOST varchar(max),TOTALPROFIT varchar(max))"
cursor.execute(create_table_query)
conn.commit()
#Insert Data From Main DataFrame
for rows in df_combined.itertuples(index=False,name=None):
row = str(list(rows))
row_data = row[1:-1]
row_data = row_data.replace("nan","''")
row_data = row_data.replace("None","''")
insert_query = "insert into sales (REGION,COUNTRY,ITEMTYPE,SALESCHANNEL,ORDERPRIORITY,ORDERDATE,ORDERID,SHIPDATE,UNITSSOLD,UNITPRICE,UNITCOST,TOTALREVENUE,TOTALCOST,TOTALPROFIT) values ("+row_data+")"
print(insert_query)
cursor.execute(insert_query)
conn.commit()
Code 5
As, Shown here The data from all the files is loaded to the SQL server Table
Image 10
Azure Databricks notebook can be used to process stream data in Azure Data Pipeline. Here are the steps to accomplish this:
1. Create a Databricks notebook: First, create a Databricks notebook in Azure Databricks. A notebook is a web-based interface for working with code and data.
2. Create a job: Next, create a job in Azure Data Factory to execute the notebook. A job is a collection of tasks that can be scheduled and run automatically.
3. Configure the job: In the job settings, specify the Azure Databricks cluster and notebook that you want to use. Also, specify the input and output datasets.
4. Write the code: In the Databricks notebook, write the code to process the stream data. Here is an example code:
#from pyspark.sql.functions import window
stream_data = spark.readStream \
.format("csv") \
.option("header", "true") \
.schema("<schema>") \
.load("/mnt/<mount-name>/<file-name>.csv")
stream_data = stream_data \
.withWatermark("timestamp", "10 minutes") \
.groupBy(window("timestamp", "10
Code 6
How To Use Azure Databrick notebook in Azure Data Factory pipeline and configure the DataFlow Pipeline Using it.
Image 11
1. Create ADF Pipeline
Image 12
2. Configure Data Pipeline
Image 13
3. Add Trigger To the PipeLine
Image 14
4. Configure the trigger
Image 15
These capabilities make Azure Databricks an ideal platform for building real-time data processing solutions.
Overall, Azure Databricks provides a scalable and flexible solution for data processing and analytics, and it's definitely worth exploring if you're working with big data on the Azure platform. With its powerful tools and easy-to-use interface, Azure Databricks is a valuable addition to any data analytics toolkit.
Project Lead in Magnusminds
Hardik is working as Project Lead of MSBI in INDIA. Hardik started his career working on SQL Server and MSBI. Hardik is having 5+ years of experience. In the starting of his career he was working on SQL Server, SSIS and SSRS. Hardik likes to explore technical things on SQL Server.