How can I delete all the messages from a Kafka topic?

0 votes

Announcement! Career Guide 2019 is out now. Explore careers to become a Big Data Developer or Architect!

I created a Kafka topic and pushed large messages to that kafka topic. Now I am getting error saying:

kafka.common.InvalidMessageSizeException: invalid message size

How I want to purge the topic so, that I can set the fetch.size for the topic ?

Jul 9, 2018 in Apache Kafka by coldcode
• 2,020 points
38,507 views

12 answers to this question.

+1 vote

To purge the Kafka topic, you need to change the retention time of that topic. The default retention time is 168 hours, i.e. 7 days. So, you have to change the retention time to 1 second, after which the messages from the topic will be deleted. Then, you can go ahead and change the retention time of the topic back to 168 hours.

To change the retention of the topic:

kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --config retention.ms=1000
answered Jul 9, 2018 by Shubham
• 13,300 points
0 votes

To delete manually:

  1. Shutdown the cluster
  2. Clean kafka log dir (specified by the log.dir attribute in kafka config file ) as well the zookeeper data
  3. Restart the cluster
answered Nov 27, 2018 by Kailash
Not a good way

How we know that which topic consumed which not and only consumed should be deleted ?

is there any way to delete topics after consume?
0 votes

Follow these steps:

  1. Stop kafka
  2. Clean kafka log specific to partition, kafka stores its log file in a format of "logDir/topic-partition" so for a topic named "MyTopic" the log for partition id 0 will be stored in /tmp/kafka-logs/MyTopic-0 where /tmp/kafka-logs is specified by the log.dirattribute
  3. Restart kafka
answered Nov 27, 2018 by Kesh
+1 vote

Add one line to server.properties file under config folder:

delete.topic.enable=true

then, run this command:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
answered Nov 27, 2018 by Lavanya
0 votes

This script will empty a topic:

#!/bin/bash
echo "Enter name of topic to empty:"
read topicName
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add config retention.ms=1000
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms
answered Nov 27, 2018 by Ninja
0 votes

Try this:

  • stop kafka
  • delete the files rm -rf /tmp/kafka-logs/newTopic-*
answered Nov 27, 2018 by Alison
0 votes

One of the possible reason for this error could be your consumer's fetch size is smaller than the largest message it is trying to consume. Try increasing your fetch.size to make it larger than the max.message.size on your producers

answered Nov 27, 2018 by Bilal
0 votes

Adding to @Bilal's answer, there could be another possible reason for this error, the log on the server could be corrupted. To verify this, try running the
DumpLogSegments tool on the log segments for that topic/partition and see if it reports a problem

answered Nov 27, 2018 by Haider
0 votes
sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost:2181
answered Dec 10, 2018 by Bhargav
0 votes

What I did was change the per-topic run-tine retention:

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1
answered Dec 10, 2018 by Lalla

Follow these steps:

  1. Stop kafka
  2. Clean kafka log specific to partition, kafka stores its log file in a format of "logDir/topic-partition" so for a topic named "MyTopic" the log for partition id 0 will be stored in /tmp/kafka-logs/MyTopic-0 where /tmp/kafka-logs is specified by the log.dirattribute
  3. Restart kafka
0 votes
I dont know.
answered Jan 30 by anonymous
0 votes

As of kafka 2.3.0 version, there is an alternate way to soft deletion of Kafka (old approach are deprecated ).

Update retention.ms to 1 sec (1000ms) then set it again after a min, to default setting i.e 7 days (168 hours, 604,800,000 in ms )

Soft deletion:- (rentention.ms=1000) (using kafka-configs.sh).

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=1000
Completed Updating config for entity: topic 'kafka_topic3p3r'.


Setting to default:- 7 days (168 hours , retention.ms= 604800000)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=604800000
answered Sep 8 by Brajkishore
• 220 points

Related Questions In Apache Kafka

0 votes
1 answer

How to delete a topic in Kafka 0.8.1.1?

Deleting topic isn't always working in 0.8.1.1 Deletion ...READ MORE

answered Sep 4, 2018 in Apache Kafka by nitinrawat895
• 10,690 points
162 views
0 votes
0 answers

How to check pending messages in KAFKA topic?

Let say we have first_topic which has kafka_server_brokertopicmetrics_messagesin_total{instance="localhost:1120",job="kafka",topic="first_topic"}  ...READ MORE

May 2 in Apache Kafka by anonymous
593 views
0 votes
1 answer

Is there any change in consumer offsets if a new partition(s) is added to a Kafka topic?

Yes, it stays the same. An offset is ...READ MORE

answered Jul 9, 2018 in Apache Kafka by nitinrawat895
• 10,690 points
319 views
0 votes
1 answer

Not able to fetch messages from Kafka cluster

You need to add the hostname in ...READ MORE

answered Jul 9, 2018 in Apache Kafka by Shubham
• 13,300 points
565 views
0 votes
0 answers

Want to delete consumed topic programatically

directly delete kafka log folder and zeekeeper ...READ MORE

Jun 4 in Apache Kafka by sarvesh
99 views
0 votes
10 answers

Writing the Kafka consumer output to a file

System.out.println(String.valueOf(output.offset()) + ": " + new String(bytes, ...READ MORE

answered Dec 7, 2018 in Apache Kafka by Harsh
7,091 views
0 votes
1 answer

Is Kafka and Zookeeper are required in a Big Data Cluster?

Apache Kafka is one of the components ...READ MORE

answered Mar 22, 2018 in Big Data Hadoop by nitinrawat895
• 10,690 points
410 views
0 votes
1 answer

Getting error while connecting zookeeper in Kafka - Spark Streaming integration

I guess you need provide this kafka.bootstrap.servers ...READ MORE

answered May 24, 2018 in Apache Spark by Shubham
• 13,300 points
640 views
0 votes
1 answer

How to reset the offset of messages consumed from Kafka?

The reset option only prints the result ...READ MORE

answered Jul 9, 2018 in Apache Kafka by Shubham
• 13,300 points
2,493 views
0 votes
1 answer

Re-balancing error while reading messages from Kafka.

rebalance.backoff.ms defines the time for which Kafka ...READ MORE

answered Jul 9, 2018 in Apache Kafka by Shubham
• 13,300 points
516 views