Akka.Streams.Kafka
After mastering raw Confluent.Kafka, move to Akka.Streams.Kafka for backpressured reactive pipelines. This is where Kafka consumption becomes ergonomic and safe.
Why Akka.Streams.Kafka: From Raw Consumer to Reactive Pipeline
If you've spent time with Confluent.Kafka's raw consumer API, you know the poll loop intimately. You call consumer.Consume(timeout), hand the result to your processing logic, and repeat — forever. It feels straightforward until the day your downstream database slows down, your enrichment service starts timing out, or a burst of messages arrives faster than your handler can process them. At that point, a question that seemed settled reopens itself: who decides how fast messages flow through your system? With a raw poll loop, the answer is often "nobody in particular" — and that gap is exactly what Akka.Streams.Kafka was designed to close.
The Poll Loop Problem
The standard Confluent.Kafka consumer model looks roughly like this:
using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe("orders");
while (!cancellationToken.IsCancellationRequested)
{
// Kafka delivers messages at whatever rate the broker allows.
// Nothing here communicates how fast ProcessOrder can actually run.
var result = consumer.Consume(cancellationToken);
ProcessOrder(result.Message.Value); // Could be slow, could throw
consumer.Commit(result);
}
This pattern has a hidden structural flaw: the rate at which Kafka delivers messages is completely decoupled from the rate at which ProcessOrder completes. When those two speeds disagree, you face an unpleasant choice:
- Buffer in memory: Spin up a
Channel<T>orConcurrentQueue<T>between polling and processing. Now you're managing buffer bounds, overflow policies, and memory pressure yourself. If the queue grows unbounded, you're trading latency for potential out-of-memory crashes. - Block the poll loop: Call
ProcessOrdersynchronously before polling again. This keeps memory usage flat but means Kafka'smax.poll.interval.mstimer is ticking. Exceed it and the broker assumes your consumer is dead, triggering a group rebalance. - Drop work: Add a bounded queue with a drop policy. Simple, but you've accepted message loss without making that decision explicitly.
None of these options is wrong in every context, but none of them is automatic. Each requires coordination machinery that has nothing to do with your business logic. Consumer lag metrics often reveal this problem in production: lag spikes that appear whenever a downstream dependency slows, then gradually shrink when it recovers, are the signature of a poll loop with no way to communicate back-pressure to the broker.
Backpressure: Downstream Demand Flows Upstream
Backpressure is the mechanism by which a pipeline stage signals to the stage feeding it that it's ready to accept more work. Rather than a producer pushing at maximum rate, a consumer pulls at the rate it can sustain. The key insight is directional: demand signals flow upstream, data flows downstream.
[Kafka Topic] ←── demand signal ──── [Deserializer] ←── demand signal ──── [DB Writer]
│ │ │
└──────────── message data ───────────►└──────────── message data ───────────►
When the DB Writer is busy, it stops requesting new records. The Deserializer, receiving no demand from downstream, stops requesting records from Kafka. Kafka polling pauses automatically — not because you wrote code to pause it, but because the pipeline's structure enforces it.
This isn't a new idea. The Reactive Streams specification formalizes this demand protocol so that libraries from different authors can compose safely. Akka.Streams implements this specification, which means a Kafka source, a database sink, and an HTTP flow written independently can all participate in the same back-pressure contract without custom glue code.
Key Principle: Backpressure doesn't make your pipeline faster — it makes it honest. Instead of hiding the speed mismatch inside a buffer, it surfaces the constraint and lets the slowest stage govern the whole pipeline's throughput.
How Akka.Streams Models a Pipeline
Akka.Streams represents a pipeline as a graph composed of three primary building blocks:
| Shape | Role | Kafka Mapping |
|---|---|---|
| Source<T, M> | Produces elements; has an outlet, no inlet | KafkaConsumer.PlainSource or CommittableSource |
| Flow<In, Out, M> | Transforms elements; has both inlet and outlet | SelectAsync, KafkaProducer.FlexiFlow |
| Sink<T, M> | Consumes elements; has an inlet, no outlet | KafkaProducer.PlainSink, Sink.Ignore |
A working pipeline is built by connecting these shapes and then materializing the graph — the step that allocates resources, wires up actors, and starts data flowing. Until materialization, you're working with a blueprint that can be reused (though accidentally materializing it twice is a pitfall covered later in this lesson).
Here's a minimal illustration of shape composition:
// Blueprint construction — nothing runs yet.
var source = KafkaConsumer.PlainSource(
consumerSettings,
Subscriptions.Topics("orders")
);
var parseFlow = Flow.Create<ConsumeResult<string, string>>()
.Select(record => JsonSerializer.Deserialize<OrderEvent>(record.Message.Value));
var logSink = Sink.ForEach<OrderEvent>(evt =>
Console.WriteLine($"Received order {evt.OrderId}"));
// Materializing the graph — NOW it runs, NOW Kafka polling begins.
var control = source
.Via(parseFlow)
.ToMaterialized(logSink, Keep.Left)
.Run(materializer);
// `control` is IControl — the handle for graceful shutdown, covered next.
KafkaConsumer.PlainSource
│
│ ConsumeResult<string, string>
▼
parseFlow (Select)
│
│ OrderEvent
▼
logSink (ForEach)
Every arrow in this diagram is a back-pressured channel. If logSink is busy, parseFlow receives no demand and holds its current element. PlainSource receives no demand from parseFlow and defers its next poll. The Kafka consumer group stays healthy because the consumer isn't fetching records it can't yet process.
Kafka can participate as any of the three shapes depending on what your pipeline needs:
- As a Source:
KafkaConsumer.PlainSourceorCommittableSource— reads records and emits them downstream. - As a Sink:
KafkaProducer.PlainSink— acceptsProducerRecordinstances as a terminal stage. - As a Flow:
KafkaProducer.FlexiFlow— writes to Kafka and passes aDeliveryResultdownstream, enabling patterns like committing a consumer offset only after a successful produce.
Trade-offs You Accept
Akka.Streams.Kafka is not a free upgrade from raw Confluent.Kafka. The ergonomics come with concrete costs worth naming honestly.
Actor system overhead. Running Akka.Streams requires an ActorSystem — a non-trivial runtime with its own thread pool, dispatcher configuration, and lifecycle. For a microservice with simple processing logic and a single topic, this overhead may genuinely not be worth it. Raw Confluent.Kafka with a bounded Channel<T> is often sufficient.
Steeper learning curve. The graph DSL, materialization semantics, supervision strategies, and dispatcher model are a genuine investment. Developers familiar with LINQ and async/await often find the first week with Akka.Streams disorienting — particularly around how exceptions propagate and how threading is managed.
Reactive streams coordination cost. The demand-signaling protocol adds overhead compared to a tight polling loop. For trivial per-message work at extreme throughput, this overhead can be measurable. Akka.Streams uses fusing — collapsing adjacent stages into a single actor — to reduce this significantly, but it doesn't eliminate it. The practical threshold where ergonomics clearly outweigh the cost is pipelines doing meaningful per-message I/O work, where coordination overhead is negligible relative to I/O latency.
| Reach for Akka.Streams.Kafka when... | Stick with raw Confluent.Kafka when... |
|---|---|
| Processing involves I/O per message | Processing is pure CPU-bound computation |
| You need composable back-pressure | You need the lowest possible latency overhead |
| Pipeline has multiple transformation stages | Operational simplicity is the top priority |
| Error handling must be per-stage | Team is new to reactive streams concepts |
The shift from a raw poll loop to a declarative graph isn't just a style preference — it's a change in where the invariants of your pipeline are enforced. In a poll loop, correct behavior depends on the discipline of the code you write. In a backpressured graph, correct behavior emerges from the structure of the graph itself.
Core Abstractions: Consumer Sources, Producer Sinks, and the Materialized Control Handle
Akka.Streams.Kafka surfaces exactly three concerns at its API boundary: reading from Kafka, writing to Kafka, and controlling a running stream from outside of it. Understanding what each abstraction emits, accepts, or returns — and when to reach for one variant over another — is the foundation everything else in this library builds on.
Consumer Sources: PlainSource vs. CommittableSource
On the consumption side, the library gives you two factory methods on KafkaConsumer, and the choice between them is a delivery-guarantee decision you make at graph construction time, not at runtime.
KafkaConsumer.PlainSource emits ConsumeResult<TKey, TValue> — the same record type you'd receive from a raw Confluent.Kafka poll call. Offset advancement is handled automatically by the underlying client according to EnableAutoCommit and AutoCommitIntervalMs. This makes PlainSource appropriate when you want the simplest possible integration and either (a) you're processing idempotent read-only analytics where losing a few messages on restart is acceptable, or (b) you're managing offset storage yourself in an external system.
KafkaConsumer.CommittableSource emits CommittableMessage<TKey, TValue>. That wrapper bundles the ConsumeResult together with an ICommittableOffset handle you can pass downstream and commit, individually or in batches. Committing through this handle advances the Kafka consumer group offset in a way your application controls explicitly. This is the entry point for at-least-once delivery: you only commit an offset after your downstream stage has successfully processed the message, so a crash before the commit causes Kafka to re-deliver from the last committed position.
// PlainSource — automatic offset management, simplest form
var plainSource = KafkaConsumer.PlainSource(
consumerSettings,
Subscriptions.Topics("orders")
);
// Emits: ConsumeResult<string, string>
// CommittableSource — explicit offset control for at-least-once delivery
var committableSource = KafkaConsumer.CommittableSource(
consumerSettings,
Subscriptions.Topics("orders")
);
// Emits: CommittableMessage<string, string>
// Each element carries message.CommittableOffset — call .Commit() after processing
Key Principle: The type the source emits tells you everything about its commit contract.
ConsumeResult= no explicit commit path exists in the graph.CommittableMessage= commit path exists and you are responsible for exercising it.
⚠️ Switching from CommittableSource to PlainSource to simplify a pipeline silently drops at-least-once guarantees. On a crash-and-restart, the consumer resumes from wherever the auto-commit cursor last landed — which may be ahead of what your downstream actually processed. This surfaces as silent message loss, not an exception.
Subscription Strategies and Offset-Reset Behavior
Both source factories accept an ISubscription object that controls how the consumer is assigned to partitions. The Subscriptions static class provides three factory methods, each implying different behavior around partition assignment and offset initialization.
Topic-name subscription (Subscriptions.Topics("orders", "payments")) registers the consumer with the broker's group coordinator, which assigns partitions dynamically and rebalances when group membership changes. Offset-reset behavior on first consumption is governed by AutoOffsetReset in your ConsumerSettings — typically Earliest or Latest.
Pattern subscription (Subscriptions.TopicPattern("order.*")) works like topic-name subscription but the broker matches topic names against the regex at assignment time. New topics matching the pattern are picked up without a code change.
Explicit partition assignment (Subscriptions.Assignment(new TopicPartition("orders", 0))) bypasses the group coordinator entirely, pinning your application to specific partitions. Crucially, no group coordinator means no auto-rebalance and no automatic offset management — you must supply starting offsets explicitly via Subscriptions.AssignmentWithOffset.
// Explicit partition + starting offset: start reading partition 0 from offset 500
var assignmentSub = Subscriptions.AssignmentWithOffset(
new TopicPartitionOffset("orders", partition: 0, offset: new Offset(500))
);
var source = KafkaConsumer.PlainSource(consumerSettings, assignmentSub);
Mental Model: Think of the three strategies as a dial from fully managed (topic name) to fully manual (explicit assignment with offset). As you turn the dial toward manual, you gain precision and give up automatic rebalancing.
Producer Sinks and FlexiFlow
On the production side, the library offers two complementary shapes depending on whether Kafka writing is the terminal step of your graph or a midpoint you need to pass through.
KafkaProducer.PlainSink is a terminal Sink<ProducerRecord<TKey, TValue>, Task>. Use this when producing to Kafka is the final action of the pipeline and you don't need the delivery receipt downstream.
KafkaProducer.FlexiFlow is a pass-through Flow<ProducerRecord<TKey, TValue>, DeliveryResult<TKey, TValue>, NotUsed>. It accepts producer records, writes them, and emits a DeliveryResult carrying the assigned TopicPartitionOffset confirming where Kafka wrote the record. Use FlexiFlow when you need to act after the write is acknowledged — updating a database cursor, committing a CommittableOffset, or auditing the exact offset.
PlainSink FlexiFlow
records ──► [KafkaProducer] ──► (done) records ──► [KafkaProducer] ──► DeliveryResult
terminal sink pass-through; emits receipt
The practical difference is clearest in a consume-transform-produce pipeline with explicit offset commits. PlainSink loses the signal for when the Kafka write succeeded, so you cannot safely commit the consumer offset only after producer confirmation. FlexiFlow gives you that acknowledgment in the stream.
// FlexiFlow — pass-through; DeliveryResult continues downstream
var flexiFlow = KafkaProducer.FlexiFlow<string, string, NotUsed>(producerSettings);
source
.Select(msg => ProducerMessage.Single(
new ProducerRecord<string, string>("results", msg.Message.Value),
passThrough: msg.CommittableOffset)) // carry offset through for later commit
.Via(flexiFlow)
.Select(result => result.Envelope.PassThrough) // extract the ICommittableOffset
.ToMaterialized(Committer.Sink(committerSettings), Keep.Both)
.Run(materializer);
The IControl Materialized Value
Every consumer source in Akka.Streams.Kafka materializes an IControl object — the bridge between your declarative, running graph and the imperative world outside it.
IControl exposes two primary methods:
Stop()— signals the source to stop polling Kafka. In-flight messages already emitted continue to flow; no new messages are fetched.Drain()— likeStop(), but returns aTask<Done>that completes only after all downstream stages have processed every in-flight element and the stream has fully terminated. This is the safe choice for graceful shutdown.
To get hold of IControl, materialize the source using ToMaterialized with a Keep.Left or Keep.Both strategy:
var (control, streamCompletion) = KafkaConsumer
.CommittableSource(consumerSettings, Subscriptions.Topics("orders"))
.Via(processingFlow)
.ToMaterialized(Committer.Sink(committerSettings), Keep.Both)
.Run(materializer);
// Wire control.Drain() to your application shutdown signal
appLifetime.ApplicationStopping.Register(async () =>
{
await control.Drain(); // wait for in-flight messages to finish
// now safe to shut down infrastructure
});
streamCompletion is a Task that completes (or faults) when the stream terminates for any reason. IControl also exposes IsShutdown — a Task<Done> that completes when the consumer has been shut down. You can use it as a liveness signal: if IsShutdown is already completed when your health check runs, the stream is gone and a restart is warranted.
Actor System Lifecycle and In-Flight Messages
Because Akka.Streams runs on top of an Akka actor system, the stream's runtime is coupled to that system's lifecycle. If you call ActorSystem.Terminate() while messages are still in flight, the stream stages receive no graceful shutdown signal — the underlying actor machinery simply stops.
Messages that have been fetched from Kafka but not yet committed will be re-delivered from the last committed offset on restart. This is correct for at-least-once semantics, but only if you're using CommittableSource faithfully.
ActorSystem.Terminate() ──► stream stages stop immediately
│
messages in buffer │ NOT committed
▼
Kafka re-delivers from last committed offset on restart
The correct shutdown sequence:
await control.Drain(); // 1. stop source, let in-flight messages finish
await ActorSystem.Terminate(); // 2. only now tear down the actor system
⚠️ Calling ActorSystem.Terminate() in a finally block without first awaiting control.Drain() guarantees some messages will be re-processed on the next startup. Under high concurrency, the window can be surprisingly large.
Abstractions at a Glance
| Abstraction | Emits / Accepts | Use When |
|---|---|---|
PlainSource |
Emits ConsumeResult<K,V> |
Auto-commit or external offset store; idempotent workloads |
CommittableSource |
Emits CommittableMessage<K,V> |
At-least-once delivery; explicit commit after processing |
PlainSink |
Accepts ProducerRecord<K,V> |
Kafka write is the terminal step; no downstream receipt needed |
FlexiFlow |
Accepts records, emits DeliveryResult |
Need write acknowledgment downstream (e.g., to commit consumer offset) |
IControl |
Imperative handle | Graceful shutdown (Drain()), liveness check (IsShutdown) |
These five types are the primary vocabulary of any Akka.Streams.Kafka pipeline. Everything else — commit batching, transactional producers, restart supervision — builds on top of these primitives.
Wiring a Basic End-to-End Pipeline in C#
With the conceptual scaffolding in place, it's time to assemble a real pipeline and watch the pieces click together. This section builds a complete consume-transform-produce pipeline from scratch: consuming from one Kafka topic, applying an async transformation, and producing to another.
Configuring ConsumerSettings and ProducerSettings
Before any graph can run, Akka.Streams.Kafka needs to know how to reach the broker, which consumer group to join, and how to serialize and deserialize messages. These concerns live in ConsumerSettings<TKey, TValue> and ProducerSettings<TKey, TValue> — immutable configuration objects you construct once and pass into graph stages.
using Akka.Actor;
using Akka.Kafka;
using Akka.Kafka.Dsl;
using Akka.Streams;
using Akka.Streams.Dsl;
using Confluent.Kafka;
// Bootstrap the actor system — streams live inside this system.
var system = ActorSystem.Create("order-pipeline");
var materializer = system.Materializer();
// Deserializer/serializer selection is a compile-time type decision.
var consumerSettings = ConsumerSettings<Null, string>
.Create(system, null, Deserializers.Utf8)
.WithGroupId("order-processor")
.WithBootstrapServers("localhost:9092")
.WithProperty("auto.offset.reset", "earliest");
// ProducerSettings maintains a shared producer instance per settings object —
// you are not paying the cost of a new TCP connection per message.
var producerSettings = ProducerSettings<Null, string>
.Create(system, null, Serializers.Utf8)
.WithBootstrapServers("localhost:9092");
A few points worth noting. Deserializer and serializer selection is baked into the generic type parameters. The .WithProperty() method is a thin pass-through to the underlying kafka-clients configuration, so any Confluent.Kafka property name is valid. Prefer loading as much as possible from HOCON configuration and using .With*() fluent overrides only for values that genuinely vary at runtime.
Building the Graph: Source → Flow → Sink
┌───────────────────────────────────────────────────────────────────┐
│ Akka.Streams Graph │
│ │
│ KafkaConsumer Flow KafkaProducer │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ PlainSource │───▶│ SelectAsync │────────▶│ PlainSink │ │
│ │ (orders-in) │ │ (enrich msg) │ │ (orders-out) │ │
│ └─────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │
│ │◀──── backpressure demand signal ────────────│ │
└───────────────────────────────────────────────────────────────────┘
Here is the complete pipeline wired in code:
var (control, streamCompletion) = KafkaConsumer
.PlainSource(consumerSettings, Subscriptions.Topics("orders-in"))
// SelectAsync runs up to `parallelism` transforms concurrently.
// Keeping this async is critical — see the pitfalls section.
.SelectAsync(parallelism: 4, async message =>
{
// Simulate an I/O-bound enrichment step, e.g., a database lookup.
var enriched = await EnrichOrderAsync(message.Value);
return new ProducerRecord<Null, string>(
topic: "orders-out",
value: enriched
);
})
.ToMaterialized(
KafkaProducer.PlainSink(producerSettings),
Keep.Both // retain both IControl and the sink's completion Task
)
.Run(materializer);
static async Task<string> EnrichOrderAsync(string rawOrder)
{
await Task.Delay(10); // replace with actual DB/HTTP call
return $"ENRICHED:{rawOrder}";
}
The Keep.Both argument to ToMaterialized returns two useful values: the IControl handle from the consumer source and a Task that completes when the stream finishes or faults.
This example uses PlainSource, which does not commit offsets. As established in the previous section, PlainSource is appropriate when offset management is external or when you're experimenting; for production at-least-once delivery, swap in CommittableSource.
Using SelectAsync for I/O-Bound Work
SelectAsync is the correct choice any time the transformation involves I/O — HTTP calls, database reads, cache lookups. The parallelism parameter sets how many async Tasks can be in-flight simultaneously from this stage.
Key Principle: The
parallelismfactor is not a thread count — it is a concurrency ceiling on async Tasks. Because these are genuinely async operations, four concurrent enrichment calls typically consume zero threads while awaiting I/O. Tuning it means finding the point at which adding more concurrency stops reducing latency.
⚠️ Calling .Result or .Wait() inside a synchronous Select instead of using SelectAsync blocks a dispatcher thread for the duration of each I/O operation, quickly starving the thread pool and stalling the entire stream. This pitfall is covered in depth in the next section.
Wiring IControl to a Shutdown Hook
// In a hosted service, this belongs in StopAsync.
AppDomain.CurrentDomain.ProcessExit += async (_, _) =>
{
// Drain waits for in-flight messages to complete before stopping.
// Stop() halts immediately; prefer Drain for safety.
await control.Drain();
await streamCompletion; // wait for the stream to fully terminate
await system.Terminate();
};
// Block the main thread until the stream completes normally or faults.
await streamCompletion;
The ordering — Drain() first, then await completion, then terminate the system — ensures every message the source had already fetched has had the opportunity to pass through every stage and commit its offset before the runtime disappears.
In a Kubernetes deployment, a pod receives SIGTERM before SIGKILL. A consumer group member that shuts down cleanly via Drain() commits its in-progress work and releases its partition assignment before exiting, letting the group rebalance without duplicating messages already processed.
Running Locally and Observing the Pipeline
The fastest way to verify the pipeline behaves as expected is against a single-node Kafka broker. Docker Compose works well here — partition assignment, consumer group coordination, and offset tracking all function identically on a single node.
Once the application is running, verify three things:
1. Partition assignment. The broker's coordinator logs will show the consumer group forming and partitions being assigned.
2. Message flow. Use kafka-console-producer to inject messages into orders-in, then watch orders-out:
## Produce test messages to orders-in
kafka-console-producer --bootstrap-server localhost:9092 --topic orders-in
>order-001
>order-002
## In another terminal, consume from orders-out to verify transformation
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic orders-out --from-beginning
## Expected output:
## ENRICHED:order-001
## ENRICHED:order-002
3. Consumer group lag. With PlainSource, offsets are not committed, so lag will appear to grow — a correct and expected observation that reinforces why PlainSource fits only stateless or externally managed offset scenarios.
kafka-consumer-groups --bootstrap-server localhost:9092 \
--group order-processor \
--describe
## Sample output:
## GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
## order-processor orders-in 0 - 47 -
The - in CURRENT-OFFSET with PlainSource confirms no offsets are being committed. If you swap in CommittableSource, real numbers will appear in those columns as commits are batched upstream.
Consumer lag is one of the most actionable Kafka health signals available. A lag value that grows monotonically means your pipeline's processing throughput is below the producer's publish rate — which in Akka.Streams terms usually means either the SelectAsync parallelism ceiling is too low, or the downstream sink is the bottleneck.
Common Pitfalls When Starting with Akka.Streams.Kafka
Adopting Akka.Streams.Kafka after working with raw Confluent.Kafka is not a drop-in migration — it is a shift in mental model. The mistakes covered here share a common trait: they do not cause immediate, loud failures. Instead, they produce subtle misbehavior — stalled pipelines, silently re-processed messages, duplicate consumers, or streams that die on a single bad payload.
Pitfall 1: Blocking Inside a Stream Stage
The most frequent mistake made by developers coming from imperative Confluent.Kafka code is treating a stream stage as a thread. In a raw consumer loop, calling an async method synchronously with .Result or .Wait() is ugly but functional — each loop iteration runs on a dedicated thread. Inside an Akka.Streams stage, that assumption breaks completely.
Stream stages execute on an Akka dispatcher — a shared thread pool. When a synchronous Select stage calls .Result on a Task, it occupies one of those shared threads for the full duration of the awaited operation. Under any real load, all available dispatcher threads can be consumed by waiting, leaving no threads to process downstream demand signals or upstream fetch results. The stream stalls — not with an error, just silence.
// ❌ WRONG: blocking inside a synchronous Select stage
var badStream = KafkaConsumer
.PlainSource(consumerSettings, Subscriptions.Topics("orders"))
.Select(msg =>
{
// Blocks a dispatcher thread for the full HTTP round-trip duration.
var result = httpClient.PostAsync("/process", content).Result;
return result.StatusCode;
})
.RunWith(Sink.Ignore<HttpStatusCode>(), materializer);
// ✅ CORRECT: non-blocking async work via SelectAsync
var goodStream = KafkaConsumer
.PlainSource(consumerSettings, Subscriptions.Topics("orders"))
.SelectAsync(parallelism: 4, async msg =>
{
// Dispatcher thread is released while the HTTP call is in flight.
var response = await httpClient.PostAsync("/process", content);
return response.StatusCode;
})
.RunWith(Sink.Ignore<HttpStatusCode>(), materializer);
⚠️ The same problem applies to custom GraphStage implementations that call blocking APIs in their OnPush handlers. If you are writing a custom stage, ensure any async work uses GetAsyncCallback to safely re-enter the stage after a Task completes.
Pitfall 2: Forgetting That PlainSource Does Not Commit Offsets
PlainSource is attractive for its simplicity. Developers sometimes switch to it to reduce boilerplate, then carry that choice forward into production code.
The distinction is not about what Kafka tracks but about what gets written back. PlainSource with enable.auto.commit = false (the default in Akka.Streams.Kafka's ConsumerSettings) means offsets are never committed. With enable.auto.commit = true, commits happen on Kafka's timer regardless of whether your processing succeeded — trading silent-loss for silent-duplication.
Consumer restart with PlainSource (auto-commit off):
Partition offset log:
[0] [1] [2] [3] [4] [5] [6] [7]
^ ^last produced
|
Last committed offset = 3
(never advanced by PlainSource)
On restart: consumer reads from offset 3 again.
Messages 3-7 are reprocessed silently.
Switching from CommittableSource to PlainSource for simplicity is safe only in read-once, throwaway pipelines. Any pipeline that must survive restarts needs CommittableSource and explicit offset commits.
Pitfall 3: Accidentally Materializing the Same Blueprint Multiple Times
In Akka.Streams, a graph blueprint is an immutable description of a pipeline — not the pipeline itself. Calling .Run() on a blueprint materializes it, creating a live, running pipeline with its own actor hierarchy, Kafka consumer instance, and consumer group membership.
If you hold a reference to the blueprint and call .Run() more than once — inside a retry loop, a controller action, or a startup method invoked twice — you create multiple independent consumers in the same consumer group:
Single blueprint materialized twice — what Kafka sees:
Consumer Group: "order-processors"
┌─────────────────────────────────────────────────┐
│ Instance A (materialization 1) │
│ Assigned partitions: [0, 1] │
│ │
│ Instance B (materialization 2) ← unintended │
│ Assigned partitions: [2, 3] │
└─────────────────────────────────────────────────┘
Effect: each instance processes only half the topic.
Shutting down one instance triggers another rebalance.
// ❌ WRONG: blueprint stored and Run() called in a loop
var blueprint = KafkaConsumer
.PlainSource(consumerSettings, Subscriptions.Topics("orders"))
.SelectAsync(4, ProcessAsync)
.ToMaterialized(Sink.Ignore<Unit>(), Keep.Left);
var control1 = blueprint.Run(materializer); // consumer 1 joins group
var control2 = blueprint.Run(materializer); // consumer 2 joins group — unintended!
// ✅ CORRECT: materialize exactly once, hold the IControl reference
var control = KafkaConsumer
.PlainSource(consumerSettings, Subscriptions.Topics("orders"))
.SelectAsync(4, ProcessAsync)
.ToMaterialized(Sink.Ignore<Unit>(), Keep.Left)
.Run(materializer);
Pro Tip: Guard materialization with a flag or structured startup sequence. In a hosted service, materialize in
StartAsyncand drain viaIControl.Drain()inStopAsync.
Pitfall 4: Buffer Size Misconfiguration That Makes the Stream Appear to Stall
A pipeline that runs correctly in integration tests can appear to stall entirely in production — emitting no messages for seconds at a time, then bursting. A common root cause is the interaction between two independent buffer settings developers rarely tune together.
max.partition.fetch.bytes caps how much data the broker returns per partition per fetch request. akka.stream.materializer.max-input-buffer-size controls how many elements Akka buffers at each async boundary within the stream graph.
The stall scenario: if max.partition.fetch.bytes is very small relative to average message size, the broker returns few messages per fetch. Meanwhile, if the Akka materializer's input buffer fills because the downstream stage is slow, the stream stops pulling from the source entirely. The two effects compound.
Stall interaction:
Kafka Broker
│ fetch.min.bytes = 1024 (default)
│ max.partition.fetch.bytes = 1048576 (default)
▼
PlainSource (internal buffer: max-input-buffer-size = 16 elements)
│
│ ← Buffer full: no downstream demand
│ ← Source stops requesting from broker
▼
SelectAsync (parallelism: 4, all slots occupied by slow ops)
▼
Sink
Net effect: broker sees no fetch requests → consumer lag grows
despite the stream technically "running" without errors.
Approach these settings together:
- Increase
akka.stream.materializer.max-input-buffer-sizeif downstream processing is consistently slower than fetch throughput. - Tune
max.partition.fetch.bytesto match realistic message sizes. - Consider
fetch.min.bytes = 1in latency-sensitive pipelines, at the cost of more frequent small fetches.
⚠️ The Akka.Streams materializer buffer size is a power-of-two value. If you configure a non-power-of-two value, Akka rounds it up silently — set 20 and you get 32. Account for the actual effective value in your tuning arithmetic.
Pitfall 5: Not Handling Failures with RestartSource or Supervision
Akka.Streams propagates failures as stream termination signals — when an unhandled exception escapes a stage, it tears down the entire pipeline. A single malformed message can kill the consumer, stop all partition processing, and potentially leave offsets un-committed.
Two complementary tools prevent this: supervision strategies and RestartSource. They solve different problems.
A supervision strategy intercepts exceptions thrown within a stage and decides whether to Stop the stream, Resume (drop the offending element and continue), or Restart the stage. Supervision is the right tool for per-message errors like deserialization failures.
var decider = new Func<Exception, Directive>(ex =>
ex is DeserializationException
? Directive.Resume // drop the bad message, log externally, continue
: Directive.Stop); // unknown exceptions still bring down the stream
var materializerWithSupervision = materializer.WithAttributes(
ActorAttributes.CreateSupervisionStrategy(decider));
KafkaConsumer
.PlainSource(consumerSettings, Subscriptions.Topics("orders"))
.SelectAsync(4, async msg => await DeserializeAndProcessAsync(msg))
.RunWith(Sink.Ignore<Unit>(), materializerWithSupervision);
⚠️ Directive.Resume silently drops the element. If the bad message must be preserved for inspection, route it to a dead-letter topic before the exception propagates — inside the SelectAsync body with a try/catch, or inside the supervision decider using a side-channel actor.
RestartSource is the outer defense: it wraps a source factory and automatically recreates the source when it fails or completes unexpectedly, with configurable back-off. This is the right tool for connection-level failures (broker unavailable, authentication timeout) rather than per-message logic errors.
var restartingSource = RestartSource.WithBackoff(
minBackoff: TimeSpan.FromSeconds(3),
maxBackoff: TimeSpan.FromSeconds(30),
randomFactor: 0.2,
maxRestarts: -1, // -1 = unlimited restarts
sourceFactory: () =>
KafkaConsumer.PlainSource(consumerSettings, Subscriptions.Topics("orders"))
);
restartingSource
.SelectAsync(4, ProcessAsync)
.RunWith(Sink.Ignore<Unit>(), materializer);
Note that RestartSource recreates the Kafka consumer on each restart, which means the new consumer seeks to the last committed offset — another reason the PlainSource vs. CommittableSource decision matters so much for resilience.
Key Principle: Layer your failure handling: use supervision strategies for per-message exceptions, and
RestartSourcefor source-level failures. Using only one leaves either message-level or infrastructure-level failures unhandled.
Summary and What Comes Next
What Changed in Your Mental Model
The shift from raw Confluent.Kafka to Akka.Streams.Kafka isn't primarily about syntax. It's about where control over consumption rate lives.
In an imperative poll loop, consumption rate is a tuning parameter — you configure max.poll.records, call Consume() on a timer, and hope your processing keeps pace. When it doesn't, you must choose between unbounded buffering, blocking, or dropping work. The loop has no mechanism to push back against Kafka.
Akka.Streams.Kafka replaces this with a graph that enforces backpressure structurally. Downstream stages signal demand upstream through the reactive streams protocol, so the Kafka source only fetches the next batch when something downstream is ready to process it. Consumption rate becomes a function of actual downstream capacity — not a number you guessed at configuration time.
Imperative model:
poll() → buffer → process → (buffer overflows if slow)
[Rate controlled by: config]
Reactive graph model:
Sink demands element
→ Flow processes if demand exists
→ Source fetches from Kafka only then
[Rate controlled by: downstream capacity]
Three Structural Decisions Made at Graph Construction Time
1. Source type encodes your delivery guarantee. The choice between PlainSource and CommittableSource is not revisable at runtime — it changes the element type flowing through the entire graph. Before writing any graph, answer: "If this process crashes after processing a message but before doing anything else, is it correct to process that message again on restart?"
- If yes (at-least-once is required): use
CommittableSource. - If the application genuinely doesn't care (stateless, or external offset tracking):
PlainSourceis appropriate. - If exactly-once is required: use
CommittableSourcewith transactional producers (covered in the next lesson).
2. IControl is the imperative escape hatch — keep it narrow. The three meaningful things IControl exposes are Stop() (signal the source to stop emitting), Drain() (await full in-flight drain before termination), and IsShutdown (a Task usable as a liveness signal). It is for interacting with a running stream from outside the graph. It is not for driving stream logic from within the graph.
3. Blueprint ≠ instance. A graph definition is a blueprint. Calling .Run() materializes it, creating actual resources. Calling .Run() twice on the same blueprint creates two independent consumers. This is correct when you want parallelism; it is a costly mistake when accidental.
Pitfall Summary
| # | Symptom | Root Cause | Fix |
|---|---|---|---|
| 1 | Stream stalls under load | .Result/.Wait() in Select |
Use SelectAsync |
| 2 | Messages replayed on restart | PlainSource with no commits |
Switch to CommittableSource |
| 3 | Partial throughput, rebalances | Blueprint materialized twice | Materialize once, guard startup |
| 4 | Intermittent stall, no error | Buffer/fetch size mismatch | Tune both together |
| 5 | One bad message kills pipeline | No supervision or RestartSource | Layer both strategies |
The Canonical Shutdown Pattern
This structure should appear in every production consumer you write. The specific operator chain will change; the shutdown wire-up should not.
var (control, done) = KafkaConsumer
.PlainSource(consumerSettings, Subscriptions.Topics("orders"))
.SelectAsync(4, async msg =>
{
await ProcessOrderAsync(msg.Message.Value);
return msg;
})
.ToMaterialized(Sink.Ignore<ConsumeResult<string, string>>(), Keep.Both)
.Run(materializer);
Console.CancelKeyPress += (_, e) => { e.Cancel = true; };
await control.Drain();
await done;
await system.Terminate();
What Comes Next: The Committable Source Pipeline
The next lesson takes CommittableSource and builds the full offset management story on top of it:
- Offset commit batching: Accumulating
CommittableOffsetinstances and committing them as a group usingCommittableOffsetBatch, making high-throughput at-least-once delivery practical without flooding the Kafka coordinator. - Transactional producers: The
KafkaProducertransactional API allows atomically producing to an output topic and committing the input offset in a single transaction. - Exactly-once semantics (EOS): Built on transactional producers and
CommittableSource, eliminating the duplicate-processing window that at-least-once delivery leaves open.
All three are direct extensions of the CommittableSource foundation introduced here. Before starting that lesson, sketch on paper the graph shape you'd use for at-least-once delivery with batched commits: CommittableSource → processing flow → Batch → commit. The next lesson will show you the exact Akka.Streams.Kafka operators that realize each of those stages.