fix: fix publish event may casue class-cast-exception

This commit is contained in:
chuntaojun 2020-05-25 14:58:29 +08:00
parent ad2b4fb969
commit 6c5f7cfeca
2 changed files with 15 additions and 60 deletions

View File

@ -19,6 +19,7 @@
package com.alibaba.nacos.core.notify; package com.alibaba.nacos.core.notify;
import com.alibaba.nacos.common.utils.ConcurrentHashSet; import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.ThreadUtils; import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.core.notify.listener.SmartSubscribe; import com.alibaba.nacos.core.notify.listener.SmartSubscribe;
import com.alibaba.nacos.core.notify.listener.Subscribe; import com.alibaba.nacos.core.notify.listener.Subscribe;
@ -57,6 +58,8 @@ public class DefaultPublisher extends Thread implements EventPublisher {
@Override @Override
public void init(Class<? extends Event> type, int bufferSize) { public void init(Class<? extends Event> type, int bufferSize) {
setDaemon(true);
setName("nacos.publisher-" + type.getName());
this.eventType = type; this.eventType = type;
this.queueMaxSize = bufferSize; this.queueMaxSize = bufferSize;
this.queue = new ArrayBlockingQueue<>(bufferSize); this.queue = new ArrayBlockingQueue<>(bufferSize);
@ -184,15 +187,18 @@ public class DefaultPublisher extends Thread implements EventPublisher {
@Override @Override
public void notifySubscriber(final Subscribe subscribe, final Event event) { public void notifySubscriber(final Subscribe subscribe, final Event event) {
final String sourceName = event.getClass().getName();
final String targetName = subscribe.subscribeType().getName();
if (!Objects.equals(sourceName, targetName)) {
return;
}
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, LOGGER.debug("[NotifyCenter] the {} will received by {}", event,
subscribe); subscribe);
final Runnable job = () -> { final Runnable job = () -> subscribe.onEvent(event);
try {
subscribe.onEvent(event);
}
catch (ClassCastException ignore) { }
};
final Executor executor = subscribe.executor(); final Executor executor = subscribe.executor();
if (Objects.nonNull(executor)) { if (Objects.nonNull(executor)) {
executor.execute(job); executor.execute(job);

View File

@ -52,61 +52,10 @@ public class NotifyCenter {
private static BiFunction<Class<? extends Event>, Integer, EventPublisher> BUILD_FACTORY = null; private static BiFunction<Class<? extends Event>, Integer, EventPublisher> BUILD_FACTORY = null;
private final EventPublisher sharePublisher = new EventPublisher() {
private EventPublisher target;
@Override
public void init(Class<? extends Event> type, int bufferSize) {
target = BUILD_FACTORY.apply(type, bufferSize);
}
@Override
public long currentEventSize() {
return target.currentEventSize();
}
@Override
public void addSubscribe(Subscribe subscribe) {
target.addSubscribe(subscribe);
}
@Override
public void unSubscribe(Subscribe subscribe) {
target.unSubscribe(subscribe);
}
@Override
public boolean publish(Event event) {
return target.publish(event);
}
@Override
public void notifySubscriber(Subscribe subscribe, Event event) {
// Is to handle a SlowEvent, because the event shares an event
// queue and requires additional filtering logic
if (filter(subscribe, event)) {
return;
}
target.notifySubscriber(subscribe, event);
}
@Override
public void shutdown() {
target.shutdown();
}
private boolean filter(final Subscribe subscribe, final Event event) {
final String sourceName = event.getClass().getName();
final String targetName = subscribe.subscribeType().getName();
LoggerUtils.printIfDebugEnabled(LOGGER, "source event name : {}, target event name : {}", sourceName, targetName);
return !Objects.equals(sourceName, targetName);
}
};
private static final NotifyCenter INSTANCE = new NotifyCenter(); private static final NotifyCenter INSTANCE = new NotifyCenter();
private EventPublisher sharePublisher;
/** /**
* Publisher management container * Publisher management container
*/ */
@ -145,7 +94,7 @@ public class NotifyCenter {
}; };
} }
INSTANCE.sharePublisher.init(SlowEvent.class, SHATE_BUFFER_SIZE); INSTANCE.sharePublisher = BUILD_FACTORY.apply(SlowEvent.class, SHATE_BUFFER_SIZE);
ShutdownUtils.addShutdownHook(new Thread(() -> { ShutdownUtils.addShutdownHook(new Thread(() -> {
shutdown(); shutdown();
})); }));