Loading... 代码已上传到:[https://github.com/laolunsi/spring-boot-examples](https://github.com/laolunsi/spring-boot-examples),springboot -> springcloud,从入门到进阶!<br /> <br />上一节介绍了如何利用 nacos 的配置中心功能来实现 Gateway 动态路由,实现起来很简单,只要引入 nacos-config 的依赖,然后重写 RouteDefinitionRepository 接口即可。这种实现的缺点也很明显 —— 统一维护一个路由配置文件会对整个微服务网关的安全性造成威胁。<br />想象一下, 某位新来的同事不经意写错了格式,比如 json 的 `}` ,就会导致所有路由都不可用。这样是比较危险。<br /> <br />这一节介绍存储自定义路由信息到 MySQL,然后通过 Gateway 提供的方法更新缓存数据。为了提高请求速率,这里将使用一个二级缓存(内存 + Redis)的功能。<br /> --- <a name="iKgmx"></a> ## 1. 创建网关服务 <br />首先根据上一节中的 AppRoute 实体类创建一张 MySQL 表: ```java CREATE TABLE `app_route` ( `id` int(11) NOT NULL AUTO_INCREMENT, `routeId` varchar(255) NOT NULL, `order` int(11) DEFAULT NULL, `uri` varchar(255) NOT NULL, `predicates` text, `filters` text, `updateTime` datetime NOT NULL, `delete` tinyint(1) NOT NULL DEFAULT '0', PRIMARY KEY (`id`,`routeId`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ``` <br />同上一节创建一个 parent 项目,然后在其下创建一个网关服务,添加 redis 和 mybatis 相关的依赖: ```xml <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency> </dependencies> ``` <br />下面我们改造一下上一节在网关服务中的一些配置:<br /> <br />application.yml: ```yaml server: port: 8502 spring: application: name: gateway-demo222 cloud: nacos: discovery: server-addr: 127.0.0.1:8848 datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/test?useSSL=false username: root password: root redis: host: localhost password: port: 6379 database: 10 management: endpoints: web: exposure: include: health,info,gateway mybatis: mapper-locations: classpath:mapper/*.xml configuration: map-underscore-to-camel-case: true # 自定义参数 gateway: dynamicRoute: dataId: 'yq_routes' group: 'YQ_GATEWAY' ``` <br />创建 AppRoute 的 DAO 和对应的 Mybatis SQL 语句,AppRouteDAO 如: ```java @Mapper @Component public interface AppRouteDAO { @Select("select * from app_route") List<AppRoute> findAll(); @Select("select * from app_route where routeId = #{routeId} AND `delete` = 0 LIMIT 1") AppRoute findByRouteId(String routeId); @Select("select * from app_route where id = #{id} AND `delete` = 0") AppRoute findById(Integer id); boolean update(AppRoute route); boolean insert(AppRoute route); boolean delete(AppRoute route); } ``` PS: 其它的语句在 /resources/mapper/AppRouteDAO.xml 中。<br /> ```java @Service public class RouteHandler implements ApplicationEventPublisherAware, CommandLineRunner { private static final Logger log = LoggerFactory.getLogger(RouteHandler.class); private ApplicationEventPublisher publisher; @Autowired private AppRouteService appRouteService; @Autowired private CacheRouteDefinitionRepository cacheRouteDefinitionRepository; @Autowired private RouteDefinitionCacheService routeDefinitionCacheService; @Override public void run(String... args) throws Exception { log.info("首次初始化路由...."); this.loadRouteConfig(); } @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.publisher = applicationEventPublisher; } public void loadRouteConfig() { log.info("加载路由配置..."); Flux<RouteDefinition> definitionFlux = cacheRouteDefinitionRepository.getRouteDefinitions(); new Thread(() -> { List<String> existRouteIds = definitionFlux.toStream().map(RouteDefinition::getId).collect(Collectors.toList()); // 也可以用下面这种方法,就不需要 new Thread() 了: // List<String> existRouteIds = routeDefinitionCacheService.getRouteDefinitions().stream().map(RouteDefinition::getId).collect(Collectors.toList()); List<AppRoute> appRouteList = appRouteService.findAll(); if (appRouteList != null && appRouteList.size() > 0) { appRouteList.forEach(a -> { if (BooleanUtils.isTrue(a.getDelete()) && existRouteIds.contains(a.getRouteId())) { deleteRoute(a.getRouteId()); } else { RouteDefinition routeDefinition = a.parseToRoute(); System.out.println("s: " + JSONObject.toJSONString(routeDefinition)); if (routeDefinition != null) { cacheRouteDefinitionRepository.save(Mono.just(routeDefinition)).subscribe(); } } }); } this.publisher.publishEvent(new RefreshRoutesEvent(this)); }).start(); } public void deleteRoute(String routeId) { log.info("删除路由:" + routeId); cacheRouteDefinitionRepository.delete(Mono.just(routeId)).subscribe(); this.publisher.publishEvent(new RefreshRoutesEvent(this)); } } ``` > 注意,这里直接用 cacheRouteDefinitionRepository.getRouteDefinitions(),在通过接口更新路由信息后调用此方法时,会出现以下异常: > java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-3 > > 这个问题的详细解释我还没找到,目前大概理解是 WebFlux 中的异步数据 Flux<T> 被同步的操作调用时,会抛出 blocking 异常。 > 解决办法 > 1. new 一个线程来从 Flux<T> 中获取数据,然后执行操作 > 2. 这里不使用 cacheRouteDefinitionRepository.getRouteDefinitions() 了,而是直接用routeDefinitionCacheService.getRouteDefinitions() <br />RouteDefinitionCacheService,该接口定义了 RouteDefinition 的本地存储和 Redis 存储,这样可以避免每一次读取路由信息都要访问数据库的问题。 ```java @Service public class RouteDefinitionCacheServiceImpl implements RouteDefinitionCacheService { /** * 本地缓存 */ private static ConcurrentHashMap<String, RouteDefinition> definitionMap = new ConcurrentHashMap<>(); /** * redis 缓存地址 */ public static String SPACE = GatewayConfig.NACOS_DATA_ID + ":" + GatewayConfig.NACOS_GROUP_ID; @Autowired private RedisTemplate redisTemplate; @Override public List<RouteDefinition> getRouteDefinitions() { List<RouteDefinition> list = new ArrayList<>(); if (definitionMap.size() > 0) { return new ArrayList<>(definitionMap.values()); } else { redisTemplate.opsForHash().values(SPACE) .stream().forEach(r -> { RouteDefinition route = JSONObject.parseObject(r.toString(), RouteDefinition.class); list.add(route); definitionMap.put(route.getId(), route); }); return list; } } @Override public boolean saveAll(List<RouteDefinition> definitions) { if (definitions != null && definitions.size() > 0) { definitions.forEach(this::save); return true; } return false; } @Override public boolean has(String routeId) { return definitionMap.containsKey(routeId) ? true : redisTemplate.opsForHash().hasKey(SPACE, routeId); } @Override public boolean delete(String routeId) { if (has(routeId)) { definitionMap.remove(routeId); redisTemplate.opsForHash().delete(SPACE, routeId); return true; } return false; } @Override public boolean save(RouteDefinition r) { if (r != null && StringUtils.isNotBlank(r.getId())) { definitionMap.put(r.getId(), r); redisTemplate.opsForHash().put(SPACE, r.getId(), JSONObject.toJSONString(r)); return true; } return false; } } ``` <br />CacheRouteDefinitionRepository,以 RouteDefinitionCacheService 为基础,是 RouteDefinitionRepository 的实现类。该类直接给 Gateway 定义了读取路由信息的方式。 ```java @Service public class CacheRouteDefinitionRepository implements RouteDefinitionRepository { @Autowired private RouteDefinitionCacheService cacheService; @Override public Flux<RouteDefinition> getRouteDefinitions() { List<RouteDefinition> list = cacheService.getRouteDefinitions(); return Flux.fromIterable(list); } @Override public Mono<Void> save(Mono<RouteDefinition> route) { return route.flatMap(r -> { cacheService.save(r); return Mono.empty(); }); } @Override public Mono<Void> delete(Mono<String> routeId) { return routeId.flatMap(id -> { if (cacheService.has(id)) { cacheService.delete(id); return Mono.empty(); } return Mono.defer(() -> Mono.error(new NotFoundException("未找到路由配置:" + routeId))); }); } } ``` <br />现在我们已经定义好了 MySQL 中的 app_route 表,设计了程序中存储 RouteDefinition 的二级缓存(Redis + Mysql),下面的问题就是:如何将 MySQL 中的 app_route 和缓存结合起来? --- <a name="lGd2u"></a> ## 2. 连接 Mysql 与 GateWay <br />首先,项目启动时,系统应该自动加载数据库中的所有路由信息:<br /> <br />编写一个启动类,当项目启动完成后,将初始化路由信息: ```java @Component public class StartListener { private static final Logger logger = LoggerFactory.getLogger(StartListener.class); @Autowired private RouteDefinitionCacheService cacheService; @Autowired private AppRouteService routeService; @PostConstruct public void init() { logger.info("初始化路由数据..."); List<AppRoute> routeList = routeService.findAll(); if (routeList != null && routeList.size() > 0) { cacheService.saveAll(routeList.stream().map(AppRoute::parseToRoute).collect(Collectors.toList())); } } } ``` <br />然后,我们创建一个 AppRouteAction** **接口类,定义对路由信息的增删改查接口: ```java @RestController @RequestMapping(value = "app/route") public class AppRouteAction { @Autowired private AppRouteService appRouteService; @GetMapping(value = "list") public JsonResult list() { JsonResult jsonResult = new JsonResult(true); jsonResult.put("routeList", appRouteService.findAll()); return jsonResult; } @PostMapping(value = "") public JsonResult save(AppRoute route) { if (route == null || StringUtils.isBlank(route.getRouteId())) { return new JsonResult(false, "id不能为空"); } else if (StringUtils.isBlank(route.getUri())) { return new JsonResult(false, "uri不能为空"); } AppRoute oldRoute = null; if (route.getId() != null) { oldRoute = appRouteService.findById(route.getId()); if (oldRoute == null || oldRoute.getId() == null) { return new JsonResult(false, "数据不存在或已被删除"); } } AppRoute sameRouteIdObj = appRouteService.findByRouteId(route.getRouteId()); if (sameRouteIdObj != null && sameRouteIdObj.getId() != null) { if (route.getId() == null) { return new JsonResult(false, "已存在相同 RouteId 的配置"); } } route.setPredicates(route.getPredicates() != null ? route.getPredicates().trim() : null); route.setFilters(route.getFilters() != null ? route.getFilters().trim() : null); boolean res = appRouteService.saveOrUpdate(route); return new JsonResult(res, res ? "操作成功" : "操作失败"); } @DeleteMapping(value = "{routeId}") public JsonResult delete(@PathVariable("routeId") String routeId) { AppRoute route = appRouteService.findByRouteId(routeId); if (route == null || StringUtils.isBlank(route.getRouteId())) { return new JsonResult(false, "路由不存在"); } boolean res = appRouteService.delete(route); return new JsonResult(res, res ? "操作成功" : "操作失败"); } } ``` <br /> <br />在路由信息被改变时,程序将通过上面的 RouteHandler 去通知更新路由信息,具体的逻辑代码在 AppRouteService 中: ```java @Service public class AppRouteServiceImpl implements AppRouteService { private static final Logger logger = LoggerFactory.getLogger(AppRouteService.class); @Autowired private AppRouteDAO appRouteDAO; @Autowired private RouteDefinitionCacheService cacheService; @Autowired private RouteHandler routeHandler; @Override public List<AppRoute> findAll() { return appRouteDAO.findAll(); } @Override public boolean saveOrUpdate(AppRoute route) { route.setUpdateTime(new Date()); AppRoute oldRoute = appRouteDAO.findById(route.getId()); boolean res = false; if (oldRoute != null && oldRoute.getId() != null) { res = appRouteDAO.update(route); } else { res = appRouteDAO.insert(route); } if (res) { logger.info("更新缓存,通知网关重新加载路由信息..."); cacheService.save(route.parseToRoute()); routeHandler.loadRouteConfig(); } return res; } @Override public boolean delete(AppRoute route) { route.setUpdateTime(new Date()); boolean res = appRouteDAO.delete(route); if (res) { logger.info("更新缓存,通知网关重新加载路由信息..."); cacheService.save(route.parseToRoute()); routeHandler.loadRouteConfig(); } return res; } @Override public AppRoute findByRouteId(String routeId) { return appRouteDAO.findByRouteId(routeId); } @Override public AppRoute findById(Integer id) { return appRouteDAO.findById(id); } } ``` <br />到此为止,一个完整的动态路由网关项目已经搭建完毕了,具体的代码请查看:[https://github.com/laolunsi/spring-boot-examples](https://github.com/laolunsi/spring-boot-examples) 下面我们测试一下。<br /> --- <a name="Vtysl"></a> ## 3. 测试 <br />同上一节创建一个简单的测试服务,取名为 demo,在数据库添加对应配置,如: ```sql INSERT INTO `app_route` VALUES ('1', 'demo', '8003', 'lb://demo', '[{\"name\":\"Path\",\"args\":{\"pattern\":\"/api/demo2/**\"}}]', '[{\"name\":\"StripPrefix\",\"args\":{\"parts\":\"2\"}}]', '2020-08-09 20:54:39', '0'); ``` <br />启动 redis、nacos、网关服务,我们可以看到项目启动后加载了路由信息:<br /> ![file](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1596978358556.png) <br /> <br />下面启动 demo 服务,测试一下路由是否正常:<br /> ![file](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1596978390796.png) <br /> <br />下面测试一下路由信息被修改后,网关服务是否会自动更新路由信息。<br />通过刚才编写的接口,修改 demo 服务的路由配置,如:<br /> ![file](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1596978445876.png) <br /> <br />我们可以看到网关服务的控制台出现:<br /> ![file](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1596978468893.png) <br /> <br />说明新的路由信息被加载了。<br />测试旧的地址发现 404,然后根据新的配置访问接口:<br />![file](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1596978486155.png) <br />OK !~<br /> --- <br />好了,本节内容就到此为止啦!<br />这两篇文章分别介绍了基于 Nacos 和 基于 mysql 来实现的动态路由功能,而这篇文章更提供了一个完整的路由信息管理的实现,想来应付业务场景已经足够了。<br /> <br />个人水平有限,如文章有错误之处还请指正。有疑问也可以联系我,希望对你有所帮助!<br />thanks. Last modification:August 11, 2020 © Allow specification reprint Support Appreciate the author AliPayWeChat Like 0 请作者喝杯肥宅快乐水吧!
One comment
你好,想转载你的文章,请问是否可以提供一下原文markdown,非常感谢!