Quick CSV to SQL with Azure Databricks | MagnusMinds Blog

Apr 14, 2023

In this blog, we will explore Azure Databricks, a cloud-based analytics platform, and how it can be used to parse a CSV file from Azure storage and then store the data in a database. Additionally, we will also learn how to process stream data and use Databricks notebook in Azure Data Pipeline.

 

Azure Databricks Overview

Azure Databricks is an Apache Spark-based analytics platform that provides a collaborative workspace for data scientists, data engineers, and business analysts. It is a cloud-based service that is designed to handle big data and allows users to process data at scale. Databricks also provides tools for data analysis, machine learning, and visualization. With its integration with Azure Storage, Azure Data Factory, and other Azure services, Azure Databricks can be used to build end-to-end data processing pipelines.

 

Parsing CSV File from Azure BlobStorage to Database using Azure Databricks

Azure Databricks can be used to parse CSV files from Azure Storage and then store the data in a database. Here are the steps to accomplish this:

 

Configure Various Azure Components

1. Create Azure Resource Group

Image 1

2. Create Azure DataBricks Resource 

Image 2

3. Create SQL Server Resource 

Image 3

4. Create SQL Database Resource

Image 4

5. Create Azure Storage Account 

Image 5

6. Create Azure DataFactory Resource 

Image 6

7. Launch Databricks Resource Workspace 

Image 7

8. Create Computing Cluster 

Image 8

9. Create New Notebook 

Image 9

 

Parsing CSV File from Azure Storage to Database using Azure Databricks

Azure Databricks can be used to parse CSV files from Azure Storage and then store the data in a database. Here are the steps to accomplish this:

1. Create a cluster: First, create a cluster in Azure Databricks as above. A cluster is a group of nodes that work together to process data.

2. Import all the necessary models in the databricks notebook 

%python

from datetime import datetime, timedelta

from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions

import pandas as pd

import pymssql

import pyspark.sql

Code 1

3. Mount Azure Storage: Next, mount the Azure Storage account in Databricks as follows

#Configure Blob Connection

storage_account_name = "storage"

storage_account_access_key="***********************************"

blob_container = "blob-container"

Code 2

4. Establish The DataBase Connection

#DB connection

conn = pymssql.connect(server='****************.database.windows.net', user='*****', password='*****', database='DataBricksDB')

cursor = conn.cursor()

Code 3

5. Parse CSV file: Once the storage account is mounted, you can parse the CSV file using the following code

#get a list of all blob from the container

blob_list = []

for blob_i in container_client.list_blobs():

blob_list.append(blob_i.name)

# print(blob_list)

    

df_list = []

#Generate SAS key for each file and load to the dataframe 

for blob_i in blob_list:

    print(blob_i)

    sas_i = generate_blob_sas(account_name = storage_account_name,

                             container_name = blob_container,

                             blob_name = blob_i,

                             account_key = storage_account_access_key,

                             permission = BlobSasPermissions(read=True),

                             expiry = datetime.utcnow() + timedelta(hours=12))

 

    sas_url = 'https://' + storage_account_name +'.blob.core.windows.net/' + blob_container + '/' +blob_i

    print(sas_url)

    

    df=pd.read_csv(sas_url)

    df_list.append(df)

Code 4

6. Transform and Store data in a database: Finally, you can store the data in a database using the following code

#Truncate Table Sales

Truncate_Query = "IF EXISTS (SELECT * FROM sysobjects WHERE name='sales' and xtype='U') truncate table sales"

cursor.execute(Truncate_Query)

conn.commit()

 

# SQL Query For Table Creation

create_table_query = "IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='sales' and xtype='U') CREATE TABLE sales (REGION  varchar(max),COUNTRY  varchar(max),ITEMTYPE  varchar(max),SALESCHANNEL  varchar(max),ORDERPRIORITY  varchar(max),ORDERDATE  varchar(max),ORDERID  varchar(max),SHIPDATE  varchar(max),UNITSSOLD  varchar(max),UNITPRICE  varchar(max),UNITCOST  varchar(max),TOTALREVENUE  varchar(max),TOTALCOST  varchar(max),TOTALPROFIT  varchar(max))IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='sales' and xtype='U') CREATE TABLE sales (REGION  varchar(max),COUNTRY  varchar(max),ITEMTYPE  varchar(max),SALESCHANNEL  varchar(max),ORDERPRIORITY  varchar(max),ORDERDATE  varchar(max),ORDERID  varchar(max),SHIPDATE  varchar(max),UNITSSOLD  varchar(max),UNITPRICE  varchar(max),UNITCOST  varchar(max),TOTALREVENUE  varchar(max),TOTALCOST  varchar(max),TOTALPROFIT  varchar(max))"

cursor.execute(create_table_query)

conn.commit()

 

#Insert Data From Main DataFrame

for rows in df_combined.itertuples(index=False,name=None):

    row = str(list(rows))

    row_data = row[1:-1]

    row_data = row_data.replace("nan","''")

    row_data = row_data.replace("None","''")

insert_query = "insert into sales (REGION,COUNTRY,ITEMTYPE,SALESCHANNEL,ORDERPRIORITY,ORDERDATE,ORDERID,SHIPDATE,UNITSSOLD,UNITPRICE,UNITCOST,TOTALREVENUE,TOTALCOST,TOTALPROFIT) values ("+row_data+")"

    print(insert_query)

    cursor.execute(insert_query)

conn.commit()

Code 5

As, Shown here The data from all the files is loaded to the SQL server Table

Image 10

 

Azure Databricks notebook can be used to process stream data in Azure Data Pipeline. Here are the steps to accomplish this:

1. Create a Databricks notebook: First, create a Databricks notebook in Azure Databricks. A notebook is a web-based interface for working with code and data.

2. Create a job: Next, create a job in Azure Data Factory to execute the notebook. A job is a collection of tasks that can be scheduled and run automatically.

3. Configure the job: In the job settings, specify the Azure Databricks cluster and notebook that you want to use. Also, specify the input and output datasets.

4. Write the code: In the Databricks notebook, write the code to process the stream data. Here is an example code:

#from pyspark.sql.functions import window

stream_data = spark.readStream \

    .format("csv") \

    .option("header", "true") \

    .schema("<schema>") \

    .load("/mnt/<mount-name>/<file-name>.csv")

 

stream_data = stream_data \

    .withWatermark("timestamp", "10 minutes") \

    .groupBy(window("timestamp", "10

Code 6
 

How To Use Azure Databrick notebook in Azure Data Factory pipeline and configure the DataFlow Pipeline Using it.

Image 11

1. Create ADF Pipeline 

Image 12

2. Configure Data Pipeline 

Image 13

3. Add Trigger To the PipeLine 

Image 14

4. Configure the trigger 

Image 15

 

These capabilities make Azure Databricks an ideal platform for building real-time data processing solutions.

Overall, Azure Databricks provides a scalable and flexible solution for data processing and analytics, and it's definitely worth exploring if you're working with big data on the Azure platform. With its powerful tools and easy-to-use interface, Azure Databricks is a valuable addition to any data analytics toolkit.

Hardik Dangar

About the Author

Hardik Dangar

Project Lead in Magnusminds

Hardik is working as Project Lead of MSBI in INDIA. Hardik started his career working on SQL Server and MSBI. Hardik is having 5+ years of experience. In the starting of his career he was working on SQL Server, SSIS and SSRS. Hardik likes to explore technical things on SQL Server.