@ -19,6 +19,7 @@ import com.alibaba.nacos.common.utils.IoUtils;
import com.alibaba.nacos.common.utils.MD5Utils ;
import com.alibaba.nacos.common.utils.Observable ;
import com.alibaba.nacos.common.utils.Observer ;
import com.alibaba.nacos.common.utils.ThreadUtils ;
import com.alibaba.nacos.config.server.constant.Constants ;
import com.alibaba.nacos.config.server.manager.TaskManager ;
import com.alibaba.nacos.config.server.model.ConfigInfo ;
@ -45,10 +46,10 @@ import com.alibaba.nacos.core.distributed.raft.exception.NoSuchRaftGroupExceptio
import com.alibaba.nacos.core.utils.ApplicationUtils ;
import com.alibaba.nacos.core.utils.GlobalExecutor ;
import com.alibaba.nacos.core.utils.InetUtils ;
import com.alibaba.nacos.core.utils.TimerContext ;
import org.apache.commons.lang3.StringUtils ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Service ;
import javax.annotation.PostConstruct ;
@ -65,6 +66,7 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.concurrent.atomic.AtomicReference ;
import static com.alibaba.nacos.config.server.utils.LogUtil.fatalLog ;
@ -84,8 +86,8 @@ public class DumpService {
* Here you inject the dependent objects constructively , ensuring that some
* of the dependent functionality is initialized ahead of time
*
* @param persistService { @link PersistService }
* @param memberManager { @link ServerMemberManager }
* @param persistService { @link PersistService }
* @param memberManager { @link ServerMemberManager }
* @param protocolManager { @link ProtocolManager }
* /
public DumpService ( PersistService persistService , ServerMemberManager memberManager ,
@ -103,12 +105,8 @@ public class DumpService {
return memberManager ;
}
public ProtocolManager getProtocolManager ( ) {
return protocolManager ;
}
@PostConstruct
protected void init ( ) throws Exception {
protected void init ( ) throws Throwable {
DynamicDataSource . getInstance ( ) . getDataSource ( ) ;
DumpProcessor processor = new DumpProcessor ( this ) ;
@ -132,6 +130,9 @@ public class DumpService {
. info ( " With embedded distributed storage, you need to wait for "
+ " the underlying master to complete before you can perform the dump operation. " ) ;
AtomicReference < Throwable > errorReference = new AtomicReference < > ( null ) ;
CountDownLatch waitDumpFinish = new CountDownLatch ( 1 ) ;
/ / watch path = > / nacos_config / leader / has value ?
Observer observer = new Observer ( ) {
@ -142,8 +143,16 @@ public class DumpService {
if ( Objects . isNull ( arg ) ) {
return ;
}
dumpOperate ( processor , dumpAllProcessor , dumpAllBetaProcessor ,
dumpAllTagProcessor ) ;
try {
dumpOperate ( processor , dumpAllProcessor , dumpAllBetaProcessor ,
dumpAllTagProcessor ) ;
}
catch ( Throwable ex ) {
errorReference . set ( ex ) ;
}
finally {
waitDumpFinish . countDown ( ) ;
}
protocol . protocolMetaData ( )
. unSubscribe ( Constants . CONFIG_MODEL_RAFT_GROUP ,
com . alibaba . nacos . consistency . cp . Constants . LEADER_META_DATA ,
@ -156,116 +165,134 @@ public class DumpService {
com . alibaba . nacos . consistency . cp . Constants . LEADER_META_DATA ,
observer ) ;
/ / When all dump is complete , allow the following flow
/ / We must wait for the dump task to complete the callback operation before
/ / continuing with the initialization
ThreadUtils . latchAwait ( waitDumpFinish ) ;
/ / If an exception occurs during the execution of the dump task , the exception
/ / needs to be thrown , triggering the node to start the failed process
final Throwable ex = errorReference . get ( ) ;
if ( Objects . nonNull ( ex ) ) {
throw ex ;
}
}
else {
dumpOperate ( processor , dumpAllProcessor , dumpAllBetaProcessor , dumpAllTagProcessor ) ;
dumpOperate ( processor , dumpAllProcessor , dumpAllBetaProcessor ,
dumpAllTagProcessor ) ;
}
}
private void dumpOperate ( DumpProcessor processor , DumpAllProcessor dumpAllProcessor ,
DumpAllBetaProcessor dumpAllBetaProcessor ,
DumpAllTagProcessor dumpAllTagProcessor ) {
LogUtil . defaultLog . warn ( " DumpService start " ) ;
TimerContext . start ( " config dump job " ) ;
try {
LogUtil . defaultLog . warn ( " DumpService start " ) ;
Runnable dumpAll = ( ) - > dumpAllTaskMgr
. addTask ( DumpAllTask . TASK_ID , new DumpAllTask ( ) ) ;
Runnable dumpAll = ( ) - > dumpAllTaskMgr
. addTask ( DumpAllTask . TASK_ID , new DumpAllTask ( ) ) ;
Runnable dumpAllBeta = ( ) - > dumpAllTaskMgr
. addTask ( DumpAllBetaTask . TASK_ID , new DumpAllBetaTask ( ) ) ;
Runnable dumpAllBeta = ( ) - > dumpAllTaskMgr
. addTask ( DumpAllBetaTask . TASK_ID , new DumpAllBetaTask ( ) ) ;
Runnable clearConfigHistory = ( ) - > {
log . warn ( " clearConfigHistory start " ) ;
if ( canExecute ( ) ) {
try {
Timestamp startTime = getBeforeStamp ( TimeUtils . getCurrentTime ( ) ,
24 * getRetentionDays ( ) ) ;
int totalCount = persistService
. findConfigHistoryCountByTime ( startTime ) ;
if ( totalCount > 0 ) {
int pageSize = 1000 ;
int removeTime = ( totalCount + pageSize - 1 ) / pageSize ;
log . warn (
" clearConfigHistory, getBeforeStamp:{}, totalCount:{}, pageSize:{}, removeTime:{} " ,
new Object [ ] { startTime , totalCount , pageSize ,
removeTime } ) ;
while ( removeTime > 0 ) {
/ / 分页删除 , 以免批量太大报错
persistService . removeConfigHistory ( startTime , pageSize ) ;
removeTime - - ;
Runnable clearConfigHistory = ( ) - > {
log . warn ( " clearConfigHistory start " ) ;
if ( canExecute ( ) ) {
try {
Timestamp startTime = getBeforeStamp ( TimeUtils . getCurrentTime ( ) ,
24 * getRetentionDays ( ) ) ;
int totalCount = persistService
. findConfigHistoryCountByTime ( startTime ) ;
if ( totalCount > 0 ) {
int pageSize = 1000 ;
int removeTime = ( totalCount + pageSize - 1 ) / pageSize ;
log . warn (
" clearConfigHistory, getBeforeStamp:{}, totalCount:{}, pageSize:{}, removeTime:{} " ,
startTime , totalCount , pageSize , removeTime ) ;
while ( removeTime > 0 ) {
/ / 分页删除 , 以免批量太大报错
persistService . removeConfigHistory ( startTime , pageSize ) ;
removeTime - - ;
}
}
}
}
catch ( Throwable e ) {
log . error ( " clearConfigHistory error " , e ) ;
}
}
} ;
try {
dumpConfigInfo ( dumpAllProcessor ) ;
/ / 更新beta缓存
LogUtil . defaultLog . info ( " start clear all config-info-beta. " ) ;
DiskUtil . clearAllBeta ( ) ;
if ( persistService . isExistTable ( BETA_TABLE_NAME ) ) {
dumpAllBetaProcessor
. process ( DumpAllBetaTask . TASK_ID , new DumpAllBetaTask ( ) ) ;
}
/ / 更新Tag缓存
LogUtil . defaultLog . info ( " start clear all config-info-tag. " ) ;
DiskUtil . clearAllTag ( ) ;
if ( persistService . isExistTable ( TAG_TABLE_NAME ) ) {
dumpAllTagProcessor . process ( DumpAllTagTask . TASK_ID , new DumpAllTagTask ( ) ) ;
}
/ / add to dump aggr
List < ConfigInfoChanged > configList = persistService . findAllAggrGroup ( ) ;
if ( configList ! = null & & ! configList . isEmpty ( ) ) {
total = configList . size ( ) ;
List < List < ConfigInfoChanged > > splitList = splitList ( configList ,
INIT_THREAD_COUNT ) ;
for ( List < ConfigInfoChanged > list : splitList ) {
MergeAllDataWorker work = new MergeAllDataWorker ( list ) ;
work . start ( ) ;
}
log . info ( " server start, schedule merge end. " ) ;
}
}
catch ( Exception e ) {
LogUtil . fatalLog
. error ( " Nacos Server did not start because dumpservice bean construction failure : \ n "
+ e . getMessage ( ) , e . getCause ( ) ) ;
throw new RuntimeException (
" Nacos Server did not start because dumpservice bean construction failure : \ n "
+ e . getMessage ( ) ) ;
}
if ( ! ApplicationUtils . getStandaloneMode ( ) ) {
Runnable heartbeat = ( ) - > {
String heartBeatTime = TimeUtils . getCurrentTime ( ) . toString ( ) ;
/ / write disk
try {
DiskUtil . saveHeartBeatToDisk ( heartBeatTime ) ;
}
catch ( IOException e ) {
LogUtil . fatalLog . error ( " save heartbeat fail " + e . getMessage ( ) ) ;
catch ( Throwable e ) {
log . error ( " clearConfigHistory error " , e ) ;
}
}
} ;
TimerTaskService . scheduleWithFixedDelay ( heartbeat , 0 , 10 , TimeUnit . SECONDS ) ;
try {
dumpConfigInfo ( dumpAllProcessor ) ;
long initialDelay = new Random ( ) . nextInt ( INITIAL_DELAY_IN_MINUTE ) + 10 ;
LogUtil . defaultLog . warn ( " initialDelay:{} " , initialDelay ) ;
/ / 更新beta缓存
LogUtil . defaultLog . info ( " start clear all config-info-beta. " ) ;
DiskUtil . clearAllBeta ( ) ;
if ( persistService . isExistTable ( BETA_TABLE_NAME ) ) {
dumpAllBetaProcessor
. process ( DumpAllBetaTask . TASK_ID , new DumpAllBetaTask ( ) ) ;
}
/ / 更新Tag缓存
LogUtil . defaultLog . info ( " start clear all config-info-tag. " ) ;
DiskUtil . clearAllTag ( ) ;
if ( persistService . isExistTable ( TAG_TABLE_NAME ) ) {
dumpAllTagProcessor
. process ( DumpAllTagTask . TASK_ID , new DumpAllTagTask ( ) ) ;
}
TimerTaskService . scheduleWithFixedDelay ( dumpAll , initialDelay ,
DUMP_ALL_INTERVAL_IN_MINUTE , TimeUnit . MINUTES ) ;
/ / add to dump aggr
List < ConfigInfoChanged > configList = persistService . findAllAggrGroup ( ) ;
if ( configList ! = null & & ! configList . isEmpty ( ) ) {
total = configList . size ( ) ;
List < List < ConfigInfoChanged > > splitList = splitList ( configList ,
INIT_THREAD_COUNT ) ;
for ( List < ConfigInfoChanged > list : splitList ) {
MergeAllDataWorker work = new MergeAllDataWorker ( list ) ;
work . start ( ) ;
}
log . info ( " server start, schedule merge end. " ) ;
}
}
catch ( Exception e ) {
LogUtil . fatalLog
. error ( " Nacos Server did not start because dumpservice bean construction failure : \ n "
+ e . getMessage ( ) , e . getCause ( ) ) ;
throw new RuntimeException (
" Nacos Server did not start because dumpservice bean construction failure : \ n "
+ e . getMessage ( ) ) ;
}
if ( ! ApplicationUtils . getStandaloneMode ( ) ) {
Runnable heartbeat = ( ) - > {
String heartBeatTime = TimeUtils . getCurrentTime ( ) . toString ( ) ;
/ / write disk
try {
DiskUtil . saveHeartBeatToDisk ( heartBeatTime ) ;
}
catch ( IOException e ) {
LogUtil . fatalLog . error ( " save heartbeat fail " + e . getMessage ( ) ) ;
}
} ;
TimerTaskService . scheduleWithFixedDelay ( dumpAllBeta , initialDelay ,
DUMP_ALL_INTERVAL_IN_MINUTE , TimeUnit . MINUTES ) ;
TimerTaskService
. scheduleWithFixedDelay ( heartbeat , 0 , 10 , TimeUnit . SECONDS ) ;
long initialDelay = new Random ( ) . nextInt ( INITIAL_DELAY_IN_MINUTE ) + 10 ;
LogUtil . defaultLog . warn ( " initialDelay:{} " , initialDelay ) ;
TimerTaskService . scheduleWithFixedDelay ( dumpAll , initialDelay ,
DUMP_ALL_INTERVAL_IN_MINUTE , TimeUnit . MINUTES ) ;
TimerTaskService . scheduleWithFixedDelay ( dumpAllBeta , initialDelay ,
DUMP_ALL_INTERVAL_IN_MINUTE , TimeUnit . MINUTES ) ;
}
TimerTaskService
. scheduleWithFixedDelay ( clearConfigHistory , 10 , 10 , TimeUnit . MINUTES ) ;
}
finally {
TimerContext . end ( LogUtil . dumpLog ) ;
}
TimerTaskService
. scheduleWithFixedDelay ( clearConfigHistory , 10 , 10 , TimeUnit . MINUTES ) ;
}