This uses a relatively standard process of a query to look up all the tables in a database, iterating over them, and passing the tables via parameters to a copy operation. The primary benefit of this setup is it allows you to use dynamic partitioning where possible and uses very high concurrency.

This makes the assumption you have your linked services and self-hosted integration runtime already set up for use.

Pipeline 1: Get Tables

Our first pipeline gathers all of the tables and their primary keys and sends that output as an array to our pipeline

Our first pipeline layout
Our first pipeline layout

For our Lookup activity the most important part is the query which is the most complex part of the whole process

Lookup activity settings
Lookup activity settings

The query we are using is a union query that does a group by to filter out duplicate tables. Any suggestions for improving this query would be welcome!

SELECT min(table_list.TABLE_SCHEMA) as TABLE_SCHEMA, table_list.TABLE_NAME, min(table_list.PRIMARY_KEY) as PRIMARY_KEY, min(table_list.TYPE) AS [TYPE] FROM
(SELECT DB_NAME() AS Database_Name
,sc.name AS 'TABLE_SCHEMA'
,o.Name AS 'TABLE_NAME'
,c.Name AS 'PRIMARY_KEY'
,t.name as 'TYPE'
FROM sys.indexes i
INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
INNER JOIN sys.columns c ON ic.object_id = c.object_id and ic.column_id = c.column_id
INNER JOIN sys.objects o ON i.object_id = o.object_id
INNER JOIN sys.schemas sc ON o.schema_id = sc.schema_id
INNER JOIN sys.types t on c.user_type_id = t.user_type_id
WHERE i.is_primary_key = 1
UNION
SELECT DB_NAME() AS Database_Name
   ,sc.name AS 'TABLE_SCHEMA'
   ,t.name as 'TABLE_NAME'
   ,'PRIMARY_KEY' = 'Z_NONE'
   ,'TYPE' = 'z_no_primary_key'
FROM
    sys.tables t
    INNER JOIN sys.schemas sc ON t.schema_id = sc.schema_id
WHERE t.is_ms_shipped = 0) AS table_list
GROUP BY table_list.TABLE_NAME

The only important settings for the TriggerCopy Execute Pipeline is the array parameter we are sending to the pipeline: @activity(‘LookupTableList’).output.value

Pipeline 2: Iterate Tables

This pipeline should be built with an array parameter as a first step

Array parameter tableList
Array parameter tableList

The pipeline has a ForEach activity that loops on the items of the tableList array.

Items setting for ForEach
Items setting for ForEach

Inside the ForEach activity, we use an If Condition activity to separate partitionable data from data that has an incompatible data type.

If Condition Activity
If Condition Activity

The expression for the activity is: @or(contains(item().TYPE, ‘int’), contains(item().TYPE, ‘date’)) Which will be true if the data type is an integer or date/datetime type.

True and False will call similar activities, both call a Pipeline, the difference between them is one uses dynamic partitioning and the other doesn’t. As such for the non-partitioned pipeline you don’t have to send the PRIMARY_KEY parameter. Make sure to uncheck Wait on completion or you will not get as much concurrency.

Calling the Copy Tables With Dynamic Partition pipeline
Calling the Copy Tables With Dynamic Partition pipeline

Pipeline 3/4: Table Copy Operation

For this we have two separate pipelines. The first uses dynamic partitioning. We need all three parameters. Also the Wait activity is to show we can have alerting/failure conditions where needed.

Parameters for the Dynamic Partition Copy
Parameters for the Dynamic Partition Copy

Settings for the Copy Activity query and Partition column name are what we need to set. Also make sure to select Dynamic Range.

Copy Activity settings
Copy Activity settings

The query needs to include ?AdfDynamicRangePartitionCondition or it will not work.

SELECT * FROM [@{pipeline().parameters.TABLE_SCHEMA}].[@{pipeline().parameters.TABLE_NAME}] WHERE ?AdfDynamicRangePartitionCondition

The only differences for the non-partitioned version is to remove the ?AdfDynamicRangePartitionCondition from the query and use Partition Option “none”.

Previous Post Next Post