Usage of ConfluentKafka Output Connector with Event Objects
The following example demonstrates the delivery of events to the opensearch output connector
[1]:
%%bash
docker compose -f ../../../../../examples/compose/docker-compose.yml down -v
docker compose -f ../../../../../examples/compose/docker-compose.yml up -d kafka
Container kafka Stopping
Container kafka Stopped
Container kafka Removing
Container kafka Removed
Network compose_kafka Removing
Network compose_kafka Removed
Network compose_kafka Creating
Network compose_kafka Created
Container kafka Creating
Container kafka Created
Container kafka Starting
Container kafka Started
[3]:
from typing import Iterator
from logprep.factory import Factory
from logprep.util.time import TimeParser
from logprep.ng.connector.confluent_kafka.output import ConfluentKafkaOutput
from logprep.ng.event.log_event import LogEvent
from logprep.ng.event.event_state import EventStateType
import logging
import sys
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
stream=sys.stdout
)
config = {
"type": "ng_confluentkafka_output",
"topic": "consumer",
"flush_timeout": 300,
"send_timeout": 0,
"kafka_config": {
"bootstrap.servers": "127.0.0.1:9092"
}
}
confluent_kafka_output: ConfluentKafkaOutput = Factory.create({"my_kafka": config})
events: Iterator = [
LogEvent({"message": f"Event {i}", "@timestamp": str(TimeParser.now())}, original=b"", state=EventStateType.PROCESSED)
for i in range(10)
]
# store events in the Opensearch output
for event in events:
confluent_kafka_output.store(event)
# event goes to state STORED_IN_OUTPUT first and then after callback from librdkafka it will be changed to DELIVERED
# assert events[-1].state == EventStateType.STORED_IN_OUTPUT
# Flush the output to ensure all events are sent
confluent_kafka_output.shut_down()
DEBUG:KafkaOutput:Produced message {'message': 'Event 0', '@timestamp': '2025-07-28 12:24:33.325385+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 1', '@timestamp': '2025-07-28 12:24:33.325411+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 2', '@timestamp': '2025-07-28 12:24:33.325420+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 3', '@timestamp': '2025-07-28 12:24:33.325426+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 4', '@timestamp': '2025-07-28 12:24:33.325432+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 5', '@timestamp': '2025-07-28 12:24:33.325438+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 6', '@timestamp': '2025-07-28 12:24:33.325444+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 7', '@timestamp': '2025-07-28 12:24:33.325450+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 8', '@timestamp': '2025-07-28 12:24:33.325456+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 9', '@timestamp': '2025-07-28 12:24:33.325462+00:00'} to topic consumer
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 0, offset 10
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 1, offset 0
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 1, offset 1
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 1, offset 2
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 2, offset 0
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 2, offset 1
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 2, offset 2
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 3, offset 0
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 3, offset 1
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 3, offset 2
INFO:KafkaOutput:Producer flushed successfully. 0 messages remaining.
[4]:
print(f"Events total: {len(events)}")
print(f"Events in delivered state: {len([e for e in events if e.state == EventStateType.DELIVERED])}")
print(f"Events not delivered: {[event for event in events if event.state != EventStateType.DELIVERED]}")
# Verify that all events are delivered
for event in events:
assert event.state == EventStateType.DELIVERED, f"Event {event.data['message']} not delivered | State: {event.state}"
Events total: 10
Events in delivered state: 10
Events not delivered: []
The following case demonstrates error handling in the confluent_kafka output. We try to send to a non existing topic. This should provoke an error for that unknown topic or partition.
[5]:
events: Iterator = [
LogEvent({"message": f"Event {i}", "@timestamp": str(TimeParser.now())}, original=b"", state=EventStateType.PROCESSED)
for i in range(10)
]
# store events in the Confluent Kafka output
for event in events:
confluent_kafka_output.store_custom(event, "non_existent_topic")
# Flush the output to ensure all events are sent
confluent_kafka_output.flush()
# Verify that all events are delivered
for event in events:
assert event.state == EventStateType.FAILED
assert len(event.errors) == 1
print (f"Event {event.data['message']} failed with error: {event.errors[0]}")
DEBUG:KafkaOutput:Produced message {'message': 'Event 0', '@timestamp': '2025-07-28 12:24:44.018526+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 1', '@timestamp': '2025-07-28 12:24:44.018555+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 2', '@timestamp': '2025-07-28 12:24:44.018564+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 3', '@timestamp': '2025-07-28 12:24:44.018571+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 4', '@timestamp': '2025-07-28 12:24:44.018577+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 5', '@timestamp': '2025-07-28 12:24:44.018584+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 6', '@timestamp': '2025-07-28 12:24:44.018590+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 7', '@timestamp': '2025-07-28 12:24:44.018596+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 8', '@timestamp': '2025-07-28 12:24:44.018602+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 9', '@timestamp': '2025-07-28 12:24:44.018608+00:00'} to topic non_existent_topic
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
INFO:KafkaOutput:Producer flushed successfully. 0 messages remaining.
Event Event 0 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 1 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 2 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 3 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 4 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 5 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 6 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 7 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 8 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 9 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}