快捷搜索:  汽车  科技

分布式链路追踪技术选型(分布式系统链路监控实践)

分布式链路追踪技术选型(分布式系统链路监控实践)基于雪花算法实现的分布式唯一ID,组成方式为:- SnowFlake IDUUID一共36bit,组成方式为8-4-4-4-12。优点是java类库提供API,使用简单,但存在一定性能与安全问题,如造成MYSQL页分裂等。- 数据库自增主键使用自增主键。优点同样是使用方便,但频繁操作数据库会产生性能瓶颈。

随着分布式与微服务的发展,系统复杂度指数式上升,对服务的鲁棒性要求也日渐增高。本文采取自研(代码可控,符合需求的情况下)的方式,探究分布式服务监控中的链路监控,并介绍相关开源产品。

文章第一部分介绍目前分布式ID主流生成方案,作为链路traceId,并基于数据库 号段模式进行实现。

后续部分进入链路监控,探究如何保证分布式系统的可观测性(Observability)与可控制性(Controllability) 以实现 “可以由其外部输出推断其内部状态的程度”。本部分主要采用transmittableThreadlocal、MDC等实现。

一、分布式唯一ID1.1 常用方式

- UUID

UUID一共36bit,组成方式为8-4-4-4-12。优点是java类库提供API,使用简单,但存在一定性能与安全问题,如造成MYSQL页分裂等。

- 数据库自增主键

使用自增主键。优点同样是使用方便,但频繁操作数据库会产生性能瓶颈。

- SnowFlake ID

基于雪花算法实现的分布式唯一ID,组成方式为:

分布式链路追踪技术选型(分布式系统链路监控实践)(1)

符号位一般不变,时间戳与序列号保证递增,时间戳单位为ms,序列号位数表示一毫秒内可产生的序列数,工作进程位表示机器描述标识,可以通过zk持久顺序节点实现。雪花算法优点是性能较高,但存在时钟回拨问题,可能产生重复ID。

- MongoDB ObjectID

类似于SnowFlake ID,组成结构为:

分布式链路追踪技术选型(分布式系统链路监控实践)(2)

包含了时间戳、机器描述、进程号、自增序列。

1.2 自研实现

综合自身业务以及系统的可控性,这里我们使用数据库 号段模式进行实现。 核心流程:

1.2.1 创建数据表

表结构:

CREATE TABLE `id_factory_config` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id' `business_code` varchar(60) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '业务编码 作为语义化前缀' `init_value` bigint(13) DEFAULT NULL COMMENT '初始值' `current_start` bigint(20) DEFAULT NULL COMMENT '本次id起始值' `current_threshold` bigint(20) DEFAULT NULL COMMENT '本次id段阈值' `step` int(11) DEFAULT NULL COMMENT '步长,可动态调整' `version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号' `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间' `create_user` varchar(60) DEFAULT NULL COMMENT '创建用户' `update_user` varchar(60) DEFAULT NULL COMMENT '修改用户' PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

对应entity:

@TableName("id_factory_config") @Data public class IdFactoryConfig { @TableId(type = IdType.AUTO) private Integer id; private String businessCode; private Long initValue; private Long currentStart; private Long currentThreshold; private Integer step; private Integer version; private Date createTime; private Date updateTime; private String createUser; private String updateUser; }

各字段含义,参考字段注释与后续代码。

对应mapper:

/** * @author WinsonWu * @create 2022-08-26 9:39 * @desc 分布式id配置查询 * **/ @Mapper public interface IdFactoryConfigMapper extends BaseMapper<IdFactoryConfig> { @Select("select * from id_factory_config") List<IdFactoryConfig> selectAll(); @Select("select * from id_factory_config where business_code=#{businessCode} limit 1 for update") IdFactoryConfig selectOneForUpdate(@Param("businessCode") String businessCode); @Update("UPDATE id_factory_config set current_threshold=#{currentThreshold} current_start=#{currentStart} version=version 1 where id=#{id} and version=#{version}") Integer updateCurrentThreshold(@Param("currentThreshold") long currentThreshold @Param("currentStart") long currentStart @Param("id") int id @Param("version") int version); }

数据库配置:

分布式链路追踪技术选型(分布式系统链路监控实践)(3)

1.2.2 代码实现1.2.2.1 接口定义

/** * @Description: ID创建工厂 * @Author: WinsonWu * @Date: 2022/8/25 16:38 **/ public interface IdFactory { /** * get distributed sequence id * @return id */ String getSeqId(String businessCode); }1.2.2.2 接口实现

实现思路:

->系统初始化加载id段

-> CAS更新数据库

-> 获取id

-> 提前异步更新

-> 过载兜底保护

/** * @Description: ID工厂实现 * @Author: WinsonWu * @Date: 2022/8/25 16:41 **/ @Slf4j @Component public class SimpleIdFactory implements IdFactory { /** * 序列字典 */ private static Map<String LocalSeqId> localSeqMap; @Resource private IdFactoryConfigMapper idFactoryConfigMapper; @Autowired @Qualifier("poolForUpdate") private ThreadPoolExecutor poolForUpdate; @PostConstruct public void initFactoryConfig() { // 查询全量配置 List<IdFactoryConfig> idFactoryConfigs = idFactoryConfigMapper.selectAll(); List<IdFactoryConfig> refreshList = new ArrayList<>(); for (IdFactoryConfig idFactoryConfig : idFactoryConfigs) { // 重启时抛弃旧数据 refreshList.add(updateFactoryConfig(idFactoryConfig)); } localSeqMap = new ConcurrentHashMap<>(refreshList.size()); for (IdFactoryConfig idFactoryConfig : refreshList) { LocalSeqId localSeqId = new LocalSeqId(); localSeqId.setCurrentThreshold(new AtomicLong(idFactoryConfig.getCurrentThreshold())); localSeqId.setCurrentId(new AtomicLong(idFactoryConfig.getCurrentStart())); localSeqId.setStep(idFactoryConfig.getStep()); localSeqId.setPrefix(idFactoryConfig.getBusinessCode()); localSeqMap.put(idFactoryConfig.getBusinessCode() localSeqId); } } /** * @param idFactoryConfig * @return */ private IdFactoryConfig updateFactoryConfig(IdFactoryConfig idFactoryConfig) { int updateResult; int retryCount = 0; while (true) { // permit CAS 10 times if (retryCount >= 10) { //todo 告警 log.error("retry too much!"); return null; } try { IdFactoryConfig newIdFactoryConfig = idFactoryConfigMapper.selectOneForUpdate(idFactoryConfig.getBusinessCode()); long currentThreshold = idFactoryConfig.getCurrentThreshold(); long step = idFactoryConfig.getStep(); updateResult = idFactoryConfigMapper.updateCurrentThreshold(currentThreshold step currentThreshold newIdFactoryConfig.getId() newIdFactoryConfig.getVersion()); if (updateResult > 0) { return newIdFactoryConfig; } else { retryCount ; } } catch (Exception e) { //todo 告警 log.error("error occurred: " e); } } } @Override public String getSeqId(String businessCode) { // 直接从本地缓存中提取id数据 final LocalSeqId localSeqId = localSeqMap.get(businessCode); if (Objects.isNull(localSeqId)) { log.error("business code not exists at MEM" businessCode); // todo 告警 return null; } AtomicLong currentId = localSeqId.getCurrentId(); // 使用超过80%,则异步更新 //todo 也可以考虑添加一个缓冲区,互相备份缓冲 if (localSeqId.getCurrentThreshold().get() - currentId.get() < 0.2 * localSeqId.getStep()) { poolForUpdate.submit(() -> { int updateResult = -1; //如果更新失败,进行重试,五次仍然失败,则放弃 for (int i = 0; i < 5; i ) { IdFactoryConfig newIdFactoryConfig = idFactoryConfigMapper.selectOneForUpdate(businessCode); long currentThreshold = newIdFactoryConfig.getCurrentThreshold(); long currentStart = newIdFactoryConfig.getCurrentStart(); long step = newIdFactoryConfig.getstep(); updateResult = idFactoryConfigMapper.updateCurrentThreshold(currentThreshold step currentThreshold newIdFactoryConfig.getId() newIdFactoryConfig.getVersion()); if (updateResult > 0) { LocalSeqId newLocalSeqId = new LocalSeqId(); newLocalSeqId.setCurrentId(new AtomicLong(currentStart)); newLocalSeqId.setPrefix(businessCode); newLocalSeqId.setStep(newIdFactoryConfig.getStep()); newLocalSeqId.setCurrentThreshold(new AtomicLong(currentThreshold)); localSeqMap.put(businessCode newLocalSeqId); break; } } }); } // 过载保护 if (localSeqMap.get(businessCode).getCurrentId().get() >= localSeqMap.get(businessCode).getCurrentThreshold().get() - 1) { synchronized (this) { if (localSeqMap.get(businessCode).getCurrentId().get() >= localSeqMap.get(businessCode).getCurrentThreshold().get() - 1) { //阻塞更新数据库 int updateResult = -1; int retryCount = 0; //如果更新失败,进行重试 while(true) { if (retryCount >= 10){ // todo 告警 log.error("retry too much!"); } IdFactoryConfig newIdFactoryConfig = idFactoryConfigMapper.selectOneForUpdate(businessCode); long currentThreshold = newIdFactoryConfig.getCurrentThreshold(); long currentStart = newIdFactoryConfig.getCurrentStart(); long step = newIdFactoryConfig.getStep(); updateResult = idFactoryConfigMapper.updateCurrentThreshold(currentThreshold step currentThreshold newIdFactoryConfig.getId() newIdFactoryConfig.getVersion()); if (updateResult > 0) { LocalSeqId newLocalSeqId = new LocalSeqId(); newLocalSeqId.setCurrentId(new AtomicLong(currentStart)); newLocalSeqId.setPrefix(businessCode); newLocalSeqId.setStep(newIdFactoryConfig.getStep()); newLocalSeqId.setCurrentThreshold(new AtomicLong(currentThreshold)); localSeqMap.put(businessCode newLocalSeqId); break; }else { retryCount ; } } } } } String result = businessCode localSeqMap.get(businessCode).getCurrentId().getAndAdd(1); return result; } }

LocalSeqId:

@Data public class LocalSeqId { /** * 当前缓存id起始值 */ private AtomicLong currentId; /** * 达到这个阈值就需要进行更新 */ private AtomicLong currentThreshold; /** * 步长 */ private int step; /** * 前缀 */ private String prefix; }

实现优化点较多,如文章中包含的告警内容、id资源浪费,以及step的动态更新,具体可以参考leaf实现,这里附参考架构图:

分布式链路追踪技术选型(分布式系统链路监控实践)(4)

1.2.3 并发测试1.2.3.1 定义线程池

/** * @author winsonWu * @date 2022.08.26 * @desc */ @Configuration public class ThreadPoolUtil { /** * get core num */ private static int corePoolSize = Runtime.getRuntime().availableProcessors() * 3; /** * max thread num */ private static int maximumPoolSize = corePoolSize; /** * thread keep alive time */ private static long keepAliveTime = 1; /** * thread keep alive time unit */ private static TimeUnit unit = TimeUnit.HOURS; /** * use array blocking queue to avoid out of memory */ private static BlockingQueue<Runnable> queueForUpdate = new ArrayBlockingQueue<>(10); /** * use array blocking queue to avoid out of memory */ private static BlockingQueue<Runnable> queueForFuture = new ArrayBlockingQueue<>(500); /** * default thread factory */ private static ThreadFactory threadFactory = Executors.defaultThreadFactory(); /** * reject policy */ private static RejectedExecutionHandler handlerForUpdate = new ThreadPoolExecutor.DiscardPolicy(); /** * reject policy */ private static RejectedExecutionHandler handlerForFuture = new ThreadPoolExecutor.CallerRunsPolicy(); @Bean("poolForCompletableFuture") public ThreadPoolExecutor poolForCompletableFuture(){ return new ThreadPoolExecutor( corePoolSize maximumPoolSize keepAliveTime unit queueForFuture threadFactory handlerForFuture); } @Bean("poolForUpdate") public ThreadPoolExecutor poolForUpdate(){ return new ThreadPoolExecutor( corePoolSize maximumPoolSize keepAliveTime unit queueForUpdate threadFactory handlerForUpdate); } }1.2.3.2 单元测试代码

... @Autowired @Qualifier("poolForCompletableFuture") private ThreadPoolExecutor poolForCompletableFuture; @Resource private SimpleIdFactory simpleIdFactory; @Test public void testSeqId(){ StopWatch stopwatch = new StopWatch(); stopwatch.start("id生成器"); List<CompletableFuture<String>> futures = new ArrayList<>(10000); List<String> results = new ArrayList<>(10000); String businessCode = "order"; for (int i=0; i<10000; i ){ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> simpleIdFactory.getSeqId(businessCode) poolForCompletableFuture); futures.add(future); } CompletableFuture .allOf(futures.toArray(new CompletableFuture[futures.size()])) .whenComplete((v t)-> futures.forEach(eachFuture -> { results.add(eachFuture.getNow(null)); })).join(); // 去重 List<String> finalList =new ArrayList<>(10000); for(String each : results) { if (!finalList.contains(each)) finalList.add(each); } // 去重后,最终对象数量为10000 Assertions.assertEquals(finalList.stream().collect(Collectors.toSet()).size() 10000); stopwatch.stop(); System.out.println(stopwatch.prettyPrint()); }1.2.3.3 单元测试结果

分布式链路追踪技术选型(分布式系统链路监控实践)(5)

这里生成10000个id,耗时654ms。

1.3 开源实现

开源实现较多,我们这里介绍下gayhub stars较多且使用简单的美团leaf。

leaf实现了号段与雪花算法两种方式。号段模式使用了双buffer优化(提前加载id段),提升ID获取效率;雪花算法模式提供了时钟回拨校验(等待与报错告警),避免重复生成ID。下面提供架构图,方便理解。

号段模式架构图:

分布式链路追踪技术选型(分布式系统链路监控实践)(6)

雪花模式架构图:

分布式链路追踪技术选型(分布式系统链路监控实践)(7)

二、监控

分布式链路追踪技术选型(分布式系统链路监控实践)(8)

实现系统的可观测性(Observability)与可控制性(Controllability),主要包含三个方面——事件日志、链路追踪和聚合度量。

  • 日志(Logging):日志的职责是记录离散事件,通过这些记录事后分析出程序的行为,譬如曾经调用过什么方法,曾经操作过哪些数据,等等。打印日志被认为是程序中最简单的工作之一,调试问题时常有人会说“当初这里记得打点日志就好了”,可见这就是一项举手之劳的任务。输出日志的确很容易,但收集和分析日志却可能会很复杂,面对成千上万的集群节点,面对迅速滚动的事件信息,面对数以 TB 计算的文本,传输与归集都并不简单。对大多数程序员来说,分析日志也许就是最常遇见也最有实践可行性的“大数据系统”了。

常用的实现思路是通过异步线程收集,并持久化到存储介质,再通过搜索引擎进行索引,最终使用可视化界面进行展示搜索。常用的开源框架为Elastic Stack:

分布式链路追踪技术选型(分布式系统链路监控实践)(9)

  • 追踪(Tracing):单体系统时代追踪的范畴基本只局限于[栈追踪]
  • (en.wikipedia.org/wiki/Stack_… Tracing),调试程序时,在 IDE 打个断点,看到的 Call Stack 视图上的内容便是追踪;编写代码时,处理异常调用了 Exception::printStackTrace()方法,它输出的堆栈信息也是追踪。微服务时代,追踪就不只局限于调用栈了,一个外部请求需要内部若干服务的联动响应,这时候完整的调用轨迹将跨越多个服务,同时包括服务间的网络传输信息与各个服务内部的调用堆栈信息,因此,分布式系统中的追踪在国内常被称为“全链路追踪”(后文就直接称“链路追踪”了),许多资料中也称它为“分布式追踪”(Distributed Tracing)。追踪的主要目的是排查故障,如分析调用链的哪一部分、哪个方法出现错误或阻塞,输入输出是否符合预期,等等。

该模块我们使用transmittableThreadlocal、MDC针对各个系统与中间件定制化开发,也可以使用javaAgent ASM的方式进行无侵入监控。

  • 度量(Metrics):度量是指对系统中某一类信息的统计聚合。譬如,证券市场的每一只股票都会定期公布财务报表,通过财报上的营收、净利、毛利、资产、负债等等一系列数据来体现过去一个财务周期中公司的经营状况,这便是一种信息聚合。Java 天生自带有一种基本的度量,就是由虚拟机直接提供的 JMX(Java Management eXtensions)度量,诸如内存大小、各分代的用量、峰值的线程数、垃圾收集的吞吐量、频率,等等都可以从 JMX 中获得。度量的主要目的是监控(Monitoring)和预警(Alert),如某些度量指标达到风险阈值时触发事件,以便自动处理或者呼叫程序员。

对于系统而言,通过异步线程收集的数据,可以进行相关业务指标定义,通过批处理、流处理聚合,进行统一可视化展示或监控预警。

各类型产品介绍:

分布式链路追踪技术选型(分布式系统链路监控实践)(10)

2.1 链路追踪(tracing)

现代分布式链路追踪公认的起源是 Google 在 2010 年发表的论文《Dapper : a Large-Scale Distributed Systems Tracing Infrastructure》,这篇论文介绍了 Google 从 2004 年开始使用的分布式追踪系统 Dapper 的实现原理。此后,所有业界有名的追踪系统,无论是国外 Twitter 的Zipkin、Naver 的Pinpoint(Naver 是 Line 的母公司,Pinpoint 出现其实早于 Dapper 论文发表,在 Dapper 论文中还提到了 Pinpoint),抑或是国内阿里的鹰眼、大众点评的CAT、个人开源的SkyWalking(后进入 Apache 基金会孵化毕业)都受到 Dapper 论文的直接影响。

pinpoint追踪示例:

分布式链路追踪技术选型(分布式系统链路监控实践)(11)

2.1.1 实现思路

本文模拟环境为:

springboot网关应用 -> http应用 -> rpc应用 -> 消息队列

  • 网关应用到http使用httpclient拦截器实现;
  • http应用到rpc应用使用dubbo filter实现;
  • rpc应用到rocketMQ应用使用钩子函数实现;
2.1.2 实现代码2.1.2.1 http服务

构建基于threadlocal的上下文容器;

public class CommonRequestContext { private static final ThreadLocal<Map<Object Object>> requestContentMap = new TransmittableThreadLocal<Map<Object Object>>(){ @Override protected Map<Object Object> initialValue() { return new HashMap<>(); } @Override public Map<Object Object> copy(Map<Object Object> parentValue) { return parentValue != null ? new HashMap<>(parentValue) : null; } }; public static void put(Object key Object value) { requestContentMap.get().put(key value); } public static Object get(Object key){ return requestContentMap.get().get(key); } public static void clear() { requestContentMap.remove(); } }

这里我们使用阿里开源的TransmittableThreadLocal,方便父子线程传递,也解决了InheritableThreadLocal在线程池场景无法传递参数到子线程的问题。

TransmittableThreadLocal基于InheritableThreadLocal实现,但通过快照和holder管理了父子线程的变量。

构建httpclient:

/** * initialize HttpClient * */ @Configuration @ConditionalOnProperty(value = "ins.httpclient.enabled" matchIfMissing = true) @EnableConfigurationProperties(HttpClientCfg.class) public class HttpClientAutoConfiguration { @ConditionalOnMissingBean(CloseableHttpClient.class) @Bean(name = "defaultHttpClient" destroyMethod = "close") public CloseableHttpClient apacheHttpClient(HttpClientCfg cfg) { RequestConfig config = RequestConfig.custom() .setConnectTimeout(cfg.getConnectTimeout()) .setSocketTimeout(cfg.getSoTimeout()) .build(); HttpClientBuilder builder = HttpClients.custom() .addInterceptorFirst((HttpRequest request HttpContext context) -> { request.addHeader("traceId" String.valueOf(CommonRequestContext.get(RequestContentConstants.COMMON_REQUEST))); }) .setDefaultRequestConfig(config); if (StringUtils.isNotEmpty(cfg.getUserAgent())) { builder.setUserAgent(cfg.getUserAgent()); } builder.setConnectionTimeToLive(cfg.getConnTimeToLive() TimeUnit.MILLISECONDS); builder.setMaxConnTotal(cfg.getMaxTotalConnections()).setMaxConnPerRoute(cfg.getDefaultMaxConnectionsPerHost()); return builder.build(); } @DependsOn("defaultHttpClient") @Bean public HttpClientHelper initHttpClientHelper(CloseableHttpClient httpClient) { return HttpClientHelper.setHttpClient(httpClient); } }

可以看到这段代码将traceId通过header传递到了http服务:

... .addInterceptorFirst((HttpRequest request HttpContext context) -> { request.addHeader("traceId" String.valueOf(CommonRequestContext.get(RequestContentConstants.COMMON_REQUEST))); ...

接下来我们通过filter来接受并解析traceId:

/** * @Description: TODO * @Author: WinsonWu * @Date: 2022/8/27 12:22 **/ @Slf4j public class TracingFilter implements Filter { @Override public void doFilter(ServletRequest servletRequest ServletResponse servletResponse FilterChain filterChain) throws IOException ServletException { HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest; String traceId = httpServletRequest.getHeader("traceId"); MDC.put("traceId" traceId); filterChain.doFilter(servletRequest servletResponse); } }

注册我们的filter:

@Configuration public class WebConfig implements WebMvcConfigurer { @Bean public TracingFilter channelFilter(){ return new TracingFilter(); } }

添加测试controller:

/** * @Description: TODO * @Author: WinsonWu * @Date: 2022/8/27 12:00 **/ @RestController("/") @Slf4j public class TestController { @RequestMapping("tracing") public String helloTracing(){ log.info("trace id is coming"); return "hello tracing"; } }

logback配置:

<?xml version="1.0" encoding="UTF-8"?> <configuration debug="false"> <property name="logPath" value="/usr/local/simulation/log"/> <!-- 控制台输出 --> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符--> <pattern> %X{traceId} %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> </encoder> </appender> <!-- 按照每天生成日志文件 --> <appender name="FILEINFOLOG" class="ch.qos.logback.core.rolling.RollingFileAppender"> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>ERROR</level> <!--过滤掉error的级别--> <onMatch>DENY</onMatch> <onMismatch>ACCEPT</onMismatch> </filter> <encoder> <pattern> %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n </pattern> </encoder> <!--滚动策略--> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--路径--> <fileNamePattern>${logPath}/simulation_info.%d.log</fileNamePattern> </rollingPolicy> </appender> <appender name="FILEERRORLOG" class="ch.qos.logback.core.rolling.RollingFileAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>ERROR</level> <!--//打印error--> </filter> <encoder> <pattern> %X{traceId} %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n </pattern> </encoder> <!--滚动策略--> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--路径--> <fileNamePattern>${logPath}/simulation_error.%d.log</fileNamePattern> </rollingPolicy> </appender> <!--myibatis log configure--> <logger name="com.apache.ibatis" level="TRACE"/> <logger name="java.sql.Connection" level="DEBUG"/> <logger name="java.sql.Statement" level="DEBUG"/> <logger name="java.sql.PreparedStatement" level="DEBUG"/> <!-- 日志输出级别 --> <root level="INFO"> <appender-ref ref="STDOUT" /> <appender-ref ref="FILEINFOLOG" /> <appender-ref ref="FILEERRORLOG" /> </root> </configuration>

注意%X{traceId},这里是取MDC中的变量,放入pattern中。

测试http服务:

@Test public void testTracing() throws IOException { final String orderSeqId = simpleIdFactory.getSeqId("order"); CommonRequestContext.put(RequestContentConstants.COMMON_REQUEST orderSeqId); MDC.put("traceId" orderSeqId); log.info("invoke"); String result = HttpClientHelper.getInstance().get("http://localhost:8080/tracing" null); log.info("result"); }

测试结果:

发起端:

分布式链路追踪技术选型(分布式系统链路监控实践)(12)

被调用端:

分布式链路追踪技术选型(分布式系统链路监控实践)(13)

2.1.1.2 rpc服务

rpc中间件,这里使用dubbo,traceId的传递,我们通过dubbo filter实现,核心代码如下:

服务消费方filter:

@Activate(group = CommonConstants.CONSUMER) @Slf4j public class DubboConsumerTraceFilter implements Filter { @Override public Result invoke(Invoker<?> invoker Invocation invocation) throws RpcException { String traceId = (String) CommonRequestContext.get(RequestContentConstants.COMMON_REQUEST); try { if (!Objects.isNull(traceId)) { invocation.getAttachments().put(String.valueOf(RequestContentConstants.COMMON_REQUEST) traceId); } } catch (Exception e) { e.printStackTrace(); } return invoker.invoke(invocation); } }

服务提供方filter:

@Activate(group = CommonConstants.PROVIDER) public class DubboProviderTraceFilter implements Filter { private static final Logger LOGGER = LoggerFactory.getLogger(DubboProviderTraceFilter.class); @Override public Result invoke(Invoker<?> invoker Invocation invocation) throws RpcException { try { String attachment = invocation.getAttachment(String.valueOf(RequestContentConstants.COMMON_REQUEST)); if (StringUtils.isNotEmpty(attachment)) { MDC.put("traceId" attachment); } } catch (Exception e) { e.printStackTrace(); } return invoker.invoke(invocation); } }2.1.2.3 消息队列

这里我们演示rockerMQ,这里我们通过hook实现。核心代码如下:

producerHook:

public class ProducerSendMessageHook implements SendMessageHook { @Override public String hookName() { return "CommonMessageHook"; } @Override public void sendMessageBefore(SendMessageContext sendMessageContext) { String traceId = (String) CommonRequestContext.get(RequestContentConstants.COMMON_REQUEST); if(StringUtils.isNotEmpty(traceId)){ sendMessageContext.getMessage().putUserProperty(RequestContentConstants.COMMON_REQUEST.name() traceId); } } @Override public void sendMessageAfter(SendMessageContext sendMessageContext) { } }

consumerHook:

public class ConsumerPullMessageHook implements FilterMessageHook { @Override public String hookName() { return "ConsumerPullMessageHook"; } @Override public void filterMessage(FilterMessageContext filterMessageContext) { Iterator<MessageExt> iterator = filterMessageContext.getMsgList().iterator(); while (iterator.hasNext()) { MessageExt messageExt = iterator.next(); String traceId = messageExt.getUserProperty(RequestContentConstants.COMMON_REQUEST.name()); if (StringUtils.isNotEmpty(traceId)) { CommonRequestContext.put(RequestContentConstants.COMMON_REQUEST traceId); MDC.put("traceId" traceId); } } } }2.1.3 总结

至此我们就实现了基本的链路监控功能,在符合业务需求的情况下,采取自研的方式,可有效提高代码的可控制性,大型分布式系统,建议使用skywalking等开源组件。

作者:WinsonWu
链接:https://juejin.cn/post/7136423390398644231
来源:稀土掘金

猜您喜欢: