1.Design
Snowflake
│
├── log_utils.py ← your logging module (imported)
│
└── Python Stored Procedure (Snowpark)
├── import log_utils
├── logging.debug(...)
└── INSERT logs into table
2.Log table
CREATE TABLE IF NOT EXISTS spark_logs (
created_at TIMESTAMP_NTZ,
level STRING,
message STRING
);
3.Logging module
import logging
from datetime import datetime
# -----------------------------
# Simple Snowflake logging handler (Snowpark only)
# -----------------------------
class SnowflakeHandler(logging.Handler):
def __init__(self, session):
super().__init__()
self.session = session
def emit(self, record):
try:
self.session.sql(
"""
INSERT INTO spark_logs (created_at, level, message)
VALUES (?, ?, ?)
""",
[
datetime.fromtimestamp(record.created),
record.levelname,
record.getMessage()
]
).collect()
except Exception as e:
# Only safe fallback inside Snowflake
print(f"Failed to write log to Snowflake: {e}")
def get_logger(session):
logger = logging.getLogger("snowpark_logger")
if not logger.handlers:
logger.setLevel(logging.DEBUG)
handler = SnowflakeHandler(session)
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
4.Upload Python file to Snowflake stage
CREATE OR REPLACE STAGE python_libs;
snowsql
PUT file://log_utils.py @python_libs AUTO_COMPRESS=FALSE
5.Snowpark Python Stored Procedure using logging
CREATE OR REPLACE PROCEDURE snowpark_logging_test()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.10
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@python_libs/log_utils.py')
HANDLER = 'run'
AS
SS
import log_utils
def run(session):
logger = log_utils.get_logger(session)
logger.debug("Procedure started")
df = session.range(0, 3)
logger.debug(f"Row count = {df.count()}")
logger.debug("Procedure finished")
return "OK"
SS;
6.run and verify
CALL snowpark_logging_test();
SELECT * FROM spark_logs ORDER BY created_at DESC;