Introduction
The next subject in the Snowflake series I would like to discuss is streams and tasks. Streams and tasks can be used to create and automate data processing in a CDC manner. When something changes in a source table, the target table is processed with this event. That's the idea. Now with Snowpipe you can automatically ingest data into Snowflake and with streams and tasks it's possible to process the data one step further into data warehouse, for instance. So that's my aim for this blogpost, to show how to do processing with streams and tasks.
For this blogpost I'm referring to a blogpost I wrote earlier about Snowpipe : Loading data with Snowpipe on Azure. But for this blogpost, I'm not using the Snowpipe for the sake of simplicity. You can imagine that when I replace the automatic insertion with Snowpipe with the manual table manipulations you get the same result.
According to the blog of David Oyegoke, streams are powered by the timetravel feature, you can query it and when the data is consumed the offset is moved forward automatically. It's like a queue. With Tasks you can run SQL statements or run a stored procedure in a time triggered manner and it's possible to connect the tasks in a tree like manner.
Streams demo
CREATE TABLE IF NOT EXISTS dbo.FactInternetSales_target( ProductKey int NOT NULL, OrderDateKey int NOT NULL, DueDateKey int NOT NULL, ShipDateKey int NOT NULL, CustomerKey int NOT NULL, PromotionKey int NOT NULL, CurrencyKey int NOT NULL, SalesTerritoryKey int NOT NULL, SalesOrderNumber nvarchar(20) NOT NULL, SalesOrderLineNumber tinyint NOT NULL, RevisionNumber tinyint NOT NULL, OrderQuantity smallint NOT NULL, UnitPrice float NOT NULL, ExtendedAmount float NOT NULL, UnitPriceDiscountPct float NOT NULL, DiscountAmount float NOT NULL, ProductStandardCost float NOT NULL, TotalProductCost float NOT NULL, SalesAmount float NOT NULL, TaxAmt float NOT NULL, Freight float NOT NULL, OrderDate datetime NULL, DueDate datetime NULL, ShipDate datetime NULL );
DROP STREAM IF EXISTS FactInternetSales_target_stream; CREATE OR REPLACE STREAM FactInternetSales_target_stream ON TABLE dbo.FactInternetSales;
SELECT * FROM FactInternetSales_target_stream
From my previous experiments with Snowpipe there are about 250 K records in there. First let's clear the table and see what happens in the stream.
DELETE FROM FactInternetSales SELECT * FROM FactInternetSales_target_stream
Merge the records
MERGE INTO dbo.FactInternetSales_target AS TRG
USING (SELECT *
FROM FactInternetSales_target_stream
WHERE NOT (metadata$action = 'DELETE'
and metadata$isupdate = TRUE)) AS STR
ON TRG.SalesOrderNumber = STR.SalesOrderNumber
AND TRG.SalesOrderlineNumber = STR.SalesOrderLineNumber
WHEN MATCHED AND STR.metadata$action = 'INSERT'
AND STR.metadata$isupdate = TRUE THEN
UPDATE SET
TRG.ProductKey = STR.ProductKey,
TRG.OrderDateKey = STR.OrderDateKey,
TRG.DueDateKey = STR.DueDateKey,
TRG.ShipDateKey = STR.ShipDateKey,
TRG.CustomerKey = STR.CustomerKey,
TRG.PromotionKey = STR.PromotionKey,
TRG.CurrencyKey = STR.CurrencyKey,
TRG.SalesTerritoryKey = STR.SalesTerritoryKey,
TRG.SalesOrderNumber = STR.SalesOrderNumber,
TRG.SalesOrderLineNumber = STR.SalesOrderLineNumber,
TRG.RevisionNumber = STR.RevisionNumber,
TRG.OrderQuantity = STR.OrderQuantity,
TRG.UnitPrice = STR.UnitPrice,
TRG.ExtendedAmount = STR.ExtendedAmount,
TRG.UnitPriceDiscountPct = STR.UnitPriceDiscountPct,
TRG.DiscountAmount = STR.DiscountAmount,
TRG.ProductStandardCost = STR.ProductStandardCost,
TRG.TotalProductCost = STR.TotalProductCost,
TRG.SalesAmount = STR.SalesAmount,
TRG.TaxAmt = STR.TaxAmt,
TRG.Freight = STR.Freight,
TRG.OrderDate = STR.OrderDate,
TRG.DueDate = STR.DueDate,
TRG.ShipDate = STR.ShipDate
WHEN MATCHED AND STR.metadata$action = 'DELETE' THEN DELETE
WHEN NOT MATCHED AND STR.metadata$action = 'INSERT' THEN
INSERT (
ProductKey,
OrderDateKey,
DueDateKey,
ShipDateKey,
CustomerKey,
PromotionKey,
CurrencyKey,
SalesTerritoryKey,
SalesOrderNumber,
SalesOrderLineNumber,
RevisionNumber,
OrderQuantity,
UnitPrice,
ExtendedAmount,
UnitPriceDiscountPct,
DiscountAmount,
ProductStandardCost,
TotalProductCost,
SalesAmount ,
TaxAmt ,
Freight,
OrderDate,
DueDate,
ShipDate
)
VALUES(
STR.ProductKey,
STR.OrderDateKey,
STR.DueDateKey,
STR.ShipDateKey,
STR.CustomerKey,
STR.PromotionKey,
STR.CurrencyKey,
STR.SalesTerritoryKey,
STR.SalesOrderNumber,
STR.SalesOrderLineNumber,
STR.RevisionNumber,
STR.OrderQuantity,
STR.UnitPrice,
STR.ExtendedAmount,
STR.UnitPriceDiscountPct,
STR.DiscountAmount,
STR.ProductStandardCost,
STR.TotalProductCost,
STR.SalesAmount ,
STR.TaxAmt ,
STR.Freight,
STR.OrderDate,
STR.DueDate,
STR.ShipDate
);
Insert a record
INSERT INTO dbo.FactInternetSales (
ProductKey,
OrderDateKey,
DueDateKey,
ShipDateKey,
CustomerKey,
PromotionKey,
CurrencyKey,
SalesTerritoryKey,
SalesOrderNumber,
SalesOrderLineNumber,
RevisionNumber,
OrderQuantity,
UnitPrice,
ExtendedAmount,
UnitPriceDiscountPct,
DiscountAmount,
ProductStandardCost,
TotalProductCost,
SalesAmount ,
TaxAmt ,
Freight,
OrderDate,
DueDate,
ShipDate
)
VALUES
(
777,
20101229,
20110110,
20110105,
21768,
1,
19,
6,
'SO43697',
1,
1,
1,
357827,
357827,
0,
0,
21712942,
21712942,
357827,
2862616,
894568,
to_timestamp('29/12/2010 00:00', 'DD/MM/YYYY HH:MI'),
to_timestamp('10/01/2011 00:00', 'DD/MM/YYYY HH:MI'),
to_timestamp('05/01/2011 00:00', 'DD/MM/YYYY HH:MI')
);
Update a record
UPDATE FactInternetSales
SET Unitprice = 213123
Delete a record
DELETE FROM FactInternetSales
Delete the source table
Tasks explained
--- CREATE TASK
CREATE TASK TASK_FactInternetSalesEveryMinute
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
AS
MERGE INTO dbo.FactInternetSales_target AS TRG
USING (SELECT *
FROM FactInternetSales_target_stream
WHERE NOT (metadata$action = 'DELETE' AND metadata$isupdate = TRUE)
) AS STR
ON TRG.SalesOrderNumber = STR.SalesOrderNumber
AND TRG.SalesOrderlineNumber = STR.SalesOrderLineNumber
WHEN MATCHED AND STR.metadata$action = 'INSERT'
AND STR.metadata$isupdate = TRUE THEN
UPDATE SET
TRG.ProductKey = STR.ProductKey,
TRG.OrderDateKey = STR.OrderDateKey,
TRG.DueDateKey = STR.DueDateKey,
TRG.ShipDateKey = STR.ShipDateKey,
TRG.CustomerKey = STR.CustomerKey,
TRG.PromotionKey = STR.PromotionKey,
TRG.CurrencyKey = STR.CurrencyKey,
TRG.SalesTerritoryKey = STR.SalesTerritoryKey,
TRG.SalesOrderNumber = STR.SalesOrderNumber,
TRG.SalesOrderLineNumber = STR.SalesOrderLineNumber,
TRG.RevisionNumber = STR.RevisionNumber,
TRG.OrderQuantity = STR.OrderQuantity,
TRG.UnitPrice = STR.UnitPrice,
TRG.ExtendedAmount = STR.ExtendedAmount,
TRG.UnitPriceDiscountPct = STR.UnitPriceDiscountPct,
TRG.DiscountAmount = STR.DiscountAmount,
TRG.ProductStandardCost = STR.ProductStandardCost,
TRG.TotalProductCost = STR.TotalProductCost,
TRG.SalesAmount = STR.SalesAmount,
TRG.TaxAmt = STR.TaxAmt,
TRG.Freight = STR.Freight,
TRG.OrderDate = STR.OrderDate,
TRG.DueDate = STR.DueDate,
TRG.ShipDate = STR.ShipDate
WHEN MATCHED AND STR.metadata$action = 'DELETE' THEN DELETE
WHEN NOT MATCHED AND STR.metadata$action = 'INSERT' THEN
INSERT (
ProductKey,
OrderDateKey,
DueDateKey,
ShipDateKey,
CustomerKey,
PromotionKey,
CurrencyKey,
SalesTerritoryKey,
SalesOrderNumber,
SalesOrderLineNumber,
RevisionNumber,
OrderQuantity,
UnitPrice,
ExtendedAmount,
UnitPriceDiscountPct,
DiscountAmount,
ProductStandardCost,
TotalProductCost,
SalesAmount ,
TaxAmt ,
Freight,
OrderDate,
DueDate,
ShipDate
)
VALUES(
STR.ProductKey,
STR.OrderDateKey,
STR.DueDateKey,
STR.ShipDateKey,
STR.CustomerKey,
STR.PromotionKey,
STR.CurrencyKey,
STR.SalesTerritoryKey,
STR.SalesOrderNumber,
STR.SalesOrderLineNumber,
STR.RevisionNumber,
STR.OrderQuantity,
STR.UnitPrice,
STR.ExtendedAmount,
STR.UnitPriceDiscountPct,
STR.DiscountAmount,
STR.ProductStandardCost,
STR.TotalProductCost,
STR.SalesAmount ,
STR.TaxAmt ,
STR.Freight,
STR.OrderDate,
STR.DueDate,
STR.ShipDate
);
SHOW TASKS;
ALTER TASK TASK_FactInternetSalesEveryMinute RESUME
Geen opmerkingen:
Een reactie posten