優(yōu)化網(wǎng)站作用永久免費(fèi)域名申請(qǐng)
前言
據(jù)上一篇,設(shè)計(jì)方案的分析,綜合考慮,最終決定,客戶(hù)端采用agent方案實(shí)現(xiàn),具本原因不再贅述,
感覺(jué)興趣的小伙伴可以回頭了解一下.該篇主要講java agent的實(shí)現(xiàn),灰度agent客戶(hù)端的基礎(chǔ)框架實(shí)現(xiàn)
java agent的介紹
java Agent 是一種允許你在 Java 應(yīng)用程序啟動(dòng)時(shí)或運(yùn)行時(shí)修改其字節(jié)碼的技術(shù)。
它通過(guò) JVM 提供的 java.lang.instrument 包來(lái)實(shí)現(xiàn),可以用于各種用途,
如性能監(jiān)控、代碼增強(qiáng)、AOP(面向切面編程)、日志記錄等。
Java Agent 的核心功能是能夠在類(lèi)加載到 JVM 之前對(duì)其進(jìn)行修改,
而無(wú)需修改應(yīng)用程序的源代碼。啟動(dòng)時(shí)代理(Premain Agent):在 JVM 啟動(dòng)時(shí)加載的代理。你需要通過(guò) -javaagent 參數(shù)指定代理的 JAR 文件路徑,并提供一個(gè) premain 方法作為入口點(diǎn)。
public static void premain(String agentArgs, Instrumentation inst) {// 在這里注冊(cè) ClassFileTransformer 或執(zhí)行其他初始化邏輯
}
基礎(chǔ)實(shí)現(xiàn)
agent的實(shí)現(xiàn)的基礎(chǔ),是基于開(kāi)源項(xiàng)目skywalking的插件核心實(shí)現(xiàn),沒(méi)必要重復(fù)造輪子,要做的是專(zhuān)注
去實(shí)現(xiàn)灰度相關(guān)的功能.
agent端 http服務(wù)實(shí)現(xiàn)
為了方便灰度管理端,給agent端實(shí)時(shí)推送數(shù)據(jù),agent端,基于netty實(shí)現(xiàn)單簡(jiǎn)的http服務(wù),; 會(huì)提供接口,供服務(wù)端對(duì)實(shí)例狀態(tài)的控制,以及其它服務(wù)實(shí)例狀態(tài)的推送;實(shí)例在收發(fā)消息或路由時(shí),依賴(lài)這些信息作相應(yīng)處理。
如果不太了解這塊功能的作用,可以去第一篇看一下整個(gè)系統(tǒng)的架構(gòu)圖;下需要agent http server代碼片段
SimpleChannelInboundHandler agentServerHttpInboundHandler = new AgentServerHttpInboundHandler();
agentChannelInitializer = new AgentServerChannelInitializer(agentServerHttpInboundHandler);
Thread serverThread = new Thread(new RunnableWithExceptionProtection(() -> {LOGGER.info("dbcat gray agent netty server.");ServerBootstrap serverBootstrap = new ServerBootstrap();bossGroup = new NioEventLoopGroup(1, new DefaultNamedThreadFactory("server-boss"));workerGroup = new NioEventLoopGroup(4, new DefaultNamedThreadFactory("server-worker"));serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(agentChannelInitializer);int port = Config.Agent.SERVER_PORT;try {ChannelFuture f = serverBootstrap.bind(port).sync();LOGGER.info("dbcat gray agent server started, port is {}.", port);f.channel().closeFuture().sync();LOGGER.info("dbcat gray agent server closed, port is {}.", port);} catch (InterruptedException e) {LOGGER.error("dbcat gray agent server start failed", e);Thread.currentThread().interrupt();}
}, t -> {
}), "gray-agent-server");
serverThread.setDaemon(true);
serverThread.start();
agent端api接口
上面實(shí)現(xiàn)agent端的http服務(wù),灰度服務(wù)端如果需要控制agent客戶(hù)端,必需還要提供api接口,才能進(jìn)行交互
http 接口實(shí)現(xiàn)片段
public class AgentServerHttpInboundHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private static final ILog log = LogManager.getLogger(AgentServerHttpInboundHandler.class);private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().create();@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {//收到http請(qǐng)求QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.uri());String requestUri = queryStringDecoder.rawPath();String method = request.method().name();if (!"POST".equals(method.toUpperCase())) {complete(ctx, request, RestResult.buildFailure(HttpResponseStatus.METHOD_NOT_ALLOWED.toString(),HttpResponseStatus.METHOD_NOT_ALLOWED.code()));return;}//通過(guò)url配置接口對(duì)應(yīng)用實(shí)現(xiàn)Endpoint endpoint = EndpointManager.getEndpoint(requestUri);if (endpoint == null) {complete(ctx, request, RestResult.buildFailure(HttpResponseStatus.NOT_FOUND.toString(),HttpResponseStatus.NOT_FOUND.code()));return;}Throwable throwable = null;RestResult result = null;try {ByteBuf content = request.content();byte[] requestBodyBytes = new byte[content.readableBytes()];content.readBytes(requestBodyBytes);String requestBodyString = new String(requestBodyBytes, "UTF-8");ParameterizedType parameterizedType = ((ParameterizedType) endpoint.getClass().getGenericInterfaces()[0]);Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();Object data = GSON.fromJson(requestBodyString,actualTypeArguments[0]);//調(diào)用相應(yīng)的接口result = endpoint.invoke(data);} catch (Throwable e) {log.error(String.format("接口響應(yīng)異常%s", requestUri), e);throwable = e;} finally {if (throwable == null) {complete(ctx, request, result);} else {complete(ctx, request, RestResult.buildFailure(throwable.getMessage(),HttpResponseStatus.INTERNAL_SERVER_ERROR.code()));}}}}
Endpoint api 接口類(lèi),通過(guò)spi 方式加載實(shí)現(xiàn)
public class EndpointManager {private static final Map<String, Endpoint> ENDPOINT_INVOKER_MAP = new HashMap<>();static {for (final Endpoint endpointInvoker : ServiceLoader.load(Endpoint.class, AgentClassLoader.getDefault())) {ENDPOINT_INVOKER_MAP.put(endpointInvoker.path(), endpointInvoker);}}public static Endpoint<?, ?> getEndpoint(String requestUri) {return ENDPOINT_INVOKER_MAP.get(requestUri);}}
Endpoint 的實(shí)某個(gè)實(shí)現(xiàn),如更新實(shí)例狀態(tài)接口
public class EnvStatusUpdateEndpoint implements Endpoint<InstanceEnvUpdateRequest, List<ExecuteResponse>> {private static final ILog log = LogManager.getLogger(EnvStatusUpdateEndpoint.class);@Overridepublic String path() {return "/env-status/update";}@Overridepublic RestResult<List<ExecuteResponse>> invoke(InstanceEnvUpdateRequest request) throws Exception {log.info("更新實(shí)例信息");ServerInstance.getInstance().setEnvStatus(request.getEnvStatus());List<ExecuteResponse> executeResponses = MQConnectionManager.restartAll(request.getStrategy());return RestResult.buildSuccess(executeResponses);}
}
到此,agent客戶(hù)端的基礎(chǔ)框加已實(shí)現(xiàn),下一篇將開(kāi)始介紹相關(guān)核心組件灰度的增強(qiáng)實(shí)現(xiàn)
硬廣
最后,不要臉給大家安利一款mysql監(jiān)控軟件: 安裝方便,消耗低,可視化,傻瓜式操作,可以監(jiān)控慢日志詳情、cpu、內(nèi)存、連接數(shù)、tps 等信息
體驗(yàn)演示
下載地址