{# Copyright (c) 2023 Coalesce. All rights reserved. This script and its associated documentation are confidential and proprietary to Coalesce. Unauthorized reproduction, distribution, or disclosure of this material is strictly prohibited. Coalesce permits you to copy and modify this script for the purposes of using with Coalsce but does not permit copying or modification for any other purpose. #} {# == Node Type Version : 1 == #} {# == Node Type Name : Dynamic Table Stage == #} {# == Node Type Description : This node creates and runs dynamic table stage == #} {% if desiredState == currentState %} {{ stage('Nothing to do.') }} select 1 = 0 {% elif desiredState %} ## Identify all config changes that would cause a CREATE instead of ALTER {% if currentState != undefined %} ## General metadata {% set columnsTest = currentState.columns == desiredState.columns %} {% set sourcesTest = currentState.sources | count == desiredState.sources | count %} {% set joinTest = currentState.join == desiredState.join %} {# Test to see if the transform in a column has changed #} {# Desired Namespace Variables Transform #} {% set desiredTransformArray = desiredState.sources | map(attribute='columns') | first | map(attribute='transform') | list -%} {# Test to see if the transform in a column has changed #} {# Current Namespace Variables Transform #} {% set currentTransformArray = currentState.sources | map(attribute='columns') | first | map(attribute='transform') | list -%} {% set columnsTransformTest = currentTransformArray == desiredTransformArray %} ## Storage Location Tests ## Only need to account for updated Storage Location mappings ## sourceTest handles changed Storage Location Names ## Current Namespace Variables {% set nsVariables = namespace(nsCurrentDepStorageLocations="") %} {% set nsVariables = namespace(nsCurrentSourceStorageLocations="") %} {% set nsVariables = namespace(nsCurrentTargetStorageLocations="") %} {% set nsVariables = namespace(nsCurrentUsedStorageLocations="") %} ## Current State Storage Location Info {% for source in currentState.sources %} {% for dep in source.dependencies %} {% set depSourceLocation = dep.node.location.name %} {% if loop.first %} {% set nsVariables.nsCurrentDepStorageLocations = depSourceLocation %} {% else %} {% set nsVariables.nsCurrentDepStorageLocations = nsVariables.nsCurrentDepStorageLocations + ',' + depSourceLocation %} {% endif %} {% endfor %} {% if loop.first %} {% set nsVariables.nsCurrentSourceStorageLocations = nsVariables.nsCurrentDepStorageLocations %} {% else %} {% set nsVariables.nsCurrentSourceStorageLocations = nsVariables.nsCurrentSourceStorageLocations + ',' + nsVariables.nsCurrentDepStorageLocations %} {% endif %} {% endfor %} {% set nsVariables.nsCurrentTargetStorageLocations = currentState.node.location.name %} {% set nsVariables.nsCurrentUsedStorageLocations = nsVariables.nsCurrentSourceStorageLocations + ',' + nsVariables.nsCurrentTargetStorageLocations %} {% set currentStorageLocations = (nsVariables.nsCurrentUsedStorageLocations).split(',') | list %} {% set currentUniqueStorageLocations = currentStorageLocations | unique | sort | list %} ## Desired Namespace Variables {% set nsVariables = namespace(nsDesiredDepStorageLocations="") %} {% set nsVariables = namespace(nsDesiredSourceStorageLocations="") %} {% set nsVariables = namespace(nsDesiredTargetStorageLocations="") %} {% set nsVariables = namespace(nsDesiredUsedStorageLocations="") %} {% set nsVariables = namespace(storageLocationTest="") %} ## Desired State Storage Location Info {% for source in desiredState.sources %} {% for dep in source.dependencies %} {% set depSourceLocation = dep.node.location.name %} {% if loop.first %} {% set nsVariables.nsDesiredDepStorageLocations = depSourceLocation %} {% else %} {% set nsVariables.nnsDesiredDepStorageLocations = nsVariables.nsDesiredDepStorageLocations + ',' + depSourceLocation %} {% endif %} {% endfor %} {% if loop.first %} {% set nsVariables.nsDesiredSourceStorageLocations = nsVariables.nsDesiredDepStorageLocations %} {% else %} {% set nsVariables.nsDesiredSourceStorageLocations = nsVariables.nsDesiredSourceStorageLocations + ',' + nsVariables.nsDesiredDepStorageLocations %} {% endif %} {% endfor %} {% set nsVariables.nsDesiredTargetStorageLocations = desiredState.node.location.name %} {% set nsVariables.nsDesiredUsedStorageLocations = nsVariables.nsDesiredSourceStorageLocations + ',' + nsVariables.nsDesiredTargetStorageLocations %} {% set desiredStorageLocations = (nsVariables.nsDesiredUsedStorageLocations).split(',') | list %} {% set desiredUniqueStorageLocations = desiredStorageLocations | unique | sort | list %} ## Storage Location Tests ## Only need to account for updated Storage Location mappings ## sourceTest handles changed Storage Location Names {% set nsVariables.storageLocationTest = true %} {% for name in desiredUniqueStorageLocations if nsVariables.storageLocationTest == true %} ## Current mappings {% set currentDatabase = currentState.storageLocations | selectattr('name', 'equalto', name) | map(attribute='database') | first %} {% set currentSchema = currentState.storageLocations | selectattr('name', 'equalto', name) | map(attribute='schema') | first %} {% set currentDatabaseSchema = currentDatabase + '.' + currentSchema %} ## Desired mappings {% set desiredDatabase = desiredState.storageLocations | selectattr('name', 'equalto', name) | map(attribute='database') | first %} {% set desiredSchema = desiredState.storageLocations | selectattr('name', 'equalto', name) | map(attribute='schema') | first %} {% set desiredDatabaseSchema = desiredDatabase + '.' + desiredSchema %} {% if currentDatabaseSchema != desiredDatabaseSchema %} {% set nsVariables.storageLocationTest = false %} {% endif %} {% endfor %} ## Config {% set insertStrategyTest = currentState.config.insertStrategy == desiredState.config.insertStrategy %} {% set groupByAllTest = currentState.config.groupByAll == desiredState.config.groupByAll %} {% set selectDistinctTest = currentState.config.selectDistinct == desiredState.config.selectDistinct %} ## Node {% set nodeNameTest = currentState.node.name == desiredState.node.name %} {% set nodeMaterializationType = currentState.node.materializationType == desiredState.node.materializationType %} {% set nodeIsMultisource = currentState.node.isMultisource == desiredState.node.isMultisource %} ## Refresh_mode test {% set refreshTest = currentState.config.refresh_mode == desiredState.config.refresh_mode %} ## Initialize test {% set initializeTest = currentState.config.initialize == desiredState.config.initialize %} ## If any of the above are false then a CREATE must be run {% if columnsTest == false or sourcesTest == false or joinTest == false or columnsTransformTest == false or nsVariables.storageLocationTest == false or insertStrategyTest == false or groupByAllTest == false or selectDistinctTest == false or nodeNameTest == false or nodeMaterializationType == false or nodeIsMultisource == false or initializeTest == false or refreshTest == false %} {% set createTest = true %} {% else %} {% set createTest = false %} {% endif %} {% endif %} ## Identify config changes that would only result in ALTER {% if createTest == false %} {% if desiredState.parameters.targetDynamicTableWarehouse == 'DEV ENVIRONMENT' %} {% set warehouseTest = currentState.config.warehouseName == desiredState.config.warehouseName %} {% else %} {% set warehouseTest = currentState.parameters.targetDynamicTableWarehouse == desiredState.parameters.targetDynamicTableWarehouse %} {% endif %} {% set lagSpecificationTest = currentState.config.lagSpecification == desiredState.config.lagSpecification %} {% set downstreamOptionTest = currentState.config.downstreamOption == desiredState.config.downstreamOption %} {% set nodeCommentTest = currentState.node.description == desiredState.node.description %} {% if warehouseTest == false or lagSpecificationTest == false or downstreamOptionTest == false or nodeCommentTest == false %} {% set alterOnlyTest = true %} {% else %} {% set alterOnlyTest = false %} {% endif %} {% endif %} ## CREATE or ALTER {% if (currentState == undefined) or (createTest == true) %} ## Dynamic Table Name {% set targetDynamicTableDatabase = ref_no_link(desiredState.node.location.name, desiredState.node.name).split('.')[0] %} {% set targetDynamicTableSchema = ref_no_link(desiredState.node.location.name, desiredState.node.name).split('.')[1] %} {% set fullyQualifiedTargetDynamicTableName = ref_no_link(desiredState.node.location.name, desiredState.node.name) %} ## Warehouse ## Can be updated during deployment via a parameter to account for different warehouse names in different deployments {% if desiredState.parameters.targetDynamicTableWarehouse == 'DEV ENVIRONMENT' %} {% set dynamicTableWarehouse = desiredState.config.warehouseName %} {% else %} {% set dynamicTableWarehouse = desiredState.parameters.targetDynamicTableWarehouse %} {% endif %} ## Downstream Option or Lag Specification {% if desiredState.config.downstreamOption == true %} {% set dynamicTableLagSpecification = 'DOWNSTREAM' %} {% else %} {% set dynamicTableLagValue = desiredState.config.lagSpecification.get('items') | map(attribute='lagValue') | first %} {% set dynamicTableLagValuePeriod = desiredState.config.lagSpecification.get('items') | map(attribute='lagPeriod') | first %} {% set dynamicTableLagSpecification = dynamicTableLagValue ~ ' ' ~ dynamicTableLagValuePeriod %} {% endif %} ## Refresh-type option {% set dynamicTablerefresh = desiredState.config.refresh_mode %} ##Initialize option {% set dynamicTableinitialize = desiredState.config.initialize %} ## Node description {%- if desiredState.node.description | length > 0 %} {% set dynamicTableComment = "COMMENT = " + "'" + desiredState.node.description + "'" %} {% endif %} {{ stage('Create Dynamic Work Table', true, "sql", "create") }} CREATE OR REPLACE DYNAMIC TABLE {{ fullyQualifiedTargetDynamicTableName }} TARGET_LAG = '{{ dynamicTableLagSpecification }}' WAREHOUSE = {{dynamicTableWarehouse}} REFRESH_MODE = {{dynamicTablerefresh}} INITIALIZE = {{dynamicTableinitialize}} {{ dynamicTableComment }} ( {% for col in desiredState.columns %} "{{ col.name }}" {%- if col.description | length > 0 %} COMMENT '{{ col.description | escape }}'{% endif %} {%- if not loop.last -%}, {% endif %} {% endfor %} ) AS {% for source in desiredState.sources %} SELECT {% if desiredState.config.selectDistinct %} DISTINCT {% endif %} {% for col in source.columns %} {{ get_source_transform(col) }} AS "{{ col.name }}" {%- if not loop.last -%}, {% endif %} {% endfor %} {{ source.join }} {% if not loop.last %} {% if desiredState.config.insertStrategy in ['UNION', 'UNION ALL'] %} {{ desiredState.config.insertStrategy }} {% endif %} {% endif %} {% endfor %} {% if desiredState.config.groupByAll %} GROUP BY ALL {% endif %} {% elif (currentState != undefined) and (alterOnlyTest == true) %} ## Desired Dynamic Table Name {% set desiredTargetDynamicTableDatabase = ref_no_link(desiredState.node.location.name, desiredState.node.name).split('.')[0] %} {% set desiredTargetDynamicTableSchema = ref_no_link(desiredState.node.location.name, desiredState.node.name).split('.')[1] %} {% set desiredFullyQualifiedTargetDynamicTableName = ref_no_link(desiredState.node.location.name, desiredState.node.name) %} ## Desired Warehouse {% if desiredState.parameters.targetDynamicTableWarehouse == 'DEV ENVIRONMENT' %} {% set desiredDynamicTableWarehouse = desiredState.config.warehouseName %} {% else %} {% set desiredDynamicTableWarehouse = desiredState.parameters.targetDynamicTableWarehouse %} {% endif %} ## Desired Downstream Option or Lag Specification {% if desiredState.config.downstreamOption == true %} {% set desiredDynamicTableLagSpecification = 'DOWNSTREAM' %} {% else %} {% set desiredDynamicTableLagValue = desiredState.config.lagSpecification.get('items') | map(attribute='lagValue') | first %} {% set desiredDynamicTableLagValuePeriod = desiredState.config.lagSpecification.get('items') | map(attribute='lagPeriod') | first %} {% set desiredDynamicTableLagSpecification = desiredDynamicTableLagValue ~ ' ' ~ desiredDynamicTableLagValuePeriod %} {% endif %} ## Current Warehouse {% set currentDynamicTableWarehouse = currentState.config.warehouseName %} ## Current Downstream Option or Lag Specification {% if currentState.config.downstreamOption == true %} {% set currentDynamicTableLagSpecification = 'DOWNSTREAM' %} {% else %} {% set currentDynamicTableLagValue = currentState.config.lagSpecification.get('items') | map(attribute='lagValue') | first %} {% set currentDynamicTableLagValuePeriod = currentState.config.lagSpecification.get('items') | map(attribute='lagPeriod') | first %} {% set currentDynamicTableLagSpecification = currentDynamicTableLagValue ~ ' ' ~ currentDynamicTableLagValuePeriod %} {% endif %} ## ALTER for Warehouse if necessary {% set dynamicTableWarehouse = '' %} {% if desiredDynamicTableWarehouse != currentDynamicTableWarehouse %} {% set dynamicTableWarehouse = 'WAREHOUSE = ' + desiredDynamicTableWarehouse %} {% endif %} ## Alter for Target lag if necessary {% set dynamicTableLagSpecification = '' %} {% if desiredDynamicTableLagSpecification != currentDynamicTableLagSpecification %} {% if desiredDynamicTableLagSpecification == 'DOWNSTREAM' %} {% set dynamicTableLagSpecification = 'TARGET_LAG = DOWNSTREAM' %} {% else %} {% set dynamicTableLagSpecification = 'TARGET_LAG = ' + '\'' + desiredDynamicTableLagSpecification + '\'' %} {% endif %} {% endif %} {%- if desiredState.node.description | length > 0 %} {% set dynamicTableComment = "COMMENT = " + "'" + desiredState.node.description + "'" %} {% endif %} {{ stage('Alter Dynamic Table', true, "sql", "create") }} ALTER DYNAMIC TABLE {{ desiredFullyQualifiedTargetDynamicTableName }} SET {{ dynamicTableWarehouse }} {{ dynamicTableLagSpecification }} {{ dynamicTableComment }} {{ stage('Refresh Dynamic Table', true, "sql", "create") }} ALTER DYNAMIC TABLE {{ desiredFullyQualifiedTargetDynamicTableName }} REFRESH {% else %} {{ stage('Nothing to do.') }} select 1 = 0 {% endif %} {% elif currentState != undefined and desiredState == undefined %} ## Dynamic Table Name {% set targetDynamicTableDatabase = ref_no_link(currentState.node.location.name, currentState.node.name).split('.')[0] %} {% set targetDynamicTableSchema = ref_no_link(currentState.node.location.name, currentState.node.name).split('.')[1] %} {% set fullyQualifiedTargetDynamicTableName = ref_no_link(currentState.node.location.name, currentState.node.name) %} {{ stage('Drop Dynamic Table', true, "sql", "drop") }} DROP DYNAMIC TABLE IF EXISTS {{ fullyQualifiedTargetDynamicTableName }} {%- else -%} {{ stage('Nothing to do.') }} select 1 = 0 {% endif %}