Introduction
One of the interesting part of Snowflake is Snowpipe. Snowpipe is a built-in data ingestion solution for continuously loading data into Snowflake. In this blogpost I'll give a walkthrough of the installation and the settings of the various components that are needed for Azure.
This blogpost is a sequence of the steps in order to load files successfully into Snowflake with Snowpipe. Overall, we can say that the following steps are needed for Snowpipe with notification:
- Create a blob storage in Azure
- Create a queue in Azure.
- Create an event grid in Azure.
- Creata a notification integration in Snowflake.
- Authenticate Snowflake for Azure.
- Authenticate Snowflake in AAD.
- Create a stage on Azure in Snowflake.
- Create the snowpipe in Snowflake.
- Test the snowpipe.
Setup of the components
1. First we have to create a resource group as a container for the different resources that we are going to create for this blogpost.
2. The next step is to create a storage account for storing the files. Please pay attention for the 'Account kind', choose here Storage V2.
4. The next step is creating a queue where the files are queued. Give it a proper name and click on Create.
5. Create an event grid subscription in the storage account. Do this by selecting the storage account and clicking on the container. The event grid subscription is created on the blob storage.
And enter the following options:
In my setup the following error happened :
It seems that I've not registered the resource provider 'event grid'. So go to the subscription and click on the 'resource provider'.
Click on register and then message appears that the resource provider is registering until it states that it has been registered.
And, let's try it again! Go back to the storage account and enter the values and press on create and now it succeeds.
create notification integration my_notification_int enabled = true type = queue notification_provider = azure_storage_queue azure_storage_queue_primary_uri = 'https://azstaccsnowpipe.queue.core.windows.net/azqueuesnowpipe' azure_tenant_id = '6917576b-7462-4d17-9805-747d91eec881';
Make sure that you execute this as accountadmin.
Check now if it is created correctly with 'show integrations'
7. Now authenticate the snowflake application for azure by copying the URL that I have just retrieved by executing the command DESC.
8. Let's go back to Azure and now we need to approve the access of snowflake to Azure.
Go to Azure and open Azure Active Directory of your subscription. Go to Enterpise applications.
9. Give access to the snowflake application. Go the storage account and click on Access Control (IAM).
Click on Add roleassigment and select Storage queue Data Contributor, search for the Snowflake application for the role contributor for the storage.
10. Go to the properties of the storage queue
Copy the SAS from the storage account. click on Shared Access Signature
The script:
create or replace stage ADVENTUREWORKS url='azure://azstaccsnowpipe.blob.core.windows.net/azcontainersnowpipe' credentials=(azure_sas_token='?sv=2019-12-12&ss=bfqt&srt=co&sp=rwlacx&se=2021-11-28T19:09:43Z&st=2020-11-28T11:09:43Z&spr=https&sig=NteRJmdbvstgxOSlIaqea%2BdO4c54t3Xm8HE0t1KhmE%3D');
12. Create the Snowpipe with the CREATE PIPE command:
The script:
create pipe snowpipe auto_ingest = true integration = 'MY_NOTIFICATION_INT' as copy into ADVENTUREWORKS.dbo.FactInternetSales from @ADVENTUREWORKS file_format = (type = 'CSV');
13. Refresh the pipe with the following command because the files that are stored in the blob storage are not loaded, yet. They were already there and the event is only triggered by a Create event. You can force the pipe to load the files by issuing the command ALTER PIPE REFRESH.
ALTER PIPE snowpipe REFRESH;
But nothing happens. What is wrong?
We can use the the following query to check that the number of columns between the table and the file are out of sync.
The script :
SELECT * FROM TABLE (information_Schema.copy_history( table_name=>'dbo.FactInternetSales', start_time=>dateadd(hours, -1, current_timestamp())));
The error is :
"Number of columns in file (1) does not match that of the corresponding table (26),
use file format option error_on_column_count_mismatch=false to ignore this error"
Okay let's check that.
create pipe snowpipe auto_ingest = true integration = 'MY_NOTIFICATION_INT' as copy into ADVENTUREWORKS.dbo.FactInternetSales from @ADVENTUREWORKS file_format = (type = 'CSV' field_delimiter = '|');
And a new error occurred.
I decide to copy one row of the file and check whether it is possible to insert this record in the table. The file is out of sync. Somehow the fields CarrierTrackingNumber and CusotmerPONumber is not present in the file. The files and the DDL are of different version. Also trying to insert '2.862.616' in a float column is not possible.
ProductKey = 310 OrderDateKey = 20101229 DueDateKey = 20110110 ShipDateKey = 20110105 CustomerKey = 21768 PromotionKey = 1 CurrencyKeyL = 19 SalesTerritoryKey = 6 SalesOrderNumber = SO43697 SalesOrderLineNumber = 1 RevisionNumber = 1 OrderQuantity = 1 UnitPrice = 3578.27 ExtendedAmount = 3578.27 UnitPriceDiscountPct = 0 DiscountAmount = 0 ProductStandardCost = 21.712.942 TotalProductCost = 21.712.942 SalesAmount = 3578.27 TaxAmt = 2.862.616 Freight = 894.568 CarrierTrackingNumber = ? CustomerPONumber = ? OrderDate = 29/12/2010 00:00 DueDate = 10/01/2011 00:0 ShipDate = 05/01/2011 00:00
The insert script:
INSERT INTO dbo.FactInternetSales(
ProductKey,
OrderDateKey,
DueDateKey,
ShipDateKey,
CustomerKey,
PromotionKey,
CurrencyKey,
SalesTerritoryKey,
SalesOrderNumber,
SalesOrderLineNumber,
RevisionNumber,
OrderQuantity,
UnitPrice,
ExtendedAmount,
UnitPriceDiscountPct,
DiscountAmount,
ProductStandardCost,
TotalProductCost,
SalesAmount ,
TaxAmt ,
Freight,
--CarrierTrackingNumber,
--CustomerPONumber,
OrderDate,
DueDate,
ShipDate
)
VALUES (
310,
20101229,
20110110,
20110105,
21768,
1,
19,
6,
'SO43697',
1,
1,
1,
3578.27,
3578.27,
0,
0,
21712942,
21712942,
3578.27,
2862616,
894568,
'29/12/2010 00:00',
'10/01/2011 00:00',
'05/01/2011 00:00'
)
This script results in another error :
Timestamp '29/12/2010 00:00' is not recognized
After a couple of experiments with the function TO_TIMESTAMP, the statement to_timestamp($22, 'DD/MM/YYYY HH:MI' is the correct one for converting the dates into a format that snowflake recognizes.
I also removed the dots from the float field, just for the sake of this demo. In reality there is a kind of conversion needed for importing the fields with thousand dot separators and decimals.
-- Now create the snowpipe create pipe snowpipe auto_ingest = true integration = 'MY_NOTIFICATION_INT' as copy into ADVENTUREWORKS.dbo.FactInternetSales ( ProductKey, OrderDateKey, DueDateKey, ShipDateKey, CustomerKey, PromotionKey, CurrencyKey, SalesTerritoryKey, SalesOrderNumber, SalesOrderLineNumber, RevisionNumber, OrderQuantity, UnitPrice, ExtendedAmount, UnitPriceDiscountPct, DiscountAmount, ProductStandardCost, TotalProductCost, SalesAmount , TaxAmt , Freight, --CarrierTrackingNumber, --CustomerPONumber, OrderDate, DueDate, ShipDate ) from ( SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, to_timestamp($22, 'DD/MM/YYYY HH:MI'), to_timestamp($23, 'DD/MM/YYYY HH:MI'), to_timestamp($24, 'DD/MM/YYYY HH:MI') FROM @ADVENTUREWORKS ) file_format = (type = 'CSV' field_delimiter = '|');
And now the table is loaded.
and the result is in the table now:
Some testing
Experiment #1
Let's put another file in the blob storage and see what happens. Here you see some of the attemps of snowpipe loading the file in the table.
What happens if I copy another file (with another name and content) in the container of the Blob storage? Let's find out.
It will try to load the dimEmpoyee.csv file into the FactInternetSales table. That's not what you would like to have.
Experiment #2
What if deliver the same file again, but with another name? I renamed the FactInternetsales.csv to FactInternetSales4.csv and loaded the file twice into Azure blob storage. The result is that the second file is not loaded for the second time
Experiment #3
What if I change something in the file but keep the name the same? The result is that the file is not loaded into Snowflake.
Used resources
There are several blogpost and you tube movies about this subject on the internet that are really interesting:
- Official Snowflake documentation : Automating Snowpipe for Microsoft Azure Blob Storage
- Youtube : SnowPipe on Azure :Creating micro-batch data ingestion pipeline from azure blob storage to Snowflake
- Linkedin : SnowPipe on Azure - Automating micro-batch serverless data ingestion into snowflake from Azure blob storage
- Building Snowpipe on Azure Blob Storage Using Azure Portal Web UI for Snowflake Data Warehouse
- Snowpipe 101
Final thoughts
I like the idea of event driven loading into a database and especially a data warehouse. There are lot of steps needed for loading a table into Snowflake. It seems to me now, that you have to setup this configuration for every file you want to load into snowflake with Snowpipe. One solution that pops up in forums is to create one master file where all the data is in there and then split the data in Snowflake. I'm not very pleased with that solution. I'm open for suggestions.
Hennie
Is it possible to load containerA/file1 to table1 and containerA/file2 to table2 using same snowpipe?
BeantwoordenVerwijderenthank you ! very accurate . how could one figure all these 100 steps on their own ?
BeantwoordenVerwijderenbtw, #10 first step -> "Properties" seems to be called "Endpoints" now (June 2022)