Introduction to Kafka with .NET: From Setup to Implementation

Apache Kafka is a powerful, distributed event-streaming platform used for building real-time data pipelines and applications. It allows systems to produce and consume messages efficiently, making it an essential tool for modern applications requiring reliable communication and data processing.

This guide is designed for beginners to understand and implement Kafka with .NET. By following the steps, you will learn how to set up Kafka using Docker and build both a producer and consumer application in .NET for a simple notification system.

Introduction to Kafka with .NET Setup to Implementation

1. Introduction to Apache Kafka

What is Kafka?

  • Distributed event streaming platform
  • Handles real-time data feeds
  • High-throughput, fault-tolerant, horizontally scalable

Core Concepts

  1. Topics
    • Category/feed name for messages
    • Can have multiple partitions
    • Messages stored in sequential order
  2. Partitions
    • Topics are divided into partitions
    • Each partition ordered, immutable sequence
    • Messages assigned partition based on key
  3. Producers
    • Write messages to topics
    • Can choose partition assignment
    • Async or sync message delivery
  4. Consumers
    • Read messages from topics
    • Grouped into consumer groups
    • Each partition read by one consumer in the group

Use Cases

  • Event-driven architectures
  • Real-time streaming
  • Log aggregation
  • Message queuing
  • Metrics collection

2. Setting Up Kafka with Docker

Docker Compose File

Save this YAML file as docker-compose.yml:

version: '1'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    networks:
      - kafka-net
      
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      - kafka-net

networks:
  kafka-net:
    driver: bridge

Steps to Set Up

Start containers:

docker-compose up -d

Verify Kafka:

//Check running containers: 
docker ps

//List topics (initially empty): 
docker exec -it kafka kafka-topics --list --bootstrap-server localhost:9092

3. Producer Application Implementation

Goal

The producer sends notifications (messages) to a Kafka topic.

Implementation

1. Define the Notification Model

Represents the structure of data sent to Kafka:

namespace NotificationAPI.Model
{
    public class Notification
    {
        public string UserId { get; set; }    // Recipient identifier
        public string Message { get; set; }    // Message content
        public string Type { get; set; }       // Notification category
        public DateTime Timestamp { get; set; } // Creation time
    }
}

2. Add Kafka Configuration

Include Kafka details in appsettings.json:

{
  "Kafka": {
    "BootstrapServers": "localhost:9092",  // Kafka connection address
    "Topic": "notifications"               // Topic name for messages
  }
}

3. KafkaProducer Service

Handles message production and topic creation:

public class KafkaProducer
{
    private readonly ProducerConfig _config;
    private readonly string _topic;

    // Constructor: Initializes Kafka configuration
    public KafkaProducer(IConfiguration configuration)
    {
        // Set up connection to Kafka broker
        _config = new ProducerConfig
        {
            BootstrapServers = configuration["Kafka:BootstrapServers"]
        };
        
        // Get topic name from configuration
        _topic = configuration["Kafka:Topic"];
    }

    // Method: Sends message to Kafka topic
    public async Task ProduceMessageAsync(string message)
    {
        // Create new producer instance
        using var producer = new ProducerBuilder<Null, string>(_config).Build();
        
        try
        {
            // Send message to topic
            var result = await producer.ProduceAsync(_topic,
                new Message<Null, string> { Value = message });
                
            // Log success with message location
            Console.WriteLine($"Delivered to: {result.TopicPartitionOffset}");
        }
        catch (ProduceException<Null, string> ex)
        {
            // Handle and log any errors
            Console.WriteLine($"Delivery failed: {ex.Error.Reason}");
        }
    }

    // Method: Creates topic if it doesn't exist
    public async Task CreateTopicIfNotExists()
    {
        // Create admin client for topic management
        using var adminClient = new AdminClientBuilder(
            new AdminClientConfig { BootstrapServers = _config.BootstrapServers }
        ).Build();
        
        try
        {
            // Create topic with specified configuration
            await adminClient.CreateTopicsAsync(new TopicSpecification[] {
                new TopicSpecification { 
                    Name = _topic,              // Topic name
                    ReplicationFactor = 1,      // Single replica for development
                    NumPartitions = 1           // Single partition
                }
            });
        }
        catch (CreateTopicsException) 
        {
            // Ignore if topic already exists
        }
    }
}

4. API Controller

Defines a POST endpoint to produce notifications:

public class NotificationController : ControllerBase
{
    private readonly KafkaProducer _producer;

    // Constructor: Injects KafkaProducer service
    public NotificationController(KafkaProducer producer)
    {
        _producer = producer;
    }

    // Endpoint: Receives and publishes notifications
    [HttpPost]
    public async Task<IActionResult> SendNotification([FromBody] Notification notification)
    {
        // Serialize notification to JSON and send to Kafka
        await _producer.ProduceMessageAsync(JsonSerializer.Serialize(notification));
        return Ok();
    }
}

5. Startup Configuration

Register the KafkaProducer as a singleton and initialize the topic:

// Register KafkaProducer as Singleton
builder.Services.AddSingleton<KafkaProducer>(sp =>
{
    var producer = new KafkaProducer(builder.Configuration);
    // Create topic on startup
    producer.CreateTopicIfNotExists().GetAwaiter().GetResult();
    return producer;
});

4. Consumer Application Implementation

Goal

The consumer listens to a Kafka topic and processes incoming messages.

Implementation

Kafka Consumer Logic

using Confluent.Kafka;
using NotificationAPI.Model;
using System.Text.Json;

class Program
{
    static void Main(string[] args)
    {
        // Configure consumer settings
        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",     // Kafka server address
            GroupId = "notification-group",          // Consumer group name
            AutoOffsetReset = AutoOffsetReset.Earliest  // Start from oldest message
        };

        // Create consumer instance
        using var consumer = new ConsumerBuilder<Null, string>(config).Build();
        
        // Subscribe to topic
        consumer.Subscribe("notifications");

        try
        {
            // Continuous message consumption loop
            while (true)
            {
                // Read message from topic
                var result = consumer.Consume();
                
                // Deserialize JSON message to Notification object
                var notification = JsonSerializer.Deserialize<Notification>(
                    result.Message.Value
                );
                
                // Display notification details
                Console.WriteLine($"User: {notification?.UserId}");
                Console.WriteLine($"Message: {notification?.Message}");
                Console.WriteLine($"Type: {notification?.Type}");
                Console.WriteLine($"Time: {notification?.Timestamp}");
                Console.WriteLine("-------------------");
            }
        }
        catch (OperationCanceledException)
        {
            // Clean shutdown
            consumer.Close();
        }
    }
}

5. Testing

  1. Start Docker containers:
docker-compose up -d
  1. Start Producer (NotificationAPI):
cd NotificationAPI
dotnet run
  1. Start Consumer:
cd NotificationConsumer
dotnet run
  1. Send test notification using Swagger UI or curl:
curl -X POST https://localhost:5001/api/notification \
-H "Content-Type: application/json" \
-d '{
    "userId": "123",
    "message": "Test notification",
    "type": "Info",
    "timestamp": "2025-01-25T10:00:00Z"
}'

Example Output

Producer Logs
Delivered to: notifications[0]@3
Consumer Logs
User: 123  
Message: Hello, Kafka!  
Type: Info  
Time: 2025-01-25T12:34:56Z  
-------------------

By following this guide, you have learned the basics of Apache Kafka, set up a Kafka environment using Docker, and implemented producer and consumer applications in .NET. This beginner-friendly walkthrough equips you with the foundational knowledge to build real-time, event-driven applications using Kafka. With this understanding, you’re ready to explore more advanced Kafka concepts and use cases in your projects.

Leave a Reply