1.Design overview
logging PySpark application logs into a SQLite table using Python logging.
The key idea is:
Use a custom logging.Handler that writes log records into SQLite, and attach it to your PySpark driver logger.
- Python logging runs on the driver, not on Spark executors
- SQLite is not safe for concurrent multi-process writes
- Therefore:
✅ Log driver-side events (job start/end, stage logic, exceptions)
❌ Do NOT log inside map, foreach, udf etc.
2.install pyspark
python -m pip install pyspark==3.5.1
3.config java17
sudo alternatives --install /usr/bin/java java /usr/lib/jvm/java-17-openjdk/bin/java 1
sudo alternatives --install /usr/bin/javac javac /usr/lib/jvm/java-17-openjdk/bin/javac 1
sudo alternatives --config java
sudo alternatives --config javac
# verify
java -version
javac -version
4.verify spark and java is fully working
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LoggingTest").getOrCreate()
# Simple test
data = [("Alice", 1), ("Bob", 2)]
df = spark.createDataFrame(data, ["name", "id"])
df.show()
spark.stop()
This confirms that PySpark is fully working with your setup:
- Python 3.11 ✅
- Java 17 ✅
- PySpark 3.5+ ✅
5.Create the SQLite table
CREATE TABLE IF NOT EXISTS spark_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT,
level TEXT,
logger TEXT,
message TEXT,
module TEXT,
function TEXT,
line INTEGER
);
6.Custom SQLite logging handler
import logging
import sqlite3
from datetime import datetime
from pyspark.sql import SparkSession
import os
# -----------------------------
# Simple SQLite logging handler
# -----------------------------
class SQLiteHandler(logging.Handler):
def __init__(self, db_path):
super().__init__()
self.db_path = db_path
# Ensure table exists
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS spark_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT,
level TEXT,
message TEXT
)
""")
conn.commit()
conn.close()
def emit(self, record):
try:
conn = sqlite3.connect(self.db_path)
conn.execute(
"INSERT INTO spark_logs (created_at, level, message) VALUES (?, ?, ?)",
(
datetime.fromtimestamp(record.created).isoformat(),
record.levelname,
record.getMessage()
)
)
conn.commit()
conn.close()
except Exception as e:
print(f"Failed to write log: {e}")
Why the lock?
- Prevents thread contention
- SQLite only allows one writer at a time
7.Use it in your PySpark application
import logging
import sqlite3
from datetime import datetime
from pyspark.sql import SparkSession
import os
# -----------------------------
# Simple SQLite logging handler
# -----------------------------
class SQLiteHandler(logging.Handler):
def __init__(self, db_path):
super().__init__()
self.db_path = db_path
# Ensure table exists
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS spark_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT,
level TEXT,
message TEXT
)
""")
conn.commit()
conn.close()
def emit(self, record):
try:
conn = sqlite3.connect(self.db_path)
conn.execute(
"INSERT INTO spark_logs (created_at, level, message) VALUES (?, ?, ?)",
(
datetime.fromtimestamp(record.created).isoformat(),
record.levelname,
record.getMessage()
)
)
conn.commit()
conn.close()
except Exception as e:
print(f"Failed to write log: {e}")
# -----------------------------
# Main
# -----------------------------
if __name__ == "__main__":
# Prevent Spark JVM freeze
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["HADOOP_OPTS"] = "-Djava.library.path=/nonexistent"
os.environ["SPARK_NO_DAEMONIZE"] = "1"
# Start Spark
spark = SparkSession.builder \
.appName("LoggingTestSimple") \
.master("local[1]") \
.config("spark.driver.bindAddress", "127.0.0.1") \
.config("spark.driver.host", "127.0.0.1") \
.getOrCreate()
print("Spark started successfully")
# Setup logger
sqlite_handler = SQLiteHandler("spark_logs.db")
logger = logging.getLogger("pyspark")
logger.setLevel(logging.INFO)
logger.addHandler(sqlite_handler)
logger.propagate = False
# Test logging
logger.info("Spark app started")
df = spark.range(5)
logger.info(f"Row count = {df.count()}")
try:
1 / 0
except Exception:
logger.exception("Test exception")
logger.info("Spark app finished")
# Stop Spark
spark.stop()
print("Done! Check 'spark_logs.db' for log entries.")
8.run
python logging_test.py
9.verify
import sqlite3
conn = sqlite3.connect("spark_logs.db")
for row in conn.execute("SELECT * FROM spark_logs"):
print(row)
conn.close()