DB-hub Technology 未分类 PySpark SQLite Logging

PySpark SQLite Logging

1.Design overview

logging PySpark application logs into a SQLite table using Python logging.

The key idea is:
Use a custom logging.Handler that writes log records into SQLite, and attach it to your PySpark driver logger.

  • Python logging runs on the driver, not on Spark executors
  • SQLite is not safe for concurrent multi-process writes
  • Therefore:
    ✅ Log driver-side events (job start/end, stage logic, exceptions)
    ❌ Do NOT log inside map, foreach, udf etc.
2.install pyspark
python -m pip install pyspark==3.5.1
3.config java17
sudo alternatives --install /usr/bin/java java /usr/lib/jvm/java-17-openjdk/bin/java 1
sudo alternatives --install /usr/bin/javac javac /usr/lib/jvm/java-17-openjdk/bin/javac 1
sudo alternatives --config java
sudo alternatives --config javac

# verify
java -version
javac -version
4.verify spark and java is fully working
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LoggingTest").getOrCreate()

# Simple test
data = [("Alice", 1), ("Bob", 2)]
df = spark.createDataFrame(data, ["name", "id"])

df.show()

spark.stop()

This confirms that PySpark is fully working with your setup:

  • Python 3.11 ✅
  • Java 17 ✅
  • PySpark 3.5+ ✅
5.Create the SQLite table
CREATE TABLE IF NOT EXISTS spark_logs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    created_at TEXT,
    level TEXT,
    logger TEXT,
    message TEXT,
    module TEXT,
    function TEXT,
    line INTEGER
);
6.Custom SQLite logging handler
import logging
import sqlite3
from datetime import datetime
from pyspark.sql import SparkSession
import os

# -----------------------------
# Simple SQLite logging handler
# -----------------------------
class SQLiteHandler(logging.Handler):
    def __init__(self, db_path):
        super().__init__()
        self.db_path = db_path
        # Ensure table exists
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS spark_logs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                created_at TEXT,
                level TEXT,
                message TEXT
            )
        """)
        conn.commit()
        conn.close()

    def emit(self, record):
        try:
            conn = sqlite3.connect(self.db_path)
            conn.execute(
                "INSERT INTO spark_logs (created_at, level, message) VALUES (?, ?, ?)",
                (
                    datetime.fromtimestamp(record.created).isoformat(),
                    record.levelname,
                    record.getMessage()
                )
            )
            conn.commit()
            conn.close()
        except Exception as e:
            print(f"Failed to write log: {e}")

Why the lock?

  • Prevents thread contention
  • SQLite only allows one writer at a time
7.Use it in your PySpark application
import logging
import sqlite3
from datetime import datetime
from pyspark.sql import SparkSession
import os

# -----------------------------
# Simple SQLite logging handler
# -----------------------------
class SQLiteHandler(logging.Handler):
    def __init__(self, db_path):
        super().__init__()
        self.db_path = db_path
        # Ensure table exists
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS spark_logs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                created_at TEXT,
                level TEXT,
                message TEXT
            )
        """)
        conn.commit()
        conn.close()

    def emit(self, record):
        try:
            conn = sqlite3.connect(self.db_path)
            conn.execute(
                "INSERT INTO spark_logs (created_at, level, message) VALUES (?, ?, ?)",
                (
                    datetime.fromtimestamp(record.created).isoformat(),
                    record.levelname,
                    record.getMessage()
                )
            )
            conn.commit()
            conn.close()
        except Exception as e:
            print(f"Failed to write log: {e}")


# -----------------------------
# Main
# -----------------------------
if __name__ == "__main__":
    # Prevent Spark JVM freeze
    os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
    os.environ["HADOOP_OPTS"] = "-Djava.library.path=/nonexistent"
    os.environ["SPARK_NO_DAEMONIZE"] = "1"

    # Start Spark
    spark = SparkSession.builder \
        .appName("LoggingTestSimple") \
        .master("local[1]") \
        .config("spark.driver.bindAddress", "127.0.0.1") \
        .config("spark.driver.host", "127.0.0.1") \
        .getOrCreate()

    print("Spark started successfully")

    # Setup logger
    sqlite_handler = SQLiteHandler("spark_logs.db")
    logger = logging.getLogger("pyspark")
    logger.setLevel(logging.INFO)
    logger.addHandler(sqlite_handler)
    logger.propagate = False

    # Test logging
    logger.info("Spark app started")
    df = spark.range(5)
    logger.info(f"Row count = {df.count()}")

    try:
        1 / 0
    except Exception:
        logger.exception("Test exception")

    logger.info("Spark app finished")

    # Stop Spark
    spark.stop()
    print("Done! Check 'spark_logs.db' for log entries.")

8.run
python logging_test.py
9.verify
import sqlite3

conn = sqlite3.connect("spark_logs.db")
for row in conn.execute("SELECT * FROM spark_logs"):
    print(row)
conn.close()

Related Post