Flatten Nested Array In Spark Dataframe
I'm reading in some JSON on the from: {'a': [{'b': {'c': 1, 'd': 2}}]} That is, the array items are unnecessarily nested. Now, because this happens inside an array, the answers gi
Solution 1:
You can use transform
:
df2 = df.selectExpr("transform(a, x -> struct(x.b.c as b_c, x.b.d as b_d)) as a")
Solution 2:
Using the method presented in the accepted answer I wrote a function to recursively unnest a dataframe (recursing into nested arrays as well):
from pyspark.sql.types import ArrayType, StructType
defflatten(df, sentinel="x"):
def_gen_flatten_expr(schema, indent, parents, last, transform=False):
defhandle(field, last):
path = parents + (field.name,)
alias = (
" as "
+ "_".join(path[1:] if transform else path)
+ (","ifnot last else"")
)
ifisinstance(field.dataType, StructType):
yieldfrom _gen_flatten_expr(
field.dataType, indent, path, last, transform
)
elif (
isinstance(field.dataType, ArrayType) andisinstance(field.dataType.elementType, StructType)
):
yield indent, "transform("yield indent + 1, ".".join(path) + ","yield indent + 1, sentinel + " -> struct("yieldfrom _gen_flatten_expr(
field.dataType.elementType,
indent + 2,
(sentinel,),
True,
True
)
yield indent + 1, ")"yield indent, ")" + alias
else:
yield (indent, ".".join(path) + alias)
try:
*fields, last_field = schema.fields
except ValueError:
passelse:
for field in fields:
yieldfrom handle(field, False)
yieldfrom handle(last_field, last)
lines = []
for indent, line in _gen_flatten_expr(df.schema, 0, (), True):
spaces = " " * 4 * indent
lines.append(spaces + line)
expr = "struct(" + "\n".join(lines) + ") as " + sentinel
return df.selectExpr(expr).select(sentinel + ".*")
Solution 3:
Simplified Approach:
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
ref: https://docs.microsoft.com/en-us/azure/synapse-analytics/how-to-analyze-complex-schema
Post a Comment for "Flatten Nested Array In Spark Dataframe"