admin管理员组

文章数量:1024615

I'm trying to connect to kafka using Confluent.Kafka 2.6.0 on .Net 7:

public static class ProducerExtensions
{
    public static IServiceCollection AddProducer(this IServiceCollection services, IConfiguration configuration)
    {
        services.AddTransient<IMessageProducer, MessageProducer>();

        var kafkaConfiguration = configuration.BindFromAppConfig<KafkaConfiguration>();
        services.AddSingleton(kafkaConfiguration);
        services.AddSingleton(new ProducerBuilder<string, object>(new ProducerConfig
            {
                BootstrapServers = "SensetiveData_BootstrapServers",
                MessageTimeoutMs = 300000,
                SocketTimeoutMs = 300000,
                TransactionTimeoutMs = 300000,
                SocketConnectionSetupTimeoutMs = 300000,
                RequestTimeoutMs = 300000,
                Partitioner = Partitioner.Consistent,
                EnableSslCertificateVerification = true,

                // Security and SASL
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SaslMechanism = SaslMechanism.OAuthBearer,
                SaslOauthbearerTokenEndpointUrl = "SensetiveData_TokenUrl",
                SaslOauthbearerClientId = "SensetiveData_ClientId",
                SaslOauthbearerClientSecret = "SensetiveData_ClientSecret",
                // SSL
                SslCaLocation = "C:\\kafka_2.13-3.7.0\\SensetiveData_CertificationName.pem",
                Debug = "all" // Enables detailed logging.
            })
            .SetValueSerializer(Serializers.ObjectCommandSerializer)
            .Build());
        return services;
    }
}

In logs from Confluent.Kafka I received (Sensetive data was hidden):

%7|1731932229.571|SASL|[SensetiveData_ClientId]#producer-1| [thrd:app]: Selected provider OAUTHBEARER (builtin) for SASL mechanism OAUTHBEARER
%7|1731932229.571|OPENSSL|[SensetiveData_ClientId]#producer-1| [thrd:app]: Using OpenSSL version OpenSSL 3.0.8 7 Feb 2023 (0x30000080, librdkafka built with 0x30000080)
%7|1731932229.576|SSL|[SensetiveData_ClientId]#producer-1| [thrd:app]: Loading CA certificate(s) from file SensetiveData_PemCertificateLocation
%7|1731932229.582|BROKER|[SensetiveData_ClientId]#producer-1| [thrd:app]: sasl_ssl://[SensetiveData_BootstrapServers]/bootstrap: Added new broker with NodeId -1
%7|1731932229.582|BRKMAIN|[SensetiveData_ClientId]#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1731932229.582|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:app]: sasl_ssl://[SensetiveData_BootstrapServers]/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1731932229.582|BRKMAIN|[SensetiveData_ClientId]#producer-1| [thrd:sasl_ssl://[SensetiveData_BootstrapServers]/bootstra]: sasl_ssl://[SensetiveData_BootstrapServers]/bootstrap: Enter main broker thread
%7|1731932229.583|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:sasl_ssl://[SensetiveData_BootstrapServers]/bootstra]: sasl_ssl://[SensetiveData_BootstrapServers]/bootstrap: Received CONNECT op
%7|1731932229.583|STATE|[SensetiveData_ClientId]#producer-1| [thrd:sasl_ssl://[SensetiveData_BootstrapServers]/bootstra]: sasl_ssl://[SensetiveData_BootstrapServers]/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1731932229.584|BROADCAST|[SensetiveData_ClientId]#producer-1| [thrd:sasl_ssl://[SensetiveData_BootstrapServers]/bootstra]: Broadcasting state change
%7|1731932229.583|INIT|[SensetiveData_ClientId]#producer-1| [thrd:app]: librdkafka v2.6.0 (0x20600ff) [SensetiveData_ClientId]#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, SSL ZLIB SNAPPY ZSTD CURL SASL_SCRAM SASL_OAUTHBEARER PLUGINS HDRHISTOGRAM, debug 0xfffff)
%7|1731932229.585|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]: Client configuration:
%7|1731932229.585|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   client.id = [SensetiveData_ClientId]
%7|1731932229.585|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   client.software.name = confluent-kafka-dotnet
%7|1731932229.585|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   client.software.version = 2.6.0
%7|1731932229.585|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   metadata.broker.list = [SensetiveData_BootstrapServers]
%7|1731932229.586|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,telemetry,all
%7|1731932229.586|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   socket.timeout.ms = 300000
%7|1731932229.586|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   socket.connection.setup.timeout.ms = 300000
%7|1731932229.587|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   default_topic_conf = 00000219A2526950
%7|1731932229.587|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   security.protocol = sasl_ssl
%7|1731932229.587|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   ssl.ca.location = SensetiveData_PemCertificateLocation
%7|1731932229.587|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   enable.ssl.certificate.verification = true
%7|1731932229.588|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   sasl.mechanisms = OAUTHBEARER
%7|1731932229.588|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   sasl.oauthbearer.client.id = [SensetiveData_ClientId]
%7|1731932229.588|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   sasl.oauthbearer.client.secret = [SensetiveData_ClientSecret]
%7|1731932229.588|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   sasl.oauthbearer.token.endpoint.url = [SensetiveData_TokenUrl]
%7|1731932229.589|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   transaction.timeout.ms = 300000
%7|1731932229.589|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   dr_msg_cb = 00007FFDB05433A4
%7|1731932229.589|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]: Default topic configuration:
%7|1731932229.589|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   request.timeout.ms = 300000
%7|1731932229.589|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   message.timeout.ms = 300000
%7|1731932229.589|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   partitioner = consistent
[2024-11-18 15:17:09 INF] ProjectName.Kafka has been started
[2024-11-18 15:17:10 INF] Now listening on: http://localhost:8000
[2024-11-18 15:17:10 INF] Application started. Press Ctrl+C to shut down.
[2024-11-18 15:17:10 INF] Hosting environment: Local
[2024-11-18 15:17:10 INF] Content root path: C:\Users\SensetiveData_UserName\repos\projectname-kafka\src\ProjectName.Kafka
%7|1731932230.032|TOPIC|[SensetiveData_ClientId]#producer-1| [thrd:app]: New local topic: [SensetiveData_TopicName]
%7|1731932230.032|TOPPARNEW|[SensetiveData_ClientId]#producer-1| [thrd:app]: NEW [SensetiveData_TopicName][-1] 00000219A24DC990 refcnt 00000219A24DCA20 (at rd_kafka_topic_new0:488)
%7|1731932230.032|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]: Topic "[SensetiveData_TopicName]" configuration (default_topic_conf):
%7|1731932230.033|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   request.timeout.ms = 300000
%7|1731932230.033|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   message.timeout.ms = 300000
%7|1731932230.033|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   partitioner = consistent
%7|1731932230.591|NOINFO|[SensetiveData_ClientId]#producer-1| [thrd:main]: Topic [SensetiveData_TopicName]metadata information unknown
%7|1731932230.591|NOINFO|[SensetiveData_ClientId]#producer-1| [thrd:main]: Topic [SensetiveData_TopicName]partition count is zero: should refresh metadata
%7|1731932230.592|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1731932230.593|METADATA|[SensetiveData_ClientId]#producer-1| [thrd:main]: Hinted cache of 1/1 topic(s) being queried
%7|1731932230.594|METADATA|[SensetiveData_ClientId]#producer-1| [thrd:main]: Skipping metadata refresh of 1 topic(s): refreshunavailable topics: no usable brokers
%7|1731932230.594|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 47ms: no cluster connection
%7|1731932231.603|NOINFO|[SensetiveData_ClientId]#producer-1| [thrd:main]: Topic [SensetiveData_TopicName]metadata information unknown
%7|1731932231.603|NOINFO|[SensetiveData_ClientId]#producer-1| [thrd:main]: Topic [SensetiveData_TopicName]partition count is zero: should refresh metadata
%7|1731932231.604|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1731932231.604|METADATA|[SensetiveData_ClientId]#producer-1| [thrd:main]: Hinted cache of 1/1 topic(s) being queried
%7|1731932231.604|METADATA|[SensetiveData_ClientId]#producer-1| [thrd:main]: Skipping metadata refresh of 1 topic(s): refreshunavailable topics: no usable brokers
%7|1731932231.604|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection[2024-11-18 15:17:12 INF] Application is shutting down...[2024-11-18 15:17:12 INF] Stopping CQRS engine...
%7|1731932232.618|NOINFO|[SensetiveData_ClientId]#producer-1| [thrd:main]: Topic [SensetiveData_TopicName]metadata information unknown
%7|1731932232.619|NOINFO|[SensetiveData_ClientId]#producer-1| [thrd:main]: Topic [SensetiveData_TopicName]partition count is zero: should refresh metadata
%7|1731932232.620|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1731932232.620|METADATA|[SensetiveData_ClientId]#producer-1| [thrd:main]: Hinted cache of 1/1 topic(s) being queried
%7|1731932232.621|METADATA|[SensetiveData_ClientId]#producer-1| [thrd:main]: Skipping metadata refresh of 1 topic(s): refreshunavailable topics: no usable brokers
%7|1731932232.621|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 48ms: no cluster connection

I can connect to kafka via a .ps1 script (as producer):

$Client_ID = "SensetiveData_ClientId"
$Secret = "SensetiveData_ClientSecret"
$OIDC = "SensetiveData_TokenUrl"
$Bootstrap = "SensetiveData_BootstrapServers"
$Topic = "SensetiveData_TopicName"
 
C:\kafka_2.13-3.7.0\bin\windows\kafka-console-producer.bat `
--bootstrap-server $Bootstrap `
--topic $Topic `
--producer-property client.id="$Client_ID" `
--producer-property security.protocol="SASL_SSL" `
--producer-property sasl.mechanism="OAUTHBEARER" `
--producer-property ssl.truststore.type="PEM" `
--producer-property ssl.truststore.location="C:\kafka_2.13-3.7.0\SensetiveData_CertificationName.pem" `
--producer-property sasl.jaas.config=".apache.kafkamon.security.oauthbearer.OAuthBearerLoginModule required oauth.token.endpoint.uri='$OIDC' oauth.client.secret='$Secret' oauth.client.id='$Client_ID';" `
--producer-property sasl.login.callback.handler.class="io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"

So I can connect to kafka as producer and consumer via .ps1 script, but can't connect to kafka through Confluent.Kafka even I set up everything as in scripts.

Give me some advices what could I do to fix it?

I'm trying to connect to kafka using Confluent.Kafka 2.6.0 on .Net 7:

public static class ProducerExtensions
{
    public static IServiceCollection AddProducer(this IServiceCollection services, IConfiguration configuration)
    {
        services.AddTransient<IMessageProducer, MessageProducer>();

        var kafkaConfiguration = configuration.BindFromAppConfig<KafkaConfiguration>();
        services.AddSingleton(kafkaConfiguration);
        services.AddSingleton(new ProducerBuilder<string, object>(new ProducerConfig
            {
                BootstrapServers = "SensetiveData_BootstrapServers",
                MessageTimeoutMs = 300000,
                SocketTimeoutMs = 300000,
                TransactionTimeoutMs = 300000,
                SocketConnectionSetupTimeoutMs = 300000,
                RequestTimeoutMs = 300000,
                Partitioner = Partitioner.Consistent,
                EnableSslCertificateVerification = true,

                // Security and SASL
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SaslMechanism = SaslMechanism.OAuthBearer,
                SaslOauthbearerTokenEndpointUrl = "SensetiveData_TokenUrl",
                SaslOauthbearerClientId = "SensetiveData_ClientId",
                SaslOauthbearerClientSecret = "SensetiveData_ClientSecret",
                // SSL
                SslCaLocation = "C:\\kafka_2.13-3.7.0\\SensetiveData_CertificationName.pem",
                Debug = "all" // Enables detailed logging.
            })
            .SetValueSerializer(Serializers.ObjectCommandSerializer)
            .Build());
        return services;
    }
}

In logs from Confluent.Kafka I received (Sensetive data was hidden):

%7|1731932229.571|SASL|[SensetiveData_ClientId]#producer-1| [thrd:app]: Selected provider OAUTHBEARER (builtin) for SASL mechanism OAUTHBEARER
%7|1731932229.571|OPENSSL|[SensetiveData_ClientId]#producer-1| [thrd:app]: Using OpenSSL version OpenSSL 3.0.8 7 Feb 2023 (0x30000080, librdkafka built with 0x30000080)
%7|1731932229.576|SSL|[SensetiveData_ClientId]#producer-1| [thrd:app]: Loading CA certificate(s) from file SensetiveData_PemCertificateLocation
%7|1731932229.582|BROKER|[SensetiveData_ClientId]#producer-1| [thrd:app]: sasl_ssl://[SensetiveData_BootstrapServers]/bootstrap: Added new broker with NodeId -1
%7|1731932229.582|BRKMAIN|[SensetiveData_ClientId]#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1731932229.582|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:app]: sasl_ssl://[SensetiveData_BootstrapServers]/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1731932229.582|BRKMAIN|[SensetiveData_ClientId]#producer-1| [thrd:sasl_ssl://[SensetiveData_BootstrapServers]/bootstra]: sasl_ssl://[SensetiveData_BootstrapServers]/bootstrap: Enter main broker thread
%7|1731932229.583|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:sasl_ssl://[SensetiveData_BootstrapServers]/bootstra]: sasl_ssl://[SensetiveData_BootstrapServers]/bootstrap: Received CONNECT op
%7|1731932229.583|STATE|[SensetiveData_ClientId]#producer-1| [thrd:sasl_ssl://[SensetiveData_BootstrapServers]/bootstra]: sasl_ssl://[SensetiveData_BootstrapServers]/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1731932229.584|BROADCAST|[SensetiveData_ClientId]#producer-1| [thrd:sasl_ssl://[SensetiveData_BootstrapServers]/bootstra]: Broadcasting state change
%7|1731932229.583|INIT|[SensetiveData_ClientId]#producer-1| [thrd:app]: librdkafka v2.6.0 (0x20600ff) [SensetiveData_ClientId]#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, SSL ZLIB SNAPPY ZSTD CURL SASL_SCRAM SASL_OAUTHBEARER PLUGINS HDRHISTOGRAM, debug 0xfffff)
%7|1731932229.585|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]: Client configuration:
%7|1731932229.585|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   client.id = [SensetiveData_ClientId]
%7|1731932229.585|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   client.software.name = confluent-kafka-dotnet
%7|1731932229.585|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   client.software.version = 2.6.0
%7|1731932229.585|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   metadata.broker.list = [SensetiveData_BootstrapServers]
%7|1731932229.586|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,telemetry,all
%7|1731932229.586|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   socket.timeout.ms = 300000
%7|1731932229.586|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   socket.connection.setup.timeout.ms = 300000
%7|1731932229.587|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   default_topic_conf = 00000219A2526950
%7|1731932229.587|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   security.protocol = sasl_ssl
%7|1731932229.587|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   ssl.ca.location = SensetiveData_PemCertificateLocation
%7|1731932229.587|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   enable.ssl.certificate.verification = true
%7|1731932229.588|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   sasl.mechanisms = OAUTHBEARER
%7|1731932229.588|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   sasl.oauthbearer.client.id = [SensetiveData_ClientId]
%7|1731932229.588|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   sasl.oauthbearer.client.secret = [SensetiveData_ClientSecret]
%7|1731932229.588|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   sasl.oauthbearer.token.endpoint.url = [SensetiveData_TokenUrl]
%7|1731932229.589|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   transaction.timeout.ms = 300000
%7|1731932229.589|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   dr_msg_cb = 00007FFDB05433A4
%7|1731932229.589|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]: Default topic configuration:
%7|1731932229.589|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   request.timeout.ms = 300000
%7|1731932229.589|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   message.timeout.ms = 300000
%7|1731932229.589|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   partitioner = consistent
[2024-11-18 15:17:09 INF] ProjectName.Kafka has been started
[2024-11-18 15:17:10 INF] Now listening on: http://localhost:8000
[2024-11-18 15:17:10 INF] Application started. Press Ctrl+C to shut down.
[2024-11-18 15:17:10 INF] Hosting environment: Local
[2024-11-18 15:17:10 INF] Content root path: C:\Users\SensetiveData_UserName\repos\projectname-kafka\src\ProjectName.Kafka
%7|1731932230.032|TOPIC|[SensetiveData_ClientId]#producer-1| [thrd:app]: New local topic: [SensetiveData_TopicName]
%7|1731932230.032|TOPPARNEW|[SensetiveData_ClientId]#producer-1| [thrd:app]: NEW [SensetiveData_TopicName][-1] 00000219A24DC990 refcnt 00000219A24DCA20 (at rd_kafka_topic_new0:488)
%7|1731932230.032|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]: Topic "[SensetiveData_TopicName]" configuration (default_topic_conf):
%7|1731932230.033|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   request.timeout.ms = 300000
%7|1731932230.033|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   message.timeout.ms = 300000
%7|1731932230.033|CONF|[SensetiveData_ClientId]#producer-1| [thrd:app]:   partitioner = consistent
%7|1731932230.591|NOINFO|[SensetiveData_ClientId]#producer-1| [thrd:main]: Topic [SensetiveData_TopicName]metadata information unknown
%7|1731932230.591|NOINFO|[SensetiveData_ClientId]#producer-1| [thrd:main]: Topic [SensetiveData_TopicName]partition count is zero: should refresh metadata
%7|1731932230.592|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1731932230.593|METADATA|[SensetiveData_ClientId]#producer-1| [thrd:main]: Hinted cache of 1/1 topic(s) being queried
%7|1731932230.594|METADATA|[SensetiveData_ClientId]#producer-1| [thrd:main]: Skipping metadata refresh of 1 topic(s): refreshunavailable topics: no usable brokers
%7|1731932230.594|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 47ms: no cluster connection
%7|1731932231.603|NOINFO|[SensetiveData_ClientId]#producer-1| [thrd:main]: Topic [SensetiveData_TopicName]metadata information unknown
%7|1731932231.603|NOINFO|[SensetiveData_ClientId]#producer-1| [thrd:main]: Topic [SensetiveData_TopicName]partition count is zero: should refresh metadata
%7|1731932231.604|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1731932231.604|METADATA|[SensetiveData_ClientId]#producer-1| [thrd:main]: Hinted cache of 1/1 topic(s) being queried
%7|1731932231.604|METADATA|[SensetiveData_ClientId]#producer-1| [thrd:main]: Skipping metadata refresh of 1 topic(s): refreshunavailable topics: no usable brokers
%7|1731932231.604|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection[2024-11-18 15:17:12 INF] Application is shutting down...[2024-11-18 15:17:12 INF] Stopping CQRS engine...
%7|1731932232.618|NOINFO|[SensetiveData_ClientId]#producer-1| [thrd:main]: Topic [SensetiveData_TopicName]metadata information unknown
%7|1731932232.619|NOINFO|[SensetiveData_ClientId]#producer-1| [thrd:main]: Topic [SensetiveData_TopicName]partition count is zero: should refresh metadata
%7|1731932232.620|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1731932232.620|METADATA|[SensetiveData_ClientId]#producer-1| [thrd:main]: Hinted cache of 1/1 topic(s) being queried
%7|1731932232.621|METADATA|[SensetiveData_ClientId]#producer-1| [thrd:main]: Skipping metadata refresh of 1 topic(s): refreshunavailable topics: no usable brokers
%7|1731932232.621|CONNECT|[SensetiveData_ClientId]#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 48ms: no cluster connection

I can connect to kafka via a .ps1 script (as producer):

$Client_ID = "SensetiveData_ClientId"
$Secret = "SensetiveData_ClientSecret"
$OIDC = "SensetiveData_TokenUrl"
$Bootstrap = "SensetiveData_BootstrapServers"
$Topic = "SensetiveData_TopicName"
 
C:\kafka_2.13-3.7.0\bin\windows\kafka-console-producer.bat `
--bootstrap-server $Bootstrap `
--topic $Topic `
--producer-property client.id="$Client_ID" `
--producer-property security.protocol="SASL_SSL" `
--producer-property sasl.mechanism="OAUTHBEARER" `
--producer-property ssl.truststore.type="PEM" `
--producer-property ssl.truststore.location="C:\kafka_2.13-3.7.0\SensetiveData_CertificationName.pem" `
--producer-property sasl.jaas.config=".apache.kafkamon.security.oauthbearer.OAuthBearerLoginModule required oauth.token.endpoint.uri='$OIDC' oauth.client.secret='$Secret' oauth.client.id='$Client_ID';" `
--producer-property sasl.login.callback.handler.class="io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"

So I can connect to kafka as producer and consumer via .ps1 script, but can't connect to kafka through Confluent.Kafka even I set up everything as in scripts.

Give me some advices what could I do to fix it?

本文标签: netConfluentKafka SASL authentication No usable brokersStack Overflow