How to Drop Messages in HiveMQ Programmatically

How to Drop Messages in HiveMQ Programmatically

In certain scenarios, you may want to filter and drop MQTT messages before they are delivered to subscribers or forwarded to external systems such as Kafka. HiveMQ provides extension points to programmatically implement this logic.

This article demonstrates two approaches:

 Instructions


1. Dropping Messages Before MQTT Delivery

The HiveMQ Extension SDK allows you to intercept outbound MQTT publishes with fine-grained control. This is useful if you want to deliver messages only to specific subscribers based on dynamic criteria.

Use Case

Only deliver messages to a limited group of MQTT clients (e.g., internal processing services), and suppress them for others based on:

  • Client ID

  • Topic name

  • Payload content

Implementation: Outbound Publish Interceptor

public class PreventPublishOutboundInterceptor implements PublishOutboundInterceptor { private static final Set<String> restrictedClientIds = Set.of("subscriber-client-1", "subscriber-client-2"); private static final Set<String> restrictedTopics = Set.of("sensor/data", "device/metrics"); @Override public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) { final Async<PublishOutboundOutput> async = output.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE); Services.extensionExecutorService().submit(() -> { try { final String clientId = input.getClientInformation().getClientId(); final String topic = input.getPublishPacket().getTopic(); final Optional<ByteBuffer> payloadBuffer = output.getPublishPacket().getPayload(); if (restrictedClientIds.contains(clientId) && restrictedTopics.contains(topic)) { if (payloadBuffer.isEmpty()) { output.preventPublishDelivery(); } else { final String payloadString = StandardCharsets.UTF_8.decode(payloadBuffer.get()).toString(); if (!payloadString.contains("importantFieldName")) { output.preventPublishDelivery(); } } } } finally { async.resume(); } }); } }

Behavior

  • Messages to non-restricted clients and topics are delivered as normal.

  • Messages to restricted clients and topics are dropped if the payload does not contain "importantFieldName".


2. Dropping Messages Before Kafka Forwarding

The HiveMQ Enterprise Extension for Kafka allows custom transformation logic before publishing MQTT messages to Kafka.

Use Case

Drop messages before they are converted into Kafka records if they do not meet certain criteria.

Implementation: Custom MQTT-to-Kafka Transformer

public class DroppingTransformer implements MqttToKafkaTransformer { private static final Set<String> restrictedClientIds = Set.of("subscriber-client-1", "subscriber-client-2"); private static final Set<String> restrictedTopics = Set.of("sensor/data", "device/metrics"); @Override public void transformMqttToKafka( final @NotNull MqttToKafkaInput input, final @NotNull MqttToKafkaOutput output) { final String clientId = input.getMqttInformation().getClientInformation().getClientId(); final String topic = input.getPublishPacket().getTopic(); final Optional<ByteBuffer> payloadBuffer = input.getPublishPacket().getPayload(); if (restrictedClientIds.contains(clientId) && restrictedTopics.contains(topic)) { if (payloadBuffer.isEmpty()) { output.setKafkaRecords(List.of()); return; } final String payloadString = StandardCharsets.UTF_8.decode(payloadBuffer.get()).toString(); if (!payloadString.contains("importantFieldName")) { output.setKafkaRecords(List.of()); return; } } final KafkaRecord kafkaRecord = output.newKafkaRecordBuilder() .topic(topic) .key(input.getPublishPacket().getResponseTopic().orElse("unknown")) .value(payloadBuffer.orElse(ByteBuffer.allocate(0))) .build(); output.setKafkaRecords(List.of(kafkaRecord)); } }

Behavior

  • Kafka records are only created if the message meets all filtering criteria.

  • All other messages are silently ignored at the transformation level.


Summary

Use Case

Extension Type

Filtering Criteria

Result

Use Case

Extension Type

Filtering Criteria

Result

MQTT Client Filtering

Outbound Publish Interceptor

Client ID, Topic, Payload

Prevent message delivery

Kafka Forwarding Filtering

MQTT-to-Kafka Transformer

Client ID, Topic, Payload

Prevent record creation

These techniques give you precise control over message flow within your HiveMQ broker and its integrations. For production deployments, be sure to test the logic thoroughly and handle edge cases like missing payloads or malformed content.


 Related articles