Click here to Skip to main content
Click here to Skip to main content

Tagged as

Go to top

C# Client for the Apache Kafka Bus

, 9 Mar 2014
Rate this:
Please Sign up or sign in to vote.
C# client for the Apache Kafka bus 0.8

Introduction

This article is about a C# client that connects to the Apache Kafka bus. I needed the client to explore the concept of µ-services but could not find any C# implementation. See Fred Georges' presentations of Micro-Service architecture here.

Background

I was missing a client to be able to test the Apache Kafka bus from my C# applications. There are clients available in several other languages on the Apache Kafka client page here, but none written in C#. I found a C# implementation on github but it was 3 years old and not compatible with the 0.8 version of the bus. With that code as a starting point (great thanks to the author of that code!), I start to write my own implementation compatible with Kafka 0.8. There were major changes in the protocol between 0.7 and 0.8, but there is a good description of the protocol at the Kafka wiki.

Using the Code

Everything on the Kafka bus is related to topics. A topic is a named instance of a message log on the bus. Messages are produced to a topic and consumed from a topic. Messages on a topic can be split into several partitions on the broker so the messages have to be addressed with topic name and a partition number. The broker will hold a log of messages so it's possible to retrieve older messages. Each message has an offset.

The lower levels of the client code can handle batches of topics and partitions, but I decided to make the top level access layer more or less single topic on single partition to keep it simple.

The low level access to the bus is handled by the Connector class. It has the following methods:

  • Metadata - Get metadata for one or more topics
  • Offset - Find start and current message offset for one or more topics
  • Fetch - Get messages from the bus on one or more topics
  • Produce - Put messages on the bus on one or more topics
The Metadata method has an interesting side effect in that it will create a topic if not present and the broker is configured to auto create topics. This is the only way I found to create topics from my client code.

General Design / Overview

Design

The objects are split in request and response objects. Each object has knowledge about serialization to or from a byte buffer. There is a class for each type of request (Metadata, Offset, Fetch and Produce). Each request contains a list of topics to act upon. There are different types of topic classes and partition classes to mirror the data structure on the bus.

Requests

All the request objects implement an IRequestBuffer interface that handles serialization to a byte buffer:

public interface IRequestBuffer
{
   List<byte> GetRequestBytes();
}  

OffsetRequest

There is one request object for each method and they have the same basic structure so I will use the OffsetRequest as an example. The request contains a list of topics or actually a list of objects implementing IRequestBuffer. The correlation id is used to identify the request and is returned in the response. This can be useful when accessing the bus asynchronously to connect a response back to a request. The ClientId shows up in the log on the server.

/// <summary>
/// Constructs a request to send to Kafka.
/// </summary>
public class OffsetRequest : IRequestBuffer
{
    private readonly Request request;
    private List<IRequestBuffer> topics = new List<IRequestBuffer>();
 
    /// <summary>
    /// The latest time constant. Specify -1 to receive the latest offsets
    /// </summary>
    public static readonly long LatestTime = -1L;
 
    /// <summary>
    /// The earliest time constant. Specify -2 to receive the earliest available offset. 
    /// Note that because offsets are pulled in descending order, asking for the earliest
    /// offset will always return you a single element.
    /// </summary>
    public static readonly long EarliestTime = -2L;
 
    public OffsetRequest(int correlationId, string clientId)
    {
        request = new Request(Request.Offset, correlationId,clientId);
    }
 
    public OffsetTopic AddTopic(string topicName)
    {
        var topic = new OffsetTopic(topicName);
        topics.Add(topic);
        return topic;
    }
 
    /// <summary>
    /// Initializes a new instance of the OffsetRequest class.
    /// </summary>
    /// <param name="topicName">The topic to publish to.</param>
    /// <param name="partitionId">The partition to publish to.</param>
    /// <param name="time">The time from which to request offsets.</param>
    /// <param name="maxNumberOfOffsets">The maximum amount of offsets to return.</param>
    public OffsetTopic AddTopic(string topicName, int partitionId, long time, int maxNumberOfOffsets)
    {
        var topic = AddTopic(topicName);
        topic.AddPartition(partitionId, time, maxNumberOfOffsets);
        return topic;
    }
 
    public List<byte> GetRequestBytes()
    {
        var requestBuffer = new List<byte>();
        // Get request base: ApiKey, ApiVersion, CorrelationId, ClientId
        requestBuffer.AddRange(request.GetRequestBytes());
 
        // Add Fetch request base: ReplicaId, MaxWaitTime, MinBytes
        requestBuffer.AddRange(BitWorks.GetBytesReversed(Request.ReplicaId));
 
        if (topics.Count > 0)
        {
            requestBuffer.AddRange(BitWorks.GetBytesReversed(topics.Count));
            // Add Topic count and all topics including partitions
            foreach (var topic in topics)
            {
                requestBuffer.AddRange(topic.GetRequestBytes());
            }
        }
 
        requestBuffer.InsertRange(0, BitWorks.GetBytesReversed(Convert.ToInt32(requestBuffer.Count)));
        return requestBuffer;
    }
}  

Topic

The Topic class is the base class with a name and a list of RequestPartitions that is an object that implements IRequestBuffer. When the topic shall be serialized, this list is iterated and each instance is serialized to the byte buffer, no matter if it's a OffsetPartition or a FetchPartition object.

 public class Topic : IRequestBuffer
    {
        private readonly String topicName;
        private List<IRequestBuffer> partitions = new List<IRequestBuffer>();
 
        public List<IRequestBuffer> Partitions
        {
            get { return partitions; }
            set { partitions = value; }
        }
 
        public Topic(String name)
        {
            topicName = name;
        }
 
        /// <summary>
        /// Serialize the topic and all contained partitions to a byte buffer
        /// </summary>
        /// <returns>Byte buffer containing the topic and partition data</returns>
        public List<byte> GetRequestBytes()
        {
            var request = new List<byte>();
            if (topicName == null)
                return request;
            request.AddRange(BitWorks.GetBytesReversed((short)topicName.Length));
            request.AddRange(Encoding.ASCII.GetBytes(topicName));
            if (Partitions.Count > 0)
            {
                request.AddRange(BitWorks.GetBytesReversed(Partitions.Count));
                foreach (IRequestBuffer partition in Partitions)
                {
                    request.AddRange(partition.GetRequestBytes());
                }
            }
            return request;
        }
    } 

OffsetTopic

The OffsetTopic contains a Topic and a method to add partitions to that contained topic.

  public class OffsetTopic : IRequestBuffer
    {
        private readonly Topic topic;
 
        public OffsetTopic(String topicName)
        {
            topic = new Topic(topicName);
        }
 
        public OffsetPartition AddPartition(int partitionId, long time, int maxNumberOfOffsets)
        {
            var partition = new OffsetPartition(partitionId, time, maxNumberOfOffsets);
            topic.Partitions.Add(partition);
            return partition;
        }
 
        public List<byte> GetRequestBytes()
        {
            var request = new List<byte>();
            request.AddRange(topic.GetRequestBytes());
            return request;
        }
    } 

OffsetPartition

public class OffsetPartition : IRequestBuffer
{
   public int MaxNuberOfOffsets { get; set; }
   public long Time { get; set; }
 
   private readonly Partition partition;
 
   public OffsetPartition(int partitionId, long time, int maxNuberOfOffsets)
   {
      partition = new Partition(partitionId);
      Time = time;
      MaxNuberOfOffsets = maxNuberOfOffsets;
   }
 
   public List<byte> GetRequestBytes()
   {
      var request = new List<byte>();
      request.AddRange(partition.GetRequestBytes());
      request.AddRange(BitWorks.GetBytesReversed(Time));
      request.AddRange(BitWorks.GetBytesReversed(MaxNuberOfOffsets));
      return request;
   }
}   

Connection

The Connection object handles the sending of request to the bus and receiving the response. I will just show the handling of an offset request as an example.

/// <summary>
/// Get a list of valid offsets (up to maxSize) before the given time.
/// </summary>
/// <param name="request">The offset request.</param>
/// <returns>List of offsets, in descending order.</returns>
public OffsetResponse GetOffsetResponseBefore(OffsetRequest request)
{
    using (var connection = new KafkaConnection(server, port))
    {
        connection.Write(request.GetRequestBytes().ToArray());
 
        int dataLength = BitConverter.ToInt32(BitWorks.ReverseBytes(connection.Read(4)), 0);
 
        if (dataLength == 0)
            return null;
        byte[] data = connection.Read(dataLength);
        var offsetResponse = new OffsetResponse(data);
        return offsetResponse;
    }
} 

KafkaConnection

The KafkaConnection object just wraps a standardTcpClient and handles writing and reading on that connection.

Response

The response handling is built in a similar way as the requests but instead of serializing the objects to a byte buffer, they are parsed from the buffer to construct on instances.

OffsetResponse

using System;
using System.Collections.Generic;
using System.Linq;
using Kafka.Client.Util;
 
public class OffsetResponse
{
    private readonly int correlationId;
 
    public int CorrelationId
    {
        get { return correlationId; }
    }
 
    private readonly Dictionary<string, List<OffsetPartition>> 
    offsetPartitions = new Dictionary<string, List<OffsetPartition>>();
 
    public OffsetResponse(byte[] data)
    {
        var dataOffset = BufferReader.Read(data, 0, out correlationId);
 
        int numTopics;
        dataOffset = BufferReader.Read(data, dataOffset, out numTopics);
 
        for (int topicIndex = 0; topicIndex < numTopics; topicIndex++)
        {
            String topicName;
            dataOffset = BufferReader.Read(data, dataOffset, out topicName);
 
            int partitionCount;
            dataOffset = BufferReader.Read(data, dataOffset, out partitionCount);
            var offsetPartitionList = new List<OffsetPartition>(partitionCount);
 
            for (var i = 0; i < partitionCount; i++)
            {
                int partitionId;
                dataOffset = BufferReader.Read(data, dataOffset, out partitionId);
 
                short errorCode;
                dataOffset = BufferReader.Read(data, dataOffset, out errorCode);
 
                int offsetCount;
                dataOffset = BufferReader.Read(data, dataOffset, out offsetCount);
                var partition = new OffsetPartition(errorCode, partitionId);
                for (var offsetIndex = 0; offsetIndex < offsetCount; offsetIndex++)
                {
                    long offset;
                    dataOffset = BufferReader.Read(data, dataOffset, out offset);
                    partition.Add(offset);
                }
                offsetPartitionList.Add(partition);
            }
            offsetPartitions.Add(topicName, offsetPartitionList);
        }
    }
 
    public List<long> Offsets(string topicName, int partitionId)
    {
        return offsetPartitions[topicName][partitionId].Offsets;
    }
    public short Errorcode(string topicName, int partitionId)
    {
        return offsetPartitions[topicName][partitionId].ErrorCode;
    }
    public List<string> Topics()
    {
        return new List<string>(offsetPartitions.Keys);
    }
 
    public List<int> Partitions(string topicName)
    {
        return offsetPartitions[topicName].Select(offsetPartition => offsetPartition.PartitionId).ToList();
    }
}

High Level API

I implemented another level on top of the Connector to simplify the implementation of my µ-services. This level is mostly about fetch and produce messages.

BusConnector

The BusConnector implements the high level API for consuming and producing messages. It can create a message stream to consume messages continually from the bus and a produce method to send messages to the bus.

KafkaMessageStream

The KafkaMessageStream message stream implements IEnumerable<KafkaMessage> and can be used in a foreach loop like this:

var stream = busConnector.CreateMessageStream(topicName, partitionId, KafkaMessageStream.StreamStart.Beginning);
foreach (var kafkaMessage in stream)
{
   Console.WriteLine(kafkaMessage);
   if (Console.KeyAvailable == true)
   {
          ConsoleKeyInfo info = Console.ReadKey();
   }
} 

MessageConsumer

The message consumer is a threaded consumer that picks messages from the bus and sends them to a delegate until stopped.

public class MessageConsumer : IKafkaMessageConsumer
{
    private Worker worker;
    /// <summary>
    /// Start a consumer thread on a topic that send messages to handler
    /// </summary>
    /// <param name="busConnector">Connection to a Kafka bus</param>
    /// <param name="topicName">Name of topict to fetch</param>
    /// <param name="partitionId">Partition to fetch</param>
    /// <param name="startOffset">Office to start fetch from. 
    /// 0 = From first message. -1 = Next message >0 = At offset </param>
    /// <param name="consumeDelegate"></param>
    public void Start(KafkaBusConnector busConnector, string topicName, 
        int partitionId, long startOffset, ConsumeDelegate consumeDelegate)
    {
        KafkaMessageStream messageStream;
        if (startOffset == 0)
            messageStream = busConnector.CreateMessageStream
                (topicName, partitionId, KafkaMessageStream.StreamStart.Beginning);
        else if (startOffset < 0)
            messageStream = busConnector.CreateMessageStream(topicName, partitionId,
                                                             KafkaMessageStream.StreamStart.Next);
        else
            messageStream = busConnector.CreateMessageStream(topicName, partitionId, startOffset);
 
        worker = new Worker(messageStream, consumeDelegate);
        var workerThread = new Thread(worker.ConsumeMessages);
        workerThread.Start();
    }
 
    public void Pause()
    {
        throw new NotImplementedException();
    }
 
    public void Resume()
    {
        throw new NotImplementedException();
    }
 
    public void Stop()
    {
        worker.Cancel();
    }
 
    internal class Worker
    {
        private bool cancel;
        private readonly KafkaMessageStream messageStream;
        private readonly ConsumeDelegate handler;
 
        internal Worker(KafkaMessageStream messageStream, ConsumeDelegate handler)
        {
            this.messageStream = messageStream;
            this.handler = handler;
        }
 
        internal void Cancel()
        {
            messageStream.Close();
            cancel = true;
        }
 
        public void ConsumeMessages()
        {
            foreach (var message in messageStream)
            {
                if (cancel)
                {
                    messageStream.Close();
                    break;
                }
                if (handler != null)
                    handler.Invoke(message);
            }
        }
    }

The MessageConsumer can be used like this:

static void DumpMessage(KafkaMessage message)
{
    Console.WriteLine(message);
}
static void Main(string[] args)
{
    const int partitionId = 0;
 
    if (args.Length < 2)
    {
        Usage();
        return;
    }
 
    var serverAddress = args[0].Split(':')[0];
    var serverPort = Convert.ToInt32(args[0].Split(':')[1]);
    var topicName = args[1];
    long startOffset = 5;
    if (args.Length > 2)
        startOffset = Convert.ToInt64(args[2]);
 
    var busConnector = new KafkaBusConnector(serverAddress, serverPort, "c# KafkaConsume util");
 
    IKafkaMessageConsumer consumer = new MessageConsumer();
    consumer.Start(busConnector, topicName, partitionId, startOffset, DumpMessage);
 
    while (Console.KeyAvailable == false)
    {
        Thread.Sleep(100);
    }
    consumer.Stop();
}  

Produce Method

The produce method is written to sense string data on a specific topic/partition. It will create the topic if the topic don't exist at the broker and then send the message. The current implementation does not handle change of leader broker because I just have one broker right now. The usage is plain and simple:

public short Produce(string topicName, int partitionId, string data) 

Future

There are some parts to improve and add when needed:

  • Handle request of multiple topics in the upper layers
  • Detect and handle leading broker and partition for a topic
  • Error logging to the bus

Points of Interest

I spent some time trying to find out how to create a new topic from the client including trying to understand the Java API written in Scala. It wasn't until I started up the Wire Shark and run the Java client tests that I found that the producer starts with a metadata request and that request triggered a topic create on the broker. It actually makes sense to do a meta data request to find the leader and partition for the topic.

There will be more writing about my experience and usage of the Kafka bus on my blog.

Source Code

The source code can be downloaded from GitHub.

History

  • 2014-03-10 First version
  • 2014-05-08 Link to source code added

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

MiNioC
Software Developer (Senior)
Sweden Sweden
I'm a developer working with Java and C# on everything from small projects to large Java enterprise banking projects.
Follow on   LinkedIn

Comments and Discussions

 
QuestionSample code for reading x messages one by one from a partion and then commiting (ie incrementing at zookeeper) Pinmemberbhardwaj_rajesh19-Jun-14 16:03 
AnswerRe: Sample code for reading x messages one by one from a partion and then commiting (ie incrementing at zookeeper) PinmemberMiNioC21-Jun-14 4:29 
GeneralGreat work! PinmemberMattias Larsson16-Mar-14 23:10 
Questionsource code PinmemberMember 1066776813-Mar-14 5:27 
AnswerRe: source code PinmemberMiNioC14-Mar-14 1:50 
AnswerSource code PinmemberMiNioC12-Mar-14 11:45 
GeneralRe: Source code Pinmemberkbenton13-Mar-14 2:11 
GeneralRe: Source code PinmemberMiNioC14-Mar-14 1:59 
GeneralRe: Source code PinmemberMember 79249546-May-14 17:27 
GeneralRe: Source code PinmemberMiNioC7-May-14 21:24 
GeneralRe: Source code PinmemberMember 79249547-May-14 21:27 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web03 | 2.8.140916.1 | Last Updated 10 Mar 2014
Article Copyright 2014 by MiNioC
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid