admin管理员组文章数量:1026989
I have a melted table of the form:
+------+---------+--------------+------------+--------------+
| time | channel | value_double | value_long | value_string |
+------+---------+--------------+------------+--------------+
| 0 | A | 1.1 | null | null |
| 0 | B | null | 1 | null |
| 0 | C | null | null | "foo" |
| 1 | A | 2.1 | null | null |
| 1 | B | null | 2 | null |
| 1 | C | null | null | "bar" |
| 2 | A | 3.1 | null | null |
| 2 | B | null | 3 | null |
| 2 | C | null | null | "foobar" |
+------+---------+--------------+------------+--------------+
And I'd like to pivot this table to be:
+------+-----+---+----------+
| time | A | B | C |
+------+-----+---+----------+
| 0 | 1.1 | 1 | "foo" |
| 1 | 2.1 | 2 | "bar" |
| 2 | 3.1 | 3 | "foobar" |
+------+-----+---+----------+
I've got something along the lines of:
df.groupBy("time").pivot("channel").agg(...)
But I'm strugging to fill the agg
function to aggregate across the different values. I've tried coalesce
but it runs into errors because of the distinct types between the columns.
Some key points:
- The three
value
columns have distinct types (double
,long
, andstring
) - The typing is consistent per channel
- There is always one and only one value column with data per row
Is this possible with PySpark/SparkSQL?
I have a melted table of the form:
+------+---------+--------------+------------+--------------+
| time | channel | value_double | value_long | value_string |
+------+---------+--------------+------------+--------------+
| 0 | A | 1.1 | null | null |
| 0 | B | null | 1 | null |
| 0 | C | null | null | "foo" |
| 1 | A | 2.1 | null | null |
| 1 | B | null | 2 | null |
| 1 | C | null | null | "bar" |
| 2 | A | 3.1 | null | null |
| 2 | B | null | 3 | null |
| 2 | C | null | null | "foobar" |
+------+---------+--------------+------------+--------------+
And I'd like to pivot this table to be:
+------+-----+---+----------+
| time | A | B | C |
+------+-----+---+----------+
| 0 | 1.1 | 1 | "foo" |
| 1 | 2.1 | 2 | "bar" |
| 2 | 3.1 | 3 | "foobar" |
+------+-----+---+----------+
I've got something along the lines of:
df.groupBy("time").pivot("channel").agg(...)
But I'm strugging to fill the agg
function to aggregate across the different values. I've tried coalesce
but it runs into errors because of the distinct types between the columns.
Some key points:
- The three
value
columns have distinct types (double
,long
, andstring
) - The typing is consistent per channel
- There is always one and only one value column with data per row
Is this possible with PySpark/SparkSQL?
Share Improve this question asked Nov 16, 2024 at 4:33 GolferDudeGolferDude 644 bronze badges 2 |4 Answers
Reset to default 0Use coalesce
and first
functions together.
from pyspark.sql import functions as F
...
df = df.groupBy('time').pivot('channel').agg(F.first(F.coalesce('value_double', 'value_long', 'value_string')))
Breaking down the steps for better understanding :
from pyspark.sql.functions import col, when, coalesce
# Identifying the appropriate value column for each channel:
df = df.withColumn("value",
when(col("channel") == "A", col("value_double"))
.when(col("channel") == "B", col("value_long"))
.otherwise(col("value_string")))
)
# Then, pivot the DataFrame:
df_pivoted = df.groupBy("time").pivot("channel").agg(first("value"))
df_pivoted.show()
EDIT : leveraging dynamic schema inference and data type conversion as the number of channels are high and mapping is not efficient.
from pyspark.sql.functions import col, when, coalesce, to_date, to_timestamp, to_timestamp_ntz
# Define a function to dynamically infer the appropriate data type:
def infer_data_type(value):
try:
return float(value)
except ValueError:
try:
return int(value)
except ValueError:
try:
return to_date(value)
except ValueError:
try:
return to_timestamp(value)
except ValueError:
return to_timestamp_ntz(value)
except ValueError:
return str(value)
# Create a UDF for the data type inference:
infer_type_udf = udf(infer_data_type)
# Pivot the DataFrame, inferring the data type for each value:
df_pivoted = df.groupBy("time").pivot("channel").agg(first(infer_type_udf("value")))
If it is guaranteed that there is one non-null value for each channel and each value you could restructure the DataFrame before pivoting:
df_filtered = []
for value_col in ["value_double","value_long","value_string"]:
df_filtered.append(df.select("time","channel",value_col).dropna().groupby("time").pivot("channel").agg(F.max(value_col)))
for i, df_curr in enumerate(df_filtered):
if i == 0:
df_all = df_curr
else:
df_all = df_all.join(df_curr, on=['time'], how='inner')
Result:
+----+---+---+------+
|time| A| B| C|
+----+---+---+------+
| 1|2.1| 2| bar|
| 2|3.1| 3|foobar|
| 0|1.1| 1| foo|
+----+---+---+------+
Use colRegex
to dynamically identify the value_
like columns, then create a mapping (m
) from the value columns
cols = df.select(df.colRegex(r'`value_.*`')).columns
m = F.create_map(*[y for c in cols for y in (F.lit(c), F.col(c))])
df1 = df.select('time', 'channel', m.alias('m'))
# df1.show()
# +----+-------+--------------------------------------------------------------------+
# |time|channel|m |
# +----+-------+--------------------------------------------------------------------+
# |0 |A |{value_double -> 1.1, value_long -> null, value_string -> null} |
# |0 |B |{value_double -> null, value_long -> 1.0, value_string -> null} |
# |0 |C |{value_double -> null, value_long -> null, value_string -> "foo"} |
# |1 |A |{value_double -> 2.1, value_long -> null, value_string -> null} |
# |1 |B |{value_double -> null, value_long -> 2.0, value_string -> null} |
# |1 |C |{value_double -> null, value_long -> null, value_string -> "bar"} |
# |2 |A |{value_double -> 3.1, value_long -> null, value_string -> null} |
# |2 |B |{value_double -> null, value_long -> 3.0, value_string -> null} |
# |2 |C |{value_double -> null, value_long -> null, value_string -> "foobar"}|
Use map_filter
to remove the null key-value pairs
df1 = df1.withColumn('m', F.map_filter('m', lambda k, v: ~F.isnull(v)))
# df1.show()
# +----+-------+--------------------------+
# |time|channel|m |
# +----+-------+--------------------------+
# |0 |A |{value_double -> 1.1} |
# |0 |B |{value_long -> 1.0} |
# |0 |C |{value_string -> "foo"} |
# |1 |A |{value_double -> 2.1} |
# |1 |B |{value_long -> 2.0} |
# |1 |C |{value_string -> "bar"} |
# |2 |A |{value_double -> 3.1} |
# |2 |B |{value_long -> 3.0} |
# |2 |C |{value_string -> "foobar"}|
# +----+-------+--------------------------+
Pivot the data frame by time
and channel
df1 = df1.groupBy('time').pivot('channel').agg(F.first('m'))
# df1.show()
# +----+---------------------+-------------------+--------------------------+
# |time|A |B |C |
# +----+---------------------+-------------------+--------------------------+
# |0 |{value_double -> 1.1}|{value_long -> 1.0}|{value_string -> "foo"} |
# |1 |{value_double -> 2.1}|{value_long -> 2.0}|{value_string -> "bar"} |
# |2 |{value_double -> 3.1}|{value_long -> 3.0}|{value_string -> "foobar"}|
# +----+---------------------+-------------------+--------------------------+
Use map_values
to extract the value from the mapping
df1 = df1.select('time', *[F.map_values(c)[0].alias(c) for c in df2.columns[1:]])
# df1.show()
# +----+---+---+--------+
# |time|A |B |C |
# +----+---+---+--------+
# |0 |1.1|1.0|"foo" |
# |1 |2.1|2.0|"bar" |
# |2 |3.1|3.0|"foobar"|
# +----+---+---+--------+
I have a melted table of the form:
+------+---------+--------------+------------+--------------+
| time | channel | value_double | value_long | value_string |
+------+---------+--------------+------------+--------------+
| 0 | A | 1.1 | null | null |
| 0 | B | null | 1 | null |
| 0 | C | null | null | "foo" |
| 1 | A | 2.1 | null | null |
| 1 | B | null | 2 | null |
| 1 | C | null | null | "bar" |
| 2 | A | 3.1 | null | null |
| 2 | B | null | 3 | null |
| 2 | C | null | null | "foobar" |
+------+---------+--------------+------------+--------------+
And I'd like to pivot this table to be:
+------+-----+---+----------+
| time | A | B | C |
+------+-----+---+----------+
| 0 | 1.1 | 1 | "foo" |
| 1 | 2.1 | 2 | "bar" |
| 2 | 3.1 | 3 | "foobar" |
+------+-----+---+----------+
I've got something along the lines of:
df.groupBy("time").pivot("channel").agg(...)
But I'm strugging to fill the agg
function to aggregate across the different values. I've tried coalesce
but it runs into errors because of the distinct types between the columns.
Some key points:
- The three
value
columns have distinct types (double
,long
, andstring
) - The typing is consistent per channel
- There is always one and only one value column with data per row
Is this possible with PySpark/SparkSQL?
I have a melted table of the form:
+------+---------+--------------+------------+--------------+
| time | channel | value_double | value_long | value_string |
+------+---------+--------------+------------+--------------+
| 0 | A | 1.1 | null | null |
| 0 | B | null | 1 | null |
| 0 | C | null | null | "foo" |
| 1 | A | 2.1 | null | null |
| 1 | B | null | 2 | null |
| 1 | C | null | null | "bar" |
| 2 | A | 3.1 | null | null |
| 2 | B | null | 3 | null |
| 2 | C | null | null | "foobar" |
+------+---------+--------------+------------+--------------+
And I'd like to pivot this table to be:
+------+-----+---+----------+
| time | A | B | C |
+------+-----+---+----------+
| 0 | 1.1 | 1 | "foo" |
| 1 | 2.1 | 2 | "bar" |
| 2 | 3.1 | 3 | "foobar" |
+------+-----+---+----------+
I've got something along the lines of:
df.groupBy("time").pivot("channel").agg(...)
But I'm strugging to fill the agg
function to aggregate across the different values. I've tried coalesce
but it runs into errors because of the distinct types between the columns.
Some key points:
- The three
value
columns have distinct types (double
,long
, andstring
) - The typing is consistent per channel
- There is always one and only one value column with data per row
Is this possible with PySpark/SparkSQL?
Share Improve this question asked Nov 16, 2024 at 4:33 GolferDudeGolferDude 644 bronze badges 2-
are channels
A, B, C
always guaranteed to having non-null values that are all the same type? – Derek O Commented Nov 17, 2024 at 0:46 - 1 Yes, they are. They will always have one and only one non null value of the same type – GolferDude Commented Nov 17, 2024 at 1:07
4 Answers
Reset to default 0Use coalesce
and first
functions together.
from pyspark.sql import functions as F
...
df = df.groupBy('time').pivot('channel').agg(F.first(F.coalesce('value_double', 'value_long', 'value_string')))
Breaking down the steps for better understanding :
from pyspark.sql.functions import col, when, coalesce
# Identifying the appropriate value column for each channel:
df = df.withColumn("value",
when(col("channel") == "A", col("value_double"))
.when(col("channel") == "B", col("value_long"))
.otherwise(col("value_string")))
)
# Then, pivot the DataFrame:
df_pivoted = df.groupBy("time").pivot("channel").agg(first("value"))
df_pivoted.show()
EDIT : leveraging dynamic schema inference and data type conversion as the number of channels are high and mapping is not efficient.
from pyspark.sql.functions import col, when, coalesce, to_date, to_timestamp, to_timestamp_ntz
# Define a function to dynamically infer the appropriate data type:
def infer_data_type(value):
try:
return float(value)
except ValueError:
try:
return int(value)
except ValueError:
try:
return to_date(value)
except ValueError:
try:
return to_timestamp(value)
except ValueError:
return to_timestamp_ntz(value)
except ValueError:
return str(value)
# Create a UDF for the data type inference:
infer_type_udf = udf(infer_data_type)
# Pivot the DataFrame, inferring the data type for each value:
df_pivoted = df.groupBy("time").pivot("channel").agg(first(infer_type_udf("value")))
If it is guaranteed that there is one non-null value for each channel and each value you could restructure the DataFrame before pivoting:
df_filtered = []
for value_col in ["value_double","value_long","value_string"]:
df_filtered.append(df.select("time","channel",value_col).dropna().groupby("time").pivot("channel").agg(F.max(value_col)))
for i, df_curr in enumerate(df_filtered):
if i == 0:
df_all = df_curr
else:
df_all = df_all.join(df_curr, on=['time'], how='inner')
Result:
+----+---+---+------+
|time| A| B| C|
+----+---+---+------+
| 1|2.1| 2| bar|
| 2|3.1| 3|foobar|
| 0|1.1| 1| foo|
+----+---+---+------+
Use colRegex
to dynamically identify the value_
like columns, then create a mapping (m
) from the value columns
cols = df.select(df.colRegex(r'`value_.*`')).columns
m = F.create_map(*[y for c in cols for y in (F.lit(c), F.col(c))])
df1 = df.select('time', 'channel', m.alias('m'))
# df1.show()
# +----+-------+--------------------------------------------------------------------+
# |time|channel|m |
# +----+-------+--------------------------------------------------------------------+
# |0 |A |{value_double -> 1.1, value_long -> null, value_string -> null} |
# |0 |B |{value_double -> null, value_long -> 1.0, value_string -> null} |
# |0 |C |{value_double -> null, value_long -> null, value_string -> "foo"} |
# |1 |A |{value_double -> 2.1, value_long -> null, value_string -> null} |
# |1 |B |{value_double -> null, value_long -> 2.0, value_string -> null} |
# |1 |C |{value_double -> null, value_long -> null, value_string -> "bar"} |
# |2 |A |{value_double -> 3.1, value_long -> null, value_string -> null} |
# |2 |B |{value_double -> null, value_long -> 3.0, value_string -> null} |
# |2 |C |{value_double -> null, value_long -> null, value_string -> "foobar"}|
Use map_filter
to remove the null key-value pairs
df1 = df1.withColumn('m', F.map_filter('m', lambda k, v: ~F.isnull(v)))
# df1.show()
# +----+-------+--------------------------+
# |time|channel|m |
# +----+-------+--------------------------+
# |0 |A |{value_double -> 1.1} |
# |0 |B |{value_long -> 1.0} |
# |0 |C |{value_string -> "foo"} |
# |1 |A |{value_double -> 2.1} |
# |1 |B |{value_long -> 2.0} |
# |1 |C |{value_string -> "bar"} |
# |2 |A |{value_double -> 3.1} |
# |2 |B |{value_long -> 3.0} |
# |2 |C |{value_string -> "foobar"}|
# +----+-------+--------------------------+
Pivot the data frame by time
and channel
df1 = df1.groupBy('time').pivot('channel').agg(F.first('m'))
# df1.show()
# +----+---------------------+-------------------+--------------------------+
# |time|A |B |C |
# +----+---------------------+-------------------+--------------------------+
# |0 |{value_double -> 1.1}|{value_long -> 1.0}|{value_string -> "foo"} |
# |1 |{value_double -> 2.1}|{value_long -> 2.0}|{value_string -> "bar"} |
# |2 |{value_double -> 3.1}|{value_long -> 3.0}|{value_string -> "foobar"}|
# +----+---------------------+-------------------+--------------------------+
Use map_values
to extract the value from the mapping
df1 = df1.select('time', *[F.map_values(c)[0].alias(c) for c in df2.columns[1:]])
# df1.show()
# +----+---+---+--------+
# |time|A |B |C |
# +----+---+---+--------+
# |0 |1.1|1.0|"foo" |
# |1 |2.1|2.0|"bar" |
# |2 |3.1|3.0|"foobar"|
# +----+---+---+--------+
版权声明:本文标题:apache spark sql - Aggregate across multiple columns of different types when pivoting in pyspark - Stack Overflow 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/questions/1745665524a2162153.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
A, B, C
always guaranteed to having non-null values that are all the same type? – Derek O Commented Nov 17, 2024 at 0:46