Apache Kafka is a powerful, distributed event-streaming platform used for building real-time data pipelines and applications. It allows systems to produce and consume messages efficiently, making it an essential tool for modern applications requiring reliable communication and data processing.
This guide is designed for beginners to understand and implement Kafka with .NET. By following the steps, you will learn how to set up Kafka using Docker and build both a producer and consumer application in .NET for a simple notification system.

1. Introduction to Apache Kafka
What is Kafka?
- Distributed event streaming platform
- Handles real-time data feeds
- High-throughput, fault-tolerant, horizontally scalable
Core Concepts
- Topics
- Category/feed name for messages
- Can have multiple partitions
- Messages stored in sequential order
- Partitions
- Topics are divided into partitions
- Each partition ordered, immutable sequence
- Messages assigned partition based on key
- Producers
- Write messages to topics
- Can choose partition assignment
- Async or sync message delivery
- Consumers
- Read messages from topics
- Grouped into consumer groups
- Each partition read by one consumer in the group
Use Cases
- Event-driven architectures
- Real-time streaming
- Log aggregation
- Message queuing
- Metrics collection
2. Setting Up Kafka with Docker
Docker Compose File
Save this YAML file as docker-compose.yml
:
version: '1'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
networks:
- kafka-net
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
Steps to Set Up
Start containers:
docker-compose up -d
Verify Kafka:
//Check running containers:
docker ps
//List topics (initially empty):
docker exec -it kafka kafka-topics --list --bootstrap-server localhost:9092
3. Producer Application Implementation
Goal
The producer sends notifications (messages) to a Kafka topic.
Implementation
1. Define the Notification Model
Represents the structure of data sent to Kafka:
namespace NotificationAPI.Model
{
public class Notification
{
public string UserId { get; set; } // Recipient identifier
public string Message { get; set; } // Message content
public string Type { get; set; } // Notification category
public DateTime Timestamp { get; set; } // Creation time
}
}
2. Add Kafka Configuration
Include Kafka details in appsettings.json
:
{
"Kafka": {
"BootstrapServers": "localhost:9092", // Kafka connection address
"Topic": "notifications" // Topic name for messages
}
}
3. KafkaProducer Service
Handles message production and topic creation:
public class KafkaProducer
{
private readonly ProducerConfig _config;
private readonly string _topic;
// Constructor: Initializes Kafka configuration
public KafkaProducer(IConfiguration configuration)
{
// Set up connection to Kafka broker
_config = new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"]
};
// Get topic name from configuration
_topic = configuration["Kafka:Topic"];
}
// Method: Sends message to Kafka topic
public async Task ProduceMessageAsync(string message)
{
// Create new producer instance
using var producer = new ProducerBuilder<Null, string>(_config).Build();
try
{
// Send message to topic
var result = await producer.ProduceAsync(_topic,
new Message<Null, string> { Value = message });
// Log success with message location
Console.WriteLine($"Delivered to: {result.TopicPartitionOffset}");
}
catch (ProduceException<Null, string> ex)
{
// Handle and log any errors
Console.WriteLine($"Delivery failed: {ex.Error.Reason}");
}
}
// Method: Creates topic if it doesn't exist
public async Task CreateTopicIfNotExists()
{
// Create admin client for topic management
using var adminClient = new AdminClientBuilder(
new AdminClientConfig { BootstrapServers = _config.BootstrapServers }
).Build();
try
{
// Create topic with specified configuration
await adminClient.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification {
Name = _topic, // Topic name
ReplicationFactor = 1, // Single replica for development
NumPartitions = 1 // Single partition
}
});
}
catch (CreateTopicsException)
{
// Ignore if topic already exists
}
}
}
4. API Controller
Defines a POST endpoint to produce notifications:
public class NotificationController : ControllerBase
{
private readonly KafkaProducer _producer;
// Constructor: Injects KafkaProducer service
public NotificationController(KafkaProducer producer)
{
_producer = producer;
}
// Endpoint: Receives and publishes notifications
[HttpPost]
public async Task<IActionResult> SendNotification([FromBody] Notification notification)
{
// Serialize notification to JSON and send to Kafka
await _producer.ProduceMessageAsync(JsonSerializer.Serialize(notification));
return Ok();
}
}
5. Startup Configuration
Register the KafkaProducer
as a singleton and initialize the topic:
// Register KafkaProducer as Singleton
builder.Services.AddSingleton<KafkaProducer>(sp =>
{
var producer = new KafkaProducer(builder.Configuration);
// Create topic on startup
producer.CreateTopicIfNotExists().GetAwaiter().GetResult();
return producer;
});
4. Consumer Application Implementation
Goal
The consumer listens to a Kafka topic and processes incoming messages.
Implementation
Kafka Consumer Logic
using Confluent.Kafka;
using NotificationAPI.Model;
using System.Text.Json;
class Program
{
static void Main(string[] args)
{
// Configure consumer settings
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092", // Kafka server address
GroupId = "notification-group", // Consumer group name
AutoOffsetReset = AutoOffsetReset.Earliest // Start from oldest message
};
// Create consumer instance
using var consumer = new ConsumerBuilder<Null, string>(config).Build();
// Subscribe to topic
consumer.Subscribe("notifications");
try
{
// Continuous message consumption loop
while (true)
{
// Read message from topic
var result = consumer.Consume();
// Deserialize JSON message to Notification object
var notification = JsonSerializer.Deserialize<Notification>(
result.Message.Value
);
// Display notification details
Console.WriteLine($"User: {notification?.UserId}");
Console.WriteLine($"Message: {notification?.Message}");
Console.WriteLine($"Type: {notification?.Type}");
Console.WriteLine($"Time: {notification?.Timestamp}");
Console.WriteLine("-------------------");
}
}
catch (OperationCanceledException)
{
// Clean shutdown
consumer.Close();
}
}
}
5. Testing
- Start Docker containers:
docker-compose up -d
- Start Producer (NotificationAPI):
cd NotificationAPI
dotnet run
- Start Consumer:
cd NotificationConsumer
dotnet run
- Send test notification using Swagger UI or curl:
curl -X POST https://localhost:5001/api/notification \
-H "Content-Type: application/json" \
-d '{
"userId": "123",
"message": "Test notification",
"type": "Info",
"timestamp": "2025-01-25T10:00:00Z"
}'
Example Output
Producer Logs
Delivered to: notifications[0]@3
Consumer Logs
User: 123
Message: Hello, Kafka!
Type: Info
Time: 2025-01-25T12:34:56Z
-------------------
By following this guide, you have learned the basics of Apache Kafka, set up a Kafka environment using Docker, and implemented producer and consumer applications in .NET. This beginner-friendly walkthrough equips you with the foundational knowledge to build real-time, event-driven applications using Kafka. With this understanding, you’re ready to explore more advanced Kafka concepts and use cases in your projects.