diff --git a/core/src/main/java/com/alibaba/nacos/core/notify/DefaultPublisher.java b/core/src/main/java/com/alibaba/nacos/core/notify/DefaultPublisher.java index 61283c0fb..70e28ceda 100644 --- a/core/src/main/java/com/alibaba/nacos/core/notify/DefaultPublisher.java +++ b/core/src/main/java/com/alibaba/nacos/core/notify/DefaultPublisher.java @@ -19,6 +19,7 @@ package com.alibaba.nacos.core.notify; import com.alibaba.nacos.common.utils.ConcurrentHashSet; +import com.alibaba.nacos.common.utils.LoggerUtils; import com.alibaba.nacos.common.utils.ThreadUtils; import com.alibaba.nacos.core.notify.listener.SmartSubscribe; import com.alibaba.nacos.core.notify.listener.Subscribe; @@ -57,6 +58,8 @@ public class DefaultPublisher extends Thread implements EventPublisher { @Override public void init(Class type, int bufferSize) { + setDaemon(true); + setName("nacos.publisher-" + type.getName()); this.eventType = type; this.queueMaxSize = bufferSize; this.queue = new ArrayBlockingQueue<>(bufferSize); @@ -184,15 +187,18 @@ public class DefaultPublisher extends Thread implements EventPublisher { @Override public void notifySubscriber(final Subscribe subscribe, final Event event) { + + final String sourceName = event.getClass().getName(); + final String targetName = subscribe.subscribeType().getName(); + + if (!Objects.equals(sourceName, targetName)) { + return; + } + LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscribe); - final Runnable job = () -> { - try { - subscribe.onEvent(event); - } - catch (ClassCastException ignore) { } - }; + final Runnable job = () -> subscribe.onEvent(event); final Executor executor = subscribe.executor(); if (Objects.nonNull(executor)) { executor.execute(job); diff --git a/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java b/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java index e1a324399..7c2ce1f6d 100644 --- a/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java +++ b/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java @@ -52,61 +52,10 @@ public class NotifyCenter { private static BiFunction, Integer, EventPublisher> BUILD_FACTORY = null; - private final EventPublisher sharePublisher = new EventPublisher() { - - private EventPublisher target; - - @Override - public void init(Class type, int bufferSize) { - target = BUILD_FACTORY.apply(type, bufferSize); - } - - @Override - public long currentEventSize() { - return target.currentEventSize(); - } - - @Override - public void addSubscribe(Subscribe subscribe) { - target.addSubscribe(subscribe); - } - - @Override - public void unSubscribe(Subscribe subscribe) { - target.unSubscribe(subscribe); - } - - @Override - public boolean publish(Event event) { - return target.publish(event); - } - - @Override - public void notifySubscriber(Subscribe subscribe, Event event) { - // Is to handle a SlowEvent, because the event shares an event - // queue and requires additional filtering logic - if (filter(subscribe, event)) { - return; - } - target.notifySubscriber(subscribe, event); - } - - @Override - public void shutdown() { - target.shutdown(); - } - - private boolean filter(final Subscribe subscribe, final Event event) { - final String sourceName = event.getClass().getName(); - final String targetName = subscribe.subscribeType().getName(); - LoggerUtils.printIfDebugEnabled(LOGGER, "source event name : {}, target event name : {}", sourceName, targetName); - return !Objects.equals(sourceName, targetName); - } - - }; - private static final NotifyCenter INSTANCE = new NotifyCenter(); + private EventPublisher sharePublisher; + /** * Publisher management container */ @@ -145,7 +94,7 @@ public class NotifyCenter { }; } - INSTANCE.sharePublisher.init(SlowEvent.class, SHATE_BUFFER_SIZE); + INSTANCE.sharePublisher = BUILD_FACTORY.apply(SlowEvent.class, SHATE_BUFFER_SIZE); ShutdownUtils.addShutdownHook(new Thread(() -> { shutdown(); }));