Optimize ThreadPoolManager (#11206)

This commit is contained in:
kanghailin 2023-10-13 10:31:25 +08:00 committed by GitHub
parent a5c0a60a1a
commit 49f34868ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 70 deletions

View File

@ -44,8 +44,6 @@ public final class ThreadPoolManager {
private Map<String, Map<String, Set<ExecutorService>>> resourcesManager; private Map<String, Map<String, Set<ExecutorService>>> resourcesManager;
private Map<String, Object> lockers = new ConcurrentHashMap<>(8);
private static final ThreadPoolManager INSTANCE = new ThreadPoolManager(); private static final ThreadPoolManager INSTANCE = new ThreadPoolManager();
private static final AtomicBoolean CLOSED = new AtomicBoolean(false); private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
@ -78,20 +76,13 @@ public final class ThreadPoolManager {
* @param executor {@link ExecutorService} * @param executor {@link ExecutorService}
*/ */
public void register(String namespace, String group, ExecutorService executor) { public void register(String namespace, String group, ExecutorService executor) {
if (!resourcesManager.containsKey(namespace)) { resourcesManager.compute(namespace, (namespaceKey, map) -> {
lockers.putIfAbsent(namespace, new Object());
}
final Object monitor = lockers.get(namespace);
synchronized (monitor) {
Map<String, Set<ExecutorService>> map = resourcesManager.get(namespace);
if (map == null) { if (map == null) {
map = new HashMap<>(8); map = new HashMap<>(8);
map.computeIfAbsent(group, key -> new HashSet<>()).add(executor);
resourcesManager.put(namespace, map);
return;
} }
map.computeIfAbsent(group, key -> new HashSet<>()).add(executor); map.computeIfAbsent(group, groupKey -> new HashSet<>()).add(executor);
} return map;
});
} }
/** /**
@ -101,12 +92,10 @@ public final class ThreadPoolManager {
* @param group group name * @param group group name
*/ */
public void deregister(String namespace, String group) { public void deregister(String namespace, String group) {
if (resourcesManager.containsKey(namespace)) { resourcesManager.computeIfPresent(namespace, (key, map) -> {
final Object monitor = lockers.get(namespace); map.remove(group);
synchronized (monitor) { return map;
resourcesManager.get(namespace).remove(group); });
}
}
} }
/** /**
@ -117,15 +106,13 @@ public final class ThreadPoolManager {
* @param executor {@link ExecutorService} * @param executor {@link ExecutorService}
*/ */
public void deregister(String namespace, String group, ExecutorService executor) { public void deregister(String namespace, String group, ExecutorService executor) {
if (resourcesManager.containsKey(namespace)) { resourcesManager.computeIfPresent(namespace, (namespaceKey, map) -> {
final Object monitor = lockers.get(namespace); map.computeIfPresent(group, (groupKey, set) -> {
synchronized (monitor) { set.remove(executor);
final Map<String, Set<ExecutorService>> subResourceMap = resourcesManager.get(namespace); return set;
if (subResourceMap.containsKey(group)) { });
subResourceMap.get(group).remove(executor); return map;
} });
}
}
} }
/** /**
@ -134,22 +121,15 @@ public final class ThreadPoolManager {
* @param namespace namespace * @param namespace namespace
*/ */
public void destroy(final String namespace) { public void destroy(final String namespace) {
final Object monitor = lockers.get(namespace); Map<String, Set<ExecutorService>> map = resourcesManager.remove(namespace);
if (monitor == null) { if (map != null) {
return; for (Set<ExecutorService> set : map.values()) {
} for (ExecutorService executor : set) {
synchronized (monitor) {
Map<String, Set<ExecutorService>> subResource = resourcesManager.get(namespace);
if (subResource == null) {
return;
}
for (Map.Entry<String, Set<ExecutorService>> entry : subResource.entrySet()) {
for (ExecutorService executor : entry.getValue()) {
ThreadUtils.shutdownThreadPool(executor); ThreadUtils.shutdownThreadPool(executor);
} }
set.clear();
} }
resourcesManager.get(namespace).clear(); map.clear();
resourcesManager.remove(namespace);
} }
} }
@ -160,21 +140,16 @@ public final class ThreadPoolManager {
* @param group group * @param group group
*/ */
public void destroy(final String namespace, final String group) { public void destroy(final String namespace, final String group) {
final Object monitor = lockers.get(namespace); resourcesManager.computeIfPresent(namespace, (namespaceKey, map) -> {
if (monitor == null) { map.computeIfPresent(group, (groupKey, set) -> {
return; for (ExecutorService executor : set) {
} ThreadUtils.shutdownThreadPool(executor);
synchronized (monitor) { }
Map<String, Set<ExecutorService>> subResource = resourcesManager.get(namespace); set.clear();
if (subResource == null) { return null;
return; });
} return map;
Set<ExecutorService> waitDestroy = subResource.get(group); });
for (ExecutorService executor : waitDestroy) {
ThreadUtils.shutdownThreadPool(executor);
}
resourcesManager.get(namespace).remove(group);
}
} }
/** /**
@ -194,9 +169,4 @@ public final class ThreadPoolManager {
public Map<String, Map<String, Set<ExecutorService>>> getResourcesManager() { public Map<String, Map<String, Set<ExecutorService>>> getResourcesManager() {
return resourcesManager; return resourcesManager;
} }
@JustForTest
public Map<String, Object> getLockers() {
return lockers;
}
} }

View File

@ -33,44 +33,35 @@ public class ThreadPoolManagerTest {
manager.register(namespace, group, executor); manager.register(namespace, group, executor);
Assert.assertTrue(manager.getResourcesManager().containsKey(namespace)); Assert.assertTrue(manager.getResourcesManager().containsKey(namespace));
Assert.assertEquals(1, manager.getResourcesManager().get(namespace).get(group).size()); Assert.assertEquals(1, manager.getResourcesManager().get(namespace).get(group).size());
Assert.assertTrue(manager.getLockers().containsKey(namespace));
manager.register(namespace, group, ExecutorFactory.newSingleExecutorService()); manager.register(namespace, group, ExecutorFactory.newSingleExecutorService());
Assert.assertEquals(2, manager.getResourcesManager().get(namespace).get(group).size()); Assert.assertEquals(2, manager.getResourcesManager().get(namespace).get(group).size());
Assert.assertTrue(manager.getLockers().containsKey(namespace));
manager.destroy(namespace, group); manager.destroy(namespace, group);
Assert.assertFalse(manager.getResourcesManager().get(namespace).containsKey(group)); Assert.assertFalse(manager.getResourcesManager().get(namespace).containsKey(group));
Assert.assertTrue(manager.getLockers().containsKey(namespace));
manager.register(namespace, group, executor); manager.register(namespace, group, executor);
manager.destroy(namespace); manager.destroy(namespace);
Assert.assertFalse(manager.getResourcesManager().containsKey(namespace)); Assert.assertFalse(manager.getResourcesManager().containsKey(namespace));
Assert.assertTrue(manager.getLockers().containsKey(namespace));
manager.register(namespace, group, executor); manager.register(namespace, group, executor);
manager.deregister(namespace, group, ExecutorFactory.newSingleExecutorService()); manager.deregister(namespace, group, ExecutorFactory.newSingleExecutorService());
Assert.assertEquals(1, manager.getResourcesManager().get(namespace).get(group).size()); Assert.assertEquals(1, manager.getResourcesManager().get(namespace).get(group).size());
Assert.assertTrue(manager.getLockers().containsKey(namespace));
manager.deregister(namespace, group, executor); manager.deregister(namespace, group, executor);
Assert.assertEquals(0, manager.getResourcesManager().get(namespace).get(group).size()); Assert.assertEquals(0, manager.getResourcesManager().get(namespace).get(group).size());
Assert.assertTrue(manager.getLockers().containsKey(namespace));
manager.register(namespace, group, executor); manager.register(namespace, group, executor);
manager.deregister(namespace, group); manager.deregister(namespace, group);
Assert.assertFalse(manager.getResourcesManager().get(namespace).containsKey(group)); Assert.assertFalse(manager.getResourcesManager().get(namespace).containsKey(group));
Assert.assertTrue(manager.getLockers().containsKey(namespace));
manager.register(namespace, group, executor); manager.register(namespace, group, executor);
manager.register(namespace, group, ExecutorFactory.newSingleExecutorService()); manager.register(namespace, group, ExecutorFactory.newSingleExecutorService());
ThreadPoolManager.shutdown(); ThreadPoolManager.shutdown();
Assert.assertFalse(manager.getResourcesManager().containsKey(namespace)); Assert.assertFalse(manager.getResourcesManager().containsKey(namespace));
Assert.assertTrue(manager.getLockers().containsKey(namespace));
manager.destroy(namespace); manager.destroy(namespace);
manager.destroy(namespace, group); manager.destroy(namespace, group);
Assert.assertFalse(manager.getResourcesManager().containsKey(namespace)); Assert.assertFalse(manager.getResourcesManager().containsKey(namespace));
Assert.assertTrue(manager.getLockers().containsKey(namespace));
} }
} }