Skip to content Skip to sidebar Skip to footer

Flatten Nested Array In Spark Dataframe

I'm reading in some JSON on the from: {'a': [{'b': {'c': 1, 'd': 2}}]} That is, the array items are unnecessarily nested. Now, because this happens inside an array, the answers gi

Solution 1:

You can use transform:

df2 = df.selectExpr("transform(a, x -> struct(x.b.c as b_c, x.b.d as b_d)) as a")

Solution 2:

Using the method presented in the accepted answer I wrote a function to recursively unnest a dataframe (recursing into nested arrays as well):

from pyspark.sql.types import ArrayType, StructType

defflatten(df, sentinel="x"):
    def_gen_flatten_expr(schema, indent, parents, last, transform=False):
        defhandle(field, last):
            path = parents + (field.name,)
            alias = (
                " as "
                + "_".join(path[1:] if transform else path)
                + (","ifnot last else"")
            )
            ifisinstance(field.dataType, StructType):
                yieldfrom _gen_flatten_expr(
                    field.dataType, indent, path, last, transform
                )
            elif (
                isinstance(field.dataType, ArrayType) andisinstance(field.dataType.elementType, StructType)
            ):
                yield indent, "transform("yield indent + 1, ".".join(path) + ","yield indent + 1, sentinel + " -> struct("yieldfrom _gen_flatten_expr(
                    field.dataType.elementType, 
                    indent + 2, 
                    (sentinel,), 
                    True, 
                    True
                )
                yield indent + 1, ")"yield indent, ")" + alias
            else:
                yield (indent, ".".join(path) + alias)

        try:
            *fields, last_field = schema.fields
        except ValueError:
            passelse:
            for field in fields:
                yieldfrom handle(field, False)
            yieldfrom handle(last_field, last)

    lines = []
    for indent, line in _gen_flatten_expr(df.schema, 0, (), True):
        spaces = " " * 4 * indent
        lines.append(spaces + line)

    expr = "struct(" + "\n".join(lines) + ") as " + sentinel
    return df.selectExpr(expr).select(sentinel + ".*")

Solution 3:

Simplified Approach:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

ref: https://docs.microsoft.com/en-us/azure/synapse-analytics/how-to-analyze-complex-schema

Post a Comment for "Flatten Nested Array In Spark Dataframe"