Snowflake Connector in Python (Remotely)

INSERT INTO SQL statements to Snowflake Tables

Priyansh Khodiyar
4 min readFeb 23, 2024
Photo by Darius Cotoi on Unsplash

I had a use case where I had to test a piece of software (built on top of Snowflake) against massive data (10+ tables with each table having several rows ranging from 10k — 2 million rows).

What did I do?

So I had to write a script to generate that many data lines, the data, although generated should be meaningful at the least. The internet failed to find a one-stop solution and hence I ended up writing a Python script myself.

The script uses the QUERY_HISTORY view of Snowflake and associated schema (data type of each column) and populates it with however number of row data you want.

Here’s a brief overview of the code.

  1. various functions to get random (but meaningful) values of start_date, end_date, random version data, hash values, random strings, random fixed choices, etc
  2. function generate_insert_statements() to write insert statements off of it.
  3. main() to execute all the insert statements at once by connecting to the Snowflake account using snowflake.connector

The Code:

import snowflake.snowpark as snowpark
import snowflake.connector
from snowflake.snowpark.functions import col
import uuid, json
import random
import secrets
from datetime import datetime, timedelta,timezone

# the number of rows of data you want
number_of_rows = 2

def date():
desired_date = datetime(2024, 1, 10, tzinfo=timezone.utc) + timedelta(days=random.randint(1, 10))

# Generate random time within the day
random_hours = random.randint(0, 23)
random_minutes = random.randint(0, 59)
random_seconds = random.randint(0, 59)
random_microseconds = random.randint(0, 999999)

# Create the datetime object with the random time
created_at_value = desired_date.replace(
hour=random_hours,
minute=random_minutes,
second=random_seconds,
microsecond=random_microseconds
)
# formatted_created_at = created_at_value.strftime("%Y-%m-%d %H:%M:%S.%f %z")
return created_at_value

def end_date():
return date() + timedelta(days=random.randint(1, 10))

def start_date():
return date() + timedelta(days=1)

# generate random version numbers for sample data
def generate_random_version():
major = random.randint(1, 10) # Adjust the range as needed
minor = random.randint(0, 9)
patch = random.randint(0, 9)

return f'{major}.{minor}.{patch}'

def generate_random_fixed_length_hash():
# Generate random bytes
length = 40
random_bytes = secrets.token_bytes(length // 2)

# Convert bytes to hexadecimal
hash_value = ''.join(format(byte, '02x') for byte in random_bytes)

return hash_value

# return random but meaningful data of various types according to each column type
def generate_fake_data():
WAREHOUSE_NAME = f"WAREHOUSE_NAME_{random.randint(1, 100)}"
QUERY_TYPE = random.choice(['RENAME_WAREHOUSE', 'CREATE_ROLE', 'GRANT', 'CREATE_TABLE', 'GET_FILES', 'REMOVE_FILES', 'SHOW', 'ALTER_WAREHOUSE_SUSPEND', 'DESCRIBE', 'SELECT', 'PUT_FILES', 'UNLOAD', 'CREATE_TASK', 'USE', 'SET', 'UNKNOWN', 'CREATE', 'CREATE_TABLE_AS_SELECT', 'ALTER', 'EXECUTE_TASK', 'CREATE_USER', 'LIST_FILES'])

ERROR_CODE = random.choice([90082.0, 1003.0, 904.0, 90109.0, None, 2140.0, 1131.0, 711.0, 90230.0, 2141.0, 604.0, 393901.0, 2027.0, 2003.0, 2211.0, 2043.0, 630.0, 90105.0, 2049.0, 3540.0, 90106.0])

return {
'QUERY_ID':str(uuid.uuid4()),
'QUERY_TEXT': 'QUERY',
'SCHEMA_ID': random.randint(1, 2000),
'SCHEMA_NAME':random.choice(['PUBLIC', 'ACCOUNT_USAGE', None]),
'QUERY_TYPE': QUERY_TYPE,
'SESSION_ID': random.randint(13312270001, 13312379999),
'WAREHOUSE_NAME': WAREHOUSE_NAME,
'QUERY_TAG': f"QUERY_TAG_{random.randint(1, 100)}",
'EXECUTION_STATUS': random.choice(['FAIL', 'SUCCESS', 'INCIDENT', None]),
'ERROR_CODE': ERROR_CODE,
'START_TIME': start_date(),
'TOTAL_ELAPSED_TIME': random.randint(1, 10439011),
'PERCENTAGE_SCANNED_FROM_CACHE': round(random.uniform(0, 1), 10),
'OUTBOUND_DATA_TRANSFER_CLOUD': random.choice(['AWS', 'AZURE', 'GCP', None]),
'RELEASE_VERSION': generate_random_version(),
'QUERY_HASH': generate_random_fixed_length_hash(),
'QUERY_PARAMETERIZED_HASH':generate_random_fixed_length_hash(),
}

# generate insert statements based of them and store them in insert_statements
def generate_insert_statements():
insert_statements = []
for _ in range(number_of_rows):
fake_data = generate_fake_data()

insert_statement = f"""
INSERT INTO <DATABSE_NAME>.PUBLIC.<TABLE_NAME> (
"QUERY_ID",
...
"QUERY_PARAMETERIZED_HASH_VERSION"
) VALUES (

'{fake_data["QUERY_ID"]}',
...
'{fake_data["QUERY_PARAMETERIZED_HASH_VERSION"]}'
);

"""
insert_statements.append(insert_statement)
return insert_statements

def main():
insert_statements = generate_insert_statements()
with snowflake.connector.connect(
**{
"account": "abcde.central-india.azure",
"user": "Username",
"role": "accountadmin",
"password": "password",
"warehouse": "default_warehouse",
}
) as con:
for statement in insert_statements:
# execute all insert statements at once
con.cursor().execute(statement)

main()

What the code does, Priyansh?

generate random data → connect to Snowflake with the account and warehouse info you provided → populate your tables there.

Prerequisites:

  1. A separate Database (preferably).
  2. Create your table(s) from the Snowflake SQL notebook itself.
  3. Execute the code remotely

Why not use the Snowflake SQL notebook itself?

The code to execute the above code will be slightly different and running there might incur with some cost as well, or, you might want to integrate running your Python code into your web or mobile app.

Note: The above approach works, but data ingestion to snowflake tables is prolonged (8k rows ingestion in 60 minutes)

A SECOND APPROACH

Populating my local postgres DB, exporting the data in CSV / JSON file formats (in multiple of 50 MB each export), and then uploading them to Snowflake tables.

Here’s how to run the same code and instead of inserting the data to Snowflake, we will populate our local DB. This is blazing fast, as the rate of ingestion depends on the power of your processor only.

Make sure the database and tables are already created and of the right data types.

import psycopg2

# do not forget to change this number
number_of_rows = 75000

#establishing the connection
conn = psycopg2.connect(
database="DATABASE_NAME", user='postgres', password='1234', host='127.0.0.1', port= '5432'
)

#Creating a cursor object using the cursor() method
cursor = conn.cursor()

def generate_insert_statements():
#same code as above

if __name__ == "__main__":
insert_statements = generate_insert_statements()

# Assuming you have a PostgreSQL connection object named `conn`
with conn.cursor() as cursor:
for statement in insert_statements:
cursor.execute(statement)

# Commit the changes
conn.commit()
# Close the connection
conn.close()

That’s it for today! Comment for queries.

Find me on LinkedIn. Stay safe.

--

--

Priyansh Khodiyar

I write highly researched technical articles on things I daily learn, sometimes code, and interview people. khodiyarPriyansh@gmail.com. Check my About section.