Click here to Skip to main content
15,887,485 members
Please Sign up or sign in to vote.
1.00/5 (2 votes)
See more:
I am using Testcontainers to setup Zookeeper and Kafka. Containers seem to be starting but I am noticing some errors in kafka. Also, I am trying to create a topic and produce message, but with no success.

2023-09-06 10:03:52 [2023-09-06 08:03:52,569] WARN [Controller id=1, targetBrokerId=1] Error connecting to node kafka_817e3e9d-9da9-42b7-ade0-cf4bbd7c791c:29092 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient)
2023-09-06 10:03:52 java.net.UnknownHostException: kafka_817e3e9d-9da9-42b7-ade0-cf4bbd7c791c
2023-09-06 10:03:52 at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
2023-09-06 10:03:52 at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1524)
2023-09-06 10:03:52 at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1382)
2023-09-06 10:03:52 at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
2023-09-06 10:03:52 at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
2023-09-06 10:03:52 at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)
2023-09-06 10:03:52 at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:510)
2023-09-06 10:03:52 at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467)
2023-09-06 10:03:52 at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173)
2023-09-06 10:03:52 at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:990)
2023-09-06 10:03:52 at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301)
2023-09-06 10:03:52 at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:64)


C#
		private static async Task StartupTask()
		{
			var containerName = Guid.NewGuid().ToString();
			var zookeeperContainerName = $"zookeeper_{containerName}";
			var zookeeperContainer = new ContainerBuilder()
				// Set the image for the container
				.WithImage("confluentinc/cp-zookeeper:latest")
				.WithName(zookeeperContainerName)
				// Bind port 2181 of the container to 
                // a random port on the host.
				.WithPortBinding(2181, true)
				.WithEnvironment(new Dictionary<string, string>
				{
					{"ZOOKEEPER_CLIENT_PORT", "2181"},
					{"ZOOKEEPER_TICK_TIME", "2000"}

				})
				.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(2181))
				.Build();

			await zookeeperContainer.StartAsync()
				.ConfigureAwait(false);

			var zookeeperHostPort = 
                zookeeperContainer.GetMappedPublicPort(2181);
			var kafkaContainerName = $"kafka_{containerName}";
			var kafkaHostPort = FindFreePort();
			var kafkaContainer = new ContainerBuilder()
				// Set the image for the container
				.WithImage("confluentinc/cp-kafka:latest")
				.WithName(kafkaContainerName)
				.WithPortBinding(kafkaHostPort, 9092)
				.WithEnvironment(new Dictionary<string, string>
				{
					{"KAFKA_BROKER_ID", "1"},
					{"KAFKA_ZOOKEEPER_CONNECT", 
                      $"host.docker.internal:{zookeeperHostPort}"},

					{"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", 
                     "PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT"},
					{"KAFKA_LISTENERS", 
                     "PLAINTEXT://:9092,PLAINTEXT_INTERNAL://:29092"},
					{"KAFKA_ADVERTISED_LISTENERS", 
                      $"PLAINTEXT://localhost: {kafkaHostPort},PLAINTEXT_INTERNAL://{kafkaContainerName}:29092"},
					{"KAFKA_INTER_BROKER_LISTENER_NAME", 
                     "PLAINTEXT_INTERNAL"},
					{"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1"},
					{"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1"},
					{"KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true"},
					{"KAFKA_DELETE_TOPIC_ENABLE", "true"},
					{"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1"},

				})
				.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(9092))
				.Build();

			await kafkaContainer.StartAsync()
				.ConfigureAwait(false);
}


To publish messages, I am using the following:

C#
	var topicName = $"automation";
	var bootstrapServers = $"localhost:{kafkaHostPort}";

	await PublishMessage(topicName, bootstrapServers);
}

private static async Task PublishMessage(string topicName, string bootstrapServers)
{
	var config = new ProducerConfig
	{
		BootstrapServers = bootstrapServers
	};

	using var producer = new ProducerBuilder<Null, string>(config).Build();
	try
	{
		var result = await producer.ProduceAsync
                     (topicName, new Message<Null, 
                      string> { Value = "my message" });
	}
	catch (ProduceException<Null, string> e)
	{
		Console.WriteLine($"Delivery failed: {e.Error.Reason}");
	}
}


What I have tried:

Publish a message, manually create topics before publishing messages:

C#
private static async Task CreateKafkaTopic
(string topicName, string bootstrapServers)
{
	var config = new AdminClientConfig
	{
		BootstrapServers = bootstrapServers
	};

	using var adminClient = new AdminClientBuilder(config)
		.Build();

	await adminClient.CreateTopicsAsync(new[] {
		new TopicSpecification { Name = topicName, 
        ReplicationFactor = 1, NumPartitions = 1 } });
}
Posted
Updated 6-Sep-23 8:16am
v2
Comments
Richard MacCutchan 6-Sep-23 4:40am    
These are the messages that need attention:
2023-09-06 10:03:52 [2023-09-06 08:03:52,569] WARN [Controller id=1, targetBrokerId=1] Error connecting to node kafka_817e3e9d-9da9-42b7-ade0-cf4bbd7c791c:29092 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient)
2023-09-06 10:03:52 java.net.UnknownHostException: kafka_817e3e9d-9da9-42b7-ade0-cf4bbd7c791c


Try the following:

1 - Add Testcontainers and Kafka client dependencies to your project.

2- Start Zookeeper and Kafka containers using Testcontainers in your test setup method.

3 - In your test method, use Kafka clients to create a Kafka topic and produce a message.
 
Share this answer
 
Found out the problem was I was missing
.WithHostname(kafkaContainerName)
 
Share this answer
 

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900