Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update DispatchQueuePriority.java #1826

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 42 additions & 37 deletions TMessagesProj/src/main/java/org/telegram/DispatchQueuePriority.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,37 @@

public class DispatchQueuePriority {

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new PriorityBlockingQueue<>(10, new Comparator<Runnable>() {

@Override
public int compare(Runnable o1, Runnable o2) {
int priority1 = 1;
int priority2 = 1;
if (o1 instanceof PriorityRunnable) {
priority1 = ((PriorityRunnable) o1).priority;
}
if (o2 instanceof PriorityRunnable) {
priority2 = ((PriorityRunnable) o2).priority;
}
return priority2 - priority1;
}
})) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
CountDownLatch latch = pauseLatch;
if (latch != null) {
try {
latch.await();
} catch (InterruptedException e) {
FileLog.e(e);
}
}
}
};

private final ThreadPoolExecutor threadPoolExecutor;
private volatile CountDownLatch pauseLatch;

public DispatchQueuePriority(String threadName) {
this.threadPoolExecutor = new ThreadPoolExecutor(
1,
1,
60,
TimeUnit.SECONDS,
new PriorityBlockingQueue<>(10, new PriorityRunnableComparator())
) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
awaitPauseLatch();
}
};
}

private void awaitPauseLatch() {
if (pauseLatch != null) {
try {
pauseLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupt status
FileLog.e(e);
}
}
}

public static Runnable wrap(Runnable runnable, int priority) {
if (priority == 1) {
return runnable;
} else {
return new PriorityRunnable(priority, runnable);
}
return priority == 1 ? runnable : new PriorityRunnable(priority, runnable);
}

public void postRunnable(Runnable runnable) {
Expand All @@ -65,10 +56,9 @@ public Runnable postRunnable(Runnable runnable, int priority) {
}

public void cancelRunnable(Runnable runnable) {
if (runnable == null) {
return;
if (runnable != null) {
threadPoolExecutor.remove(runnable);
}
threadPoolExecutor.remove(runnable);
}

public void pause() {
Expand All @@ -81,7 +71,7 @@ public void resume() {
CountDownLatch latch = pauseLatch;
if (latch != null) {
latch.countDown();
pauseLatch = null;
pauseLatch = null; // Allow future pauses
}
}

Expand All @@ -99,4 +89,19 @@ public void run() {
runnable.run();
}
}

private static class PriorityRunnableComparator implements Comparator<Runnable> {
@Override
public int compare(Runnable o1, Runnable o2) {
int priority1 = getPriority(o1);
int priority2 = getPriority(o2);
return Integer.compare(priority2, priority1); // Higher priority first
}

private int getPriority(Runnable runnable) {
return (runnable instanceof PriorityRunnable)
? ((PriorityRunnable) runnable).priority
: 1; // Default priority
}
}
}