maandag 30 november 2020

Snowflake : Loading data with Snowpipe on Azure

Introduction

One of the interesting part of Snowflake is Snowpipe. Snowpipe is a built-in data ingestion solution for continuously loading data into Snowflake. In this blogpost I'll give a walkthrough of the installation and the settings of the various components that are needed for Azure.

This blogpost is a sequence of the steps in order to load files successfully into Snowflake with Snowpipe. Overall, we can say that the following steps are needed for Snowpipe with notification:
  1. Create a blob storage in Azure
  2. Create a queue in Azure.
  3. Create an event grid in Azure.
  4. Creata a notification integration in Snowflake.
  5. Authenticate Snowflake for Azure.
  6. Authenticate Snowflake in AAD.
  7. Create a stage on Azure in Snowflake.
  8. Create the snowpipe in Snowflake.
  9. Test the snowpipe.

Setup of the components

1.  First we have to create a resource group as a container for the different resources that we are going to create for this blogpost.



2. The next step is to create a storage account for storing the files. Please pay attention for the 'Account kind', choose here Storage V2.



3. Create a container in the storage account for storing the files. Give it a proper name.



4. The next step is creating a queue where the files are queued. Give it a proper name and click on Create.




5. Create an event grid subscription in the storage account. Do this by selecting the storage account and clicking on the container. The event grid subscription is created on the blob storage.



And enter the following options:


 Press on the endpoint and the following window appears



Fill in the values as follows:


And press on confirm selection, but now fill in a topic name and press on Create.



In my setup the following error happened :


It seems that I've not registered the resource provider  'event grid'. So go to the subscription and click on the 'resource provider'.


And here it is :


Click on register and then message appears that the resource provider is registering until it states that it has been registered.


And, let's try it again! Go back to the storage account and enter the values and press on create and now it succeeds.


6. Now create the notification integration between Azure and Snowflake. Execute the following statement:

create notification integration my_notification_int
  enabled = true
  type = queue
  notification_provider = azure_storage_queue
  azure_storage_queue_primary_uri = 'https://azstaccsnowpipe.queue.core.windows.net/azqueuesnowpipe'
  azure_tenant_id = '6917576b-7462-4d17-9805-747d91eec881';

Make sure that you execute this as accountadmin. 

Check now if it is created correctly with 'show integrations'



Check now the properties of the notification integration by using the command 'DESC'.




7. Now authenticate the snowflake application for azure by copying the URL that I have just retrieved by executing the command DESC.


8. Let's go back to Azure and now we need to approve the access of snowflake to Azure.
Go to Azure and open Azure Active Directory of your subscription. Go to Enterpise applications.


Click on the Snowflake application :


Copy the applicationname for later. 


9. Give access to the snowflake application. Go the storage account and click on Access Control (IAM).


Click on role assignments.


Click on Add roleassigment and select Storage queue Data Contributor, search for the Snowflake application for the role contributor for the storage.


Click on Save.



10. Go to the properties of the storage queue


Copy the primary endpoint blob service to the clipboard (or editor).


Copy the SAS from the storage account. click on Shared Access Signature


Set the settings right of the SAS string :


11. Now, create a stage in Snowflake and check if the correct files are there with the LS command. I changed the SAS string ;-)


The script:
create or replace stage ADVENTUREWORKS
url='azure://azstaccsnowpipe.blob.core.windows.net/azcontainersnowpipe'
credentials=(azure_sas_token='?sv=2019-12-12&ss=bfqt&srt=co&sp=rwlacx&se=2021-11-28T19:09:43Z&st=2020-11-28T11:09:43Z&spr=https&sig=NteRJmdbvstgxOSlIaqea%2BdO4c54t3Xm8HE0t1KhmE%3D');

12. Create the Snowpipe with the CREATE PIPE command:


The script:
create pipe snowpipe
  auto_ingest = true
  integration = 'MY_NOTIFICATION_INT'
  as
  copy into ADVENTUREWORKS.dbo.FactInternetSales
  from @ADVENTUREWORKS
  file_format = (type = 'CSV');

13. Refresh the pipe with the following command because the files that are stored in the blob storage are not loaded, yet. They were already there and the event is only triggered by a Create event. You can force the pipe to load the files by issuing the command ALTER PIPE REFRESH.

ALTER PIPE snowpipe REFRESH;

But nothing happens. What is wrong?

We can use the the following query to check that the number of columns between the table and the file are out of sync. 


The script :
  SELECT * FROM TABLE (information_Schema.copy_history(
        table_name=>'dbo.FactInternetSales', 
        start_time=>dateadd(hours, -1, current_timestamp())));

The error is : 

"Number of columns in file (1) does not match that of the corresponding table (26),
use file format option error_on_column_count_mismatch=false to ignore this error"

Okay let's check that. 


The file is separated with | instead of , I changed the statement with a field_delimiter property :
create pipe snowpipe
  auto_ingest = true
  integration = 'MY_NOTIFICATION_INT'
  as
  copy into ADVENTUREWORKS.dbo.FactInternetSales
  from @ADVENTUREWORKS
  file_format = (type = 'CSV' field_delimiter = '|');

And a new error occurred. 


I decide to copy one row of the file and check whether it is possible to insert this record in the table. The file is out of sync. Somehow the fields CarrierTrackingNumber and CusotmerPONumber is not present in the file. The files and the DDL are of different version. Also trying to insert '2.862.616' in a float column is not possible.
ProductKey 			= 310
OrderDateKey 			= 20101229
DueDateKey 			= 20110110
ShipDateKey 			= 20110105
CustomerKey 			= 21768
PromotionKey			= 1
CurrencyKeyL			= 19	
SalesTerritoryKey		= 6 
SalesOrderNumber		= SO43697
SalesOrderLineNumber	        = 1
RevisionNumber			= 1
OrderQuantity			= 1
UnitPrice			= 3578.27
ExtendedAmount			= 3578.27
UnitPriceDiscountPct	        = 0
DiscountAmount			= 0
ProductStandardCost		= 21.712.942
TotalProductCost		= 21.712.942
SalesAmount			= 3578.27
TaxAmt				= 2.862.616
Freight				= 894.568
CarrierTrackingNumber	        = ?
CustomerPONumber		= ?
OrderDate			= 29/12/2010 00:00
DueDate				= 10/01/2011 00:0
ShipDate			= 05/01/2011 00:00

The insert script:

INSERT INTO dbo.FactInternetSales(
ProductKey,
OrderDateKey,
DueDateKey,
ShipDateKey,
CustomerKey,
PromotionKey,
CurrencyKey,
SalesTerritoryKey,
SalesOrderNumber,
SalesOrderLineNumber,
RevisionNumber,
OrderQuantity,
UnitPrice,
ExtendedAmount,
UnitPriceDiscountPct,
DiscountAmount,
ProductStandardCost,
TotalProductCost,
SalesAmount ,
TaxAmt ,
Freight,
--CarrierTrackingNumber,
--CustomerPONumber,
OrderDate,
DueDate,
ShipDate
)
VALUES (
310,
20101229,
20110110,
20110105,	
21768,
1,	
19,	
6,	
'SO43697',
1,	
1,	
1,	
3578.27,	
3578.27,	
0,	
0,
21712942,
21712942,
3578.27,	
2862616,	
894568,			
'29/12/2010 00:00',
'10/01/2011 00:00',	
'05/01/2011 00:00'
)



This script results in another error :

Timestamp '29/12/2010 00:00' is not recognized

After a couple of experiments with the function TO_TIMESTAMP, the statement  to_timestamp($22, 'DD/MM/YYYY HH:MI' is the correct one for converting the dates into a format that snowflake recognizes.

I also removed the dots from the float field, just for the sake of this demo. In reality there is a kind of conversion needed for importing the fields with thousand dot separators and decimals.

-- Now create the snowpipe
create pipe snowpipe
  auto_ingest = true
  integration = 'MY_NOTIFICATION_INT'
  as
  copy into ADVENTUREWORKS.dbo.FactInternetSales (
    ProductKey,
    OrderDateKey,
    DueDateKey,
    ShipDateKey,
    CustomerKey,
    PromotionKey,
    CurrencyKey,
    SalesTerritoryKey,
    SalesOrderNumber,
    SalesOrderLineNumber,
    RevisionNumber,
    OrderQuantity,
    UnitPrice,
    ExtendedAmount,
    UnitPriceDiscountPct,
    DiscountAmount,
    ProductStandardCost,
    TotalProductCost,
    SalesAmount ,
    TaxAmt ,
    Freight,
    --CarrierTrackingNumber,
    --CustomerPONumber,
    OrderDate,
    DueDate,
    ShipDate
  )
  from (
    SELECT 
        $1, 
        $2, 
        $3, 
        $4, 
        $5, 
        $6, 
        $7, 
        $8, 
        $9, 
        $10, 
        $11, 
        $12, 
        $13, 
        $14, 
        $15, 
        $16, 
        $17, 
        $18, 
        $19, 
        $20, 
        $21, 
    to_timestamp($22, 'DD/MM/YYYY HH:MI'),
    to_timestamp($23, 'DD/MM/YYYY HH:MI'),
    to_timestamp($24, 'DD/MM/YYYY HH:MI')
    FROM @ADVENTUREWORKS
  )
  file_format = (type = 'CSV' field_delimiter = '|');

And now the table is loaded.


and the result is in the table now: 




Some testing

Experiment #1
Let's put another file in the blob storage and see what happens. Here you see some of the attemps of snowpipe loading the file in the table.


What happens if I copy another file (with another name and content) in the container of the Blob storage? Let's find out.


It will try to load the dimEmpoyee.csv file into the FactInternetSales table. That's not what you would like to have.

Experiment #2
What if deliver the same file again, but with another name? I renamed the FactInternetsales.csv to FactInternetSales4.csv and loaded the file twice into Azure blob storage. The result is that the second file is not loaded for the second time

Experiment #3
What if I change something in the file but keep the name the same? The result is that the file is not loaded into Snowflake.

Used resources

There are several blogpost and you tube movies about this subject on the internet that are really interesting:

Final thoughts

I like the idea of event driven loading into a database and especially a data warehouse. There are lot of steps needed for loading a table into Snowflake. It seems to me now, that you have to setup this configuration for every file you want to load into snowflake with Snowpipe. One solution that pops up in forums is to create one master file where all the data is in there and then split the data in Snowflake. I'm not very pleased with that solution. I'm open for suggestions.

Hennie