Working with semi-structured data in Snowflake provides great advantages, specially around the continuous ingestion approach. Let’s take a look at a simple way of having a single ingestion endpoint and splitting this data into multiple tables with one single insert while having the data at the ingestion level in a VARIANT data type. This approach could be combined with SnowPipe for automated ingestion and Streams to process only changes.

Let’s define a sample table with 1 Variant field. Also let’s add a Stream to this table to process only changes.

CREATE TABLE MULTI_SENSOR_INGEST
   (
     READING_DATA VARIANT
   );

CREATE OR REPLACE STREAM STR_MULTI_SENSOR_INGEST 
    ON TABLE MULTI_SENSOR_INGEST 
    INSERT_ONLY = TRUE;

Now let’s define our 3 destination tables.

CREATE OR REPLACE TABLE PRESSURE_READINGS
(
    READING_TIMESTAMP TIMESTAMP_TZ,
    VALUE DECIMAL(4,0) 
);
CREATE OR REPLACE TABLE TEMPERATURE_READINGS
(
    READING_TIMESTAMP TIMESTAMP_TZ,
    VALUE DECIMAL(4,0) 
);
CREATE OR REPLACE TABLE HUMIDITY_READINGS
(
    READING_TIMESTAMP TIMESTAMP_TZ,
    VALUE DECIMAL(4,2) 
);

Now for this sample, let’s generate 10k random records of sensors data for 3 sensor types, pressure, humidity and temperature. These sensor types will determine the destination tables to which data should be inserted. I am using the GENERATOR function along with OBJECT_CONSTRUCT to simulate data that might have come from SNOWPIPE.

INSERT INTO MULTI_SENSOR_INGEST
WITH GENERATED_DATA AS 
(
SELECT DECODE(uniform(1,3,random()), 
            1,'temperature',
            2,'pressure',
            3, 'humidity') AS "sensor_type",
        CASE WHEN "sensor_type" = 'temperature' 
        THEN object_construct('temperature', uniform(65,99,random())::decimal(4,0),
                              'timestamp', dateadd(millisecond,uniform(0,1000,random()),current_timestamp(2)))
        WHEN "sensor_type" = 'pressure' 
        THEN object_construct('psi',uniform(135,200,random())::decimal(4,0),
                              'timestamp',dateadd(millisecond,uniform(0,1000,random()),current_timestamp(2)))
        WHEN "sensor_type" = 'humidity' 
        THEN object_construct('hValue',uniform(25::float,99::float,random())::decimal(4,2),
                              'timestamp',dateadd(millisecond,uniform(0,1000,random()),current_timestamp(2)))
        END AS "reading"
        
    FROM TABLE(generator(rowcount=>10000)) 
) SELECT OBJECT_CONSTRUCT(*) as READING_DATA FROM GENERATED_DATA;

We can now process this data from the Stream created previously. Processing data from a stream, helps reduce latency by only processing data that has been inserted and has not been processed yet. Let’s take a look at the generated data with a sample of 10 rows.

Processing Multi-Insert

The following script evaluates the SensorType field in the JSON object to determine the destination table for the insert. Having a single point of ingestion can be beneficial on certain occasions, specially when processing has been made to land aggregated multi-table data in one common location. Also this multi-table approach can be used to process inserts into multiple target based on mathematical conditions among others.

INSERT ALL
    WHEN READING_DATA:sensor_type::VARCHAR = 'pressure' 
        THEN INTO PRESSURE_READINGS 
        (
         READING_TIMESTAMP,
         VALUE
        )
        VALUES
        (
            READING_DATA:reading:timestamp::TIMESTAMP_TZ,
            READING_DATA:reading:psi::decimal(4,0)
        )
    WHEN READING_DATA:sensor_type::VARCHAR = 'temperature' 
        THEN INTO TEMPERATURE_READINGS 
        (
            READING_TIMESTAMP,
            VALUE
        )
        VALUES
        (
            READING_DATA:reading:timestamp::TIMESTAMP_TZ,
            READING_DATA:reading:temperature::decimal(4,0)
        )
    WHEN READING_DATA:sensor_type::VARCHAR = 'humidity' 
        THEN INTO HUMIDITY_READINGS 
        (
            READING_TIMESTAMP,
            VALUE
        )
    VALUES
        (
            READING_DATA:reading:timestamp::TIMESTAMP_TZ,
            READING_DATA:reading:humidity::decimal(4,2)
        )
SELECT
    READING_DATA
FROM
    STR_MULTI_SENSOR_INGEST;

Now the results…

One of the benefits of Multi-table inserts is parallel processing. It will insert to multiple tables in a single operation which reduces the time needed to hold a transaction or the need to iterate through a list of tables to insert to. Look at the execution plan below. The insert node processed the records for 3 tables in a single step.

To summarize, Muti-Table inserts are a really neat alternative to process data for multiple destinations at once, specially data that has a single source and needs to be segregated. Combining this workflow with Snowflake options like SnowPipe and Streams can be a really powerful option. Enjoy!

Leave a comment