濟(jì)南做網(wǎng)站最好的公司鎮(zhèn)江網(wǎng)站制作公司
之前寫的兩篇關(guān)于基于 trace_id
的鏈路追蹤的文章:
- 基于trace_id的鏈路追蹤(含F(xiàn)eign、Hystrix、線程池等場景)
- 基于trace_id的鏈路追蹤(ForkJoinPool場景)
一、引言
在之前的文章中,我們討論了基于 trace_id
的鏈路追蹤的常見場景。然而,最近我意識到在微服務(wù)架構(gòu)中,我們還缺少對一個非常常見場景的探討:在網(wǎng)關(guān)中如何處理 trace_id
,尤其是在 Reactor 異步模式下的處理。因此,我決定記錄下這些思考和解決方案。
二、具體場景
在Spring Cloud Gateway網(wǎng)關(guān)中,我們需要實(shí)現(xiàn)請求訪問日志的打印功能,以便更好地排查問題。具體的實(shí)現(xiàn)方式包括兩個全局過濾器:
- TraceIdGlobalFilter:實(shí)現(xiàn)
trace_id
全局?jǐn)r截(先執(zhí)行)。 - AccessLogGlobalFilter:實(shí)現(xiàn)請求訪問日志的打印(后執(zhí)行)。
在正常情況下,這兩個過濾器可以打印請求的 request
日志和 response
日志,并且日志中都包含相同的 trace_id
。然而,在開發(fā)調(diào)試過程中,我發(fā)現(xiàn)了一種異常情況:request
日志中總能打印出 trace_id
,而 response
日志中則有時能打印出 trace_id
,有時卻不能。這導(dǎo)致了 request
日志和 response
日志無法關(guān)聯(lián)的問題。
三、分析
1. 為什么 response
日志沒有打印 trace_id
?
通過分析日志,我發(fā)現(xiàn)打印 response
日志的線程與打印 request
日志的線程并不是同一個線程?;诖?#xff0c;我們可以判斷,trace_id
沒有傳遞到打印 response
日志的線程中。
2. 為什么 trace_id
沒有傳遞到打印 response
日志的線程中?
我們知道 Spring Cloud Gateway 是基于 WebFlux Reactor 異步模式實(shí)現(xiàn)的,因此一個請求的 request
和 response
可能由不同的線程來執(zhí)行。在 TraceIdGlobalFilter
中,我們使用了 MDC來傳遞 trace_id
。然而,MDC 在普通的多線程環(huán)境中有效,但在 Reactor 異步模式下并不起作用。這是因?yàn)?Reactor 異步模式需要通過另外一種方式來傳遞 trace_id
。
四、解決方案
在 WebFlux Reactor 異步模式下,我們需要使用 reactor.util.context.Context
來傳遞 trace_id。核心邏輯如下:
透傳 trace_id
: 通過 Mono.contextWrite(context)
往 context 中設(shè)置 trace_id
。
取出 trace_id
: 通過 Flux.deferContextual(context)
從 context 中獲取 trace_id
。
具體實(shí)現(xiàn)代碼示例如下:
// 設(shè)置 trace_id
Mono.contextWrite(context -> context.put("trace_id", traceId));// 獲取 trace_id
Flux.deferContextual(context -> {String traceId = context.get("trace_id");// 可將 traceId 設(shè)置到MDC中供當(dāng)前線程使用return Flux.just(traceId);
});
通過這種方式,我們可以確保 trace_id
在整個請求處理鏈路中都能被正確傳遞和使用,解決了 request
日志和 response
日志斷聯(lián)的問題。
五、具體代碼
TraceIdGlobalFilter
/*** trace_id 全局?jǐn)r截器*/
@Slf4j
@Component
public class TraceIdGlobalFilter implements GlobalFilter, Ordered {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();String traceId = request.getHeaders().getFirst(TraceConsts.TRACE_ID);// trace_idtraceId = MdcUtil.attachTraceId(traceId);// 將traceId傳遞給下游微服務(wù)String finalTraceId = traceId;Consumer<HttpHeaders> headersConsumer = httpHeaders -> {httpHeaders.set(TraceConsts.TRACE_ID, finalTraceId);};ServerHttpRequest requestNew = exchange.getRequest().mutate().headers(headersConsumer).build();return chain.filter(exchange.mutate().request(requestNew).build()).doFinally(s -> {// 清除MDCMdcUtil.detachTraceId();});}@Overridepublic int getOrder() {return -100;}}
AccessLogGlobalFilter
/*** 請求訪問日志 全局?jǐn)r截器*/
@Slf4j
@Component
public class AccessLogGlobalFilter implements GlobalFilter, Ordered {/*** gateway access log 日志開關(guān)* <p>* 特別注意:高并發(fā)業(yè)務(wù)場景下,可以關(guān)閉日志來提升性能*/@Value("${com.gateway.access.log.enabled:true}")private boolean logEnabled;private final HandlerStrategies handlerStrategies = HandlerStrategies.withDefaults();@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {StopWatch stopWatch = new StopWatch();stopWatch.start();ServerHttpRequest httpRequest = exchange.getRequest();// 日志開關(guān),直接進(jìn)入下一個Filterif (!logEnabled) {return chain.filter(exchange).then(Mono.fromRunnable(() -> {stopWatch.stop();// 為了方便排查問題,還是打印一個簡單的日志if (log.isDebugEnabled()) {log.debug("請求參數(shù) [{}] [{}] query:{}, time: {} ms", httpRequest.getURI().getPath(), httpRequest.getMethod(), httpRequest.getURI().getRawQuery(), stopWatch.getTotalTimeMillis());}}));}// Request 處理ServerRequest request = ServerRequest.create(exchange, handlerStrategies.messageReaders());// header 參數(shù)HttpHeaders httpHeaders = request.headers().asHttpHeaders();// 是否為文件上傳,若是文件上傳,則不打印bodyboolean isFile = null != httpHeaders.getContentType() && AccessLogUtil.isBinayBodyData(httpHeaders.getContentType().toString());// response 包裝ServerHttpResponseDecorator responseDecorator = responseDecoratorAndRecordLog(exchange, stopWatch);if (isFile) {// 打印請求日志this.reqLog(request, isFile, null);// 執(zhí)行過濾器return chain.filter(exchange.mutate().request(request.exchange().getRequest()).response(responseDecorator).build())// 從最初的Mono本身解析一個值,并將其放入上下文context中,以便下游可以通過上下文context API訪問它// webflux reactor 異步模式下:通過 contextWrite 往context中設(shè)置trace_id.contextWrite(context -> context.put(TraceConsts.TRACE_ID, MdcUtil.getTraceId()));}Mono<String> modifiedBody = request.bodyToMono(String.class).defaultIfEmpty(CommonConsts.NULL).flatMap(body -> {// 打印請求日志this.reqLog(request, isFile, body);return Mono.just(body);});// 通過 BodyInserter 插入 body(支持修改body), 避免 request body 只能獲取一次// BodyInserters.fromPublisher 不支持文件上傳,所以不能用BodyInserter<Mono<String>, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);HttpHeaders headers = new HttpHeaders();headers.putAll(exchange.getRequest().getHeaders());headers.remove(HttpHeaders.CONTENT_LENGTH);CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {// request 包裝ServerHttpRequestDecorator requestDecorator = requestDecorator(exchange, headers, outputMessage);// 執(zhí)行過濾器return chain.filter(exchange.mutate().request(requestDecorator).response(responseDecorator).build())// 從最初的Mono本身解析一個值,并將其放入上下文context中,以便下游可以通過上下文context API訪問它// webflux reactor 異步模式下:通過 contextWrite 往context中設(shè)置trace_id.contextWrite(context -> context.put(TraceConsts.TRACE_ID, MdcUtil.getTraceId()));}));}@Overridepublic int getOrder() {return -90;}/*** 打印 request log*/private void reqLog(ServerRequest request, boolean isFile, String body) {// URL query 參數(shù)String queryString = request.uri().getRawQuery();// header 參數(shù)HttpHeaders headers = request.headers().asHttpHeaders();String headersParams = headersToString(headers);if (isFile) {if (log.isInfoEnabled()) {log.info("請求參數(shù) [{}] [{}] query:{}, headers:{}", request.uri().getPath(), request.methodName(), queryString, headersParams);}return;}// request body 長度處理,避免太長,打印耗性能String requestBody = AccessLogUtil.fixFieldAndReplaceWhite(body, AccessLogUtil.DEF_MAX_LEN);if (log.isInfoEnabled()) {log.info("請求參數(shù) [{}] [{}] query:{}, headers:{}, body:{}", request.uri().getPath(), request.methodName(), queryString, headersParams, requestBody);}}/*** 過濾headers,避免打印過多的日志*/private String headersToString(HttpHeaders headers) {Map<String, String> map = new HashMap<String, String>();for (Map.Entry<String, List<String>> entry : headers.entrySet()) {if (RequestParamUtil.containsHeader(entry.getKey())) {map.put(entry.getKey(), entry.getValue().toString());}}return JSON.toJSONString(map);}/*** Request裝飾器,重新計算 headers*/private ServerHttpRequestDecorator requestDecorator(ServerWebExchange exchange, HttpHeaders headers,CachedBodyOutputMessage outputMessage) {return new ServerHttpRequestDecorator(exchange.getRequest()) {@Overridepublic HttpHeaders getHeaders() {long contentLength = headers.getContentLength();HttpHeaders httpHeaders = new HttpHeaders();httpHeaders.putAll(super.getHeaders());if (contentLength > 0) {httpHeaders.setContentLength(contentLength);} else {httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");}return httpHeaders;}@Overridepublic Flux<DataBuffer> getBody() {return outputMessage.getBody();}};}/*** Response裝飾器,記錄響應(yīng)日志* <p>* 通過 DataBufferFactory 解決響應(yīng)體分段傳輸問題。*/private ServerHttpResponseDecorator responseDecoratorAndRecordLog(ServerWebExchange exchange, StopWatch stopWatch) {ServerHttpResponse response = exchange.getResponse();DataBufferFactory bufferFactory = response.bufferFactory();return new ServerHttpResponseDecorator(response) {@Overridepublic Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {stopWatch.stop();if (!(body instanceof Flux)) {return super.writeWith(body);}// 獲取響應(yīng)類型String responseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);if (AccessLogUtil.isBinayBodyData(responseContentType)) {if (log.isInfoEnabled()) {log.info("響應(yīng)參數(shù): time {} ms", stopWatch.getTotalTimeMillis());}return super.writeWith(body);}// info及以上日志級別才做如下處理if (log.isInfoEnabled()) {Flux<? extends DataBuffer> fluxBody = Flux.from(body).flatMap(dataBuffer -> Flux.deferContextual(context -> {// webflux reactor 異步模式下:通過 deferContextual 取出context中的trace_idMdcUtil.putTraceId(context.get(TraceConsts.TRACE_ID));if (log.isDebugEnabled()) {log.debug("spring cloud gateway webflux reactor 異步模式下,透傳trace_id: {}", MdcUtil.getTraceId());}return Flux.just(dataBuffer);})).doFinally(signalType -> {// 清理掉trace_idMdcUtil.removeTraceId();});return super.writeWith(fluxBody.buffer().map(dataBuffers -> {// 合并多個流集合,解決返回體分段傳輸DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();DataBuffer join = dataBufferFactory.join(dataBuffers);byte[] content = new byte[join.readableByteCount()];join.read(content);// 釋放掉內(nèi)存DataBufferUtils.release(join);String responseBody = new String(content, StandardCharsets.UTF_8);// response body 長度處理,避免太長,打印耗性能responseBody = AccessLogUtil.fixFieldAndReplaceWhite(responseBody, AccessLogUtil.DEF_MAX_LEN);log.info("響應(yīng)參數(shù): {}, time {} ms", responseBody, stopWatch.getTotalTimeMillis());return bufferFactory.wrap(content);}));}return super.writeWith(body);}};}
}