Category - SSDT

Azure Databricks CSV to SQL
Apr 14, 2023

In this blog, we will explore Azure Databricks, a cloud-based analytics platform, and how it can be used to parse a CSV file from Azure storage and then store the data in a database. Additionally, we will also learn how to process stream data and use Databricks notebook in Azure Data Pipeline.   Azure Databricks Overview Azure Databricks is an Apache Spark-based analytics platform that provides a collaborative workspace for data scientists, data engineers, and business analysts. It is a cloud-based service that is designed to handle big data and allows users to process data at scale. Databricks also provides tools for data analysis, machine learning, and visualization. With its integration with Azure Storage, Azure Data Factory, and other Azure services, Azure Databricks can be used to build end-to-end data processing pipelines.   Parsing CSV File from Azure BlobStorage to Database using Azure Databricks Azure Databricks can be used to parse CSV files from Azure Storage and then store the data in a database. Here are the steps to accomplish this:   Configure Various Azure Components 1. Create Azure Resource Group Image 1 2. Create Azure DataBricks Resource  Image 2 3. Create SQL Server Resource  Image 3 4. Create SQL Database Resource Image 4 5. Create Azure Storage Account  Image 5 6. Create Azure DataFactory Resource  Image 6 7. Launch Databricks Resource Workspace  Image 7 8. Create Computing Cluster  Image 8 9. Create New Notebook  Image 9   Parsing CSV File from Azure Storage to Database using Azure Databricks Azure Databricks can be used to parse CSV files from Azure Storage and then store the data in a database. Here are the steps to accomplish this: 1. Create a cluster: First, create a cluster in Azure Databricks as above. A cluster is a group of nodes that work together to process data. 2. Import all the necessary models in the databricks notebook  %python from datetime import datetime, timedelta from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions import pandas as pd import pymssql import pyspark.sql Code 1 3. Mount Azure Storage: Next, mount the Azure Storage account in Databricks as follows #Configure Blob Connection storage_account_name = "storage" storage_account_access_key="***********************************" blob_container = "blob-container" Code 2 4. Establish The DataBase Connection #DB connection conn = pymssql.connect(server='****************.database.windows.net', user='*****', password='*****', database='DataBricksDB') cursor = conn.cursor() Code 3 5. Parse CSV file: Once the storage account is mounted, you can parse the CSV file using the following code #get a list of all blob from the container blob_list = [] for blob_i in container_client.list_blobs(): blob_list.append(blob_i.name) # print(blob_list)      df_list = [] #Generate SAS key for each file and load to the dataframe  for blob_i in blob_list:     print(blob_i)     sas_i = generate_blob_sas(account_name = storage_account_name,                              container_name = blob_container,                              blob_name = blob_i,                              account_key = storage_account_access_key,                              permission = BlobSasPermissions(read=True),                              expiry = datetime.utcnow() + timedelta(hours=12))       sas_url = 'https://' + storage_account_name +'.blob.core.windows.net/' + blob_container + '/' +blob_i     print(sas_url)          df=pd.read_csv(sas_url)     df_list.append(df) Code 4 6. Transform and Store data in a database: Finally, you can store the data in a database using the following code #Truncate Table Sales Truncate_Query = "IF EXISTS (SELECT * FROM sysobjects WHERE name='sales' and xtype='U') truncate table sales" cursor.execute(Truncate_Query) conn.commit()   # SQL Query For Table Creation create_table_query = "IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='sales' and xtype='U') CREATE TABLE sales (REGION  varchar(max),COUNTRY  varchar(max),ITEMTYPE  varchar(max),SALESCHANNEL  varchar(max),ORDERPRIORITY  varchar(max),ORDERDATE  varchar(max),ORDERID  varchar(max),SHIPDATE  varchar(max),UNITSSOLD  varchar(max),UNITPRICE  varchar(max),UNITCOST  varchar(max),TOTALREVENUE  varchar(max),TOTALCOST  varchar(max),TOTALPROFIT  varchar(max))IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='sales' and xtype='U') CREATE TABLE sales (REGION  varchar(max),COUNTRY  varchar(max),ITEMTYPE  varchar(max),SALESCHANNEL  varchar(max),ORDERPRIORITY  varchar(max),ORDERDATE  varchar(max),ORDERID  varchar(max),SHIPDATE  varchar(max),UNITSSOLD  varchar(max),UNITPRICE  varchar(max),UNITCOST  varchar(max),TOTALREVENUE  varchar(max),TOTALCOST  varchar(max),TOTALPROFIT  varchar(max))" cursor.execute(create_table_query) conn.commit()   #Insert Data From Main DataFrame for rows in df_combined.itertuples(index=False,name=None):     row = str(list(rows))     row_data = row[1:-1]     row_data = row_data.replace("nan","''")     row_data = row_data.replace("None","''") insert_query = "insert into sales (REGION,COUNTRY,ITEMTYPE,SALESCHANNEL,ORDERPRIORITY,ORDERDATE,ORDERID,SHIPDATE,UNITSSOLD,UNITPRICE,UNITCOST,TOTALREVENUE,TOTALCOST,TOTALPROFIT) values ("+row_data+")"     print(insert_query)     cursor.execute(insert_query) conn.commit() Code 5 As, Shown here The data from all the files is loaded to the SQL server Table Image 10   Azure Databricks notebook can be used to process stream data in Azure Data Pipeline. Here are the steps to accomplish this: 1. Create a Databricks notebook: First, create a Databricks notebook in Azure Databricks. A notebook is a web-based interface for working with code and data. 2. Create a job: Next, create a job in Azure Data Factory to execute the notebook. A job is a collection of tasks that can be scheduled and run automatically. 3. Configure the job: In the job settings, specify the Azure Databricks cluster and notebook that you want to use. Also, specify the input and output datasets. 4. Write the code: In the Databricks notebook, write the code to process the stream data. Here is an example code: #from pyspark.sql.functions import window stream_data = spark.readStream \     .format("csv") \     .option("header", "true") \     .schema("<schema>") \     .load("/mnt/<mount-name>/<file-name>.csv")   stream_data = stream_data \     .withWatermark("timestamp", "10 minutes") \     .groupBy(window("timestamp", "10 Code 6   How To Use Azure Databrick notebook in Azure Data Factory pipeline and configure the DataFlow Pipeline Using it. Image 11 1. Create ADF Pipeline  Image 12 2. Configure Data Pipeline  Image 13 3. Add Trigger To the PipeLine  Image 14 4. Configure the trigger  Image 15   These capabilities make Azure Databricks an ideal platform for building real-time data processing solutions. Overall, Azure Databricks provides a scalable and flexible solution for data processing and analytics, and it's definitely worth exploring if you're working with big data on the Azure platform. With its powerful tools and easy-to-use interface, Azure Databricks is a valuable addition to any data analytics toolkit.

Create SSIS Data Flow Task Package Programmatically
Jul 27, 2020

In this article, we will review how to create a data flow task package of SSIS in Console Application with an example. Requirements Microsoft Visual Studio 2017 SQL Server 2014 SSDT Article  Done with the above requirements? Let's start by launching Microsoft Visual Studio 2017. Create a new Console Project with .Net Core.  After creating a new project, provide a proper name for it. In Project Explorer import relevant references and ensure that you have declared namespaces as below: using Microsoft.SqlServer.Dts.Pipeline.Wrapper; using Microsoft.SqlServer.Dts.Runtime; using RuntimeWrapper = Microsoft.SqlServer.Dts.Runtime.Wrapper;   To import above namespaces we need to import below refrences.   We need to keep in mind that, above all references should have same version.   After importing namespaces, ask user for the source connection string, destination connection string and table that will be copied to destination. string sourceConnectionString, destinationConnectionString, tableName; Console.Write("Enter Source Database Connection String: "); sourceConnectionString = Console.ReadLine(); Console.Write("Enter Destination Database Connection String: "); destinationConnectionString = Console.ReadLine(); Console.Write("Enter Table Name: "); tableName = Console.ReadLine();   After Declaration, create instance of Application and Package. Application app = new Application(); Package Mipk = new Package(); Mipk.Name = "DatabaseToDatabase";   Create OLEDB Source Connection Manager to the package. ConnectionManager connSource; connSource = Mipk.Connections.Add("ADO.NET:SQL"); connSource.ConnectionString = sourceConnectionString; connSource.Name = "ADO NET DB Source Connection";   Create OLEDB Destination Connection Manager to the package. ConnectionManager connDestination; connDestination= Mipk.Connections.Add("ADO.NET:SQL"); connDestination.ConnectionString = destinationConnectionString; connDestination.Name = "ADO NET DB Destination Connection";   Insert a data flow task to the package. Executable e = Mipk.Executables.Add("STOCK:PipelineTask"); TaskHost thMainPipe = (TaskHost)e; thMainPipe.Name = "DFT Database To Database"; MainPipe df = thMainPipe.InnerObject as MainPipe;   Assign OLEDB Source Component to the Data Flow Task. IDTSComponentMetaData100 conexionAOrigen = df.ComponentMetaDataCollection.New(); conexionAOrigen.ComponentClassID = "Microsoft.SqlServer.Dts.Pipeline.DataReaderSourceAdapter, Microsoft.SqlServer.ADONETSrc, Version=14.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91"; conexionAOrigen.Name = "ADO NET Source";   Get Design time instance of the component and initialize it. CManagedComponentWrapper instance = conexionAOrigen.Instantiate(); instance.ProvideComponentProperties();   Specify the Connection Manager. conexionAOrigen.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connSource); conexionAOrigen.RuntimeConnectionCollection[0].ConnectionManagerID = connSource.ID;   Set the custom properties. instance.SetComponentProperty("AccessMode", 0); instance.SetComponentProperty("TableOrViewName", "\"dbo\".\"" + tableName + "\"");   Reinitialize the source metadata. instance.AcquireConnections(null); instance.ReinitializeMetaData(); instance.ReleaseConnections();   Now, Add Destination Component to the Data Flow Task. IDTSComponentMetaData100 conexionADestination = df.ComponentMetaDataCollection.New(); conexionADestination.ComponentClassID = "Microsoft.SqlServer.Dts.Pipeline.ADONETDestination, Microsoft.SqlServer.ADONETDest, Version=14.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91"; conexionADestination.Name = "ADO NET Destination";   Get Design time instance of the component and initialize it. CManagedComponentWrapper instanceDest = conexionADestination.Instantiate(); instanceDest.ProvideComponentProperties();   Specify the Connection Manager. conexionADestination.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connDestination); conexionADestination.RuntimeConnectionCollection[0].ConnectionManagerID = connDestination.ID;   Set the custom properties. instanceDest.SetComponentProperty("TableOrViewName", "\"dbo\".\"" + tableName + "\"");   Connect the source to destination component: IDTSPath100 union = df.PathCollection.New(); union.AttachPathAndPropagateNotifications(conexionAOrigen.OutputCollection[0], conexionADestination.InputCollection[0]);   Reinitialize the destination metadata. instanceDest.AcquireConnections(null); instanceDest.ReinitializeMetaData(); instanceDest.ReleaseConnections();   Map Source input Columns and Destination Columns foreach (IDTSOutputColumn100 col in conexionAOrigen.OutputCollection[0].OutputColumnCollection) {     for (int i = 0; i < conexionADestination.InputCollection[0].ExternalMetadataColumnCollection.Count; i++)     {         string c = conexionADestination.InputCollection[0].ExternalMetadataColumnCollection[i].Name;         if (c.ToUpper() == col.Name.ToUpper())         {             IDTSInputColumn100 column = conexionADestination.InputCollection[0].InputColumnCollection.New();             column.LineageID = col.ID;             column.ExternalMetadataColumnID = conexionADestination.InputCollection[0].ExternalMetadataColumnCollection[i].ID;         }     } }   Save Package into the file system. app.SaveToXml(@"D:\Workspace\SSIS\Test_DB_To_DB.dtsx", Mipk, null);   Execute package. Mipk.Execute(); Conclusion In this article, we have explained one of the alternatives for creating SSIS packages using .NET console application. In case you have any questions, please feel free to ask in the comment section below.   RELATED BLOGS: Basics of SSIS(SQL Server Integration Service)