There are some files with the suffix ".compact" in the sink folder of Spark checkpoint location.
The files compact all the history EsSinkStatus logs since the first index action.

I found the reason is that org.elasticsearch.spark.sql.streaming.EsSinkMetadataLog#compactLogs() returns all the input logs rather than drops the expired ones.
override def compactLogs(logs: Seq[EsSinkStatus]): Seq[EsSinkStatus] = logs
I think it's better to add a code snippet which filter the logs and drop the expired ones.