Handy helpers for swimming in the sea of logs with PySpark.
Setting up
All imports
from typing import Iterable, Optional, Callable, Any
from pyspark.sql import DataFrame, Row
from pyspark import RDD
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd
from matplotlib import pyplot as plt
# %matplotlib widget
import re
from datetime import datetime
import json
import itertools
import collections
Running locally
!python -m pip install -U pip
!python -m pip install pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.getOrCreate()
sc: SparkContext = spark.sparkContext
Dealing with logs structurally
Loading from raw text files
def load_logs(
path: str,
log_regex: re.Pattern,
log_schema: dict[str, tuple[DataType, Callable[[str], Any]]],
) -> DataFrame:
'''
Loads all log text files into Spark.
The returned `DataFrame` will have all the columns listed in `log_schema`,
plus the followings:
* `raw`: the original log line
* `seq`: a monotonically increasing field to preserve the ordering in the
original log file
* `log_source`: the path to the file where the line of log came from
Lines that do not match `log_regex` will be discarded.
Parameters
----------
path
The path where PySpark should read the log text files.
log_regex
A Python regex where its named capture groups correspond to those specified
in `log_schema`.
log_schema
The basic schema of logs, described as a mapping from log field names to
their corresponding PySpark `DataType`s, accompanied with a conversion
function that converts the string captured from the log line to value of
its corresponding Python type.
'''
@F.udf(returnType=StructType([StructField(n, t) for (n, (t, _)) in log_schema.items()]))
def extract_log(line: str):
m = log_regex.match(line)
if m is None:
return None
return tuple(conv(m.group(name)) for name, (_, conv) in log_schema.items())
return (
spark.read.text(path)
.withColumns({
'raw': F.col('value'),
'seq': F.monotonically_increasing_id(),
'log_source': F.input_file_name(),
'__packed__': extract_log('value'),
})
.filter('__packed__ is not null')
.withColumns({c: F.col(f'__packed__.{c}') for c in log_schema})
.drop('__packed__')
.select(*log_schema, 'raw', 'seq', 'log_source')
)
Classifying and extracting events
def extract_events(
logs: DataFrame, content_col_name: str,
event_schema: dict[str, tuple[re.Pattern, dict[str, Callable[[str], Any]]]],
event_type_col_name: str = 'type',
param_col_name: str = 'param',
) -> DataFrame:
'''
Extract events and their structured parameters.
This procedure will add the follwing columns to the input `DataFrame`:
* Event type: the type of the events specified as keys of `event_schema`
* Event parameters: the extracted event parameters serialized as JSON string
Log entries that do not match any of the given event schema will be discarded.
Parameters
----------
logs
The `DataFrame` containing all logs.
content_col_name
The column name of log contents in `logs`.
event_schema : event_type --> (event_regex, param_name --> convert_fn)
The schema definitions of events, described as a mapping from event type
names to their parameter schemas. Parameter schema is described as a tuple
where the first element is the regex, the second is a mapping from
parameter names to their type conversion functions.
event_type_col_name
The column name of the added field where event type is stored.
param_col_name
The column name of the added field where event parameters are stored.
Defaults to "param".
'''
ExpandType = StructType([
StructField(event_type_col_name, StringType()),
StructField(param_col_name, StringType()),
])
@F.udf(returnType=ExpandType)
def match_event(cont: str):
for event_type, (event_regex, param_schema) in event_schema.items():
m = event_regex.match(cont)
if m is None:
continue
return (
event_type,
json.dumps({
name: conv(m.group(name))
for name, conv in param_schema.items()
})
)
return None
return (
logs
.withColumn('__expand__', match_event(content_col_name))
.filter('__expand__ is not null')
.withColumns({f: F.col(f'__expand__.{f}') for f in ExpandType.fieldNames()})
.drop('__expand__')
.select(*ExpandType.fieldNames(), *logs.schema.fieldNames())
)