admin管理员组文章数量:1022982
Databricks DLT DataFrame - How to use Schemas
I'm new to Databricks Delta Live Tables and DataFrames, and I'm confused about how to use schemas when reading from the stream. I'm doing table to table streaming. One of our requirements is to have comments on the columns in the tables that can be viewed from the DBX Catalog Overview page.
The input table has the following columns (trimmed down for brevity):
message_schema = StructType([
StructField("bayId", StringType(), True, {"comment": "Bay ID"}),
StructField("storeId", StringType(), True, {"comment": "Store ID"}),
StructField("message", StringType(), True, {"comment": "Message content"}),
])
The message
column above contains a stringified JSON object, with the following fields (trimmed down for brevity):
event_schema = StructType([
StructField("Id", StringType(), True, {"comment": "Event ID"}),
StructField("Payload", StructType([
StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
]),
])
Here's my code to read the table from the stream and build the DF:
df = spark.readStream.table("tablename")
df = df.where(
col("MESSAGE").isNotNull()
).select(
col("BAYID"),
col("STOREID"),
F.from_json(col("MESSAGE"), message_schema).alias("MESSAGE")
).select(
col("BAYID"),
col("STOREID"),
F.from_json(col("MESSAGE.message"), event_schema).alias("EVENT")
).select(
F.expr("uuid()").alias("ID"),
col("BAYID").alias("BAY_ID"),
col("STOREID").alias("STORE_ID"),
col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
)
After I run this, when I view the Catalog overview of the output table, the PLAYER_DEXTERITY_NAME
and
ATTACK_ANGLE_NBR
columns are showing the comments that I set in the schema. However, the BAY_ID
and STORE_ID
columns
do not have my comments.
I was able to build the comments by adding the following code after the block above:
df = (df
.withMetadata("BAY_ID", {"comment": "Bay ID"})
.withMetadata("STORE_ID", {"comment": "Store ID"})
)
However, for the sake of consistency, I would like to set the comments in the schema itself. How can I do that? What am I doing wrong?
UPDATE:
In the first answer below, it was suggested to use the schema on the @dlt.table()
. However, we need to use @dlt.view()
, which does not allow specifying a schema.
However, I see that there is a schema()
on the DataStreamReader
, so I tried this:
df = (
spark.readStream
.schema(message_schema)
.table(
f"{bronze_catalog}.{bronze_schema}.{bronze_table_name}",
)
)
Unfortunately, though, this made no difference in the tables.
Databricks DLT DataFrame - How to use Schemas
I'm new to Databricks Delta Live Tables and DataFrames, and I'm confused about how to use schemas when reading from the stream. I'm doing table to table streaming. One of our requirements is to have comments on the columns in the tables that can be viewed from the DBX Catalog Overview page.
The input table has the following columns (trimmed down for brevity):
message_schema = StructType([
StructField("bayId", StringType(), True, {"comment": "Bay ID"}),
StructField("storeId", StringType(), True, {"comment": "Store ID"}),
StructField("message", StringType(), True, {"comment": "Message content"}),
])
The message
column above contains a stringified JSON object, with the following fields (trimmed down for brevity):
event_schema = StructType([
StructField("Id", StringType(), True, {"comment": "Event ID"}),
StructField("Payload", StructType([
StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
]),
])
Here's my code to read the table from the stream and build the DF:
df = spark.readStream.table("tablename")
df = df.where(
col("MESSAGE").isNotNull()
).select(
col("BAYID"),
col("STOREID"),
F.from_json(col("MESSAGE"), message_schema).alias("MESSAGE")
).select(
col("BAYID"),
col("STOREID"),
F.from_json(col("MESSAGE.message"), event_schema).alias("EVENT")
).select(
F.expr("uuid()").alias("ID"),
col("BAYID").alias("BAY_ID"),
col("STOREID").alias("STORE_ID"),
col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
)
After I run this, when I view the Catalog overview of the output table, the PLAYER_DEXTERITY_NAME
and
ATTACK_ANGLE_NBR
columns are showing the comments that I set in the schema. However, the BAY_ID
and STORE_ID
columns
do not have my comments.
I was able to build the comments by adding the following code after the block above:
df = (df
.withMetadata("BAY_ID", {"comment": "Bay ID"})
.withMetadata("STORE_ID", {"comment": "Store ID"})
)
However, for the sake of consistency, I would like to set the comments in the schema itself. How can I do that? What am I doing wrong?
UPDATE:
In the first answer below, it was suggested to use the schema on the @dlt.table()
. However, we need to use @dlt.view()
, which does not allow specifying a schema.
However, I see that there is a schema()
on the DataStreamReader
, so I tried this:
df = (
spark.readStream
.schema(message_schema)
.table(
f"{bronze_catalog}.{bronze_schema}.{bronze_table_name}",
)
)
Unfortunately, though, this made no difference in the tables.
Share Improve this question edited Nov 19, 2024 at 21:27 Westy asked Nov 19, 2024 at 2:58 WestyWesty 3055 silver badges20 bronze badges 1- are you using the schema while reading the table? – JayashankarGS Commented Nov 19, 2024 at 3:50
1 Answer
Reset to default 1You give the schema in the dlt decorator like below.
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
In your case you create a schema for returning dataframe from the dlt and use it.
from pyspark.sql.types import *
from pyspark.sql import functions as F
import dlt
output_schema = StructType([
StructField("ID", StringType(), True, {"comment": "Unique identifier for the row"}),
StructField("BAY_ID", StringType(), True, {"comment": "Bay ID"}),
StructField("STORE_ID", StringType(), True, {"comment": "Store ID"}),
StructField("PLAYER_DEXTERITY_NAME", StringType(), True, {"comment": "Player dexterity"}),
StructField("ATTACK_ANGLE_NBR", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"})
])
event_schema = StructType([
StructField("Id", StringType(), True, {"comment": "Event ID"}),
StructField("Payload", StructType([
StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
]),)
])
@dlt.table(
comment="Raw data on sales",
schema=output_schema)
def sales():
df = spark.readStream.table("tablename")
df = df.where(F.col("MESSAGE").isNotNull()).\
select(F.col("BAYID"),F.col("STOREID"),F.from_json(F.col("MESSAGE"), event_schema).alias("EVENT")).\
select(
F.expr("uuid()").alias("ID"),
F.col("BAYID").alias("BAY_ID"),
F.col("STOREID").alias("STORE_ID"),
F.col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
F.col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
)
return df
Below is the sample data used.
sample_data = [
("BAY001", "STORE123", '{"Id":"E001", "Payload":{"PlayerDexterity":"High", "AttackAngle":45.5}}'),
("BAY002", "STORE456", '{"Id":"E002", "Payload":{"PlayerDexterity":"Medium", "AttackAngle":30.0}}'),
("BAY003", "STORE789", '{"Id":"E003", "Payload":{"PlayerDexterity":"Low", "AttackAngle":15.2}}') ]
output:
and comments in Overview tab.
Databricks DLT DataFrame - How to use Schemas
I'm new to Databricks Delta Live Tables and DataFrames, and I'm confused about how to use schemas when reading from the stream. I'm doing table to table streaming. One of our requirements is to have comments on the columns in the tables that can be viewed from the DBX Catalog Overview page.
The input table has the following columns (trimmed down for brevity):
message_schema = StructType([
StructField("bayId", StringType(), True, {"comment": "Bay ID"}),
StructField("storeId", StringType(), True, {"comment": "Store ID"}),
StructField("message", StringType(), True, {"comment": "Message content"}),
])
The message
column above contains a stringified JSON object, with the following fields (trimmed down for brevity):
event_schema = StructType([
StructField("Id", StringType(), True, {"comment": "Event ID"}),
StructField("Payload", StructType([
StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
]),
])
Here's my code to read the table from the stream and build the DF:
df = spark.readStream.table("tablename")
df = df.where(
col("MESSAGE").isNotNull()
).select(
col("BAYID"),
col("STOREID"),
F.from_json(col("MESSAGE"), message_schema).alias("MESSAGE")
).select(
col("BAYID"),
col("STOREID"),
F.from_json(col("MESSAGE.message"), event_schema).alias("EVENT")
).select(
F.expr("uuid()").alias("ID"),
col("BAYID").alias("BAY_ID"),
col("STOREID").alias("STORE_ID"),
col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
)
After I run this, when I view the Catalog overview of the output table, the PLAYER_DEXTERITY_NAME
and
ATTACK_ANGLE_NBR
columns are showing the comments that I set in the schema. However, the BAY_ID
and STORE_ID
columns
do not have my comments.
I was able to build the comments by adding the following code after the block above:
df = (df
.withMetadata("BAY_ID", {"comment": "Bay ID"})
.withMetadata("STORE_ID", {"comment": "Store ID"})
)
However, for the sake of consistency, I would like to set the comments in the schema itself. How can I do that? What am I doing wrong?
UPDATE:
In the first answer below, it was suggested to use the schema on the @dlt.table()
. However, we need to use @dlt.view()
, which does not allow specifying a schema.
However, I see that there is a schema()
on the DataStreamReader
, so I tried this:
df = (
spark.readStream
.schema(message_schema)
.table(
f"{bronze_catalog}.{bronze_schema}.{bronze_table_name}",
)
)
Unfortunately, though, this made no difference in the tables.
Databricks DLT DataFrame - How to use Schemas
I'm new to Databricks Delta Live Tables and DataFrames, and I'm confused about how to use schemas when reading from the stream. I'm doing table to table streaming. One of our requirements is to have comments on the columns in the tables that can be viewed from the DBX Catalog Overview page.
The input table has the following columns (trimmed down for brevity):
message_schema = StructType([
StructField("bayId", StringType(), True, {"comment": "Bay ID"}),
StructField("storeId", StringType(), True, {"comment": "Store ID"}),
StructField("message", StringType(), True, {"comment": "Message content"}),
])
The message
column above contains a stringified JSON object, with the following fields (trimmed down for brevity):
event_schema = StructType([
StructField("Id", StringType(), True, {"comment": "Event ID"}),
StructField("Payload", StructType([
StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
]),
])
Here's my code to read the table from the stream and build the DF:
df = spark.readStream.table("tablename")
df = df.where(
col("MESSAGE").isNotNull()
).select(
col("BAYID"),
col("STOREID"),
F.from_json(col("MESSAGE"), message_schema).alias("MESSAGE")
).select(
col("BAYID"),
col("STOREID"),
F.from_json(col("MESSAGE.message"), event_schema).alias("EVENT")
).select(
F.expr("uuid()").alias("ID"),
col("BAYID").alias("BAY_ID"),
col("STOREID").alias("STORE_ID"),
col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
)
After I run this, when I view the Catalog overview of the output table, the PLAYER_DEXTERITY_NAME
and
ATTACK_ANGLE_NBR
columns are showing the comments that I set in the schema. However, the BAY_ID
and STORE_ID
columns
do not have my comments.
I was able to build the comments by adding the following code after the block above:
df = (df
.withMetadata("BAY_ID", {"comment": "Bay ID"})
.withMetadata("STORE_ID", {"comment": "Store ID"})
)
However, for the sake of consistency, I would like to set the comments in the schema itself. How can I do that? What am I doing wrong?
UPDATE:
In the first answer below, it was suggested to use the schema on the @dlt.table()
. However, we need to use @dlt.view()
, which does not allow specifying a schema.
However, I see that there is a schema()
on the DataStreamReader
, so I tried this:
df = (
spark.readStream
.schema(message_schema)
.table(
f"{bronze_catalog}.{bronze_schema}.{bronze_table_name}",
)
)
Unfortunately, though, this made no difference in the tables.
Share Improve this question edited Nov 19, 2024 at 21:27 Westy asked Nov 19, 2024 at 2:58 WestyWesty 3055 silver badges20 bronze badges 1- are you using the schema while reading the table? – JayashankarGS Commented Nov 19, 2024 at 3:50
1 Answer
Reset to default 1You give the schema in the dlt decorator like below.
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
In your case you create a schema for returning dataframe from the dlt and use it.
from pyspark.sql.types import *
from pyspark.sql import functions as F
import dlt
output_schema = StructType([
StructField("ID", StringType(), True, {"comment": "Unique identifier for the row"}),
StructField("BAY_ID", StringType(), True, {"comment": "Bay ID"}),
StructField("STORE_ID", StringType(), True, {"comment": "Store ID"}),
StructField("PLAYER_DEXTERITY_NAME", StringType(), True, {"comment": "Player dexterity"}),
StructField("ATTACK_ANGLE_NBR", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"})
])
event_schema = StructType([
StructField("Id", StringType(), True, {"comment": "Event ID"}),
StructField("Payload", StructType([
StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
]),)
])
@dlt.table(
comment="Raw data on sales",
schema=output_schema)
def sales():
df = spark.readStream.table("tablename")
df = df.where(F.col("MESSAGE").isNotNull()).\
select(F.col("BAYID"),F.col("STOREID"),F.from_json(F.col("MESSAGE"), event_schema).alias("EVENT")).\
select(
F.expr("uuid()").alias("ID"),
F.col("BAYID").alias("BAY_ID"),
F.col("STOREID").alias("STORE_ID"),
F.col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
F.col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
)
return df
Below is the sample data used.
sample_data = [
("BAY001", "STORE123", '{"Id":"E001", "Payload":{"PlayerDexterity":"High", "AttackAngle":45.5}}'),
("BAY002", "STORE456", '{"Id":"E002", "Payload":{"PlayerDexterity":"Medium", "AttackAngle":30.0}}'),
("BAY003", "STORE789", '{"Id":"E003", "Payload":{"PlayerDexterity":"Low", "AttackAngle":15.2}}') ]
output:
and comments in Overview tab.
本文标签: Databricks DLT DataFrameHow to use Schemas with CommentsStack Overflow
版权声明:本文标题:Databricks DLT DataFrame - How to use Schemas with Comments - Stack Overflow 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/questions/1745584987a2157535.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论