中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

網(wǎng)站建設(shè)和網(wǎng)絡(luò)推廣是干嘛廣告做到百度第一頁

網(wǎng)站建設(shè)和網(wǎng)絡(luò)推廣是干嘛,廣告做到百度第一頁,企業(yè)網(wǎng)站管理源碼,html5韓國網(wǎng)站模板1.設(shè)計 前面已經(jīng)完成了netty的集成,接下來就是借助netty完成選舉就行了。 針對選舉,我們用到了VotRequestMessage、VotRespMessage、當節(jié)點下線時NodeOfflineMessage、NodeOnlineMessage、NodeOnlineRespMessage 1.1 節(jié)點詳細的交互 1.2 對所有消息的…

1.設(shè)計

前面已經(jīng)完成了netty的集成,接下來就是借助netty完成選舉就行了。

針對選舉,我們用到了VotRequestMessage、VotRespMessage、當節(jié)點下線時NodeOfflineMessage、NodeOnlineMessage、NodeOnlineRespMessage

1.1 節(jié)點詳細的交互

1.2 對所有消息的處理使用策略模式

由于我們是Spring的應(yīng)用,我可以借助Spring的容器完成對消息類型的選擇

1.2.1 創(chuàng)建 IMessageService 接口

所有消息的處理都實現(xiàn)此接口

public interface IMessageService {byte getMessageType();void execute(ChannelHandlerContext ctx, DttaskMessage message);}

1.2.2 DttaskMessage

@Data
public class DttaskMessage {public static final byte COMMON_RESP = 0X00;public static final byte PING = 0X01;public static final byte PONG = 0X02;public static final byte VOTING = 0X03;public static final byte VOT_RESP = 0X04;public static final byte NODE_OFFLINE = 0X05;public static final byte NODE_ONLINE = 0X06;public static final byte NODE_ONLINE_RESP = 0X07;// 類型private byte type;// 消息實際信息private String info;public static DttaskMessage buildPingMessage(long serverId) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(PING);dttaskMessage.setInfo(JSON.toJSONString(new PingMessage(serverId)));return dttaskMessage;}public static DttaskMessage buildPongMessage(long serverId) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(PONG);dttaskMessage.setInfo(JSON.toJSONString(new PongMessage(serverId)));return dttaskMessage;}public static DttaskMessage buildCommonRespMessage(String message, boolean successFlag) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(COMMON_RESP);dttaskMessage.setInfo(JSON.toJSONString(new CommonRespMessage(message, successFlag)));return dttaskMessage;}public static DttaskMessage buildNodeOnlineRespMessage(long serverId) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(NODE_ONLINE_RESP);dttaskMessage.setInfo(JSON.toJSONString(new NodeOnlineRespMessage(serverId)));return dttaskMessage;}public static DttaskMessage buildNodeOnlineMessage(long serverId) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(NODE_ONLINE);dttaskMessage.setInfo(JSON.toJSONString(new NodeOnlineMessage(serverId)));return dttaskMessage;}public static DttaskMessage buildNodeOfflineMessage(long serverId) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(NODE_OFFLINE);dttaskMessage.setInfo(JSON.toJSONString(new VotRespMessage(serverId)));return dttaskMessage;}public static DttaskMessage buildVotRespMessage(long serverId) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(VOT_RESP);dttaskMessage.setInfo(JSON.toJSONString(new VotRespMessage(serverId)));return dttaskMessage;}public static DttaskMessage buildVotRequestMessage(Long lastControllerServerId, long fromServerId, long serverId, int version) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(VOTING);dttaskMessage.setInfo(JSON.toJSONString(new VotRequestMessage(lastControllerServerId, fromServerId, serverId, version)));return dttaskMessage;}}

1.2.3 VotingMessageService

這里以VotingMessageService為例,它實現(xiàn)了IMessageService接口,完成投票信息的處理。其它的*MessageService也是同樣的,這里就不一一舉例了,可以在 com.swsm.dttask.server.service.message包下查看

@Slf4j
@Component
public class VotingMessageService implements IMessageService {@Overridepublic byte getMessageType() {return DttaskMessage.VOTING;}@Overridepublic void execute(ChannelHandlerContext ctx, DttaskMessage message) {Channel channel = ctx.channel();VotRequestMessage votRequestMessage = JSON.parseObject(message.getInfo(), VotRequestMessage.class);long fromServerId = votRequestMessage.getFromServerId();Long lastControllerServerId = votRequestMessage.getLastControllerServerId();ServerInfo.addOtherNode(fromServerId, channel);boolean addRes = ServerInfo.addVotResult(votRequestMessage.getVersion(), votRequestMessage.getServerId());if (!addRes) {log.info("丟棄以前版本的投票信息={}", votRequestMessage);return;}// 歸票VotResult votResult = ServerInfo.getVotResult();Map<Long, Integer> votMap = votResult.getVotMap();for (Map.Entry<Long, Integer> entry : votMap.entrySet()) {long controllerServerId = entry.getKey();if (votMap.get(controllerServerId) >= ServerInfo.getVotMax()) {// 歸票成功log.info("本節(jié)點={}是controller", controllerServerId);ServerInfo.setStatus(ServerStatus.RUNNING);Controller controller = ServerInfo.initController();for (Long otherServerId : ServerInfo.getOtherNodeIds()) {Channel otherNodeChannel = ServerInfo.getChannelByServerId(otherServerId);otherNodeChannel.writeAndFlush(DttaskMessage.buildVotRespMessage(controllerServerId));}return;}}}
}

1.2.4 MessageServiceManager

Spring托管的bean,里面會將所有實現(xiàn)了IMessageService接口的類都管理起來,并在消息到來時進行選擇

@Slf4j
@Component
public class MessageServiceManager {@Autowired(required = false)private List<IMessageService> messageServices;private Map<Byte, IMessageService> messageServiceMap = new HashMap<>();@PostConstructpublic void init() {if (messageServices != null) {for (IMessageService messageService : messageServices) {messageServiceMap.put(messageService.getMessageType(), messageService);}}}public IMessageService chooseMessageService(byte messageType) {if (messageServiceMap.containsKey(messageType)) {return messageServiceMap.get(messageType);}return messageServiceMap.get(Byte.MIN_VALUE);}}

1.2.5 借助netty實現(xiàn)節(jié)點與節(jié)點直接的心跳

@Slf4j
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {  @Override  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  if (evt instanceof IdleStateEvent) {  IdleStateEvent event = (IdleStateEvent) evt;  if (event.state() == IdleState.READER_IDLE) {  log.warn("讀取空閑...");} else if (event.state() == IdleState.WRITER_IDLE) {log.warn("寫入空閑...");} else if (event.state() == IdleState.ALL_IDLE) {Long serverId = ServerInfo.getServerIdByChannelId(ctx.channel().id());log.warn("serverId={}與server通信讀取或?qū)懭肟臻e...", serverId);if (serverId != null) {ctx.writeAndFlush(DttaskMessage.buildPingMessage(serverId));} else {ctx.close();}}}  }  
}

1.2.6 ServerInfo類完成所有信息的管理

ServerInfo類完成所有信息的管理,它里面會存儲很多系統(tǒng)運行需要的數(shù)據(jù)如:當前節(jié)點信息(myNodeInfo)、Channel和節(jié)點的關(guān)系(nodeChannelMap、nodeChannelServerIdMap)、系統(tǒng)狀態(tài)(status)、其它節(jié)點信息(otherNodeInfoMap);

ServerInfo還肩負著確定節(jié)點角色(Controller或Follower)以及初始化角色的任務(wù)

@Slf4j
public class ServerInfo {private ServerInfo() {}private static NioEventLoopGroup bossGroup;public static void setBossGroup(NioEventLoopGroup bg) {bossGroup = bg;}public static NioEventLoopGroup getBossGroup() {return bossGroup;}private static NioEventLoopGroup workerGroup;private static Channel serverChannel;private static Bootstrap connectOtherNodeBootStrap;private static ServerBootstrap bootstrapForClient;private static NodeInfo myNodeInfo;private static Map<Long, Channel> nodeChannelMap = new ConcurrentHashMap<>();private static Map<ChannelId, Long> nodeChannelServerIdMap = new ConcurrentHashMap<>();private static volatile ServerStatus status;private static Map<Long, NodeInfo> otherNodeInfoMap = new ConcurrentHashMap<>();private static VotResult votResult = new VotResult();private static Controller controller;private static Follower follower;public static void init() {RedisUtil redisUtil = BeanUseHelper.redisUtil();DttaskServerConfig dttaskServerConfig = BeanUseHelper.dttaskServerConfig();long localServerId = dttaskServerConfig.getServerId();ServerInfo.setStatus(ServerStatus.STARTING);Long controllerServerId = redisUtil.getLongValue(Constant.RedisConstants.DTTASK_CONTROLLER);if (controllerServerId == null) {log.info("當前啟動狀態(tài)為:未確定controller");initNodeInfoByConfig();Long minServerId = ServerInfo.getMinNodeId();if (minServerId == localServerId) {log.info("就當前一個節(jié)點:{},此節(jié)點就是controller", localServerId);ServerInfo.setStatus(ServerStatus.RUNNING);} else {log.info("有多個節(jié)點,節(jié)點狀態(tài)應(yīng)為VOTING");ServerInfo.setStatus(ServerStatus.VOTING);}} else {log.info("當前啟動狀態(tài)為:已確定controller");ServerInfo.refreshNodeInfoByRedis();InetSocketAddress address = dttaskServerConfig.getServerInfoMap().get(localServerId);ServerInfo.setMyNodeInfo(localServerId, address.getHostString(), address.getPort(), null);ServerInfo.setStatus(ServerStatus.IDENTIFYING);}}private static void initNodeInfoByConfig() {DttaskServerConfig dttaskServerConfig = BeanUseHelper.dttaskServerConfig();long localServerId = dttaskServerConfig.getServerId();Map<Long, InetSocketAddress> serverInfoMap = dttaskServerConfig.getServerInfoMap();for (Map.Entry<Long, InetSocketAddress> entry : serverInfoMap.entrySet()) {long id = entry.getKey();InetSocketAddress address = serverInfoMap.get(id);if (localServerId != id) {ServerInfo.addOtherNode(id, address.getHostString(), address.getPort());} else {ServerInfo.setMyNodeInfo(localServerId, address.getHostString(), address.getPort(), null);}}}public static Controller initController() {long localServerId = ServerInfo.getServerId();RedisUtil redisUtil = BeanUseHelper.redisUtil();log.info("初始化本節(jié)點={}controller信息...", ServerInfo.getServerId());ServerInfo.setRole(ServerRole.CONTROLLER);redisUtil.setCacheObject(Constant.RedisConstants.DTTASK_CONTROLLER, localServerId);ServerInfo.setOtherNodeRole(localServerId);ServerInfo.refreshRedisNodeInfo();controller = Controller.getInstance();return controller;}public static Follower initFollower() {log.info("初始化本節(jié)點={}follower信息...", ServerInfo.getServerId());RedisUtil redisUtil = BeanUseHelper.redisUtil();Long controllerServerId = redisUtil.getLongValue(Constant.RedisConstants.DTTASK_CONTROLLER);if (controllerServerId == null) {log.error("init follower時,controller還沒有確定...");throw new BusinessException("init follower時,controller還沒有確定...");}ServerInfo.setRole(ServerRole.FOLLOWER);ServerInfo.setStatus(ServerStatus.RUNNING);ServerInfo.setOtherNodeRole(controllerServerId);follower = Follower.getInstance();return follower;}public static void setMyNodeInfo(long serverId, String ip, int port, ServerRole serverRole) {NodeInfo nodeInfo = new NodeInfo();nodeInfo.setServerId(serverId);nodeInfo.setIp(ip);nodeInfo.setPort(port);nodeInfo.setServerRole(serverRole);myNodeInfo = nodeInfo;}public static long getServerId() {return myNodeInfo.getServerId();}public static NodeInfo getMyNodeInfo() {return myNodeInfo;}public static Map<Long, NodeInfo> getOtherNodeInfoMap() {return otherNodeInfoMap;}public static int getVotMax() {return otherNodeInfoMap.size();}public static VotResult getVotResult() {return votResult;}public static synchronized void cacheChannelAnsServerIdRel(long serverId, Channel channel) {nodeChannelMap.put(serverId, channel);nodeChannelServerIdMap.put(channel.id(), serverId);}public static Channel getChannelByServerId(Long serverId) {return nodeChannelMap.get(serverId);}public static synchronized void removeChannel(ChannelId channelId) {Long serverId = nodeChannelServerIdMap.get(channelId);if (serverId != null) {log.info("刪除和節(jié)點id={}的連接", serverId);nodeChannelServerIdMap.remove(channelId);nodeChannelMap.remove(serverId);otherNodeInfoMap.remove(serverId);}}public static synchronized void refreshRedisNodeInfo() {if (myNodeInfo != null && getRole() != null && getRole().isController()) {RedisUtil redisUtil = BeanUseHelper.redisUtil();Map<Long, NodeInfo> otherNodeInfoMap = ServerInfo.getOtherNodeInfoMap();List<NodeInfo> nodeInfoList = new ArrayList<>();nodeInfoList.addAll(otherNodeInfoMap.values());nodeInfoList.add(ServerInfo.getMyNodeInfo());log.info("controller刷新節(jié)點信息到redis:{}", nodeInfoList);redisUtil.setCacheObject(Constant.RedisConstants.DTTASK_NODE_INFO, JSON.toJSONString(nodeInfoList));}}public static Long getServerIdByChannelId(ChannelId channelId) {return nodeChannelServerIdMap.get(channelId);}public static synchronized boolean addVotResult(int version, long chooseServerId) {Integer curVersion = votResult.getVersion();if (version < votResult.getVersion()) {log.info("版本={}已失效,當前的為:{}", version, curVersion);return false;}votResult.setVersion(version);if (votResult.getVotMap().containsKey(chooseServerId)) {votResult.getVotMap().put(chooseServerId, votResult.getVotMap().get(chooseServerId) + 1);} else {votResult.getVotMap().put(chooseServerId, 1);}return true;}public static void addOtherNode(long serverId, String ip, int port) {NodeInfo nodeInfo = new NodeInfo();nodeInfo.setServerId(serverId);nodeInfo.setIp(ip);nodeInfo.setPort(port);otherNodeInfoMap.put(serverId, nodeInfo);}public static void addOtherNode(long serverId, Channel channel) {addOtherNode(serverId, channel, null);}public static void addOtherNode(long serverId, Channel channel, ServerRole serverRole) {InetSocketAddress address = BeanUseHelper.dttaskServerConfig().getServerInfoMap().get(serverId);if (address == null) {throw new BusinessException(CharSequenceUtil.format("id={}的沒有配置在文件中", serverId));}if (channel != null && ServerInfo.getServerIdByChannelId(channel.id()) == null) {ServerInfo.cacheChannelAnsServerIdRel(serverId, channel);}NodeInfo nodeInfo = new NodeInfo();nodeInfo.setServerId(serverId);nodeInfo.setIp(address.getHostString());nodeInfo.setPort(address.getPort());nodeInfo.setServerRole(serverRole);otherNodeInfoMap.put(serverId, nodeInfo);}public static NodeInfo getNodeInfo(long serverId) {return otherNodeInfoMap.get(serverId);}public static Set<Long> getOtherNodeIds() {return otherNodeInfoMap.keySet();}public static long getMinNodeId() {if (getOtherNodeIds().isEmpty()) {return ServerInfo.getServerId();}return Collections.min(getOtherNodeIds());}public static void setOtherNodeRole(long controllerServerId) {for (Map.Entry<Long, NodeInfo> entry : otherNodeInfoMap.entrySet()) {long serverId = entry.getKey();if (serverId == controllerServerId) {otherNodeInfoMap.get(serverId).setServerRole(ServerRole.CONTROLLER);} else {otherNodeInfoMap.get(serverId).setServerRole(ServerRole.FOLLOWER);}}}public static ServerRole getRole() {return myNodeInfo.getServerRole();}public static void setStatus(ServerStatus s) {status = s;}public static void setRole(ServerRole r) {myNodeInfo.setServerRole(r);}public static Set<Channel> getOtherNodeChannel(Long serverId) {Set<Channel> res = new HashSet<>();for (Map.Entry<Long, Channel> entry : nodeChannelMap.entrySet()) {long id = entry.getKey();if (!Objects.equals(serverId, id)) {res.add(nodeChannelMap.get(id));}}return res;}/*** 這個方法針對,本節(jié)點并沒有和要斷開節(jié)點有連接的* @param offlineServerId 掉線的節(jié)點id*/public static void removeNode(long offlineServerId) {log.info("刪除掉線節(jié)點id={}", offlineServerId);otherNodeInfoMap.remove(offlineServerId);}public static void refreshNodeInfoByRedis() {RedisUtil redisUtil = BeanUseHelper.redisUtil();Object obj = redisUtil.getCacheObject(Constant.RedisConstants.DTTASK_NODE_INFO);if (obj != null) {List<NodeInfo> nodeInfoList = JSON.parseObject(obj.toString(),new TypeReference<List<NodeInfo>>() {}.getType());for (NodeInfo nodeInfo : nodeInfoList) {otherNodeInfoMap.put(nodeInfo.getServerId(), nodeInfo);}}}public static boolean isIdentifying() {return status.isIdentifying();}public static boolean isVoting() {return status.isVoting();}public static boolean isRunning() {return status.isRunning();}public static void setWorkerGroup(NioEventLoopGroup bg) {workerGroup = bg;}public static NioEventLoopGroup getWorkerGroup() {return workerGroup;}public static void setServerChannel(Channel ch) {serverChannel = ch;}public static Channel getServerChannel() {return serverChannel;}public static void setConnectOtherNodeBootStrap(Bootstrap bs) {connectOtherNodeBootStrap = bs;}public static Bootstrap getConnectOtherNodeBootStrap() {return connectOtherNodeBootStrap;}public static void setBootstrapForClient(ServerBootstrap sbs) {bootstrapForClient = sbs;}public static ServerBootstrap getBootstrapForClient() {return bootstrapForClient;}}

1.2.7 SpringInitRunner -- 增加選舉的邏輯

SpringInitRunner前面只啟動了netty等待連接,這里將完成選舉,以及當就自己一個節(jié)點則就認為自己是controller

@Component
@Slf4j
public class SpringInitRunner implements CommandLineRunner {@Autowiredprivate DttaskServerConfig dttaskServerConfig;@Autowiredprivate NetworkService networkService;@Autowiredprivate MessageServiceManager messageServiceManager;@Autowiredprivate RedisUtil redisUtil;@PostConstructpublic void init() {initServerBootStrap();initConnectOtherNodeBootStrap();}private void initConnectOtherNodeBootStrap() {ServerInfo.setConnectOtherNodeBootStrap(new Bootstrap());ServerInfo.getConnectOtherNodeBootStrap().group(new NioEventLoopGroup(4)).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) {socketChannel.pipeline().addLast(new DttaskMessageDecoder(MESSAGE_MAX_SIZE, MESSAGE_LENGTH_FILED_OFFSET, MESSAGE_LENGTH_FILED_LENGTH));socketChannel.pipeline().addLast(new IdleStateHandler(dttaskServerConfig.getReadIdleSecondTime(),dttaskServerConfig.getWriteIdleSecondTime(),dttaskServerConfig.getAllIdleSecondTime()));socketChannel.pipeline().addLast(new DttaskMessageEncoder());socketChannel.pipeline().addLast(new ServerClientChannelHandler(networkService, redisUtil, messageServiceManager));}});}private void initServerBootStrap() {ServerInfo.setBossGroup(new NioEventLoopGroup(4));ServerInfo.setWorkerGroup(new NioEventLoopGroup(8));ServerInfo.setBootstrapForClient(new ServerBootstrap());ServerInfo.getBootstrapForClient().group(ServerInfo.getBossGroup(), ServerInfo.getWorkerGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) {socketChannel.pipeline().addLast(new DttaskMessageDecoder(MESSAGE_MAX_SIZE, MESSAGE_LENGTH_FILED_OFFSET, MESSAGE_LENGTH_FILED_LENGTH));socketChannel.pipeline().addLast(new DttaskMessageEncoder());IdleStateHandler idleStateHandler = new IdleStateHandler(dttaskServerConfig.getReadIdleSecondTime(), dttaskServerConfig.getWriteIdleSecondTime(), dttaskServerConfig.getAllIdleSecondTime());socketChannel.pipeline().addLast(idleStateHandler);socketChannel.pipeline().addLast(new HeartBeatServerHandler());socketChannel.pipeline().addLast(new ServerClientChannelHandler(networkService, redisUtil, messageServiceManager));}});}@Overridepublic void run(String... args) {log.info("spring啟動完成,接下來啟動 netty");ServerInfo.init();try {log.info("啟動監(jiān)聽其它節(jié)點端請求的服務(wù)端...");ServerInfo.setServerChannel(ServerInfo.getBootstrapForClient().bind(dttaskServerConfig.listenerPort()).sync().channel());} catch (Exception e) {log.error("啟動 監(jiān)聽其它節(jié)點請求的服務(wù)端出現(xiàn)異常", e);System.exit(-1);}try {log.info("連接controller或開始vote...");if (ServerInfo.isIdentifying()) {log.info("連接controller...");RedisUtil redisUtil = BeanUseHelper.redisUtil();long controllerServerId = redisUtil.getLongValue(Constant.RedisConstants.DTTASK_CONTROLLER);networkService.connectController(controllerServerId);} else if (ServerInfo.isVoting()){log.info("開始vote...");long minNodeId = ServerInfo.getMinNodeId();networkService.startVote(null, minNodeId);} else if (ServerInfo.isRunning()) {log.info("已確認本節(jié)點={}就是controller...", ServerInfo.getServerId());Controller controller = ServerInfo.initController();}}  catch (Exception e) {log.error("連接controller或開始vote出現(xiàn)異常", e);System.exit(-1);}log.info("netty 啟動成功...");}@PreDestroypublic void shutdown() {if (ServerInfo.getOtherNodeIds().isEmpty()) {redisUtil.setCacheObject(Constant.RedisConstants.DTTASK_CONTROLLER, null);redisUtil.setCacheObject(Constant.RedisConstants.DTTASK_NODE_INFO, null);}try {ServerInfo.getServerChannel().close().sync();} catch (InterruptedException e) {log.error("dttask-server netty shutdown 出現(xiàn)異常", e);Thread.currentThread().interrupt();} finally {ServerInfo.getWorkerGroup().shutdownGracefully();ServerInfo.getBossGroup().shutdownGracefully();}}
}

1.2.8 ServerClientChannelHandler -- 完成投票等消息處理

前面ServerClientChannelHandler只是完成了基本框架,代碼較少,現(xiàn)在要在這里添加處理每個消息的邏輯,放心,代碼也不多,因為我們已經(jīng)對消息處理進行了拆分,只要加入一個策略選擇就可以。

這里不得不感嘆一下:策略模式的功能,否則這里將會有一大堆if else。

當節(jié)點與節(jié)點的通信斷開時,會觸發(fā)channelInactive和exceptionCaught方法,我們需要在這里處理斷開的業(yè)務(wù)邏輯,注意斷開的業(yè)務(wù)邏輯需要判斷斷開的是Controller還是Follower,這里的處理邏輯不同。

@Slf4j
public class ServerClientChannelHandler extends SimpleChannelInboundHandler<DttaskMessage> {private NetworkService networkService;private MessageServiceManager messageServiceManager;private RedisUtil redisUtil;public ServerClientChannelHandler(NetworkService networkService, RedisUtil redisUtil, MessageServiceManager messageServiceManager) {super();this.networkService = networkService;this.redisUtil = redisUtil;this.messageServiceManager = messageServiceManager;}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("channelActive={}", ctx.channel().id());}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, DttaskMessage message) throws Exception {log.info("收到客戶端的請求:{}", message);messageServiceManager.chooseMessageService(message.getType()).execute(ctx, message);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.warn("channelInactive...");stopChannel(ctx, null);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.warn("exceptionCaught...", cause);stopChannel(ctx, cause);}private void stopChannel(ChannelHandlerContext ctx, Throwable cause) {Channel channel = ctx.channel();long localServerId = ServerInfo.getServerId();Long serverId = ServerInfo.getServerIdByChannelId(ctx.channel().id());if (serverId == null) {return;}if (cause != null) {log.error("nodeId={}與本節(jié)點id={}通信出現(xiàn)異常", serverId, localServerId, cause);} else {log.error("nodeId={}與本節(jié)點id={}通信失效", serverId, localServerId);}if (channel.isActive()) {channel.close();}// 判斷下線的是follower 還是 controllerNodeInfo nodeInfo = ServerInfo.getNodeInfo(serverId);if (!nodeInfo.getServerRole().isController()) {log.info("下線的是follower,id={}", serverId);Set<Channel> otherNodeChannels = ServerInfo.getOtherNodeChannel(serverId);for (Channel otherNodeChannel : otherNodeChannels) {otherNodeChannel.writeAndFlush(DttaskMessage.buildNodeOfflineMessage(serverId));}ServerInfo.removeChannel(channel.id());ServerInfo.refreshRedisNodeInfo();} else {log.info("下線的是controller,id={}", serverId);redisUtil.setCacheObject(Constant.RedisConstants.DTTASK_CONTROLLER, null);ServerInfo.removeChannel(channel.id());long minNodeId = ServerInfo.getMinNodeId();if (minNodeId != localServerId) {// 重新選舉networkService.startVote(serverId, minNodeId);} else {// 當前就只剩自己一個節(jié)點ServerInfo.setStatus(ServerStatus.RUNNING);Controller controller = ServerInfo.initController();}ServerInfo.refreshRedisNodeInfo();}}
}

2. 驗證

2.1 建立3個節(jié)點的配置

注意:Server.port需要是不一樣的

注意:節(jié)點的serverId也要和serverInfo匹配

2.2 idea建立針對3個配置的啟動Service

2.3 依次啟動驗證

可以按照本文最起那面的 節(jié)點詳細交互圖的 步驟進行測試。

  • 確保redis,mysql都已ok
  • 依次啟動3個節(jié)點,可以看到1為controller,2 3為follower,redis的key --- 完成選舉

  • 下線3號節(jié)點

  • 下線1號節(jié)點

2號節(jié)點稱為Controller

  • 上線1號節(jié)點、3號節(jié)點

所有節(jié)點啟動完成,這時2是C、1 3是F

  • 停止2號節(jié)點

至此完成了所有驗證

http://www.risenshineclean.com/news/51796.html

相關(guān)文章:

  • 做網(wǎng)站維護的收入怎么確認做專業(yè)搜索引擎優(yōu)化
  • 湖南建設(shè)廳網(wǎng)站二建注銷推廣代理平臺登錄
  • web網(wǎng)站做二級標題是什么意思網(wǎng)絡(luò)廣告策劃書模板范文
  • 泰安哪里可以做網(wǎng)站河南網(wǎng)站推廣優(yōu)化
  • 燕郊網(wǎng)站建設(shè)社群營銷平臺有哪些
  • vue做的網(wǎng)站有什么徐州網(wǎng)站優(yōu)化
  • 不會被封的網(wǎng)站誰做搜索優(yōu)化seo
  • 宣傳片拍攝合同模板杭州百度快照優(yōu)化公司
  • 做網(wǎng)站銷售大概多少錢色盲測試圖片
  • 中文 域名的網(wǎng)站seo網(wǎng)站搭建是什么
  • asp c 網(wǎng)站開發(fā)百度發(fā)視頻步驟
  • 網(wǎng)站 設(shè)計要求營銷咨詢公司
  • 投票活動網(wǎng)站怎么做搜索引擎廣告圖片
  • 佛山建設(shè)外貿(mào)網(wǎng)站seo技術(shù)自學
  • 山東新華電腦學院學網(wǎng)站開發(fā)如何制作一個自己的網(wǎng)頁網(wǎng)站
  • 網(wǎng)站制作的報價大約是多少香港疫情最新情況
  • 寧波seo網(wǎng)站建設(shè)費用企業(yè)推廣平臺
  • o2o商城網(wǎng)站建設(shè)供應(yīng)可以直接進入的輿情網(wǎng)站
  • 手機app設(shè)計網(wǎng)站沈陽seo搜索引擎
  • 網(wǎng)站域名備案密碼seo產(chǎn)品優(yōu)化免費軟件
  • 買2g 空間做下載網(wǎng)站網(wǎng)頁制作軟件dw
  • 網(wǎng)站建設(shè)中的需求報告功能企業(yè)策劃書
  • 網(wǎng)站banner圖的作用公司網(wǎng)站與推廣
  • seo排名技術(shù)教程seo排名軟件價格
  • 做網(wǎng)站好的網(wǎng)站建設(shè)公司排名青島百度推廣優(yōu)化怎么做的
  • 網(wǎng)站域名查詢地址做百度推廣銷售怎么找客戶
  • 中山市小欖新意網(wǎng)站設(shè)計有限公司太原網(wǎng)站建設(shè)制作
  • wordpress geogebraseo刷排名工具
  • 做本地網(wǎng)站需要的軟件網(wǎng)盤資源共享群吧
  • 淘寶上做網(wǎng)站排名的是真的嗎口碑營銷案例2022