Merge pull request #2874 from alibaba/hotfix_notify_cast_error

fix: fix the class cast problem
This commit is contained in:
liaochuntao 2020-05-24 22:32:18 +08:00 committed by GitHub
commit aad542a098
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 88 additions and 74 deletions

View File

@ -29,4 +29,10 @@ public final class LoggerUtils {
}
}
public static void printIfInfoEnabled(Logger logger, String s, Object... args) {
if (logger.isInfoEnabled()) {
logger.info(s, args);
}
}
}

View File

@ -15,13 +15,13 @@
*/
package com.alibaba.nacos.config.server.model.event;
import com.alibaba.nacos.core.notify.Event;
import com.alibaba.nacos.core.notify.SlowEvent;
/**
* @author <a href="mailto:liaochunyhm@live.com">liaochuntao</a>
*/
@SuppressWarnings("PMD.ClassNamingShouldBeCamelRule")
public class RaftDBErrorEvent implements Event {
public class RaftDBErrorEvent implements SlowEvent {
private static final long serialVersionUID = 101591819161802336L;

View File

@ -155,7 +155,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
this.transactionTemplate = dataSourceService.getTransactionTemplate();
// Registers a Derby Raft state machine failure event for node degradation processing
NotifyCenter.registerToPublisher(RaftDBErrorEvent.class, 8);
NotifyCenter.registerToSharePublisher(RaftDBErrorEvent.class);
NotifyCenter.registerSubscribe(new Subscribe<RaftDBErrorEvent>() {
@Override

View File

@ -175,6 +175,7 @@ public class ServerMemberManager
String oldAddress = event.getOldIp() + ":" + port;
String newAddress = event.getNewIp() + ":" + port;
ServerMemberManager.this.localAddress = newAddress;
ApplicationUtils.setLocalAddress(localAddress);
Member self = ServerMemberManager.this.self;
self.setIp(event.getNewIp());

View File

@ -29,6 +29,7 @@ import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static com.alibaba.nacos.core.notify.NotifyCenter.RING_BUFFER_SIZE;
@ -51,7 +52,8 @@ public class DefaultPublisher extends Thread implements EventPublisher {
private final ConcurrentHashSet<Subscribe> subscribes = new ConcurrentHashSet<>();
private int queueMaxSize = -1;
private BlockingQueue<Event> queue;
private long lastEventSequence = -1L;
private volatile Long lastEventSequence = -1L;
private final AtomicReferenceFieldUpdater<DefaultPublisher, Long> updater = AtomicReferenceFieldUpdater.newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");
@Override
public void init(Class<? extends Event> type, int bufferSize) {
@ -102,7 +104,7 @@ public class DefaultPublisher extends Thread implements EventPublisher {
}
final Event event = queue.take();
receiveEvent(event);
lastEventSequence = Math.max(lastEventSequence, event.sequence());
updater.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
}
catch (Throwable ex) {
@ -124,22 +126,15 @@ public class DefaultPublisher extends Thread implements EventPublisher {
@Override
public boolean publish(Event event) {
checkIsStart();
try {
this.queue.put(event);
return true;
}
catch (InterruptedException ignore) {
Thread.interrupted();
boolean success = this.queue.offer(event);
if (!success) {
LOGGER.warn(
"Unable to plug in due to interruption, synchronize sending time, event : {}",
event);
receiveEvent(event);
return true;
}
catch (Throwable ex) {
LOGGER.error("[NotifyCenter] publish {} has error : {}", event, ex);
return false;
}
return true;
}
void checkIsStart() {
@ -192,7 +187,12 @@ public class DefaultPublisher extends Thread implements EventPublisher {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event,
subscribe);
final Runnable job = () -> subscribe.onEvent(event);
final Runnable job = () -> {
try {
subscribe.onEvent(event);
}
catch (ClassCastException ignore) { }
};
final Executor executor = subscribe.executor();
if (Objects.nonNull(executor)) {
executor.execute(job);

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.core.notify;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.ShutdownUtils;
import com.alibaba.nacos.core.notify.listener.SmartSubscribe;
import com.alibaba.nacos.core.notify.listener.Subscribe;
@ -51,48 +52,6 @@ public class NotifyCenter {
private static BiFunction<Class<? extends Event>, Integer, EventPublisher> BUILD_FACTORY = null;
static {
// Internal ArrayBlockingQueue buffer size. For applications with high write throughput,
// this value needs to be increased appropriately. default value is 16384
String ringBufferSizeProperty = "com.alibaba.nacos.core.notify.ringBufferSize";
RING_BUFFER_SIZE = Integer.getInteger(ringBufferSizeProperty, 16384);
// The size of the public publisher's message staging queue buffer
String shareBufferSizeProperty = "com.alibaba.nacos.core.notify.shareBufferSize";
SHATE_BUFFER_SIZE = Integer.getInteger(shareBufferSizeProperty, 1024);
ServiceLoader<EventPublisher> loader = ServiceLoader.load(EventPublisher.class);
Iterator<EventPublisher> iterator = loader.iterator();
if (iterator.hasNext()) {
BUILD_FACTORY = (cls, buffer) -> {
loader.reload();
EventPublisher publisher = ServiceLoader.load(EventPublisher.class).iterator().next();
publisher.init(cls, buffer);
return publisher;
};
} else {
BUILD_FACTORY = (cls, buffer) -> {
EventPublisher publisher = new DefaultPublisher();
publisher.init(cls, buffer);
return publisher;
};
}
}
private static final NotifyCenter INSTANCE = new NotifyCenter();
/**
* Publisher management container
*/
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
/**
* Multi-event listening list
*/
private final Set<SmartSubscribe> smartSubscribes = new ConcurrentHashSet<>();
private final EventPublisher sharePublisher = new EventPublisher() {
private EventPublisher target;
@ -138,14 +97,61 @@ public class NotifyCenter {
}
private boolean filter(final Subscribe subscribe, final Event event) {
final String sourceName = event.getClass().getCanonicalName();
final String targetName = subscribe.subscribeType()
.getCanonicalName();
final String sourceName = event.getClass().getName();
final String targetName = subscribe.subscribeType().getName();
LoggerUtils.printIfDebugEnabled(LOGGER, "source event name : {}, target event name : {}", sourceName, targetName);
return !Objects.equals(sourceName, targetName);
}
};
private static final NotifyCenter INSTANCE = new NotifyCenter();
/**
* Publisher management container
*/
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
/**
* Multi-event listening list
*/
private final Set<SmartSubscribe> smartSubscribes = new ConcurrentHashSet<>();
static {
// Internal ArrayBlockingQueue buffer size. For applications with high write throughput,
// this value needs to be increased appropriately. default value is 16384
String ringBufferSizeProperty = "com.alibaba.nacos.core.notify.ringBufferSize";
RING_BUFFER_SIZE = Integer.getInteger(ringBufferSizeProperty, 16384);
// The size of the public publisher's message staging queue buffer
String shareBufferSizeProperty = "com.alibaba.nacos.core.notify.shareBufferSize";
SHATE_BUFFER_SIZE = Integer.getInteger(shareBufferSizeProperty, 1024);
ServiceLoader<EventPublisher> loader = ServiceLoader.load(EventPublisher.class);
Iterator<EventPublisher> iterator = loader.iterator();
if (iterator.hasNext()) {
BUILD_FACTORY = (cls, buffer) -> {
loader.reload();
EventPublisher publisher = ServiceLoader.load(EventPublisher.class).iterator().next();
publisher.init(cls, buffer);
return publisher;
};
} else {
BUILD_FACTORY = (cls, buffer) -> {
EventPublisher publisher = new DefaultPublisher();
publisher.init(cls, buffer);
return publisher;
};
}
INSTANCE.sharePublisher.init(SlowEvent.class, SHATE_BUFFER_SIZE);
ShutdownUtils.addShutdownHook(new Thread(() -> {
shutdown();
}));
}
@JustForTest
public static Map<String, EventPublisher> getPublisherMap() {
return INSTANCE.publisherMap;
@ -169,13 +175,6 @@ public class NotifyCenter {
return INSTANCE.sharePublisher;
}
static {
INSTANCE.sharePublisher.init(SlowEvent.class, SHATE_BUFFER_SIZE);
ShutdownUtils.addShutdownHook(new Thread(() -> {
shutdown();
}));
}
private static final AtomicBoolean closed = new AtomicBoolean(false);
public static void shutdown() {

View File

@ -46,6 +46,10 @@ public class NotifyCenter_ITCase {
}
static {
System.setProperty("com.alibaba.nacos.core.notify.shareBufferSize", "8");
}
@Test
public void test_a_event_can_listen() throws Exception {
@ -216,14 +220,15 @@ public class NotifyCenter_ITCase {
NotifyCenter.registerToSharePublisher(SlowE1.class);
NotifyCenter.registerToSharePublisher(SlowE2.class);
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch1 = new CountDownLatch(30);
CountDownLatch latch2 = new CountDownLatch(30);
String[] values = new String[] {null, null};
NotifyCenter.registerSubscribe(new Subscribe<SlowE1>() {
@Override
public void onEvent(SlowE1 event) {
System.out.println(event);
values[0] = event.info;
latch1.countDown();
}
@ -237,6 +242,7 @@ public class NotifyCenter_ITCase {
NotifyCenter.registerSubscribe(new Subscribe<SlowE2>() {
@Override
public void onEvent(SlowE2 event) {
System.out.println(event);
values[1] = event.info;
latch2.countDown();
}
@ -247,11 +253,13 @@ public class NotifyCenter_ITCase {
}
});
NotifyCenter.publishEvent(new SlowE1());
NotifyCenter.publishEvent(new SlowE2());
for (int i = 0; i < 30; i ++) {
NotifyCenter.publishEvent(new SlowE1());
NotifyCenter.publishEvent(new SlowE2());
}
latch1.await(10_000L, TimeUnit.MILLISECONDS);
latch2.await(10_000L, TimeUnit.MILLISECONDS);
latch1.await();
latch2.await();
Assert.assertEquals("SlowE1", values[0]);
Assert.assertEquals("SlowE2", values[1]);