admin管理员组文章数量:1022989
I am trying to run the following code:
CREATE TABLE IF NOT EXISTS some_source_table
(
myField1 VARCHAR,
myField2 VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'demo',
'properties.bootstrap.servers' = '***',
'properties.group.id' = 'some-id-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
CREATE TABLE IF NOT EXISTS some_sink_table
(
myField1 VARCHAR,
myField2 VARCHAR,
PRIMARY KEY (`myField1`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'demosink',
'properties.bootstrap.servers' = '***',
'key.format' = 'json',
'value.format' = 'json',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
INSERT INTO some_sink_table SELECT * FROM some_source_table;
I can see that I am consuming data in the Confluent platform, but no data is produced to the demosink topic.
I have tried to add dependencies to the example given here.
I have tried to add the following dependencies:
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
I use Flink version 1.19.1. Am I missing some dependency or are some of my dependencies incompatible? I have also tried to copy the flink-sql-connector-kafka and flink-clients .jar file into the lib folder inside the dockerfile.
I am trying to run the following code:
CREATE TABLE IF NOT EXISTS some_source_table
(
myField1 VARCHAR,
myField2 VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'demo',
'properties.bootstrap.servers' = '***',
'properties.group.id' = 'some-id-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
CREATE TABLE IF NOT EXISTS some_sink_table
(
myField1 VARCHAR,
myField2 VARCHAR,
PRIMARY KEY (`myField1`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'demosink',
'properties.bootstrap.servers' = '***',
'key.format' = 'json',
'value.format' = 'json',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
INSERT INTO some_sink_table SELECT * FROM some_source_table;
I can see that I am consuming data in the Confluent platform, but no data is produced to the demosink topic.
I have tried to add dependencies to the example given here.
I have tried to add the following dependencies:
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
I use Flink version 1.19.1. Am I missing some dependency or are some of my dependencies incompatible? I have also tried to copy the flink-sql-connector-kafka and flink-clients .jar file into the lib folder inside the dockerfile.
Share Improve this question asked Nov 28, 2024 at 9:56 user28527275user28527275 11 bronze badge 3 |1 Answer
Reset to default 0The problem was with properties.sasl.jaas.config
. Duplicate of this
I am trying to run the following code:
CREATE TABLE IF NOT EXISTS some_source_table
(
myField1 VARCHAR,
myField2 VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'demo',
'properties.bootstrap.servers' = '***',
'properties.group.id' = 'some-id-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
CREATE TABLE IF NOT EXISTS some_sink_table
(
myField1 VARCHAR,
myField2 VARCHAR,
PRIMARY KEY (`myField1`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'demosink',
'properties.bootstrap.servers' = '***',
'key.format' = 'json',
'value.format' = 'json',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
INSERT INTO some_sink_table SELECT * FROM some_source_table;
I can see that I am consuming data in the Confluent platform, but no data is produced to the demosink topic.
I have tried to add dependencies to the example given here.
I have tried to add the following dependencies:
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
I use Flink version 1.19.1. Am I missing some dependency or are some of my dependencies incompatible? I have also tried to copy the flink-sql-connector-kafka and flink-clients .jar file into the lib folder inside the dockerfile.
I am trying to run the following code:
CREATE TABLE IF NOT EXISTS some_source_table
(
myField1 VARCHAR,
myField2 VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'demo',
'properties.bootstrap.servers' = '***',
'properties.group.id' = 'some-id-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
CREATE TABLE IF NOT EXISTS some_sink_table
(
myField1 VARCHAR,
myField2 VARCHAR,
PRIMARY KEY (`myField1`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'demosink',
'properties.bootstrap.servers' = '***',
'key.format' = 'json',
'value.format' = 'json',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
INSERT INTO some_sink_table SELECT * FROM some_source_table;
I can see that I am consuming data in the Confluent platform, but no data is produced to the demosink topic.
I have tried to add dependencies to the example given here.
I have tried to add the following dependencies:
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
I use Flink version 1.19.1. Am I missing some dependency or are some of my dependencies incompatible? I have also tried to copy the flink-sql-connector-kafka and flink-clients .jar file into the lib folder inside the dockerfile.
Share Improve this question asked Nov 28, 2024 at 9:56 user28527275user28527275 11 bronze badge 3-
Do you have any error messages ? Because normally you can't read standard kafka topic (
'connector' = 'kafka'
) and insert directly to'upsert-kafka'
. 'upsert-kafka' tables require to define a PRIMARY KEY constraint. – Niko Commented Nov 28, 2024 at 10:13 -
I get
Error connecting to node b5-{BOOTSTRAP_SERVER} (id: 5 rack: 2) java.UnknownHostException: b5-{BOOTSTRAP_SERVER}: Name or service not known.
I get this error message with b0-, b1-, b2- and b5-. I have thePRIMARY KEY ('myField1') NOT ENFORCED
inupsert-kafka
is this not how you add a PRIMARY KEY? @Niko – user28527275 Commented Nov 28, 2024 at 11:21 - I dont know the real problem with your bootstrap server. Try to add error log to your post. Flink probably can't access to servers because your kafka container not sharing the same network with Flink. You have PRIMARY KEY in your sink table but your source table doesn't have it. Read this doc for better understanding – Niko Commented Nov 28, 2024 at 13:09
1 Answer
Reset to default 0The problem was with properties.sasl.jaas.config
. Duplicate of this
本文标签: Inserting data to kafka topic with Flink SQL using the kubernetes operatorStack Overflow
版权声明:本文标题:Inserting data to kafka topic with Flink SQL using the kubernetes operator - Stack Overflow 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/questions/1745518316a2154189.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
'connector' = 'kafka'
) and insert directly to'upsert-kafka'
. 'upsert-kafka' tables require to define a PRIMARY KEY constraint. – Niko Commented Nov 28, 2024 at 10:13Error connecting to node b5-{BOOTSTRAP_SERVER} (id: 5 rack: 2) java.UnknownHostException: b5-{BOOTSTRAP_SERVER}: Name or service not known.
I get this error message with b0-, b1-, b2- and b5-. I have thePRIMARY KEY ('myField1') NOT ENFORCED
inupsert-kafka
is this not how you add a PRIMARY KEY? @Niko – user28527275 Commented Nov 28, 2024 at 11:21