Temporary data in Workflows
To improve performance and to be able to inspect results in intermediate steps, Workflows makes use of temporary data objects that are stored, by default, under a so called
workflows_temp
schema/dataset in your data warehouse.Note that at the time of creating the connection to your data warehouse, you can also provide a custom location for storing the temporary data generated by Workflows.
This system of intermediate tables functions as a cache, avoiding re-executing all the steps in a workflow when they have not been modified; thus, executing again only those that have been changed or their inputs have changed when you are re-running your workflows.
In order for CARTO to be able to execute workflows in your data warehouse and temporarily store the data generated in intermediate steps of your analysis, there is a specific set of permissions required on the connection selected when creating your workflow.
Since CARTO Workflows can be used with different providers (Google BigQuery, Snowflake, Redshift, and PostgreSQL), adapting the different terminology for each data warehouse, the recommended setup is:
Google BigQuery
Snowflake
Amazon Redshift
PostgreSQL
The service account or user account used for creating the connection must have the following required roles:
- BigQuery Data Editor
- BigQuery Job User
GRANT USAGE ON DATABASE DATABASE_NAME TO ROLE ROLE_NAME;
GRANT CREATE SCHEMA ON DATABASE DATABASE_NAME TO ROLE ROLE_NAME;
-- If workflows_temp schema was created by another user
GRANT ALL PRIVILEGES ON SCHEMA DATABASE_NAME.WORKFLOWS_TEMP TO ROLE ROLE_NAME;
GRANT CREATE ON DATABASE database_name TO user_name;
-- If the workflows_temp schema was created by another user
GRANT ALL ON SCHEMA workflows_temp TO user_name;
GRANT CREATE ON DATABASE database_name TO user_name;
-- If the schema was created by another user
GRANT ALL ON SCHEMA workflows_temp TO user_name;
When your workflow runs in BigQuery (or CARTO Data Warehouse), these tables are temporary and will be automatically removed after 30 days. However, in the rest of the data warehouse platforms, we currently need a specific configuration on the user's side in order to clean up the intermediate tables generated by CARTO Workflows.
Below, you can find code and instructions for automating the process of cleaning up the intermediate tables in each data warehouse:
Snowflake
Redshift
PostgreSQL
The code shown below will add a task to regularly remove all tables in the
$database$
older than 30 days (please replace this placeholder with the name of the database where you are running CARTO Workflows everywhere where it appears). You will also have to replace the $warehouse$
placeholder with the name of your warehouse.Note that
EXECUTE IMMEDIATE $$ ... $$
is only needed if you use Snowflake's Classic Console and you can omit it on the newer Snowsight interface.The code uses Snowflake tasks to execute the clean-up periodically (every day at 0h UTC). There are some useful examples here.
EXECUTE IMMEDIATE
$$
CREATE TASK IF NOT EXISTS $database$.WORKFLOWS_TEMP.CLEAR_CACHE
SCHEDULE = 'USING CRON 0 0 * * * UTC'
WAREHOUSE = $warehouse$
AS
DECLARE
statements STRING;
BEGIN
LET res resultset := (EXECUTE IMMEDIATE '
SELECT
LISTAGG(''DROP TABLE ''|| TABLE_CATALOG || ''.'' || TABLE_SCHEMA || ''.'' || TABLE_NAME || '';'')
FROM
$database$.INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = ''WORKFLOWS_TEMP''
AND CREATED < DATEADD(''day'', -30, CURRENT_TIMESTAMP)
');
LET cur_res CURSOR FOR res;
OPEN cur_res;
FETCH cur_res INTO statements;
EXECUTE IMMEDIATE 'BEGIN\n' || :statements || '\nEND';
END;
$$
In order to remove the intermediate tables, you should create a scheduled query in Redshift to remove datasets older than 30 days.
Scheduled queries are only available for provisioned Redshift clusters. If you are using Redshift Serverless you'll have to execute manually the clean up, e.g. calling the procedure defined below, or find other external means of executing it periodically.
The following procedure should be created in your Redshift cluster to delete tables older than 30 days (you will have to replace all the occurrences of the
$database$
placeholder with the name of your database):CREATE OR REPLACE PROCEDURE $database$.workflows_temp._clear_cache_fn() as
$$
DECLARE
statement RECORD;
query VARCHAR(MAX);
BEGIN
query := 'SELECT
\'DROP TABLE $database$.workflows_temp.\' || relname || \';\' as statement
FROM
pg_class_info
LEFT JOIN
pg_namespace
ON
pg_class_info.relnamespace = pg_namespace.oid
WHERE
reltype != 0
AND TRIM(nspname) = \'workflows_temp\'
AND datediff(day, relcreationtime, current_date) > 30
';
FOR statement IN EXECUTE query
LOOP
EXECUTE statement.statement;
END LOOP;
END;
$$ LANGUAGE plpgsql;
After that, you should define a Redshift scheduled query with the following
CALL
to execute the previous procedure once per day:CALL $database$.workflows_temp._clear_cache_fn();
To remove the intermediate tables in PostgreSQL you need to schedule a daily job using the pg_cron extension to execute the following code periodically.
Here are some resources to learn how to use pg_cron on different managed services based on PostgreSQL:
d) Azure Databases for PostgreSQL supports pg_cron since version 11 and provides some usage examples.
The code shown below must to be scheduled in a job, replacing all the occurrences of the
$database$
placeholder with the name of your database:CREATE OR REPLACE PROCEDURE $database$.workflows_temp._clear_cache_fn()
LANGUAGE plpgsql
AS
$$
DECLARE
drops TEXT;
BEGIN
SELECT
STRING_AGG(
'DROP TABLE IF EXISTS ' || tablename::text || ';'
'DELETE FROM $database$.workflows_temp.table_creation_time WHERE tablename = ''' || tablename::text || '''',
';'
) drops
INTO
drops
FROM
$database$.workflows_temp.table_creation_time
WHERE
tablename LIKE '%workflows_temp.workflow\_%' AND
EXTRACT(epoch FROM current_timestamp - "timestamp")/86400 > 30;
IF drops IS NOT NULL THEN
EXECUTE drops;
END IF;
END;
$$;
Before scheduling a task to execute the procedure, you'll need to have enabled and installed pg_cron on your cluster. After having followed the instructions for your database provider, and if you have not done so as part of those instructions, you'll need to execute
CREATE EXTENSION pg_cron;
in the postgres
database of your cluster. This requires superuser privileges, so you may need to request this to your database administrator.For regular users to be able to schedule jobs, superuser will need to do this also in the
postgres
database(replacing first the $user$
placeholder):GRANT USAGE ON SCHEMA cron TO $user$;
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA cron TO $user$;
After that, you should set up a scheduled task that runs it once per day using the pg_cron extension and the following statement in the
postgres
database of your cluster:SELECT cron.schedule_in_database('workflows-temp-cleanup', '0 0 * * *', $$CALL $database$.workflows_temp._clear_cache_fn()$$, '$database$');
To remove the schedule, this can be executed in the
postgres
database:SELECT cron.unschedule('workflows-temp-cleanup');
If a temporary table has been generated as the output of one of its components, it will not be created again when the workflow is re-run later, unless the structure of the workflow has changed or the parameters used to configure the workflow have changed. That is, when the workflow itself causes the table to be different, and updating it is needed.
However, if one of the source tables used in the workflow (tables not generated by the workflows, but already existing in the database) changes and you re-run the workflow, intermediate tables might not be correctly updated.
If your workflow uses a BigQuery or Snowflake connection, the workflow will detect those changes and update all intermediate tables accordingly. For Redshift and PostgreSQL, however, such a mechanism is not implemented. If you change your source tables, you will have to manually drop the intermediate tables before re-running your workflow. Otherwise, those intermediate tables will not be updated.
Last modified 1mo ago