You are viewing a preview of this lesson. Sign in to start learning
Back to Kafka for .NET Developers 2026

Raw Confluent.Kafka Client

Spend real time with Confluent.Kafka before any abstraction. Build producers and consumers by hand. Understand every config option you set.

Last generated

Why Start with the Raw Client

At some point, a Kafka-backed .NET service starts misbehaving in production. Messages are delayed. A consumer group rebalances unexpectedly. A producer silently drops messages during a restart. You open the issue tracker for MassTransit or the Confluent Schema Registry client, read through a dozen threads, and eventually land on a comment pointing at a librdkafka configuration option — linger.ms, max.poll.interval.ms, acks — that someone had left at its default. The fix is one line. But unless you already understand what those options do and where they live in the stack, that comment is nearly incomprehensible.

This is the core argument for starting with the raw Confluent.Kafka client before reaching for any abstraction. The raw client is not a stepping stone you leave behind — it is the substrate that every higher-level library runs on top of, and the one whose behavior ultimately determines what your service does in production.

The Abstraction Stack Is Real, and It Has a Floor

When you use MassTransit with Kafka, or the Confluent Schema Registry client, or any of the growing number of .NET libraries that integrate with Kafka, you are working with wrappers around Confluent.Kafka. Those wrappers configure a producer or consumer on your behalf, set up serialization, manage consumer group lifecycles, and expose a friendlier interface. That friendliness is genuinely valuable — for day-to-day development work, you probably want those abstractions.

But abstractions do not eliminate the underlying behavior; they encapsulate it. When something goes wrong at the abstraction layer, the failure is almost always a consequence of what the underlying Confluent.Kafka client is doing — a delivery timeout because delivery.timeout.ms is too low, a consumer that never commits because the abstraction chose manual offset management and the calling code doesn't realize it, or a rebalance storm because session.timeout.ms is tuned for a different workload.

┌──────────────────────────────────────────┐
│         Your Application Code            │
├──────────────────────────────────────────┤
│  MassTransit / Schema Registry Client /  │
│  Other Higher-Level Abstractions         │
├──────────────────────────────────────────┤
│         Confluent.Kafka (.NET)           │  ← this lesson
├──────────────────────────────────────────┤
│     librdkafka (native C library)        │
├──────────────────────────────────────────┤
│      Kafka Broker (TCP/SASL/TLS)         │
└──────────────────────────────────────────┘

When you understand the Confluent.Kafka layer — its configuration model, its threading assumptions, its delivery guarantees, its error surfaces — debugging the abstraction layers above it becomes tractable. The issue in MassTransit becomes "the consumer isn't committing offsets because the abstraction is calling Consume without a cancellation token and then exiting before the commit happens," not an inscrutable mystery.

💡 Mental Model: Think of Confluent.Kafka as the engine in a car. MassTransit and similar libraries are the dashboard and the automatic transmission — they make driving easier, but when the engine light comes on, you need to understand what the engine is actually doing.

Every Config Option Has a Measurable Effect

The Confluent.Kafka client exposes its configuration through plain C# objects — ProducerConfig, ConsumerConfig, and AdminClientConfig — that map almost directly to the underlying librdkafka configuration properties. The defaults are chosen for reasonable general-purpose behavior. But "reasonable general-purpose" is not the same as "correct for your workload."

Consider a simple producer instantiated with only a bootstrap server:

// A producer with only the minimum required config.
// Every option not explicitly set takes a default from librdkafka.
var config = new ProducerConfig
{
    BootstrapServers = "localhost:9092"
    // Not set: Acks, LingerMs, BatchSize, CompressionType,
    // DeliveryTimeoutMs, MessageMaxBytes, and many more.
};

using var producer = new ProducerBuilder<string, string>(config).Build();

This compiles and runs. But the omitted options control fundamental behaviors:

  • acks (default: all) determines how many broker replicas must acknowledge a message before the client considers it delivered. Some teams intentionally set it to 1 or 0 for throughput-sensitive workloads where occasional message loss is acceptable. A choice is being made either way.
  • linger.ms (default: 5) controls how long the client waits to accumulate messages into a batch before sending. A value of 0 minimizes latency at the cost of throughput; higher values improve batching efficiency.
  • delivery.timeout.ms (default: 120000, two minutes) is the ceiling on how long the client retries a failed delivery before giving up.

🎯 Key Principle: Configuration ignorance is not neutrality. Every config option you don't set still takes a value — the default — and that default shapes the behavior of your system.

💡 Real-World Example: A team discovers their consumer is being repeatedly removed from its group and triggering rebalances under load. The root cause is max.poll.interval.ms — the maximum time the broker will wait between calls to Consume before assuming the consumer is dead. Their message processing is slow enough to exceed this window. A developer who has never seen max.poll.interval.ms in the raw client has no frame of reference for what it means when it appears in a MassTransit issue thread.

What This Lesson Covers — and What Comes Next

This lesson focuses on two things: understanding the architecture of Confluent.Kafka and getting a working producer and consumer off the ground. Producer configuration — delivery guarantee knobs, batching options, compression — is covered in the dedicated Producer Configuration lesson that follows. Offset management, consumer group coordination, and manual commits are covered in Consumer & Manual Commits. Both assume the foundation built here.

⚠️ Common Mistake: The impulse when starting with Kafka in .NET is to reach immediately for an integration package — MassTransit.Kafka, for instance — add it to the DI container, and start producing and consuming through its API. This works, right up until it doesn't. When something goes wrong in the abstraction, the developer who skipped the raw client has to reverse-engineer both the abstraction's behavior and the underlying client's behavior simultaneously.

🤔 Did you know? The ProducerConfig and ConsumerConfig classes are generated from the librdkafka configuration documentation. Property names map almost one-to-one: linger.ms in librdkafka is LingerMs in ProducerConfig. This symmetry makes cross-referencing the lower-level documentation practical whenever you need it.

How Confluent.Kafka Fits into the Kafka Ecosystem

When you install the Confluent.Kafka NuGet package, you are not getting a pure .NET implementation of the Kafka protocol. You are getting a thin managed wrapper around librdkafka, a battle-tested C library that handles the actual TCP connections, protocol framing, and internal queuing. That distinction explains a category of behaviors that can otherwise seem like inexplicable quirks: native memory that doesn't show up in .NET heap dumps, exception stack traces that bottom out in unmanaged code, and platform-specific native binaries landing in your runtimes/ folder at publish time.

The librdkafka Layer

Librdkafka powers a large share of non-JVM Kafka clients — Python's confluent-kafka, Go's confluent-kafka-go, and Confluent.Kafka for .NET all sit on top of it. The NuGet package ships with precompiled native binaries for each supported platform (linux-x64, linux-arm64, osx, win-x64, and so on). At runtime, the managed assembly uses P/Invoke to call into the appropriate native binary.

┌────────────────────────────────────────────┐
│            Your .NET Application           │
├────────────────────────────────────────────┤
│         Confluent.Kafka (managed C#)       │
│   IProducer<K,V>  IConsumer<K,V>  IAdmin   │
├────────────────────────────────────────────┤
│      librdkafka (native C library)         │
│   Protocol framing, TCP pooling, queuing   │
├────────────────────────────────────────────┤
│         Kafka Broker(s) over TCP           │
└────────────────────────────────────────────┘

The practical consequences of this layering:

  • Native memory: The internal message queue and socket buffers live outside the managed heap. If your producer accumulates a large backlog, the .NET GC won't see that pressure — librdkafka surfaces an error when its queue limit is hit, not an OutOfMemoryException.
  • Platform binaries: Publishing a self-contained app or applying aggressive trimming requires ensuring the correct native binary is included. Misconfigured Docker images can strip it, producing a DllNotFoundException at startup rather than a compile error.
  • Unmanaged exceptions: Some error conditions originate inside librdkafka and are marshaled back to .NET as KafkaException or surfaced through error callbacks. The error codes and reason strings on those exceptions deserve careful reading rather than just logging the exception type.

The Three Primary Types

The managed API surfaces three interfaces that together cover nearly every Kafka use case. Each one is a distinct client with its own connection lifecycle — understanding this prevents a common class of resource and behavior bugs.

IProducer<TKey, TValue>

IProducer<TKey, TValue> publishes records to Kafka topics. It holds a persistent TCP connection pool to the brokers, manages an internal in-memory queue of pending messages, and owns a background I/O thread inherited from librdkafka. You build one via ProducerBuilder<TKey, TValue>.

IConsumer<TKey, TValue>

IConsumer<TKey, TValue> reads records and manages group coordination — heartbeating, partition assignment, rebalancing — while tracking offset state locally until you commit. Like the producer, it owns its own connection lifecycle.

IAdminClient

IAdminClient handles cluster-level operations: creating and deleting topics, describing configurations, listing consumer groups, and inspecting cluster metadata. It is less frequently instantiated in application code but essential for tooling and integration tests that set up topic fixtures.

┌─────────────────────────────────────────────────────────────┐
│                  Application Process                        │
│                                                             │
│  ┌──────────────────────┐  ┌──────────────────────────┐     │
│  │  IProducer<K,V>      │  │  IConsumer<K,V>          │     │
│  │  - Internal queue    │  │  - Group coordinator     │     │
│  │  - I/O thread        │  │  - Offset tracking       │     │
│  │  - Connection pool   │  │  - Connection pool       │     │
│  └──────────┬───────────┘  └──────────────┬───────────┘     │
│             │                             │                 │
│  ┌──────────┴─────────────────────────────┴───────────┐     │
│  │              IAdminClient (optional)               │     │
│  │  - Separate connection lifecycle                   │     │
│  └────────────────────────────────────────────────────┘     │
└─────────────────────────────────────────────────────────────┘
         │                        │
         ▼                        ▼
   Kafka Broker               Kafka Broker

🎯 Key Principle: These three types are not cheap to construct. Each establishes real broker connections when first used. Treat them as long-lived, application-scoped objects — instantiate once, reuse throughout, and dispose explicitly when your application shuts down.

Serialization as a First-Class Concern

The generic type parameters on IProducer<TKey, TValue> and IConsumer<TKey, TValue> are not just syntactic sugar. At construction time, the builder requires a concrete ISerializer<T> for the producer side and IDeserializer<T> for the consumer side. The library ships built-in implementations for byte[], string, int, long, float, double, and Null.

// Producer with string keys and byte[] values.
// The serializer pair is resolved at build time, not per-message.
using var producer = new ProducerBuilder<string, byte[]>(
    new ProducerConfig { BootstrapServers = "localhost:9092" })
    .SetKeySerializer(Serializers.Utf8)        // built-in ISerializer<string>
    .SetValueSerializer(Serializers.ByteArray) // built-in ISerializer<byte[]>
    .Build();

For custom types, you implement ISerializer<T> directly:

public sealed class OrderEventSerializer : ISerializer<OrderEvent>
{
    public byte[] Serialize(OrderEvent data, SerializationContext context)
    {
        // context carries topic name and whether this is a key or value,
        // useful if a single serializer needs topic-aware behavior.
        return JsonSerializer.SerializeToUtf8Bytes(data);
    }
}

using var producer = new ProducerBuilder<string, OrderEvent>(
    new ProducerConfig { BootstrapServers = "localhost:9092" })
    .SetKeySerializer(Serializers.Utf8)
    .SetValueSerializer(new OrderEventSerializer())
    .Build();

The SerializationContext parameter carries the topic name and a MessageComponentType enum (Key or Value). This is particularly relevant if you later adopt the Schema Registry client, which uses the topic name to look up the correct schema — the plumbing is already in place at the raw client level.

💡 Real-World Example: A common pattern is to start with IProducer<string, byte[]> and serialize JSON manually, then migrate to a custom ISerializer<T> backed by a binary format (Protobuf, Avro) later. Because serialization is registered at construction time rather than per-message, that migration is a one-line change in the builder.

The Internal I/O Thread and Message Buffering

When you call Produce() or ProduceAsync(), the message does not go to the broker immediately. Librdkafka writes it into an internal in-memory queue, and a background I/O thread drains that queue, batches messages by topic-partition, and sends them to the broker over TCP.

Your Code
   │
   │  Produce() / ProduceAsync()
   ▼
┌─────────────────────────────────┐
│   librdkafka Internal Queue     │  ← native memory
│   [msg1][msg2][msg3]...         │
└──────────────┬──────────────────┘
               │  batched by linger.ms / batch.size
               ▼
┌─────────────────────────────────┐
│   Background I/O Thread         │
│   (owned by librdkafka)         │
└──────────────┬──────────────────┘
               │  TCP
               ▼
          Kafka Broker
               │
               │  Acknowledgment (acks=1 or acks=all)
               ▼
┌─────────────────────────────────┐
│   Delivery Report Callback      │
│   or ProduceAsync Task result   │
└─────────────────────────────────┘

This buffering model has concrete implications:

  • A successful Produce() call means the message entered the queue, not that it reached the broker. The delivery guarantee only holds once the delivery callback fires or the ProduceAsync task completes with a non-error result.
  • Disposing a producer without flushing silently discards messages still in the queue. This is important enough to flag here so the mental model is correct from the start — the bootstrapping section returns to it in detail.
  • The background thread is not observable from managed code. You cannot await it or observe it in TPL tooling. It surfaces results back exclusively through callbacks and returned Task values.

🤔 Did you know? The same background I/O thread also handles consumer fetches, heartbeats to the group coordinator, and offset commits when you use auto-commit. This is why the Consume() call doubles as the "poll loop" that keeps the consumer alive in its group — failing to call Consume() regularly means the heartbeat is not sent, and the broker will eventually remove the consumer from the group.

At this point the architecture resolves into a coherent picture: the NuGet package is a managed surface over a native library; the three primary types each own independent connections and threads; serialization is wired in at construction time; and the actual wire I/O is always asynchronous with respect to your Produce() calls, mediated by an internal queue and a background thread. With this in mind, the minimal working examples that follow read as deliberate choices rather than boilerplate.

Bootstrapping a Producer and Consumer: A Minimal Working Example

The gap between "I have Confluent.Kafka installed" and "I have a producer and consumer that work correctly" is smaller than it looks — but it contains several decisions that are easy to get wrong silently. A dropped message or a consumer that never receives anything won't throw an exception; you simply won't see the data you expected. This section walks through the minimal correct code for both sides of the wire, stopping at each non-obvious decision to explain what's actually happening.

Configuring the Clients

ProducerConfig and ConsumerConfig are plain C# POCOs. You populate them before building the client, and the builder validates them at construction time rather than at the first send or receive call — front-loading configuration errors is the correct tradeoff.

For a producer, exactly one field is non-negotiable: BootstrapServers, the comma-separated list of host:port pairs the client uses to discover the cluster. (After initial connection, the client learns the full broker topology from Kafka itself — this is just the entry point.)

For a consumer, two fields are required: BootstrapServers and GroupId, which names the consumer group this instance belongs to. The group identity governs partition assignment, offset tracking, and rebalance behavior. Leaving GroupId empty will throw at build time — a consumer without a group is a malformed consumer.

// Minimal producer configuration
var producerConfig = new ProducerConfig
{
    BootstrapServers = "localhost:9092"
    // All other settings take librdkafka defaults.
    // Producer Configuration (the follow-on lesson) covers
    // batching, compression, and delivery guarantees in depth.
};

// Minimal consumer configuration
var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "localhost:9092",
    GroupId = "my-consumer-group",
    // AutoOffsetReset determines behavior when no committed offset
    // exists for this group+partition pair.
    AutoOffsetReset = AutoOffsetReset.Earliest
};

AutoOffsetReset is technically optional, but omitting it leaves you at the mercy of the server-side default. Setting it explicitly eliminates a class of "why did I miss messages?" bugs — treat it as required in practice.

Building the Clients

ProducerBuilder<TKey, TValue> and ConsumerBuilder<TKey, TValue> are the correct and only public construction path for their respective clients. The concrete implementation types are internal to the library — you cannot new them up directly. The builder pattern lets the library enforce invariants and configure the underlying librdkafka handle before the object is handed to you.

// Build a producer that uses string keys and string values.
using IProducer<string, string> producer =
    new ProducerBuilder<string, string>(producerConfig)
        .Build();

// Build a consumer with matching types.
using IConsumer<string, string> consumer =
    new ConsumerBuilder<string, string>(consumerConfig)
        .Build();

Notice the using declarations. Both IProducer<TKey, TValue> and IConsumer<TKey, TValue> implement IDisposable, and Dispose does real work — it tears down the librdkafka handle and closes TCP connections. On the producer side, flush behavior at dispose deserves special attention.

Disposing Correctly: The Producer Flush Problem

As established in the architecture section, librdkafka buffers messages internally and sends them in batches on a background thread. This creates a hazard at shutdown: if you call Dispose on the producer without first calling Flush, messages still in the internal queue are silently dropped. No exception is raised.

// ✅ Explicit flush before dispose — the safe shutdown pattern.
producer.Flush(TimeSpan.FromSeconds(10));
producer.Dispose();

// When using a 'using' block, flush explicitly before the scope ends:
using (var p = new ProducerBuilder<string, string>(producerConfig).Build())
{
    // ... produce messages ...
    p.Flush(TimeSpan.FromSeconds(10)); // flush BEFORE the using block exits
} // Dispose called here by the using statement

⚠️ Common Mistake: Relying on Dispose alone to drain the producer buffer. Dispose calls Flush internally with a short timeout, but that timeout may not be long enough for all pending messages to be acknowledged under load or with a slow broker.

Sending a Message: ProduceAsync vs. Fire-and-Forget

Once you have a producer, sending a message uses either ProduceAsync (awaitable, returns a DeliveryResult<TKey, TValue>) or Produce with a delivery handler callback. Both paths are legitimate; the choice depends on your throughput requirements.

Awaiting ProduceAsync
try
{
    var deliveryResult = await producer.ProduceAsync(
        topic: "my-topic",
        message: new Message<string, string>
        {
            Key = "order-id-123",
            Value = "{\"amount\": 42.00}"
        });

    Console.WriteLine(
        $"Delivered to partition {deliveryResult.Partition}, "
        + $"offset {deliveryResult.Offset}");
}
catch (ProduceException<string, string> ex)
{
    Console.Error.WriteLine($"Delivery failed: {ex.Error.Reason}");
}

The tradeoff: awaiting each ProduceAsync introduces per-message latency. You wait for a broker acknowledgment before moving to the next message. For high-throughput scenarios, this round-trip cost adds up.

Fire-and-Forget with a Delivery Handler

The Produce overload with an Action<DeliveryReport<TKey, TValue>> callback is non-blocking. The call returns immediately; librdkafka invokes your callback on its internal poll thread when the broker acknowledges (or rejects) the message.

// Non-blocking produce. The callback is invoked by librdkafka's
// internal thread when the broker responds.
producer.Produce(
    topic: "my-topic",
    message: new Message<string, string>
    {
        Key = "order-id-456",
        Value = "{\"amount\": 17.50}"
    },
    deliveryHandler: report =>
    {
        if (report.Error.IsError)
            Console.Error.WriteLine($"Delivery failed: {report.Error.Reason}");
        else
            Console.WriteLine($"Delivered to offset {report.Offset}");
    });

// Messages are still in the internal queue here — flush before disposing.
producer.Flush(TimeSpan.FromSeconds(10));

⚠️ Common Mistake: Assuming the callback fires before Produce returns. It does not. The callback is asynchronous with respect to your calling thread. Any state the callback closes over must be thread-safe.

ProduceAsync Produce + Callback
Blocking? Awaitable (async) Non-blocking
Throughput Lower per-message Higher per-message
Error handling try/catch on ProduceException Check report.Error in callback
Best for Simplicity, low-volume High-throughput pipelines

Subscribing and Consuming

On the consumer side, there are two ways to assign partitions: Subscribe and Assign. They are not interchangeable.

Subscribe registers the consumer with the group coordinator at the broker. The coordinator distributes partitions across all consumers sharing the same GroupId and triggers rebalances when consumers join or leave. This is the standard path for most applications.

Assign bypasses the group coordinator entirely and pins the consumer to a specific TopicPartition or set of them. Offset commits still work, but partition assignment is no longer managed automatically. Assign is appropriate for specific scenarios — replaying a partition from a known offset during debugging, for instance — but is not a substitute for Subscribe.

After Subscribe, the consumer loop calls Consume with a cancellation token. Each call drives the client's internal poll mechanism, which keeps the heartbeat to the broker alive and delivers the next available message.

// Subscribe BEFORE the consume loop.
consumer.Subscribe("my-topic");

using var cts = new CancellationTokenSource();

try
{
    while (!cts.IsCancellationRequested)
    {
        var result = consumer.Consume(cts.Token);

        if (result is null) continue;

        Console.WriteLine(
            $"Received: key={result.Message.Key} "
            + $"value={result.Message.Value} "
            + $"partition={result.Partition} "
            + $"offset={result.Offset}");
    }
}
catch (OperationCanceledException)
{
    // Normal shutdown path when the CancellationToken is cancelled.
}
finally
{
    // Close() commits offsets and notifies the group coordinator
    // before Dispose tears down the connection.
    consumer.Close();
}

Two things in this snippet deserve explicit attention.

First, consumer.Close() in the finally block. Calling Close before Dispose sends a LeaveGroup request to the broker, triggering an immediate rebalance rather than waiting for session.timeout.ms to expire. Skipping Close means other consumers in the group wait out the full timeout before picking up those partitions.

Second, the CancellationToken passed to Consume. Without it, the loop has no clean mechanism to stop from the outside — the CancellationToken overload throws OperationCanceledException when cancelled, which is the idiomatic .NET exit path. The consequences of omitting it are covered in the next section.

Common Mistakes When Using the Raw Client

Working directly with Confluent.Kafka rewards attention to detail. The library gives you precise control over every aspect of message production and consumption, but that precision cuts both ways: real behaviors — connection pooling, internal threading, non-exception error paths — that higher-level abstractions handle quietly are now your responsibility. The four mistakes covered here are concrete, mechanism-driven, and expensive to diagnose after the fact.

Mistake 1: Creating a New IProducer Per Message

The most immediately costly mistake is treating IProducer<TKey, TValue> as a lightweight, stateless utility — constructing one, calling Produce, then disposing it before the next message.

As established in the architecture section, each IProducer instance maintains its own TCP connection pool to the broker cluster and an internal message queue backed by native memory. Every new ProducerBuilder<...>().Build() call triggers TCP handshaking with every broker in BootstrapServers, allocates the internal buffer, and starts a background I/O thread. Disposing tears all of that down.

Per-message producer (wrong)
──────────────────────────────────────────────────────────────
  Message 1  →  [Build] [TCP handshake] [Produce] [Flush] [Dispose]
  Message 2  →  [Build] [TCP handshake] [Produce] [Flush] [Dispose]
        ↑                   ↑
   Wasted allocation    Network RTT on every message

Shared producer (correct)
──────────────────────────────────────────────────────────────
  [Build once] [TCP handshake once]
  Message 1  →  [Produce → internal queue]
  Message 2  →  [Produce → internal queue]  →  [batch to broker]
  [Flush on shutdown] [Dispose]

Beyond throughput, there is a resource leak dimension. Because the internal queue is native memory, the GC cannot observe or collect it. Production systems can exhaust file descriptors or native heap long before the managed heap shows any pressure.

✅ Instantiate one IProducer per application (or per logical pipeline), register it in your DI container as a singleton, and dispose it explicitly on shutdown after calling Flush.

Mistake 2: Ignoring the Error Property on ConsumeResult and DeliveryResult

Kafka operations can fail in ways that do not throw a .NET exception. In Confluent.Kafka, the distinction is intentional: fatal errors throw KafkaException, while non-fatal errors (transient fetch failures, offset commit issues, metadata refresh errors) are surfaced through the Error property on the result objects.

Both ConsumeResult<TKey, TValue> and DeliveryResult<TKey, TValue> carry an Error property with an IsError boolean and a Code of type ErrorCode.

// ⚠️ WRONG: Error is silently ignored
var result = await producer.ProduceAsync("orders", new Message<string, string>
{
    Key = order.Id,
    Value = JsonSerializer.Serialize(order)
});
// result.Error may indicate a problem — nobody is checking

// ✅ CORRECT: Check Error before treating the message as delivered
var result = await producer.ProduceAsync("orders", new Message<string, string>
{
    Key = order.Id,
    Value = JsonSerializer.Serialize(order)
});

if (result.Error.IsError)
{
    logger.LogError(
        "Delivery failed for order {OrderId}: [{Code}] {Reason}",
        order.Id, result.Error.Code, result.Error.Reason);
    throw new InvalidOperationException($"Message delivery failed: {result.Error.Reason}");
}

On the consumer side, a common scenario: the consumer receives a result whose Message is null and whose Error.Code is ErrorCode.Local_TimedOut. Code that only checks result.Message != null discards diagnostic information about why the fetch produced no message.

var result = consumer.Consume(cancellationToken);

if (result == null) continue;

if (result.IsPartitionEOF)
{
    // Reached the end of a partition — normal, not an error
    continue;
}

if (result.Error.IsError)
{
    logger.LogWarning(
        "Non-fatal consume error on [{Topic}/{Partition}]: [{Code}] {Reason}",
        result.Topic, result.Partition, result.Error.Code, result.Error.Reason);
    continue;
}

ProcessMessage(result.Message);

💡 Pro Tip: The Error.Code enum is your primary diagnostic tool for non-fatal errors. ErrorCode.Local_TimedOut is expected under low-traffic conditions. ErrorCode.Local_QueueFull tells you the producer's internal queue is saturated — a signal worth alerting on.

Mistake 3: Calling Consume in a Tight Loop Without a Cancellation Token

IConsumer.Consume with no cancellation token blocks the calling thread indefinitely until a message is available or a fatal error occurs. The problem surfaces at deployment: when the orchestrator sends SIGTERM and expects the application to drain and exit within a grace period, a loop blocked on Consume() with no cancellation path forces either waiting for the next message or escalating to SIGKILL.

// ⚠️ WRONG: blocks indefinitely, no way to stop gracefully
while (true)
{
    var result = consumer.Consume(); // infinite block
    Process(result);
}

// ✅ CORRECT: pass the application's cancellation token
while (!stoppingToken.IsCancellationRequested)
{
    ConsumeResult<string, string>? result;
    try
    {
        result = consumer.Consume(stoppingToken);
    }
    catch (OperationCanceledException)
    {
        break;
    }

    if (result == null) continue;
    Process(result);
}

consumer.Close();
Graceful shutdown sequence
──────────────────────────────────────────────────────────────
  Orchestrator       Application            Kafka Broker
      │                   │                       │
      │──── SIGTERM ─────▶│                       │
      │                   │ CancellationToken     │
      │                   │  .Cancel()            │
      │                   │                       │
      │                   │ Consume() throws      │
      │                   │ OperationCanceled     │
      │                   │                       │
      │                   │──── LeaveGroup ──────▶│
      │                   │                       │ Reassigns partitions
      │                   │◀─── Confirm ──────────│  immediately
      │                   │                       │
      │                   │ consumer.Close()      │
      │                   │ consumer.Dispose()    │
      │◀──── Process exit─│                       │

💡 Real-World Example: In a hosted .NET BackgroundService, the stoppingToken passed to ExecuteAsync is exactly the token to use here. When the host begins shutdown, it cancels that token, which unblocks Consume and lets ExecuteAsync return cleanly.

Mistake 4: Assuming IConsumer Is Thread-Safe

The threading models of IProducer and IConsumer are deliberately asymmetric: IProducer.Produce and IProducer.ProduceAsync are thread-safe; IConsumer methods are not thread-safe, and calling them concurrently produces undefined behavior.

The producer's thread-safety comes from librdkafka's internal lock-protected queue. The consumer's poll-offset-commit cycle is a stateful sequence with no internal locking — fetching a message updates partition assignment state, and the group heartbeat runs on the same thread that calls Consume. Concurrent calls produce silent data loss or incorrect offset commits.

// ⚠️ WRONG: sharing one consumer across threads
for (int i = 0; i < workerCount; i++)
{
    Task.Run(() =>
    {
        while (true)
        {
            var result = consumer.Consume(stoppingToken); // ← race condition
            Process(result);
        }
    });
}

// ✅ CORRECT option A: one consumer per thread
// Each instance with the same GroupId is a member of the same group.
for (int i = 0; i < workerCount; i++)
{
    var threadConsumer = new ConsumerBuilder<string, string>(config).Build();
    threadConsumer.Subscribe("orders");

    Task.Run(() =>
    {
        try
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var result = threadConsumer.Consume(stoppingToken);
                if (result == null) continue;
                Process(result);
            }
        }
        catch (OperationCanceledException) { }
        finally
        {
            threadConsumer.Close();
            threadConsumer.Dispose();
        }
    });
}

// ✅ CORRECT option B: single consumer thread feeds a channel
// Consume on one thread; processing parallelism is downstream.
var channel = Channel.CreateBounded<ConsumeResult<string, string>>(capacity: 256);

_ = Task.Run(async () =>
{
    try
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var result = consumer.Consume(stoppingToken);
            if (result != null)
                await channel.Writer.WriteAsync(result, stoppingToken);
        }
    }
    catch (OperationCanceledException) { }
    finally
    {
        channel.Writer.Complete();
        consumer.Close();
        consumer.Dispose();
    }
});

await Parallel.ForEachAsync(
    channel.Reader.ReadAllAsync(stoppingToken),
    new ParallelOptions { MaxDegreeOfParallelism = workerCount, CancellationToken = stoppingToken },
    async (result, ct) => await ProcessAsync(result)
);

Option B is worth understanding because it keeps offset management simple: you always know which thread interacts with IConsumer, so commit calls are serialized without additional synchronization.

IProducer IConsumer
Thread-safe methods Produce, ProduceAsync None
Shared across threads Safe (by design) Undefined behavior
Recommended model Singleton, concurrent callers One instance per thread

Key Takeaways and What Comes Next

The Architecture Is the Explanation

Confluent.Kafka is a managed wrapper around librdkafka, a C library that handles all network I/O, message buffering, and protocol implementation. When you see behavior that feels surprising from a .NET perspective — native memory pressure under high throughput, exceptions that don't trace into managed code, a background thread that persists when you think the client is idle — the explanation is almost always in librdkafka's architecture.

This also explains why disposal matters more than it does for purely managed clients. The background thread and internal queue are native resources. Calling Dispose without first calling Flush on a producer doesn't block until in-flight messages are delivered — it discards whatever is still in the internal buffer, silently, with no exception or log warning.

Long-Lived, Stateful Objects — Not Request-Scoped

Both IProducer<TKey, TValue> and IConsumer<TKey, TValue> are long-lived objects that own TCP connection pools, internal queues, and background threads. Treating them as request-scoped objects multiplies construction cost with every operation and leaks native resources in the interval before cleanup.

🧠 Mnemonic: Think of IProducer and IConsumer the way you think of HttpClient — one instance shared across the lifetime of the application, not one per call.

// ✅ Register as a singleton in your DI container
public static IServiceCollection AddKafkaProducer(
    this IServiceCollection services,
    IConfiguration configuration)
{
    services.AddSingleton<IProducer<string, string>>(sp =>
    {
        var config = new ProducerConfig
        {
            BootstrapServers = configuration["Kafka:BootstrapServers"]
        };
        return new ProducerBuilder<string, string>(config).Build();
    });

    // Ensure Flush is called during application shutdown, not per-request.
    services.AddHostedService<KafkaShutdownService>();

    return services;
}

The IConsumer follows the same rule. Each new IConsumer instance is a new group member from the broker's perspective, which triggers a partition rebalance. Creating one per message continuously destabilizes the consumer group.

Serialization, Errors, and Cancellation Are Your Responsibility

The Confluent.Kafka client does not validate, log, or retry anything on your behalf. Three places this bites developers most often:

Serialization: ISerializer<T> and IDeserializer<T> must be registered on the builder for any type that isn't byte[] or string. There is no runtime warning if you forget.

Error surfaces: Non-fatal errors are surfaced through the Error property on ConsumeResult and DeliveryResult, not as exceptions. Code that only wraps the consume loop in a try/catch misses this entire error class. Checking result.Error.IsError before processing is the minimum.

Cancellation: The Consume(CancellationToken) overload is the correct shutdown path. Without it, your consumer service has no way to honor IHostedService.StopAsync within the framework's shutdown timeout.

What Changes in Your Mental Model

Concept ❌ Before ✅ After
Client architecture "It's just a .NET library" Managed wrapper around librdkafka; native memory and threads involved
Object lifetime Create per message / per request Singleton; one instance per application lifetime
Disposal using block is enough Flush first, then Dispose; Close before Dispose for consumers
Serialization The client handles it Caller-registered ISerializer / IDeserializer; no default for custom types
Error handling try/catch around Consume Also check result.Error.IsError; non-fatal errors skip exceptions
Cancellation Optional Required for graceful shutdown; use Consume(CancellationToken) overload

What Comes Next

Producer Configuration takes the ProducerConfig object — populated here with only BootstrapServers — and covers every knob governing delivery guarantees and batching: acks, linger.ms, batch.size, max.in.flight.requests.per.connection, idempotence, and their interactions. That lesson assumes you understand the librdkafka buffer and the producer's internal queue, both of which you now do.

Consumer & Manual Commits covers offset management: the difference between EnableAutoCommit and manual StoreOffset / Commit calls, what happens to uncommitted offsets during a rebalance, and how to avoid the duplicate-processing and message-loss failure modes that offset management errors produce. That lesson assumes you understand the distinction between Subscribe and Assign, and why the consumer is a stateful group member — both of which you now do.

🤔 Did you know? The Confluent.Kafka consumer's offset commit logic interacts with the group coordinator on the broker, not just local state. Calling Commit() sends a network request — it's not a local bookmark. This is why manual commit timing matters as much as it does, and why Consumer & Manual Commits warrants its own dedicated lesson.