maandag 7 december 2020

Snowflake : Streams and tasks

Introduction

The next subject in the Snowflake series I would like to discuss is streams and tasks. Streams and tasks can be used to create and automate data processing in a CDC manner. When something changes in a source table, the target table is processed with this event. That's the idea. Now with Snowpipe you can automatically ingest data into Snowflake and with streams and tasks it's possible to process the data one step further into data warehouse, for instance. So that's my aim for this blogpost, to show how to do processing with streams and tasks.


For this blogpost I'm referring to a blogpost I wrote earlier about Snowpipe : Loading data with Snowpipe on Azure. But for this blogpost, I'm not using the Snowpipe for the sake of simplicity. You can imagine that when I replace the automatic insertion with Snowpipe with the manual table manipulations you get the same result.

According to the blog of  David Oyegoke, streams are powered by the timetravel feature, you can query it and when the data is consumed the offset is moved forward automatically. It's like a queue.  With Tasks you can run SQL statements or run a stored procedure in a time triggered manner and it's possible to connect the tasks in a tree like manner.


Streams demo

For this blogpost, I've created a small setup with the table FactInternetSales from AdventureWorks that I've created earlier in my blogpost Loading data with Snowpipe on Azure. Now let's start this setup with creating a target table. I name this FactInternetSales_target. In my previous blogpost about snowpipe I already created a FactInternetSales table where data is loaded into.

CREATE TABLE IF NOT EXISTS dbo.FactInternetSales_target(
	ProductKey int NOT NULL,
	OrderDateKey int NOT NULL,
	DueDateKey int NOT NULL,
	ShipDateKey int NOT NULL,
	CustomerKey int NOT NULL,
	PromotionKey int NOT NULL,
	CurrencyKey int NOT NULL,
	SalesTerritoryKey int NOT NULL,
	SalesOrderNumber nvarchar(20) NOT NULL,
	SalesOrderLineNumber tinyint NOT NULL,
	RevisionNumber tinyint NOT NULL,
	OrderQuantity smallint NOT NULL,
	UnitPrice float NOT NULL,
	ExtendedAmount float NOT NULL,
	UnitPriceDiscountPct float NOT NULL,
	DiscountAmount float NOT NULL,
	ProductStandardCost float NOT NULL,
	TotalProductCost float NOT NULL,
	SalesAmount float NOT NULL,
	TaxAmt float NOT NULL,
	Freight float NOT NULL,
	OrderDate datetime NULL,
	DueDate datetime NULL,
	ShipDate datetime NULL
);

Now let's create a stream with SQL.

DROP STREAM IF EXISTS FactInternetSales_target_stream;

CREATE OR REPLACE STREAM FactInternetSales_target_stream 
    ON TABLE dbo.FactInternetSales;

You can not use the same name as the table resulting in an error when you try: SQL compilation error: Object found is of type 'TABLE', not specified type 'STREAM'.

Now, the next step is to query the stream. You can query the stream like a table

SELECT * FROM FactInternetSales_target_stream 

There is nothing in there right now. So the data in the table FactInternetSales is not initially inserted when the stream is created.


From my previous experiments with Snowpipe there are about 250 K records in there. First let's clear the table and see what happens in the stream. 

DELETE FROM FactInternetSales

SELECT * FROM FactInternetSales_target_stream 

Resulting in a stream that is filled with records that are indicated as deleted.


Merge the records

I borrowed a Merge statement from Lee Harrington for some of my experiments with streams and tasks. 

MERGE INTO dbo.FactInternetSales_target AS TRG
USING (SELECT * 
       FROM FactInternetSales_target_stream 
       WHERE NOT (metadata$action = 'DELETE' 
        and metadata$isupdate = TRUE)) AS STR
ON TRG.SalesOrderNumber = STR.SalesOrderNumber 
    AND TRG.SalesOrderlineNumber = STR.SalesOrderLineNumber
WHEN MATCHED AND STR.metadata$action = 'INSERT'
             AND STR.metadata$isupdate = TRUE THEN 
     UPDATE SET
        TRG.ProductKey				= STR.ProductKey,
        TRG.OrderDateKey			= STR.OrderDateKey,
        TRG.DueDateKey				= STR.DueDateKey,
        TRG.ShipDateKey				= STR.ShipDateKey,
        TRG.CustomerKey				= STR.CustomerKey,
        TRG.PromotionKey			= STR.PromotionKey,
        TRG.CurrencyKey				= STR.CurrencyKey,
        TRG.SalesTerritoryKey		= STR.SalesTerritoryKey,
        TRG.SalesOrderNumber		= STR.SalesOrderNumber,
        TRG.SalesOrderLineNumber	= STR.SalesOrderLineNumber,
        TRG.RevisionNumber			= STR.RevisionNumber,
        TRG.OrderQuantity			= STR.OrderQuantity,
        TRG.UnitPrice				= STR.UnitPrice,
        TRG.ExtendedAmount			= STR.ExtendedAmount,
        TRG.UnitPriceDiscountPct	= STR.UnitPriceDiscountPct,
        TRG.DiscountAmount			= STR.DiscountAmount,
        TRG.ProductStandardCost		= STR.ProductStandardCost,
        TRG.TotalProductCost		= STR.TotalProductCost,
        TRG.SalesAmount 			= STR.SalesAmount,
        TRG.TaxAmt 					= STR.TaxAmt,
        TRG.Freight					= STR.Freight,
        TRG.OrderDate				= STR.OrderDate,
        TRG.DueDate					= STR.DueDate,
        TRG.ShipDate				= STR.ShipDate
WHEN MATCHED AND STR.metadata$action = 'DELETE' THEN DELETE
WHEN NOT MATCHED AND STR.metadata$action = 'INSERT' THEN 
    INSERT (
       ProductKey,
      OrderDateKey,
      DueDateKey,
      ShipDateKey,
      CustomerKey,
      PromotionKey,
      CurrencyKey,
      SalesTerritoryKey,
      SalesOrderNumber,
      SalesOrderLineNumber,
      RevisionNumber,
      OrderQuantity,
      UnitPrice,
      ExtendedAmount,
      UnitPriceDiscountPct,
      DiscountAmount,
      ProductStandardCost,
      TotalProductCost,
      SalesAmount ,
      TaxAmt ,
      Freight,
      OrderDate,
      DueDate,
      ShipDate
    ) 
    VALUES(
        STR.ProductKey,
        STR.OrderDateKey,
        STR.DueDateKey,
        STR.ShipDateKey,
        STR.CustomerKey,
        STR.PromotionKey,
        STR.CurrencyKey,
        STR.SalesTerritoryKey,
        STR.SalesOrderNumber,
        STR.SalesOrderLineNumber,
        STR.RevisionNumber,
        STR.OrderQuantity,
        STR.UnitPrice,
        STR.ExtendedAmount,
        STR.UnitPriceDiscountPct,
        STR.DiscountAmount,
        STR.ProductStandardCost,
        STR.TotalProductCost,
        STR.SalesAmount ,
        STR.TaxAmt ,
        STR.Freight,
        STR.OrderDate,
        STR.DueDate,
        STR.ShipDate
    );

Executing this query results in zeros because there was nothing in the target table.


Insert a record

I wish I used a smaller table for this blogpost;-). Now, what happens if an Insert statement is executed on the source table FactInternetSales?

INSERT INTO dbo.FactInternetSales (
       ProductKey,
      OrderDateKey,
      DueDateKey,
      ShipDateKey,
      CustomerKey,
      PromotionKey,
      CurrencyKey,
      SalesTerritoryKey,
      SalesOrderNumber,
      SalesOrderLineNumber,
      RevisionNumber,
      OrderQuantity,
      UnitPrice,
      ExtendedAmount,
      UnitPriceDiscountPct,
      DiscountAmount,
      ProductStandardCost,
      TotalProductCost,
      SalesAmount ,
      TaxAmt ,
      Freight,
      OrderDate,
      DueDate,
      ShipDate
    ) 
VALUES
    (
      777,
      20101229,
      20110110,
      20110105,
      21768,
      1,
      19,
      6,
      'SO43697', 
      1,
      1,
      1,
      357827,
      357827,
      0,
      0,
      21712942,
      21712942,
      357827,
      2862616,
      894568,
      to_timestamp('29/12/2010 00:00', 'DD/MM/YYYY HH:MI'),
      to_timestamp('10/01/2011 00:00', 'DD/MM/YYYY HH:MI'),
      to_timestamp('05/01/2011 00:00', 'DD/MM/YYYY HH:MI')
    );
Let's investigate the stream and see what content is of the stream. The stream shows a record with the METADATA$ACTION = 'INSERT'



Let's execute the MERGE statement to test whether we can insert the record in the table.The stream is empty now:


But the FactInternetsales_target table is filled with data


Update a record

A next step is to check out an update of a record in the source table. If a record is updated in the source table, how does the stream look like? Here an example of the UPDATE statement.

UPDATE FactInternetSales
SET Unitprice = 213123
Results in : 

And run the MERGE statement again and here is the result of that:




Delete a record

After the insertion, the update, let's take a look at the deletion of a record. What happens in the stream?

DELETE FROM FactInternetSales

Here is a screendump from the stream. The Metacolumn indicates a DELETE.



Now execute the MERGE statement


And now the target table is empty. The record is deleted.




Delete the source table

While experimenting I deleted the source table. When I triedd to query the stream an error occurred.



Just a recreate of the stream is enough to get it working again.


Tasks explained

We have experimented a bit with streams, let's take a look at tasks. Task objects defines a recurring schedule for executing a SQL statement or stored procedures. Tasks can be chained together for successive execution. In this way it's possible to load a data warehouse for instance: first staging, datavault and perhaps data marts.  Lets, see how we can utilise the tasks feature of snowflake to automate the work we have done.

First create the task with the CREATE TASK statement. I integrated the MERGE statement I developed earlier and described in this blogpost (see above).

--- CREATE TASK
CREATE TASK TASK_FactInternetSalesEveryMinute
  WAREHOUSE = COMPUTE_WH
  SCHEDULE = '1 MINUTE' 
AS
MERGE INTO dbo.FactInternetSales_target AS TRG
USING (SELECT * 
       FROM FactInternetSales_target_stream 
       WHERE NOT (metadata$action = 'DELETE' AND metadata$isupdate = TRUE)
) AS STR
ON TRG.SalesOrderNumber = STR.SalesOrderNumber 
    AND TRG.SalesOrderlineNumber = STR.SalesOrderLineNumber
WHEN MATCHED AND STR.metadata$action = 'INSERT'
             AND STR.metadata$isupdate = TRUE THEN 
     UPDATE SET
        TRG.ProductKey				= STR.ProductKey,
        TRG.OrderDateKey			= STR.OrderDateKey,
        TRG.DueDateKey				= STR.DueDateKey,
        TRG.ShipDateKey				= STR.ShipDateKey,
        TRG.CustomerKey				= STR.CustomerKey,
        TRG.PromotionKey			= STR.PromotionKey,
        TRG.CurrencyKey				= STR.CurrencyKey,
        TRG.SalesTerritoryKey		= STR.SalesTerritoryKey,
        TRG.SalesOrderNumber		= STR.SalesOrderNumber,
        TRG.SalesOrderLineNumber	= STR.SalesOrderLineNumber,
        TRG.RevisionNumber			= STR.RevisionNumber,
        TRG.OrderQuantity			= STR.OrderQuantity,
        TRG.UnitPrice				= STR.UnitPrice,
        TRG.ExtendedAmount			= STR.ExtendedAmount,
        TRG.UnitPriceDiscountPct	= STR.UnitPriceDiscountPct,
        TRG.DiscountAmount			= STR.DiscountAmount,
        TRG.ProductStandardCost		= STR.ProductStandardCost,
        TRG.TotalProductCost		= STR.TotalProductCost,
        TRG.SalesAmount 			= STR.SalesAmount,
        TRG.TaxAmt 					= STR.TaxAmt,
        TRG.Freight					= STR.Freight,
        TRG.OrderDate				= STR.OrderDate,
        TRG.DueDate					= STR.DueDate,
        TRG.ShipDate				= STR.ShipDate
WHEN MATCHED AND STR.metadata$action = 'DELETE' THEN DELETE
WHEN NOT MATCHED AND STR.metadata$action = 'INSERT' THEN 
    INSERT (
       ProductKey,
      OrderDateKey,
      DueDateKey,
      ShipDateKey,
      CustomerKey,
      PromotionKey,
      CurrencyKey,
      SalesTerritoryKey,
      SalesOrderNumber,
      SalesOrderLineNumber,
      RevisionNumber,
      OrderQuantity,
      UnitPrice,
      ExtendedAmount,
      UnitPriceDiscountPct,
      DiscountAmount,
      ProductStandardCost,
      TotalProductCost,
      SalesAmount ,
      TaxAmt ,
      Freight,
      OrderDate,
      DueDate,
      ShipDate
    ) 
    VALUES(
        STR.ProductKey,
        STR.OrderDateKey,
        STR.DueDateKey,
        STR.ShipDateKey,
        STR.CustomerKey,
        STR.PromotionKey,
        STR.CurrencyKey,
        STR.SalesTerritoryKey,
        STR.SalesOrderNumber,
        STR.SalesOrderLineNumber,
        STR.RevisionNumber,
        STR.OrderQuantity,
        STR.UnitPrice,
        STR.ExtendedAmount,
        STR.UnitPriceDiscountPct,
        STR.DiscountAmount,
        STR.ProductStandardCost,
        STR.TotalProductCost,
        STR.SalesAmount ,
        STR.TaxAmt ,
        STR.Freight,
        STR.OrderDate,
        STR.DueDate,
        STR.ShipDate
    );

Okay, that is done. The Task is created but why isn't it running yet. Executing Show tasks can help you understand the status of the Task. 

SHOW TASKS;

Now, it's clear that the task is suspended.


We have to execute a RESUME on the task in order to get the Tasks running.

ALTER TASK TASK_FactInternetSalesEveryMinute RESUME

Now let's run the experiments as before : Insert a record, update a record and delete a record

Insert

Update:


Delete:
Nothing is there..

Now the Merge statement isn't manually executed anymore but in an automated manner. Really cool! With a few statements automatically loading data into a data warehouse.

Final Thoughts

Now, I haven't implemented this as a SCD2, but you can imagine that is very easy to built this with a MERGE statement. Perhaps you can build a stored procedure for automating the load with TASKS. 

Hennie

Geen opmerkingen:

Een reactie posten