admin管理员组文章数量:1025499
Hello am in process of creating an apache beam data pipiline that runs in GCP using DataFlow as runner.Below is a my code ,it does not throw any errors but the issue is no data is written into bigquery when I check the Job execution graph there seems to be no activity on the "write to bigquery" section of the pipeline yet I can see activities in data commming from the pubsub activity.I have gone even ahead to add a trigger but still no data is coming out .Please assit .Thanks
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
from apache_beam import window
import typing
import numpy as np
import pandas as pd
class BmsSchema(typing.NamedTuple):
can_data_frame_1: typing.Optional[str]
beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)
class ParsePubSubMessage(beam.DoFn):
def process(self, message):
import json
all_columns = [
"can_data_frame_1"
]
main_dict = dict(zip(all_columns, [None] * len(all_columns)))
record = json.loads(message.decode('utf-8'))
main_dict.update(record)
yield {
all_columns[0]: main_dict[all_columns[0]]}
def run():
options = PipelineOptions(
project='dwingestion',
runner='DataflowRunner',
streaming=True,
temp_location='gs://....../temp',
staging_location='gs://.........../staging',
region='europe-west1',
job_name='.........streaming-pipeline-dataflow',
save_main_session=True,
flags=['--allow_unsafe_triggers']
)
options.view_as(StandardOptions).streaming = True
input_subscription = 'projects/..._data_streaming'
table_schema = {
"fields": [
{"name": "current_mA", "type": "INTEGER", "mode": "NULLABLE"}
]
}
with beam.Pipeline(options=options) as p:
messages = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Apply Fixed Window' >> beam.WindowInto(
window.FixedWindows(60),
trigger=beam.trigger.AfterWatermark(),
allowed_lateness=window.Duration(10),
accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
)
| 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
| 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
)
# Convert the messages to a DataFrame
df = to_dataframe(messages)
# Extract and process the 'current_mA' field
df['current_mA'] = df['can_data_frame_1'].str[4:8].apply(lambda x: int(x, 16) if pd.notna(x) else 0)
df['current_mA'] = df['current_mA'].where(df['current_mA'] < 0x8000, df['current_mA'] - 0x10000)
df['current_mA'] = df['current_mA'] * 10
# Convert back to PCollection and map to dictionaries
transformed_pcol = (
to_pcollection(df)
| 'Log Transformed PCollection' >> beam.Map(lambda x: (print(f"Transformed Row: {x}"), x)[1]) # Debugging
| 'Convert to Dict with Native Types' >> beam.Map(lambda row: {
"current_mA": int(row.current_mA) if row.current_mA is not None else None
})
)
# Write to BigQuery
transformed_pcol | 'Write to BigQuery' >> WriteToBigQuery(
table='..........table_test_all_columns_04',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
schema=table_schema,
custom_gcs_temp_location='gs://......_template/temp'
)
if __name__ == '__main__':
run()
Hello am in process of creating an apache beam data pipiline that runs in GCP using DataFlow as runner.Below is a my code ,it does not throw any errors but the issue is no data is written into bigquery when I check the Job execution graph there seems to be no activity on the "write to bigquery" section of the pipeline yet I can see activities in data commming from the pubsub activity.I have gone even ahead to add a trigger but still no data is coming out .Please assit .Thanks
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
from apache_beam import window
import typing
import numpy as np
import pandas as pd
class BmsSchema(typing.NamedTuple):
can_data_frame_1: typing.Optional[str]
beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)
class ParsePubSubMessage(beam.DoFn):
def process(self, message):
import json
all_columns = [
"can_data_frame_1"
]
main_dict = dict(zip(all_columns, [None] * len(all_columns)))
record = json.loads(message.decode('utf-8'))
main_dict.update(record)
yield {
all_columns[0]: main_dict[all_columns[0]]}
def run():
options = PipelineOptions(
project='dwingestion',
runner='DataflowRunner',
streaming=True,
temp_location='gs://....../temp',
staging_location='gs://.........../staging',
region='europe-west1',
job_name='.........streaming-pipeline-dataflow',
save_main_session=True,
flags=['--allow_unsafe_triggers']
)
options.view_as(StandardOptions).streaming = True
input_subscription = 'projects/..._data_streaming'
table_schema = {
"fields": [
{"name": "current_mA", "type": "INTEGER", "mode": "NULLABLE"}
]
}
with beam.Pipeline(options=options) as p:
messages = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Apply Fixed Window' >> beam.WindowInto(
window.FixedWindows(60),
trigger=beam.trigger.AfterWatermark(),
allowed_lateness=window.Duration(10),
accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
)
| 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
| 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
)
# Convert the messages to a DataFrame
df = to_dataframe(messages)
# Extract and process the 'current_mA' field
df['current_mA'] = df['can_data_frame_1'].str[4:8].apply(lambda x: int(x, 16) if pd.notna(x) else 0)
df['current_mA'] = df['current_mA'].where(df['current_mA'] < 0x8000, df['current_mA'] - 0x10000)
df['current_mA'] = df['current_mA'] * 10
# Convert back to PCollection and map to dictionaries
transformed_pcol = (
to_pcollection(df)
| 'Log Transformed PCollection' >> beam.Map(lambda x: (print(f"Transformed Row: {x}"), x)[1]) # Debugging
| 'Convert to Dict with Native Types' >> beam.Map(lambda row: {
"current_mA": int(row.current_mA) if row.current_mA is not None else None
})
)
# Write to BigQuery
transformed_pcol | 'Write to BigQuery' >> WriteToBigQuery(
table='..........table_test_all_columns_04',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
schema=table_schema,
custom_gcs_temp_location='gs://......_template/temp'
)
if __name__ == '__main__':
run()
本文标签:
版权声明:本文标题:python - Apache beam pipeline not running to completion after applying windowing and a trigger and no errors thrown - Stack Over 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/questions/1745628615a2160019.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论