{% set source = sources[0] %} {% set src_node = source.dependencies[0].node %} {% set forecast_name = 'FORECAST_' ~ src_node.name %} {{ stage('Truncate Forecast table') }} TRUNCATE IF EXISTS {{ this }} {{ stage('Populate Forecast Table with Historical Data') }} INSERT INTO {{ this }} ( {%- for col in source.columns %} "{{ col.name }}" {%- if not loop.last -%}, {% endif %} {%- endfor %} ) SELECT {% for col in source.columns %} {{ get_source_transform(col) }} AS "{{ col.name }}" {%- if not loop.last -%}, {% endif %} {% endfor %} {{ source.join }} {{ stage('Create Forecast Model Instance') }} CREATE OR REPLACE SNOWFLAKE.ML.FORECAST {{ ref_no_link(node.location.name, forecast_name) }}( INPUT_DATA => SYSTEM$QUERY_REFERENCE(' SELECT {%- for col in columns if not (col.forecast or col.upper_bound or col.lower_bound) %} "{{col.name}}"{%- if not loop.last %}, {%- endif %} {%- endfor %} FROM {{ this }} WHERE "{{ config.tgtcol.name }}" IS NOT NULL '), {% if config.multiseries %}SERIES_COLNAME => '{{ config.seriescol.name }}', {% endif %} TIMESTAMP_COLNAME => '{{ config.tscol.name }}', TARGET_COLNAME => '{{ config.tgtcol.name }}' ) {{ stage('Insert Forecast data') }} BEGIN {% if config.exvar %} CALL {{ ref_no_link(node.location.name, forecast_name) }}!FORECAST( INPUT_DATA => SYSTEM$QUERY_REFERENCE(' SELECT {%- for col in columns if not (col.forecast or col.upper_bound or col.lower_bound) %} "{{col.name}}"{%- if not loop.last %}, {%- endif %} {%- endfor %} FROM {{ this }} WHERE "{{ config.tgtcol.name }}" IS NULL '), {%if config.multiseries %}SERIES_COLNAME => '{{ config.seriescol.name }}', {% endif %} TIMESTAMP_COLNAME => '{{ config.tscol.name }}' ); UPDATE {{ this }} SRC SET {%- for col in columns if col.forecast or col.lower_bound or col.upper_bound %} "{{ col.name }}" = RES."{{ col.name }}" {%- if not loop.last %}, {%- endif %} {%- endfor %} FROM TABLE(RESULT_SCAN(-1)) RES WHERE RES."TS" = SRC."{{ config.tscol.name }}" {%- if config.multiseries %} AND RES."SERIES" = SRC."{{ config.seriescol.name }}" {%- endif %}; {% else %} CALL {{ ref_no_link(node.location.name, forecast_name) }}!FORECAST({{ config.fcdays }}); INSERT INTO {{ this }} ( {%- for col in source.columns %} "{{ col.name }}" {%- if not loop.last -%}, {% endif %} {%- endfor %} ) SELECT {% for col in source.columns %} {%- if config.multiseries and col.id == config.seriescol.id %}"SERIES" {%- elif col.id == config.tscol.id %}"TS" {%- elif col.id == config.tgtcol.id %}NULL {%- elif col.forecast or col.lower_bound or col.upper_bound %}"{{ col.name }}" {%- else %}{{ get_source_transform(col) }} AS "{{ col.name }}" {%- endif %}{%- if not loop.last %}, {%- endif %} {% endfor %} FROM TABLE(RESULT_SCAN(-1)) RES; {% endif %} END