In this post i’ll describe the different aspects of developing SSIS components in C# and particularly the Derived Column component. Building packages in C# is a challenging job. Hopefully with help of this post you will be able to overcome most of the difficulties. I already published some posts about programming SSIS with C# (part I, part II, part III, part IV and part V). These were reasonable simple straight packages. In this post I’ll describe building a packages that is a bit more complicated. In this post we will implement the derived column component in a data flow task. The derived column add or replaces columns in a dataflow task.
There are certain parts of SSIS components to understand in order to build packages programmatically successful:
- Type of component (source adapter, destination adapter, transformation adapter).
- Design time/run time.
- Virtual/non virtual.
Types of components
There are three types of components: the source adapter, transformation and the destination adapter. The source adapter retrieves data from a external location and transforms this into a internal buffer format that the pipeline expects. The transformation component accepts buffers of data from the pipeline, does something with it and pushes the data further downstream. The destination adapter accepts buffers and writes this into a external location.
Design time/run time
The design time phase refers to the methods and interfaces that are called when the component is being used in a development sense. In other words, the code that is being run when the component is dragged onto the SSIS design surface, and when it’s being configured.. The run time functionality refers to the calls and interfaces that are being used when the component is actually being executed.
Below there are some typical Design time methods that I’ve been using in my SSIS code projects.
- ProvidecomponentProperties. This method is called when a component is first added to a dataflowtask and is used to initialize a component. Components should add their inputs, outputs and customproperties
- SetComponentProperty. With the setcomponentproperty method it’s possible to set properties of a component
- SetUsageType. This method deals with the columns on inputs into the component. You can use this to select a column and tells the component how you will treat each column.
- MapInputColumn. These method is used to create a relationship between input column and MetaDatacolumn. An external metadata column is an offline representation of an output or input column and can be used to downstream components to create an input. I've used this in my destination adapter where a mapping is required.
Virtual/non virtual.
In the code below i'll be using the virtual and the non-virtual columns. As far as i understand now, the virtual columns are what a component could have and the non virtual (normal) columns are the chosen columns of the component (from the virtualinputs).
The code
I’ve made one assumption in this code and that is I have the same name for the derived column and for the columnname in the database (DerivedColumn). This made it easy for mapping the columns in the destination. If you don’t have the same name then you have to build some custom code here.
Please note that the code is created based on SQL Server 2008 R2,
Add the following references to your project (Project->Add Reference):
- Microsoft.SqlServer.DTSPipelineWrap.dll
- Microsoft.SQLServer.DTSRuntimeWrap.dll
- Microsoft.SQLServer.ManagedDTS.dll
- Microsoft.SqlServer.PipelineHost.dll
Use this code on top of your script:
using DTS = Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
These are the steps that needs to be taken for succesfully generating a Derived Column component in a dataflow with C#:
- Create a application.
- Create a package.
- Add a connection for the source adapter to the package.
- Add a connection for the destination adapter to the package.
- Add a Dataflow task to the package.
- Add a source component to the dataflowtask.
- Connect the Source connection with the source component.
- Add a destination component to the dataflowtask.
- Connect the destination connection with the destination component.
- Add a Derived column component
- Create the path between the Source and the Derived Column component.
- Create the path between the Derived Column and the Destination component.
- Add a column to the derived column component.
- Setting some properties for the derived column component.
- Map the fields in the destination component.
- Write the package to a file ( you can also write this to SQL Server).
- Dispose the objects.
1. Create a application.
The first thing you have to create is a new object
Application. With an application class it's possible to discover and access package objects. It can also access collections and properties that contain information about the system.
// Create an application
DTS.Application app = new DTS.Application();
2. Create a package.
After creating the application object the next step is creating the
package object. The package is a collection of other containers, connections, tasks, transformations, variables, configurations, and precedence constraints.
// Create a package
DTS.Package pkg = new DTS.Package();
//Setting some properties
pkg.Name = @"MyProgrammedDataflowTaskWithDerivedColumn";
3. Add a connection for the source adapter to the package.
Data comes from a source and it will be pushed downstream to a destination. So a connection to the source is needed and is created with the
connectionmanager class. The connectionmanager class provides the information that you must have to connect to a datasource.
//Adding a connection to the database AdventureWorksLT2008R2
DTS.ConnectionManager ConnMgrAdvent = pkg.Connections.Add("OLEDB");
ConnMgrAdvent.ConnectionString = @"Provider=SQLOLEDB.1;" +
"Integrated Security=SSPI;Initial Catalog=AdventureWorks2008R2;" +
"Data Source=(local);";
ConnMgrAdvent.Name = @"AdventureWorks2008R2";
ConnMgrAdvent.Description = @"SSIS Connection Manager for OLEDB Source
AdventureWorks2008R2";
4. Add a connection for the destination adapter to the package.
Logically, the destination adapter needs to be created too. This is described in my former
post.
//Adding a connection to the database Import_DB
DTS.ConnectionManager ConnMgrImport_DB = pkg.Connections.Add("OLEDB");
ConnMgrImport_DB.ConnectionString = @"Provider=SQLOLEDB.1;" +
"Integrated Security=SSPI;Initial Catalog=Import_DB;" +
"Data Source=(local);";
ConnMgrImport_DB.Name = @"Import_DB";
ConnMgrImport_DB.Description = @"SSIS Connection Manager for OLEDB Source
ImportDB";
5. Add a Dataflow task to the package.
The derived column is part of the dataflow and therefore a dataflow task is needed. This is described in my former
post.
//Adding the dataflow task to the package
DTS.Executable exe = pkg.Executables.Add("STOCK:PipelineTask");
DTS.TaskHost TKHSQLHost = (DTS.TaskHost)exe;
TKHSQLHost.Name = "This is a programmed DataFlowTask";
MainPipe dataFlowTask = (MainPipe)TKHSQLHost.InnerObject;
6. Add a source component to the dataflowtask.
This is described in my former
post.
// Create the source component.
IDTSComponentMetaData100 source =
dataFlowTask.ComponentMetaDataCollection.New();
source.ComponentClassID = "DTSAdapter.OleDbSource";
CManagedComponentWrapper srcDesignTime = source.Instantiate();
srcDesignTime.ProvideComponentProperties();
7. Connect the Source connection with the source component.
This is described in my former
post.
// Assign the connection manager.
if (source.RuntimeConnectionCollection.Count > 0)
{
source.RuntimeConnectionCollection[0].ConnectionManager =
DTS.DtsConvert.GetExtendedInterface(ConnMgrAdvent);
source.RuntimeConnectionCollection[0].ConnectionManagerID =
pkg.Connections["AdventureWorks2008R2"].ID;
}
// Set the custom properties of the source.
srcDesignTime.SetComponentProperty("AccessMode", 0);
srcDesignTime.SetComponentProperty("OpenRowset", "[Production].[Product]");
// Connect to the data source, and then update the metadata for the source.
srcDesignTime.AcquireConnections(null);
srcDesignTime.ReinitializeMetaData();
srcDesignTime.ReleaseConnections();
8. Add a destination component to the dataflowtask.
This is described in my former
post.
// Create the destination component.
IDTSComponentMetaData100 destination =
dataFlowTask.ComponentMetaDataCollection.New();
destination.ComponentClassID = "DTSAdapter.OleDbDestination";
CManagedComponentWrapper destDesignTime = destination.Instantiate();
destDesignTime.ProvideComponentProperties();
9. Connect the destination connection with the destination component.
This is described in my former
post.
// Assign the connection manager.
destination.RuntimeConnectionCollection[0].ConnectionManager =
DTS.DtsConvert.GetExtendedInterface(ConnMgrImport_DB);
if (destination.RuntimeConnectionCollection.Count > 0)
{
destination.RuntimeConnectionCollection[0].ConnectionManager =
DTS.DtsConvert.GetExtendedInterface(ConnMgrImport_DB);
destination.RuntimeConnectionCollection[0].ConnectionManagerID =
pkg.Connections["Import_DB"].ID;
}
// Set the custom properties of the destination
destDesignTime.SetComponentProperty("AccessMode", 0);
destDesignTime.SetComponentProperty("OpenRowset", "[dbo].
[ProductWithDerivedColumn]");
// Connect to the data source, and then update the metadata for the source.
destDesignTime.AcquireConnections(null);
destDesignTime.ReinitializeMetaData();
destDesignTime.ReleaseConnections();
10. Add a Derived column component.
This part is new regarding my
former post. The code below adds the derived column component to the dataflow task.
//Derived Column
IDTSComponentMetaData100 derived =
dataFlowTask.ComponentMetaDataCollection.New();
derived.Name = "Derived Column Component";
derived.ComponentClassID = "DTSTransform.DerivedColumn";
CManagedComponentWrapper DesignDerivedColumns = derived.Instantiate();
DesignDerivedColumns.ProvideComponentProperties(); //design time
derived.InputCollection[0].ExternalMetadataColumnCollection.IsUsed = false;
derived.InputCollection[0].HasSideEffects = false;
//update the metadata for the derived columns
DesignDerivedColumns.AcquireConnections(null);
DesignDerivedColumns.ReinitializeMetaData();
DesignDerivedColumns.ReleaseConnections();
11. Create the path between the Source and the Derived Column component.
With this code a connection (path) is created between the source component and the derived column component.
//Create the path from source to derived columns
IDTSPath100 SourceToDerivedPath = dataFlowTask.PathCollection.New();
SourceToDerivedPath.AttachPathAndPropagateNotifications(source.OutputCollection[0],
derived.InputCollection[0]);
12. Create the path between the Derived Column and the Destination component.
With this code a connection (path) is created between the derived column component and the derived column component.
//Create the path from derived to desitination
IDTSPath100 DerivedToDestinationPath = dataFlowTask.PathCollection.New();
DerivedToDestinationPath.AttachPathAndPropagateNotifications(
derived.OutputCollection[0],destination.InputCollection[0]);
13. Add a column to the derived column component.
This code is about adding a column to the derviced column component. I just entered 10 for simplicity but anything is possible, off course.
IDTSOutputColumn100 myCol =
derived.OutputCollection[0].OutputColumnCollection.New();
myCol.Name = "DerivedColumn";
myCol.SetDataTypeProperties(
Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_I4, 0, 0, 0, 0);
myCol.ExternalMetadataColumnID = 0;
myCol.ErrorRowDisposition = DTSRowDisposition.RD_FailComponent;
myCol.TruncationRowDisposition = DTSRowDisposition.RD_FailComponent;
IDTSCustomProperty100 myProp = myCol.CustomPropertyCollection.New();
myProp.Name = "Expression";
myProp.Value = "10";
myProp = myCol.CustomPropertyCollection.New();
myProp.Name = "FriendlyExpression";
myProp.Value = "10";
14. Setting some properties for the derived column component.
Setting some properties for the derived column component.
//Create the input columns for the transformation component
IDTSInput100 input = derived.InputCollection[0];
IDTSVirtualInput100 derivedInputVirtual = input.GetVirtualInput();
input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed;
input.ErrorOrTruncationOperation = "";
DesignDerivedColumns.ReleaseConnections();
15.Map the fields in the destination component.
this code will map the columns in the mapping tab of the destination component. For simplicity i used the same name for the derived column and the database. Therefore it's not needed to manual mapping between the columns and the columns in the table.
// Get the destination's default input and virtual input.
IDTSInput100 destinationinput = destination.InputCollection[0];
int destinationInputID = input.ID;
IDTSVirtualInput100 vdestinationinput = destinationinput.GetVirtualInput();
// Iterate through the virtual input column collection.
foreach (IDTSVirtualInputColumn100 vColumn in
vdestinationinput.VirtualInputColumnCollection)
{
IDTSInputColumn100 vCol = destDesignTime.SetUsageType(destinationinput.ID,
vdestinationinput, vColumn.LineageID, DTSUsageType.UT_READWRITE);
destDesignTime.MapInputColumn(destinationinput.ID, vCol.ID,
destinationinput.ExternalMetadataColumnCollection[vColumn.Name].ID);
}
16. Write the package to a file ( you can also write this to SQL Server).
This is described in my former
post.
app.SaveToXml(String.Format(@"E:\\SSISProgram\\{0}.dtsx", pkg.Name), pkg, null);
17. Dispose the objects.
This is described in my former
post.
.
pkg.Dispose();
app = null;
source = null;
destination = null;
srcDesignTime = null;
destDesignTime = null;
Conclusion
There is a lot of programming needed for implementing a derived column in a dataflow task. The lack of good documentation and tutorials for building packages in C# doesn't help a lot. So far i haven't found a good blog or site explaining this. By learning from snippets, MSDN and trying and trying it will succeed. The error codes (HRESULT) that are generated by the compiler are really terrible and gives very little debug information.
Greetz,
ps. the complete code:
// Create an application
DTS.Application app = new DTS.Application();
// Create a package
DTS.Package pkg = new DTS.Package();
//Setting some properties
pkg.Name = @"MyProgrammedDataflowTaskWithDerivedColumn";
//Adding a connection to the database AdventureWorksLT2008R2
DTS.ConnectionManager ConnMgrAdvent = pkg.Connections.Add("OLEDB");
ConnMgrAdvent.ConnectionString = @"Provider=SQLOLEDB.1;" +
"Integrated Security=SSPI;Initial Catalog=AdventureWorks2008R2;" +
"Data Source=(local);";
ConnMgrAdvent.Name = @"AdventureWorks2008R2";
ConnMgrAdvent.Description = @"SSIS Connection Manager for OLEDB Source AdventureWorks2008R2";
//Adding a connection to the database Import_DB
DTS.ConnectionManager ConnMgrImport_DB = pkg.Connections.Add("OLEDB");
ConnMgrImport_DB.ConnectionString = @"Provider=SQLOLEDB.1;" +
"Integrated Security=SSPI;Initial Catalog=Import_DB;" +
"Data Source=(local);";
ConnMgrImport_DB.Name = @"Import_DB";
ConnMgrImport_DB.Description = @"SSIS Connection Manager for OLEDB Source ImportDB";
//Adding the dataflow task to the package
DTS.Executable exe = pkg.Executables.Add("STOCK:PipelineTask");
DTS.TaskHost TKHSQLHost = (DTS.TaskHost)exe;
TKHSQLHost.Name = "This is a programmed DataFlowTask";
MainPipe dataFlowTask = (MainPipe)TKHSQLHost.InnerObject;
// Create the source component.
IDTSComponentMetaData100 source = dataFlowTask.ComponentMetaDataCollection.New();
source.ComponentClassID = "DTSAdapter.OleDbSource";
CManagedComponentWrapper srcDesignTime = source.Instantiate();
srcDesignTime.ProvideComponentProperties();
// Assign the connection manager.
if (source.RuntimeConnectionCollection.Count > 0)
{
source.RuntimeConnectionCollection[0].ConnectionManager = DTS.DtsConvert.GetExtendedInterface(ConnMgrAdvent);
source.RuntimeConnectionCollection[0].ConnectionManagerID = pkg.Connections["AdventureWorks2008R2"].ID;
}
// Set the custom properties of the source.
srcDesignTime.SetComponentProperty("AccessMode", 0);
srcDesignTime.SetComponentProperty("OpenRowset", "[Production].[Product]");
// Connect to the data source, and then update the metadata for the source.
srcDesignTime.AcquireConnections(null);
srcDesignTime.ReinitializeMetaData();
srcDesignTime.ReleaseConnections();
// Create the destination component.
IDTSComponentMetaData100 destination = dataFlowTask.ComponentMetaDataCollection.New();
destination.ComponentClassID = "DTSAdapter.OleDbDestination";
CManagedComponentWrapper destDesignTime = destination.Instantiate();
destDesignTime.ProvideComponentProperties();
// Assign the connection manager.
destination.RuntimeConnectionCollection[0].ConnectionManager = DTS.DtsConvert.GetExtendedInterface(ConnMgrImport_DB);
if (destination.RuntimeConnectionCollection.Count > 0)
{
destination.RuntimeConnectionCollection[0].ConnectionManager = DTS.DtsConvert.GetExtendedInterface(ConnMgrImport_DB);
destination.RuntimeConnectionCollection[0].ConnectionManagerID = pkg.Connections["Import_DB"].ID;
}
// Set the custom properties of the destination
destDesignTime.SetComponentProperty("AccessMode", 0);
destDesignTime.SetComponentProperty("OpenRowset", "[dbo].[ProductWithDerivedColumn]");
// Connect to the data source, and then update the metadata for the source.
destDesignTime.AcquireConnections(null);
destDesignTime.ReinitializeMetaData();
destDesignTime.ReleaseConnections();
//Derived Column
IDTSComponentMetaData100 derived = dataFlowTask.ComponentMetaDataCollection.New();
derived.Name = "Derived Column Component";
derived.ComponentClassID = "DTSTransform.DerivedColumn";
CManagedComponentWrapper DesignDerivedColumns = derived.Instantiate();
DesignDerivedColumns.ProvideComponentProperties(); //design time
derived.InputCollection[0].ExternalMetadataColumnCollection.IsUsed = false;
derived.InputCollection[0].HasSideEffects = false;
//update the metadata for the derived columns
DesignDerivedColumns.AcquireConnections(null);
DesignDerivedColumns.ReinitializeMetaData();
DesignDerivedColumns.ReleaseConnections();
//Create the path from source to derived columns
IDTSPath100 SourceToDerivedPath = dataFlowTask.PathCollection.New();
SourceToDerivedPath.AttachPathAndPropagateNotifications(source.OutputCollection[0], derived.InputCollection[0]);
//Create the path from derived to desitination
IDTSPath100 DerivedToDestinationPath = dataFlowTask.PathCollection.New();
DerivedToDestinationPath.AttachPathAndPropagateNotifications(derived.OutputCollection[0], destination.InputCollection[0]);
//derived.OutputCollection[0].TruncationRowDisposition = DTSRowDisposition.RD_NotUsed;
//derived.OutputCollection[0].ErrorRowDisposition = DTSRowDisposition.RD_NotUsed;
IDTSOutputColumn100 myCol = derived.OutputCollection[0].OutputColumnCollection.New();
myCol.Name = "DerivedColumn";
myCol.SetDataTypeProperties(Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_I4, 0, 0, 0, 0);
myCol.ExternalMetadataColumnID = 0;
myCol.ErrorRowDisposition = DTSRowDisposition.RD_FailComponent;
myCol.TruncationRowDisposition = DTSRowDisposition.RD_FailComponent;
IDTSCustomProperty100 myProp = myCol.CustomPropertyCollection.New();
myProp.Name = "Expression";
myProp.Value = "10";
myProp = myCol.CustomPropertyCollection.New();
myProp.Name = "FriendlyExpression";
myProp.Value = "10";
//Get the output collection
//IDTSOutput100 outputd = derived.OutputCollection[0];
//Create the input columns for the transformation component
IDTSInput100 input = derived.InputCollection[0];
IDTSVirtualInput100 derivedInputVirtual = input.GetVirtualInput();
input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed;
input.ErrorOrTruncationOperation = "";
DesignDerivedColumns.ReleaseConnections();
// Get the destination's default input and virtual input.
IDTSInput100 destinationinput = destination.InputCollection[0];
int destinationInputID = input.ID;
IDTSVirtualInput100 vdestinationinput = destinationinput.GetVirtualInput();
// Iterate through the virtual input column collection.
foreach (IDTSVirtualInputColumn100 vColumn in vdestinationinput.VirtualInputColumnCollection)
{
IDTSInputColumn100 vCol = destDesignTime.SetUsageType(destinationinput.ID, vdestinationinput, vColumn.LineageID, DTSUsageType.UT_READWRITE);
destDesignTime.MapInputColumn(destinationinput.ID, vCol.ID, destinationinput.ExternalMetadataColumnCollection[vColumn.Name].ID);
}
// Verify that the columns have been added to the input.
foreach (IDTSInputColumn100 inputColumn in destination.InputCollection[0].InputColumnCollection)
Console.WriteLine(inputColumn.Name);
Console.Read();
app.SaveToXml(String.Format(@"E:\\SSISProgram\\{0}.dtsx", pkg.Name), pkg, null);
Console.WriteLine("Package {0} created", pkg.Name);
pkg.Dispose();
app = null;
source = null;
destination = null;
srcDesignTime = null;
destDesignTime = null;