Dynamic Kafka Consumer Design for Task-Level Separation
In distributed systems, decoupling message producers and consumers is often desirable, allowing for scalability, independent deployment, and fault isolation. However as systems evolve, specific use cases arise that require fine-grained control over message consumption based on key types within a single Kafka topic. In this blog, we’ll explore how we approached this problem, discuss the alternative solutions we considered and document the final approach we implemented.
This post continues the principles we discussed in Dynamic Task Scheduling Using Flex Task Scheduler, extending the focus to Kafka consumers handling complex task workflows.
The Problem
Our Kafka topic, kubernetes
, serves as a single source of task-related messages for various workflows. The producers, which are external to our team, use message keys to distinguish different task types. For example:
kubernetes.analyze-nodes
kubernetes.cleanup-nodes
kubernetes.process-events
Initially, a single consumer group handled all the messages. However, as the system grew, we encountered a challenge:
- Some task types, such as
cleanup-nodes
, are long-running (hours). They slowed down the processing of other tasks. - Producers cannot be changed at this stage to create separate topics for task types.
This left us with a single topic, kubernetes
, but the need to segregate long-running tasks (cleanup-nodes
) from the rest of the workload. We had to ensure they could run independently while avoiding interference or backlog.
Initial Exploration: Options We Considered
When faced with the challenge of separating long-running tasks (cleanup-nodes
) from other task types within a single Kafka topic, we considered the following options:
1. Separate Topics per Task Type
The most straightforward and scalable approach would have been to split tasks into separate Kafka topics based on their types. For example:
kubernetes.analyze-nodes
kubernetes.cleanup-nodes
kubernetes.process-events
This approach has clear advantages:
- Each topic can be processed independently by its own consumer group.
- No filtering logic is required in the consumer, as each consumer only subscribes to the relevant topic.
- Scaling can be fine-tuned for each task type — for example, you could allocate more resources to handle high-throughput topics or long-running tasks.
However, this was not feasible in our case because:
- Right now, the producer uses a single topic (
kubernetes
) with different message keys for task types. - Changing the topic structure would require significant coordination between teams, making it impractical in the short term.
2. Partitioning by Key
Kafka’s partitioning mechanism guarantees that all messages with the same key are routed to the same partition within a topic. This allows for logical grouping of messages at the partition level. For example:
- Messages with the key
kubernetes.cleanup-nodes
always end up in the same partition. - Consumers in the same group are assigned partitions, and only one consumer processes each partition.
We considered leveraging this mechanism to separate processing by key:
- A specific consumer could handle the partition containing
cleanup-nodes
, while others handled the remaining partitions. - This would give us a degree of isolation, as
cleanup-nodes
messages would not interfere with messages in other partitions.
Challenges with this approach:
- Partition Ownership is by Partition, Not Key:
- Kafka assigns partitions to consumers, not individual keys. A single partition might contain multiple keys (
kubernetes.cleanup-nodes
,kubernetes.analyze-nodes
, etc.), so a consumer would still need to filter messages by key. - There’s no way to “force” a specific consumer to handle only one key unless you can guarantee that the key maps uniquely to a partition.
2. Partition-to-Key Mapping is Opaque:
- Kafka uses a consistent hashing algorithm to map keys to partitions. Without explicit control over the partitioning logic, it can be difficult to ensure a predictable mapping of keys to partitions.
- If the number of partitions changes (e.g., if the topic is reconfigured to increase partitions), the mapping of keys to partitions may change unpredictably.
Why We Didn’t Choose This Approach:
- While partitioning by key works in principle, it adds significant complexity, especially when partition-to-key mapping isn’t under direct control.
- The filtering logic required to handle multiple keys within a partition makes the approach less efficient than a true key-based separation.
3. Filtering by Message Key
Another option we explored was to use filtering logic within the consumers themselves. In this approach:
- All consumers in the group subscribe to the
kubernetes
topic and consume all messages. - Each consumer filters messages based on their key. For example:
- Consumer 1 processes all keys exceptcleanup-nodes
Consumer 2 processes only
-cleanup-nodes
.
This approach is simple to implement:
- The filtering logic can be added directly to the consumer code.
- The Kafka producer and topic structure remain unchanged.
Challenges with this approach:
- Resource Waste:
- All consumers receive all messages in the topic, even if they only process a subset of them. This leads to wasted network and compute resources, especially in high-throughput scenarios.
- For example, a consumer processing only
cleanup-nodes
would still receive and discard all other task types, creating inefficiencies.
2. Consumer Lag:
- In a high-volume topic, the filtering consumer might lag behind as it discards irrelevant messages. This could lead to backpressure or delays in processing the relevant messages.
- If a significant portion of the messages are irrelevant to a particular consumer, the consumer spends more time skipping messages than processing them.
3. Scaling Challenges:
- Adding more consumers to the group doesn’t improve performance for a specific key type, as all consumers receive all messages. In other words, scaling consumers doesn’t help if each consumer still has to discard most of the messages.
Why We Didn’t Choose This Approach:
- The inefficiency of having all consumers process all messages, only to discard most of them, made this approach less desirable.
- While it might work for low-throughput scenarios, it doesn’t scale well when handling high message volumes or long-running tasks.
Our Approach: Dynamic Consumer Groups for Key-Based Separation
We ultimately decided to split the processing logic by creating multiple consumer groups for the same topic. Each group processes the same topic but filters and handles specific message keys independently. Here’s how it works:
Key Features of the Approach
- Separate Consumer Groups:
- Each consumer group has its own offset management, enabling independent processing of messages.
- For example:
group-1
: Processes all message keys exceptcleanup-nodes
.group-2
: Processes onlycleanup-nodes
.
2. Generic Consumer Logic:
- The consumer application remains the same for all groups.
- Filtering logic is controlled via an external configuration (e.g., environment variables).
- The
GROUP_ID
for Kafka is passed dynamically, ensuring the same code can be deployed across different groups.
3. Deployment-Specific Configuration:
- In Kubernetes, the
GROUP_ID
is injected via environment variables in the deployment YAML. - In bare-metal deployments, the
GROUP_ID
is passed via Ansible playbooks or command-line arguments.
Implementation
Consumer Code
Here’s how the consumer application was designed to dynamically determine its behavior based on configuration:
var groupId = Environment.GetEnvironmentVariable("GROUP_ID")
?? throw new InvalidOperationException("GROUP_ID is not set.");
var topic = Environment.GetEnvironmentVariable("TOPIC") ?? "kubernetes";
var processKeyFilter = Environment.GetEnvironmentVariable("PROCESS_KEY_FILTER")
?? "all"; // Default: process all keys
var config = new ConsumerConfig
{
GroupId = groupId,
BootstrapServers = "kafka:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe(topic);
Console.WriteLine($"Consumer started with GROUP_ID={groupId} and PROCESS_KEY_FILTER={processKeyFilter}");
while (true)
{
var consumeResult = consumer.Consume();
// Filter messages dynamically based on the environment variable
if (processKeyFilter == "cleanup-nodes" && consumeResult.Message.Key != "cleanup-nodes")
{
continue; // Skip non-cleanup-nodes keys
}
if (processKeyFilter == "not-cleanup-nodes" && consumeResult.Message.Key == "cleanup-nodes")
{
continue; // Skip cleanup-nodes
}
Console.WriteLine($"Processing message with key: {consumeResult.Message.Key}");
// Your processing logic here
}
Key Benefits of the Approach
- Separation of Concerns: Long-running tasks (
cleanup-nodes
) are completely decoupled from other tasks, avoiding interference or backlogs.
2. Generic and Configurable: The consumer application is generic and can handle different scenarios by simply changing environment variables.
3. Scalability: Both consumer groups can scale independently, with different replicas handling their respective workloads.
Comparison Summary
Let us recap differences between consuming all messages within a group and in multiple groups.
Filtering Within a Single Consumer Group
- How it Works: Consumers in the same group are assigned partitions. Each consumer filters irrelevant messages, but still receives all messages from assigned partitions.
- Resource Impact:
- High network usage, as all messages are consumed, even if irrelevant to the consumer.
- CPU overhead, as consumers spend time discarding unwanted messages, slowing down relevant processing. - Limitations: Consumers waste resources filtering irrelevant messages, leading to inefficiencies and slower processing.
Dynamic Consumer Groups (Recommended Approach)
- How it Works: Separate consumer groups handle different parts of the workload (e.g., group-1 processes all but cleanup-nodes, while group-2 processes cleanup-nodes). Each group independently consumes and filters messages for its scope.
- Advantages:
- Separation of Concerns: Each group focuses on specific tasks without competing for resources within the same group.
- Efficiency: No redundant resource usage within a group — messages are processed only by the relevant group.
- Scalability: Each group can scale independently based on workload demands (e.g., adding consumers to group-2 for cleanup tasks without impacting group-1).
Key Difference
Dynamic consumer groups distribute processing logically, improving efficiency, scalability, and resource utilization, unlike the single-group approach that wastes bandwidth and CPU cycles on filtering irrelevant messages.
Additional Considerations:
- Message Ordering: If strict message ordering is required for all message types, using separate consumer groups might not be suitable, as each group processes messages independently.
Final Thoughts
While separating tasks into different topics is ideal, constraints such as lack of producer control may force you to work within a single topic. This blog demonstrates how dynamic consumer group configuration can help you achieve task-level separation without compromising flexibility or scalability.