4747
4848import java .io .IOException ;
4949import java .util .LinkedHashMap ;
50+ import java .util .function .Supplier ;
5051
5152/** RocksDB compaction filter utils for state with TTL. */
5253public class RocksDbTtlCompactFiltersManager {
@@ -160,15 +161,27 @@ public void configCompactFilter(
160161
161162 private static class ListElementFilterFactory <T >
162163 implements FlinkCompactionFilter .ListElementFilterFactory {
163- private final TypeSerializer <T > serializer ;
164+ // {@See #createListElementFilter}.
165+ private final ThreadLocalSerializerProvider <T > threadLocalSerializer ;
164166
165167 private ListElementFilterFactory (TypeSerializer <T > serializer ) {
166- this .serializer = serializer ;
168+ ClassLoader contextClassLoader = null ;
169+ try {
170+ contextClassLoader = Thread .currentThread ().getContextClassLoader ();
171+ } catch (Throwable e ) {
172+ LOG .info ("Cannot get context classloader for list state's compaction filter." , e );
173+ }
174+ threadLocalSerializer =
175+ new ThreadLocalSerializerProvider <>(serializer , contextClassLoader );
167176 }
168177
169178 @ Override
170179 public FlinkCompactionFilter .ListElementFilter createListElementFilter () {
171- return new ListElementFilter <>(serializer );
180+ // This method will be invoked by native code multiple times when creating compaction
181+ // filter. And the created filter will be shared by multiple background threads.
182+ // Make sure the serializer is thread-local and has classloader set for each thread
183+ // correctly and individually.
184+ return new ListElementFilter <>(threadLocalSerializer );
172185 }
173186 }
174187
@@ -186,21 +199,22 @@ public long currentTimestamp() {
186199 }
187200
188201 private static class ListElementFilter <T > implements FlinkCompactionFilter .ListElementFilter {
189- private final TypeSerializer <T > serializer ;
190- private DataInputDeserializer input ;
202+ private final ThreadLocalSerializerProvider <T > threadLocalSerializer ;
203+ private final DataInputDeserializer input ;
191204
192- private ListElementFilter (TypeSerializer <T > serializer ) {
193- this .serializer = serializer ;
205+ private ListElementFilter (ThreadLocalSerializerProvider <T > serializer ) {
206+ this .threadLocalSerializer = serializer ;
194207 this .input = new DataInputDeserializer ();
195208 }
196209
197210 @ Override
198211 public int nextUnexpiredOffset (byte [] bytes , long ttl , long currentTimestamp ) {
199212 input .setBuffer (bytes );
200213 int lastElementOffset = 0 ;
214+ TypeSerializer <T > serializer = threadLocalSerializer .get ();
201215 while (input .available () > 0 ) {
202216 try {
203- long timestamp = nextElementLastAccessTimestamp ();
217+ long timestamp = nextElementLastAccessTimestamp (serializer );
204218 if (!TtlUtils .expired (timestamp , ttl , currentTimestamp )) {
205219 break ;
206220 }
@@ -213,7 +227,8 @@ public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
213227 return lastElementOffset ;
214228 }
215229
216- private long nextElementLastAccessTimestamp () throws IOException {
230+ private long nextElementLastAccessTimestamp (TypeSerializer <T > serializer )
231+ throws IOException {
217232 TtlValue <?> ttlValue = (TtlValue <?>) serializer .deserialize (input );
218233 if (input .available () > 0 ) {
219234 input .skipBytesToRead (1 );
@@ -222,6 +237,37 @@ private long nextElementLastAccessTimestamp() throws IOException {
222237 }
223238 }
224239
240+ private static class ThreadLocalSerializerProvider <T > implements Supplier <TypeSerializer <T >> {
241+ // Multiple background threads may share the same filter instance, so we need to make sure
242+ // the serializer is thread-local, and every thread has its own instance with classloader.
243+ private final ThreadLocal <TypeSerializer <T >> threadLocalSerializer ;
244+
245+ public ThreadLocalSerializerProvider (
246+ TypeSerializer <T > serializer , ClassLoader classLoader ) {
247+ this .threadLocalSerializer =
248+ ThreadLocal .withInitial (
249+ () -> {
250+ setClassloaderIfNeeded (classLoader );
251+ return serializer .duplicate ();
252+ });
253+ }
254+
255+ private void setClassloaderIfNeeded (ClassLoader classLoader ) {
256+ // The classloader that should be set to the current thread when deserializing.
257+ // The reason why we should set classloader is that the serializer may be Kryo
258+ // serializer which needs user classloader to load user classes.
259+ // See FLINK-16686 for more details.
260+ if (classLoader != null ) {
261+ Thread .currentThread ().setContextClassLoader (classLoader );
262+ }
263+ }
264+
265+ @ Override
266+ public TypeSerializer <T > get () {
267+ return threadLocalSerializer .get ();
268+ }
269+ }
270+
225271 public void disposeAndClearRegisteredCompactionFactories () {
226272 for (FlinkCompactionFilterFactory factory : compactionFilterFactories .values ()) {
227273 IOUtils .closeQuietly (factory );
0 commit comments