admin管理员组

文章数量:1026213

Writing a pipeline that splits a stream into tables dynamically named by the event_name and event_date in the data, in Dataflow.

The tables are being created, with the correct name, but the data is failing to be written, citing the below formatting error.

"Unknown name "json" at 'rows[0]': Proto field is not repeating, cannot start list"

The print record stage provides this log, immediately before writetobigquery is called - which to me looks correct:

About to write to BigQuery - Table: PROJECT_ID:DATASET_NAME.TABLE_NAME, Record: [{'event_name': 'scroll', 'event_date': '20241118', 'user_id': '', 'platform': 'WEB'}]

(for clarity i have tried removing square brackets, with the same result)

Here is the pipeline code

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.transforms.window import FixedWindows
import logging

def log_before_write(element):
    table_name, record = element
    logging.info(f"About to write to BigQuery - Table: {table_name}, Record: {record}")
    return element 

class SplitByParameter(beam.DoFn):
    def process(self, element):
        event_name = element['event_name']
        event_date = element['event_date']
        yield (event_name, event_date, element)

def format_table_name(element):
    event_name, event_date, record = element
    sanitized_event_name = event_name.replace(' ', '_')
    sanitized_event_date = event_date.replace(' ', '_')
    table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
    return table_name, record


def split_records(element):
    table_name, record = element

    
    json_record = [{
    'event_name': str(record.get('event_name', '')) if record.get('event_name') is not None else '',
    'event_date': str(record.get('event_date', '')) if record.get('event_date') is not None else '',
    'user_id': str(record.get('user_id', '')) if record.get('user_id') is not None else '',
    'platform': str(record.get('platform', '')) if record.get('platform') is not None else ''
    }]

    yield (table_name,json_record)

def print_record(record):
    logging.info(f"Record before WriteToBigQuery: {record}")
    return record

def run(argv=None):
    options = PipelineOptions(argv)
    options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=options)

    # Define schema for BigQuery (this needs to match your record structure)
    schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'

    # Read from BigQuery, apply windowing, and process records
    (p
     | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=f'''
        SELECT *
        FROM `PROJECT_ID.DATASET.TABLE`
        WHERE _TABLE_SUFFIX = FORMAT_TIMESTAMP('%Y%m%d', CURRENT_TIMESTAMP())
       ''', use_standard_sql=True)
     | 'ApplyWindowing' >> beam.WindowInto(FixedWindows(60))  # 60-second window
     | 'SplitByParameter' >> beam.ParDo(SplitByParameter())  # Split by event_name and event_date
     | 'FormatTableName' >> beam.Map(format_table_name)  # Format the table name
     | 'LogBeforeFlatMap' >> beam.Map(lambda x: logging.info(f'Before FlatMap: {x}') or x)
     | 'SplitRecords' >> beam.FlatMap(split_records)  # Convert record to desired format
     | 'LogBeforeWrite' >> beam.Map(log_before_write)
     | 'PrintRecord' >> beam.Map(print_record)  # Print records before writing to BigQuery
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           table=lambda x: x[0],  # Table name is the first element of the tuple
           schema=schema,  # Use the schema defined above
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND  # Append data to existing tables
       )
    )

    p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Not sure where to go from here...

I've tried a bunch of different formatting, but there's only so many configs that have a chance of working.

Writing a pipeline that splits a stream into tables dynamically named by the event_name and event_date in the data, in Dataflow.

The tables are being created, with the correct name, but the data is failing to be written, citing the below formatting error.

"Unknown name "json" at 'rows[0]': Proto field is not repeating, cannot start list"

The print record stage provides this log, immediately before writetobigquery is called - which to me looks correct:

About to write to BigQuery - Table: PROJECT_ID:DATASET_NAME.TABLE_NAME, Record: [{'event_name': 'scroll', 'event_date': '20241118', 'user_id': '', 'platform': 'WEB'}]

(for clarity i have tried removing square brackets, with the same result)

Here is the pipeline code

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.transforms.window import FixedWindows
import logging

def log_before_write(element):
    table_name, record = element
    logging.info(f"About to write to BigQuery - Table: {table_name}, Record: {record}")
    return element 

class SplitByParameter(beam.DoFn):
    def process(self, element):
        event_name = element['event_name']
        event_date = element['event_date']
        yield (event_name, event_date, element)

def format_table_name(element):
    event_name, event_date, record = element
    sanitized_event_name = event_name.replace(' ', '_')
    sanitized_event_date = event_date.replace(' ', '_')
    table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
    return table_name, record


def split_records(element):
    table_name, record = element

    
    json_record = [{
    'event_name': str(record.get('event_name', '')) if record.get('event_name') is not None else '',
    'event_date': str(record.get('event_date', '')) if record.get('event_date') is not None else '',
    'user_id': str(record.get('user_id', '')) if record.get('user_id') is not None else '',
    'platform': str(record.get('platform', '')) if record.get('platform') is not None else ''
    }]

    yield (table_name,json_record)

def print_record(record):
    logging.info(f"Record before WriteToBigQuery: {record}")
    return record

def run(argv=None):
    options = PipelineOptions(argv)
    options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=options)

    # Define schema for BigQuery (this needs to match your record structure)
    schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'

    # Read from BigQuery, apply windowing, and process records
    (p
     | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=f'''
        SELECT *
        FROM `PROJECT_ID.DATASET.TABLE`
        WHERE _TABLE_SUFFIX = FORMAT_TIMESTAMP('%Y%m%d', CURRENT_TIMESTAMP())
       ''', use_standard_sql=True)
     | 'ApplyWindowing' >> beam.WindowInto(FixedWindows(60))  # 60-second window
     | 'SplitByParameter' >> beam.ParDo(SplitByParameter())  # Split by event_name and event_date
     | 'FormatTableName' >> beam.Map(format_table_name)  # Format the table name
     | 'LogBeforeFlatMap' >> beam.Map(lambda x: logging.info(f'Before FlatMap: {x}') or x)
     | 'SplitRecords' >> beam.FlatMap(split_records)  # Convert record to desired format
     | 'LogBeforeWrite' >> beam.Map(log_before_write)
     | 'PrintRecord' >> beam.Map(print_record)  # Print records before writing to BigQuery
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           table=lambda x: x[0],  # Table name is the first element of the tuple
           schema=schema,  # Use the schema defined above
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND  # Append data to existing tables
       )
    )

    p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Not sure where to go from here...

I've tried a bunch of different formatting, but there's only so many configs that have a chance of working.

Share Improve this question edited Nov 18, 2024 at 21:26 danronmoon 3,8735 gold badges36 silver badges58 bronze badges asked Nov 18, 2024 at 10:54 Rick RicklesRick Rickles 112 bronze badges 1
  • I do not think you can pass (table_name,json_record) to WriteToBigQuery. If your table name is not changed, follow this example: github/GoogleCloudPlatform/dataflow-cookbook/blob/main/…. If your table name is dynamic based on the input data, check this: github/GoogleCloudPlatform/dataflow-cookbook/blob/main/… – XQ Hu Commented Nov 18, 2024 at 16:08
Add a comment  | 

1 Answer 1

Reset to default 0

WriteToBigQuery expects to process json fields, but in this pipeline it is receiving a tuple of (table_name, json_field).

The PCollection parsed by WriteToBigQuery needs to be a json field, and the table name function should genererate the table name using the json field as an input like

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import logging

def format_table_name(element):
    event_name = element['event_name']
    event_date = element['event_date']
    sanitized_event_name = event_name.replace(' ', '_')
    sanitized_event_date = event_date.replace(' ', '_')
    table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
    return table_name


def split_records(element):
    json_record = {
    'event_name': str(element.get('event_name', '')) if element.get('event_name') is not None else '',
    'event_date': str(element.get('event_date', '')) if element.get('event_date') is not None else '',
    'user_id': str(element.get('user_id', '')) if element.get('user_id') is not None else '',
    'platform': str(element.get('platform', '')) if element.get('platform') is not None else ''
    }

    yield json_record

def run(argv=None):
    options = PipelineOptions(argv)
    options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=options)

    # Define schema for BigQuery (this needs to match your record structure)
    schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'

    (p
     | beam.Create([
         {
            'event_name': 'scroll',
            'event_date': '20241118',
            'user_id': 'user1',
            'platform': 'WEB'
        },
        {
            'event_name': 'click',
            'event_date': '20241117',
            'user_id': 'user2',
            'platform': 'WEB'
        }
     ])
     | 'SplitRecords' >> beam.FlatMap(split_records)  # Convert record to desired format
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           table=format_table_name,  # Table name is the first element of the tuple
           schema=schema,  # Use the schema defined above
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND  # Append data to existing tables
       )
    )

    p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Writing a pipeline that splits a stream into tables dynamically named by the event_name and event_date in the data, in Dataflow.

The tables are being created, with the correct name, but the data is failing to be written, citing the below formatting error.

"Unknown name "json" at 'rows[0]': Proto field is not repeating, cannot start list"

The print record stage provides this log, immediately before writetobigquery is called - which to me looks correct:

About to write to BigQuery - Table: PROJECT_ID:DATASET_NAME.TABLE_NAME, Record: [{'event_name': 'scroll', 'event_date': '20241118', 'user_id': '', 'platform': 'WEB'}]

(for clarity i have tried removing square brackets, with the same result)

Here is the pipeline code

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.transforms.window import FixedWindows
import logging

def log_before_write(element):
    table_name, record = element
    logging.info(f"About to write to BigQuery - Table: {table_name}, Record: {record}")
    return element 

class SplitByParameter(beam.DoFn):
    def process(self, element):
        event_name = element['event_name']
        event_date = element['event_date']
        yield (event_name, event_date, element)

def format_table_name(element):
    event_name, event_date, record = element
    sanitized_event_name = event_name.replace(' ', '_')
    sanitized_event_date = event_date.replace(' ', '_')
    table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
    return table_name, record


def split_records(element):
    table_name, record = element

    
    json_record = [{
    'event_name': str(record.get('event_name', '')) if record.get('event_name') is not None else '',
    'event_date': str(record.get('event_date', '')) if record.get('event_date') is not None else '',
    'user_id': str(record.get('user_id', '')) if record.get('user_id') is not None else '',
    'platform': str(record.get('platform', '')) if record.get('platform') is not None else ''
    }]

    yield (table_name,json_record)

def print_record(record):
    logging.info(f"Record before WriteToBigQuery: {record}")
    return record

def run(argv=None):
    options = PipelineOptions(argv)
    options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=options)

    # Define schema for BigQuery (this needs to match your record structure)
    schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'

    # Read from BigQuery, apply windowing, and process records
    (p
     | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=f'''
        SELECT *
        FROM `PROJECT_ID.DATASET.TABLE`
        WHERE _TABLE_SUFFIX = FORMAT_TIMESTAMP('%Y%m%d', CURRENT_TIMESTAMP())
       ''', use_standard_sql=True)
     | 'ApplyWindowing' >> beam.WindowInto(FixedWindows(60))  # 60-second window
     | 'SplitByParameter' >> beam.ParDo(SplitByParameter())  # Split by event_name and event_date
     | 'FormatTableName' >> beam.Map(format_table_name)  # Format the table name
     | 'LogBeforeFlatMap' >> beam.Map(lambda x: logging.info(f'Before FlatMap: {x}') or x)
     | 'SplitRecords' >> beam.FlatMap(split_records)  # Convert record to desired format
     | 'LogBeforeWrite' >> beam.Map(log_before_write)
     | 'PrintRecord' >> beam.Map(print_record)  # Print records before writing to BigQuery
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           table=lambda x: x[0],  # Table name is the first element of the tuple
           schema=schema,  # Use the schema defined above
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND  # Append data to existing tables
       )
    )

    p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Not sure where to go from here...

I've tried a bunch of different formatting, but there's only so many configs that have a chance of working.

Writing a pipeline that splits a stream into tables dynamically named by the event_name and event_date in the data, in Dataflow.

The tables are being created, with the correct name, but the data is failing to be written, citing the below formatting error.

"Unknown name "json" at 'rows[0]': Proto field is not repeating, cannot start list"

The print record stage provides this log, immediately before writetobigquery is called - which to me looks correct:

About to write to BigQuery - Table: PROJECT_ID:DATASET_NAME.TABLE_NAME, Record: [{'event_name': 'scroll', 'event_date': '20241118', 'user_id': '', 'platform': 'WEB'}]

(for clarity i have tried removing square brackets, with the same result)

Here is the pipeline code

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.transforms.window import FixedWindows
import logging

def log_before_write(element):
    table_name, record = element
    logging.info(f"About to write to BigQuery - Table: {table_name}, Record: {record}")
    return element 

class SplitByParameter(beam.DoFn):
    def process(self, element):
        event_name = element['event_name']
        event_date = element['event_date']
        yield (event_name, event_date, element)

def format_table_name(element):
    event_name, event_date, record = element
    sanitized_event_name = event_name.replace(' ', '_')
    sanitized_event_date = event_date.replace(' ', '_')
    table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
    return table_name, record


def split_records(element):
    table_name, record = element

    
    json_record = [{
    'event_name': str(record.get('event_name', '')) if record.get('event_name') is not None else '',
    'event_date': str(record.get('event_date', '')) if record.get('event_date') is not None else '',
    'user_id': str(record.get('user_id', '')) if record.get('user_id') is not None else '',
    'platform': str(record.get('platform', '')) if record.get('platform') is not None else ''
    }]

    yield (table_name,json_record)

def print_record(record):
    logging.info(f"Record before WriteToBigQuery: {record}")
    return record

def run(argv=None):
    options = PipelineOptions(argv)
    options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=options)

    # Define schema for BigQuery (this needs to match your record structure)
    schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'

    # Read from BigQuery, apply windowing, and process records
    (p
     | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=f'''
        SELECT *
        FROM `PROJECT_ID.DATASET.TABLE`
        WHERE _TABLE_SUFFIX = FORMAT_TIMESTAMP('%Y%m%d', CURRENT_TIMESTAMP())
       ''', use_standard_sql=True)
     | 'ApplyWindowing' >> beam.WindowInto(FixedWindows(60))  # 60-second window
     | 'SplitByParameter' >> beam.ParDo(SplitByParameter())  # Split by event_name and event_date
     | 'FormatTableName' >> beam.Map(format_table_name)  # Format the table name
     | 'LogBeforeFlatMap' >> beam.Map(lambda x: logging.info(f'Before FlatMap: {x}') or x)
     | 'SplitRecords' >> beam.FlatMap(split_records)  # Convert record to desired format
     | 'LogBeforeWrite' >> beam.Map(log_before_write)
     | 'PrintRecord' >> beam.Map(print_record)  # Print records before writing to BigQuery
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           table=lambda x: x[0],  # Table name is the first element of the tuple
           schema=schema,  # Use the schema defined above
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND  # Append data to existing tables
       )
    )

    p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Not sure where to go from here...

I've tried a bunch of different formatting, but there's only so many configs that have a chance of working.

Share Improve this question edited Nov 18, 2024 at 21:26 danronmoon 3,8735 gold badges36 silver badges58 bronze badges asked Nov 18, 2024 at 10:54 Rick RicklesRick Rickles 112 bronze badges 1
  • I do not think you can pass (table_name,json_record) to WriteToBigQuery. If your table name is not changed, follow this example: github/GoogleCloudPlatform/dataflow-cookbook/blob/main/…. If your table name is dynamic based on the input data, check this: github/GoogleCloudPlatform/dataflow-cookbook/blob/main/… – XQ Hu Commented Nov 18, 2024 at 16:08
Add a comment  | 

1 Answer 1

Reset to default 0

WriteToBigQuery expects to process json fields, but in this pipeline it is receiving a tuple of (table_name, json_field).

The PCollection parsed by WriteToBigQuery needs to be a json field, and the table name function should genererate the table name using the json field as an input like

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import logging

def format_table_name(element):
    event_name = element['event_name']
    event_date = element['event_date']
    sanitized_event_name = event_name.replace(' ', '_')
    sanitized_event_date = event_date.replace(' ', '_')
    table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
    return table_name


def split_records(element):
    json_record = {
    'event_name': str(element.get('event_name', '')) if element.get('event_name') is not None else '',
    'event_date': str(element.get('event_date', '')) if element.get('event_date') is not None else '',
    'user_id': str(element.get('user_id', '')) if element.get('user_id') is not None else '',
    'platform': str(element.get('platform', '')) if element.get('platform') is not None else ''
    }

    yield json_record

def run(argv=None):
    options = PipelineOptions(argv)
    options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=options)

    # Define schema for BigQuery (this needs to match your record structure)
    schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'

    (p
     | beam.Create([
         {
            'event_name': 'scroll',
            'event_date': '20241118',
            'user_id': 'user1',
            'platform': 'WEB'
        },
        {
            'event_name': 'click',
            'event_date': '20241117',
            'user_id': 'user2',
            'platform': 'WEB'
        }
     ])
     | 'SplitRecords' >> beam.FlatMap(split_records)  # Convert record to desired format
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           table=format_table_name,  # Table name is the first element of the tuple
           schema=schema,  # Use the schema defined above
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND  # Append data to existing tables
       )
    )

    p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

本文标签: pythonApachebeam write to big query json payload errorStack Overflow