How to Drop Messages in HiveMQ Programmatically
In certain scenarios, you may want to filter and drop MQTT messages before they are delivered to subscribers or forwarded to external systems such as Kafka. HiveMQ provides extension points to programmatically implement this logic.
This article demonstrates two approaches:
Dropping messages before they reach specific MQTT clients using the Outbound Publish Interceptor.
Dropping messages before they are forwarded to Kafka using a custom MQTT-to-Kafka transformer.
Instructions
1. Dropping Messages Before MQTT Delivery
The HiveMQ Extension SDK allows you to intercept outbound MQTT publishes with fine-grained control. This is useful if you want to deliver messages only to specific subscribers based on dynamic criteria.
Use Case
Only deliver messages to a limited group of MQTT clients (e.g., internal processing services), and suppress them for others based on:
Client ID
Topic name
Payload content
Implementation: Outbound Publish Interceptor
public class PreventPublishOutboundInterceptor implements PublishOutboundInterceptor {
private static final Set<String> restrictedClientIds = Set.of("subscriber-client-1", "subscriber-client-2");
private static final Set<String> restrictedTopics = Set.of("sensor/data", "device/metrics");
@Override
public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) {
final Async<PublishOutboundOutput> async = output.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
Services.extensionExecutorService().submit(() -> {
try {
final String clientId = input.getClientInformation().getClientId();
final String topic = input.getPublishPacket().getTopic();
final Optional<ByteBuffer> payloadBuffer = output.getPublishPacket().getPayload();
if (restrictedClientIds.contains(clientId) && restrictedTopics.contains(topic)) {
if (payloadBuffer.isEmpty()) {
output.preventPublishDelivery();
} else {
final String payloadString = StandardCharsets.UTF_8.decode(payloadBuffer.get()).toString();
if (!payloadString.contains("importantFieldName")) {
output.preventPublishDelivery();
}
}
}
} finally {
async.resume();
}
});
}
}
Behavior
Messages to non-restricted clients and topics are delivered as normal.
Messages to restricted clients and topics are dropped if the payload does not contain
"importantFieldName"
.
2. Dropping Messages Before Kafka Forwarding
The HiveMQ Enterprise Extension for Kafka allows custom transformation logic before publishing MQTT messages to Kafka.
Use Case
Drop messages before they are converted into Kafka records if they do not meet certain criteria.
Implementation: Custom MQTT-to-Kafka Transformer
public class DroppingTransformer implements MqttToKafkaTransformer {
private static final Set<String> restrictedClientIds = Set.of("subscriber-client-1", "subscriber-client-2");
private static final Set<String> restrictedTopics = Set.of("sensor/data", "device/metrics");
@Override
public void transformMqttToKafka(
final @NotNull MqttToKafkaInput input,
final @NotNull MqttToKafkaOutput output) {
final String clientId = input.getMqttInformation().getClientInformation().getClientId();
final String topic = input.getPublishPacket().getTopic();
final Optional<ByteBuffer> payloadBuffer = input.getPublishPacket().getPayload();
if (restrictedClientIds.contains(clientId) && restrictedTopics.contains(topic)) {
if (payloadBuffer.isEmpty()) {
output.setKafkaRecords(List.of());
return;
}
final String payloadString = StandardCharsets.UTF_8.decode(payloadBuffer.get()).toString();
if (!payloadString.contains("importantFieldName")) {
output.setKafkaRecords(List.of());
return;
}
}
final KafkaRecord kafkaRecord = output.newKafkaRecordBuilder()
.topic(topic)
.key(input.getPublishPacket().getResponseTopic().orElse("unknown"))
.value(payloadBuffer.orElse(ByteBuffer.allocate(0)))
.build();
output.setKafkaRecords(List.of(kafkaRecord));
}
}
Behavior
Kafka records are only created if the message meets all filtering criteria.
All other messages are silently ignored at the transformation level.
Summary
Use Case | Extension Type | Filtering Criteria | Result |
---|---|---|---|
MQTT Client Filtering | Outbound Publish Interceptor | Client ID, Topic, Payload | Prevent message delivery |
Kafka Forwarding Filtering | MQTT-to-Kafka Transformer | Client ID, Topic, Payload | Prevent record creation |
These techniques give you precise control over message flow within your HiveMQ broker and its integrations. For production deployments, be sure to test the logic thoroughly and handle edge cases like missing payloads or malformed content.