This uses a relatively standard process of a query to look up all the tables in a database, iterating over them, and passing the tables via parameters to a copy operation. The primary benefit of this setup is it allows you to use dynamic partitioning where possible and uses very high concurrency.
This makes the assumption you have your linked services and self-hosted integration runtime already set up for use.
Our first pipeline gathers all of the tables and their primary keys and sends that output as an array to our pipeline
For our Lookup activity the most important part is the query which is the most complex part of the whole process
The query we are using is a union query that does a group by to filter out duplicate tables. Any suggestions for improving this query would be welcome!
SELECT min(table_list.TABLE_SCHEMA) as TABLE_SCHEMA, table_list.TABLE_NAME, min(table_list.PRIMARY_KEY) as PRIMARY_KEY, min(table_list.TYPE) AS [TYPE] FROM
(SELECT DB_NAME() AS Database_Name
,sc.name AS 'TABLE_SCHEMA'
,o.Name AS 'TABLE_NAME'
,c.Name AS 'PRIMARY_KEY'
,t.name as 'TYPE'
FROM sys.indexes i
INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
INNER JOIN sys.columns c ON ic.object_id = c.object_id and ic.column_id = c.column_id
INNER JOIN sys.objects o ON i.object_id = o.object_id
INNER JOIN sys.schemas sc ON o.schema_id = sc.schema_id
INNER JOIN sys.types t on c.user_type_id = t.user_type_id
WHERE i.is_primary_key = 1
UNION
SELECT DB_NAME() AS Database_Name
,sc.name AS 'TABLE_SCHEMA'
,t.name as 'TABLE_NAME'
,'PRIMARY_KEY' = 'Z_NONE'
,'TYPE' = 'z_no_primary_key'
FROM
sys.tables t
INNER JOIN sys.schemas sc ON t.schema_id = sc.schema_id
WHERE t.is_ms_shipped = 0) AS table_list
GROUP BY table_list.TABLE_NAME
The only important settings for the TriggerCopy Execute Pipeline is the array parameter we are sending to the pipeline: @activity(‘LookupTableList’).output.value
This pipeline should be built with an array parameter as a first step
The pipeline has a ForEach activity that loops on the items of the tableList array.
Inside the ForEach activity, we use an If Condition activity to separate partitionable data from data that has an incompatible data type.
The expression for the activity is: @or(contains(item().TYPE, ‘int’), contains(item().TYPE, ‘date’)) Which will be true if the data type is an integer or date/datetime type.
True and False will call similar activities, both call a Pipeline, the difference between them is one uses dynamic partitioning and the other doesn’t. As such for the non-partitioned pipeline you don’t have to send the PRIMARY_KEY parameter. Make sure to uncheck Wait on completion or you will not get as much concurrency.
For this we have two separate pipelines. The first uses dynamic partitioning. We need all three parameters. Also the Wait activity is to show we can have alerting/failure conditions where needed.
Settings for the Copy Activity query and Partition column name are what we need to set. Also make sure to select Dynamic Range.
The query needs to include ?AdfDynamicRangePartitionCondition or it will not work.
SELECT * FROM [@{pipeline().parameters.TABLE_SCHEMA}].[@{pipeline().parameters.TABLE_NAME}] WHERE ?AdfDynamicRangePartitionCondition
The only differences for the non-partitioned version is to remove the ?AdfDynamicRangePartitionCondition from the query and use Partition Option “none”.