DB-hub Technology 未分类 Snowpark Logging

Snowpark Logging

1.Design
Snowflake
│
├── log_utils.py        ← your logging module (imported)
│
└── Python Stored Procedure (Snowpark)
     ├── import log_utils
     ├── logging.debug(...)
     └── INSERT logs into table

2.Log table
CREATE TABLE IF NOT EXISTS spark_logs (
    created_at TIMESTAMP_NTZ,
    level STRING,
    message STRING
);
3.Logging module
import logging
from datetime import datetime

# -----------------------------
# Simple Snowflake logging handler (Snowpark only)
# -----------------------------
class SnowflakeHandler(logging.Handler):
    def __init__(self, session):
        super().__init__()
        self.session = session

    def emit(self, record):
        try:
            self.session.sql(
                """
                INSERT INTO spark_logs (created_at, level, message)
                VALUES (?, ?, ?)
                """,
                [
                    datetime.fromtimestamp(record.created),
                    record.levelname,
                    record.getMessage()
                ]
            ).collect()
        except Exception as e:
            # Only safe fallback inside Snowflake
            print(f"Failed to write log to Snowflake: {e}")


def get_logger(session):
    logger = logging.getLogger("snowpark_logger")

    if not logger.handlers:
        logger.setLevel(logging.DEBUG)

        handler = SnowflakeHandler(session)
        formatter = logging.Formatter(
            "%(asctime)s - %(levelname)s - %(message)s"
        )
        handler.setFormatter(formatter)

        logger.addHandler(handler)

    return logger
4.Upload Python file to Snowflake stage
CREATE OR REPLACE STAGE python_libs;

snowsql

PUT file://log_utils.py @python_libs AUTO_COMPRESS=FALSE
5.Snowpark Python Stored Procedure using logging
CREATE OR REPLACE PROCEDURE snowpark_logging_test()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.10
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@python_libs/log_utils.py')
HANDLER = 'run'
AS
SS
import log_utils

def run(session):

    logger = log_utils.get_logger(session)

    logger.debug("Procedure started")

    df = session.range(0, 3)
    logger.debug(f"Row count = {df.count()}")

    logger.debug("Procedure finished")

    return "OK"
SS;
6.run and verify
CALL snowpark_logging_test();

SELECT * FROM spark_logs ORDER BY created_at DESC;

Related Post