Links

How to handle temporary tables

When executing a workflow, CARTO generates intermediate tables corresponding to the results at each one of the components in the workflow. This system of intermediate tables functions as a cache, avoiding re-executing all the steps in a workflow when they have not been modified; thus, executing again only those that have been changed or their inputs have changed when you are re-running your workflows.
When your workflow runs in BigQuery (or CARTO Data Warehouse), these tables are temporary and will be removed after 30 days, automatically. However, in the rest of the data warehouse platforms, we currently need a specific configuration on the user's side in order to clean up the intermediate tables generated by CARTO Workflows.
Below, you can find code and instructions for automating the process of cleaning up the intermediate tables in each data warehouse:
Snowflake
Redshift
PostgreSQL
The code shown below will add a task to regularly remove all tables in the $database$ older than 30 days (please replace this placeholder with the name of the database where you are running CARTO Workflows) . You will also have to replace the $warehouse$ placeholder with the name of your warehouse.
You should to use Snowflake tasks to execute it periodically. There are some useful examples here.
EXECUTE IMMEDIATE
$$
CREATE TASK IF NOT EXISTS $database$.WORKFLOWS_TEMP.CLEAR_CACHE
SCHEDULE = '30 days',
WAREHOUSE = $warehouse$
AS
DECLARE
statements STRING;
BEGIN
LET res resultset := (EXECUTE IMMEDIATE '
SELECT
LISTAGG(''DROP TABLE ''|| TABLE_CATALOG || ''.'' || TABLE_SCHEMA || ''.'' || TABLE_NAME || '';'')
FROM
$database$.INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = ''WORKFLOWS_TEMP''
AND CREATED < DATEADD(''day'', -30, CURRENT_TIMESTAMP)
');
LET cur_res CURSOR FOR res;
OPEN cur_res;
FETCH cur_res INTO statements;
EXECUTE IMMEDIATE 'BEGIN\n' || :statements || '\nEND';
END;
$$
In order to remove the intermediate tables, you should create a scheduled query in Redshift to remove datasets older than 30 days.
With this purpose, the following procedure should be created in your Redshift cluster (you will have to replace the $database$ placeholder with the name of your database):
CREATE OR REPLACE PROCEDURE $database$.workflows_temp._clear_cache_fn() as
$$
DECLARE
statement RECORD
query VARCHAR(MAX);
BEGIN
query := 'SELECT
\'DROP TABLE $database$.workflows_temp.\' || relname || \';\' as statement
FROM
pg_class_info
LEFT JOIN
pg_namespace
ON
pg_class_info.relnamespace = pg_namespace.oid
WHERE
reltype != 0
AND TRIM(nspname) = \'workflows_temp\'
AND datediff(day, relcreationtime, current_date) > 30
';
FOR statement IN EXECUTE query
LOOP
EXECUTE statement.statement;
END LOOP;
END;
$$ LANGUAGE plpgsql;
Add after that, you should define a Redshift scheduled query with the following CALL to execute the previous procedure once per day:
CALL $database$.workflows_temp._clear_cache_fn();
To remove the intermediate tables in PostgreSQL you need to schedule a daily job using the pg_cron extension to execute the following code periodically.
Here are some resources to learn how to use pg_cron on different managed services based on PostgreSQL:
a) Amazon RDS. Usage guide available here.
b) Amazon Aurora PostgreSQL supports pg_cron since version 12.
c) Google Cloud SQL. Setup instructions here.
d) Azure Databases for PostgreSQL supports pg_cron since version 11 and provides some usage examples.
The code shown below must to be scheduled in a job, replacing the $database$ placeholder with the name of your database:
CREATE OR REPLACE PROCEDURE $database$.workflows_temp._clear_cache_fn()
LANGUAGE plpgsql
AS
$$
DECLARE
drops TEXT;
BEGIN
SELECT
STRING_AGG('DROP TABLE IF EXISTS ' || tablename::text, ';') drops
INTO
drops
FROM
$database$.workflows_temp.TABLE_CREATION_TIME
WHERE
EXTRACT(epoch FROM timestamp - current_timestamp)/86400 > 30;
EXECUTE drops;
end;
$$;
After that, you should set up a scheduled task that runs it once per day using the pg_cron extension and the following statement:
SELECT('0 0 * * *', $$CALL $database$.workflows_temp._clear_cache_fn()$$);