Usage of ConfluentKafka Output Connector with Event Objects

The following example demonstrates the delivery of events to the opensearch output connector

[1]:
%%bash
docker compose -f ../../../../../examples/compose/docker-compose.yml down -v
docker compose -f ../../../../../examples/compose/docker-compose.yml up -d kafka

 Container kafka  Stopping
 Container kafka  Stopped
 Container kafka  Removing
 Container kafka  Removed
 Network compose_kafka  Removing
 Network compose_kafka  Removed
 Network compose_kafka  Creating
 Network compose_kafka  Created
 Container kafka  Creating
 Container kafka  Created
 Container kafka  Starting
 Container kafka  Started
[3]:
from typing import Iterator
from logprep.factory import Factory
from logprep.util.time import TimeParser
from logprep.ng.connector.confluent_kafka.output import ConfluentKafkaOutput
from logprep.ng.event.log_event import LogEvent
from logprep.ng.event.event_state import EventStateType
import logging
import sys

# Configure logging
logging.basicConfig(
    level=logging.DEBUG,
    stream=sys.stdout
)


config = {
    "type": "ng_confluentkafka_output",
    "topic": "consumer",
    "flush_timeout": 300,
    "send_timeout": 0,
    "kafka_config": {
        "bootstrap.servers": "127.0.0.1:9092"
    }
}

confluent_kafka_output: ConfluentKafkaOutput = Factory.create({"my_kafka": config})


events: Iterator = [
    LogEvent({"message": f"Event {i}", "@timestamp": str(TimeParser.now())}, original=b"", state=EventStateType.PROCESSED)
    for i in range(10)
]

# store events in the Opensearch output
for event in events:
    confluent_kafka_output.store(event)

# event goes to state STORED_IN_OUTPUT first and then after callback from librdkafka it will be changed to DELIVERED
# assert events[-1].state == EventStateType.STORED_IN_OUTPUT

# Flush the output to ensure all events are sent
confluent_kafka_output.shut_down()

DEBUG:KafkaOutput:Produced message {'message': 'Event 0', '@timestamp': '2025-07-28 12:24:33.325385+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 1', '@timestamp': '2025-07-28 12:24:33.325411+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 2', '@timestamp': '2025-07-28 12:24:33.325420+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 3', '@timestamp': '2025-07-28 12:24:33.325426+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 4', '@timestamp': '2025-07-28 12:24:33.325432+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 5', '@timestamp': '2025-07-28 12:24:33.325438+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 6', '@timestamp': '2025-07-28 12:24:33.325444+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 7', '@timestamp': '2025-07-28 12:24:33.325450+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 8', '@timestamp': '2025-07-28 12:24:33.325456+00:00'} to topic consumer
DEBUG:KafkaOutput:Produced message {'message': 'Event 9', '@timestamp': '2025-07-28 12:24:33.325462+00:00'} to topic consumer
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 0, offset 10
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 1, offset 0
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 1, offset 1
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 1, offset 2
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 2, offset 0
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 2, offset 1
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 2, offset 2
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 3, offset 0
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 3, offset 1
DEBUG:KafkaOutput:Message delivered to 'consumer' partition 3, offset 2
INFO:KafkaOutput:Producer flushed successfully. 0 messages remaining.
[4]:
print(f"Events total: {len(events)}")
print(f"Events in delivered state: {len([e for e in events if e.state == EventStateType.DELIVERED])}")

print(f"Events not delivered: {[event for event in events if event.state != EventStateType.DELIVERED]}")


# Verify that all events are delivered
for event in events:
    assert event.state == EventStateType.DELIVERED, f"Event {event.data['message']} not delivered | State: {event.state}"
Events total: 10
Events in delivered state: 10
Events not delivered: []

The following case demonstrates error handling in the confluent_kafka output. We try to send to a non existing topic. This should provoke an error for that unknown topic or partition.

[5]:
events: Iterator = [
    LogEvent({"message": f"Event {i}", "@timestamp": str(TimeParser.now())}, original=b"", state=EventStateType.PROCESSED)
    for i in range(10)
]

# store events in the Confluent Kafka output
for event in events:
    confluent_kafka_output.store_custom(event, "non_existent_topic")

# Flush the output to ensure all events are sent
confluent_kafka_output.flush()
# Verify that all events are delivered
for event in events:
    assert event.state == EventStateType.FAILED
    assert len(event.errors) == 1
    print (f"Event {event.data['message']} failed with error: {event.errors[0]}")
DEBUG:KafkaOutput:Produced message {'message': 'Event 0', '@timestamp': '2025-07-28 12:24:44.018526+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 1', '@timestamp': '2025-07-28 12:24:44.018555+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 2', '@timestamp': '2025-07-28 12:24:44.018564+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 3', '@timestamp': '2025-07-28 12:24:44.018571+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 4', '@timestamp': '2025-07-28 12:24:44.018577+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 5', '@timestamp': '2025-07-28 12:24:44.018584+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 6', '@timestamp': '2025-07-28 12:24:44.018590+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 7', '@timestamp': '2025-07-28 12:24:44.018596+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 8', '@timestamp': '2025-07-28 12:24:44.018602+00:00'} to topic non_existent_topic
DEBUG:KafkaOutput:Produced message {'message': 'Event 9', '@timestamp': '2025-07-28 12:24:44.018608+00:00'} to topic non_existent_topic
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
INFO:KafkaOutput:Producer flushed successfully. 0 messages remaining.
Event Event 0 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 1 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 2 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 3 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 4 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 5 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 6 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 7 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 8 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event Event 9 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}