代码已上传到:https://github.com/laolunsi/spring-boot-examples,springboot -> springcloud,从入门到进阶!


上一节介绍了如何利用 nacos 的配置中心功能来实现 Gateway 动态路由,实现起来很简单,只要引入 nacos-config 的依赖,然后重写 RouteDefinitionRepository 接口即可。这种实现的缺点也很明显 —— 统一维护一个路由配置文件会对整个微服务网关的安全性造成威胁。
想象一下, 某位新来的同事不经意写错了格式,比如 json 的 } ,就会导致所有路由都不可用。这样是比较危险。


这一节介绍存储自定义路由信息到 MySQL,然后通过 Gateway 提供的方法更新缓存数据。为了提高请求速率,这里将使用一个二级缓存(内存 + Redis)的功能。


1. 创建网关服务


首先根据上一节中的 AppRoute 实体类创建一张 MySQL 表:

CREATE TABLE `app_route` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `routeId` varchar(255) NOT NULL,
  `order` int(11) DEFAULT NULL,
  `uri` varchar(255) NOT NULL,
  `predicates` text,
  `filters` text,
  `updateTime` datetime NOT NULL,
  `delete` tinyint(1) NOT NULL DEFAULT '0',
  PRIMARY KEY (`id`,`routeId`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;


同上一节创建一个 parent 项目,然后在其下创建一个网关服务,添加 redis 和 mybatis 相关的依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-gateway</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>


下面我们改造一下上一节在网关服务中的一些配置:


application.yml:

server:
  port: 8502

spring:
  application:
    name: gateway-demo222
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848

  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test?useSSL=false
    username: root
    password: root

  redis:
    host: localhost
    password:
    port: 6379
    database: 10

management:
  endpoints:
    web:
      exposure:
        include: health,info,gateway

mybatis:
  mapper-locations: classpath:mapper/*.xml
  configuration:
    map-underscore-to-camel-case: true

# 自定义参数
gateway:
  dynamicRoute:
    dataId: 'yq_routes'
    group: 'YQ_GATEWAY'


创建 AppRoute 的 DAO 和对应的 Mybatis SQL 语句,AppRouteDAO 如:

@Mapper
@Component
public interface AppRouteDAO {

    @Select("select * from app_route")
    List<AppRoute> findAll();

    @Select("select * from app_route where routeId = #{routeId} AND `delete` = 0 LIMIT 1")
    AppRoute findByRouteId(String routeId);

    @Select("select * from app_route where id = #{id} AND `delete` = 0")
    AppRoute findById(Integer id);

    boolean update(AppRoute route);

    boolean insert(AppRoute route);

    boolean delete(AppRoute route);

}

PS: 其它的语句在 /resources/mapper/AppRouteDAO.xml 中。

@Service
public class RouteHandler implements ApplicationEventPublisherAware, CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(RouteHandler.class);

    private ApplicationEventPublisher publisher;

    @Autowired
    private AppRouteService appRouteService;

    @Autowired
    private CacheRouteDefinitionRepository cacheRouteDefinitionRepository;

    @Autowired
    private RouteDefinitionCacheService routeDefinitionCacheService;

    @Override
    public void run(String... args) throws Exception {
        log.info("首次初始化路由....");
        this.loadRouteConfig();
    }

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }

    public void loadRouteConfig() {
        log.info("加载路由配置...");

        Flux<RouteDefinition> definitionFlux = cacheRouteDefinitionRepository.getRouteDefinitions();
        new Thread(() -> {
            List<String> existRouteIds = definitionFlux.toStream().map(RouteDefinition::getId).collect(Collectors.toList());
            // 也可以用下面这种方法,就不需要 new Thread() 了:
            // List<String> existRouteIds = routeDefinitionCacheService.getRouteDefinitions().stream().map(RouteDefinition::getId).collect(Collectors.toList());

            List<AppRoute> appRouteList = appRouteService.findAll();
            if (appRouteList != null && appRouteList.size() > 0) {
                appRouteList.forEach(a -> {
                    if (BooleanUtils.isTrue(a.getDelete()) && existRouteIds.contains(a.getRouteId())) {
                        deleteRoute(a.getRouteId());
                    } else {
                        RouteDefinition routeDefinition = a.parseToRoute();
                        System.out.println("s: " + JSONObject.toJSONString(routeDefinition));
                        if (routeDefinition != null) {
                            cacheRouteDefinitionRepository.save(Mono.just(routeDefinition)).subscribe();
                        }
                    }
                });
            }

            this.publisher.publishEvent(new RefreshRoutesEvent(this));
        }).start();

    }

    public void deleteRoute(String routeId) {
        log.info("删除路由:" + routeId);
        cacheRouteDefinitionRepository.delete(Mono.just(routeId)).subscribe();
        this.publisher.publishEvent(new RefreshRoutesEvent(this));
    }
}

注意,这里直接用 cacheRouteDefinitionRepository.getRouteDefinitions(),在通过接口更新路由信息后调用此方法时,会出现以下异常:
       java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-3

这个问题的详细解释我还没找到,目前大概理解是 WebFlux 中的异步数据 Flux<T> 被同步的操作调用时,会抛出 blocking 异常。
解决办法

  1. new 一个线程来从 Flux<T> 中获取数据,然后执行操作
  2. 这里不使用 cacheRouteDefinitionRepository.getRouteDefinitions() 了,而是直接用routeDefinitionCacheService.getRouteDefinitions()


RouteDefinitionCacheService,该接口定义了 RouteDefinition 的本地存储和 Redis 存储,这样可以避免每一次读取路由信息都要访问数据库的问题。

@Service
public class RouteDefinitionCacheServiceImpl implements RouteDefinitionCacheService {

    /**
     * 本地缓存
     */
    private static ConcurrentHashMap<String, RouteDefinition> definitionMap = new ConcurrentHashMap<>();

    /**
     * redis 缓存地址
     */
    public static String SPACE = GatewayConfig.NACOS_DATA_ID + ":" + GatewayConfig.NACOS_GROUP_ID;

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public List<RouteDefinition> getRouteDefinitions() {
        List<RouteDefinition> list = new ArrayList<>();
        if (definitionMap.size() > 0) {
            return new ArrayList<>(definitionMap.values());
        } else {
            redisTemplate.opsForHash().values(SPACE)
                    .stream().forEach(r -> {
                        RouteDefinition route = JSONObject.parseObject(r.toString(), RouteDefinition.class);
                list.add(route);
                definitionMap.put(route.getId(), route);
            });
            return list;
        }
    }

    @Override
    public boolean saveAll(List<RouteDefinition> definitions) {
        if (definitions != null && definitions.size() > 0) {
            definitions.forEach(this::save);
            return true;
        }
        return false;
    }

    @Override
    public boolean has(String routeId) {
        return definitionMap.containsKey(routeId) ? true : redisTemplate.opsForHash().hasKey(SPACE, routeId);
    }

    @Override
    public boolean delete(String routeId) {
        if (has(routeId)) {
            definitionMap.remove(routeId);
            redisTemplate.opsForHash().delete(SPACE, routeId);
            return true;
        }
        return false;
    }

    @Override
    public boolean save(RouteDefinition r) {
        if (r != null && StringUtils.isNotBlank(r.getId())) {
            definitionMap.put(r.getId(), r);
            redisTemplate.opsForHash().put(SPACE, r.getId(), JSONObject.toJSONString(r));
            return true;
        }
        return false;
    }
}


CacheRouteDefinitionRepository,以 RouteDefinitionCacheService 为基础,是 RouteDefinitionRepository 的实现类。该类直接给 Gateway 定义了读取路由信息的方式。

@Service
public class CacheRouteDefinitionRepository implements RouteDefinitionRepository {

    @Autowired
    private RouteDefinitionCacheService cacheService;

    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {
        List<RouteDefinition> list = cacheService.getRouteDefinitions();
        return Flux.fromIterable(list);
    }

    @Override
    public Mono<Void> save(Mono<RouteDefinition> route) {
        return route.flatMap(r -> {
            cacheService.save(r);
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> delete(Mono<String> routeId) {
        return routeId.flatMap(id -> {
           if (cacheService.has(id)) {
               cacheService.delete(id);
               return Mono.empty();
           }

           return Mono.defer(() -> Mono.error(new NotFoundException("未找到路由配置:" + routeId)));
        });
    }
}


现在我们已经定义好了 MySQL 中的 app_route 表,设计了程序中存储 RouteDefinition 的二级缓存(Redis + Mysql),下面的问题就是:如何将 MySQL 中的 app_route 和缓存结合起来?


2. 连接 Mysql 与 GateWay


首先,项目启动时,系统应该自动加载数据库中的所有路由信息:


编写一个启动类,当项目启动完成后,将初始化路由信息:

@Component
public class StartListener {

    private static final Logger logger = LoggerFactory.getLogger(StartListener.class);

    @Autowired
    private RouteDefinitionCacheService cacheService;

    @Autowired
    private AppRouteService routeService;

    @PostConstruct
    public void init() {
        logger.info("初始化路由数据...");
        List<AppRoute> routeList = routeService.findAll();
        if (routeList != null && routeList.size() > 0) {
            cacheService.saveAll(routeList.stream().map(AppRoute::parseToRoute).collect(Collectors.toList()));
        }
    }
}


然后,我们创建一个 AppRouteAction 接口类,定义对路由信息的增删改查接口:

@RestController
@RequestMapping(value = "app/route")
public class AppRouteAction {
    @Autowired
    private AppRouteService appRouteService;

    @GetMapping(value = "list")
    public JsonResult list() {
        JsonResult jsonResult = new JsonResult(true);
        jsonResult.put("routeList", appRouteService.findAll());
        return jsonResult;
    }

    @PostMapping(value = "")
    public JsonResult save(AppRoute route) {
        if (route == null || StringUtils.isBlank(route.getRouteId())) {
            return new JsonResult(false, "id不能为空");
        } else if (StringUtils.isBlank(route.getUri())) {
            return new JsonResult(false, "uri不能为空");
        }

        AppRoute oldRoute = null;
        if (route.getId() != null) {
            oldRoute = appRouteService.findById(route.getId());
            if (oldRoute == null || oldRoute.getId() == null) {
                return new JsonResult(false, "数据不存在或已被删除");
            }
        }

        AppRoute sameRouteIdObj = appRouteService.findByRouteId(route.getRouteId());
        if (sameRouteIdObj != null && sameRouteIdObj.getId() != null) {
            if (route.getId() == null) {
                return new JsonResult(false, "已存在相同 RouteId 的配置");
            }
        }
        route.setPredicates(route.getPredicates() != null ? route.getPredicates().trim() : null);
        route.setFilters(route.getFilters() != null ? route.getFilters().trim() : null);

        boolean res = appRouteService.saveOrUpdate(route);
        return new JsonResult(res, res ? "操作成功" : "操作失败");
    }

    @DeleteMapping(value = "{routeId}")
    public JsonResult delete(@PathVariable("routeId") String routeId) {
        AppRoute route = appRouteService.findByRouteId(routeId);
        if (route == null || StringUtils.isBlank(route.getRouteId())) {
            return new JsonResult(false, "路由不存在");
        }

        boolean res = appRouteService.delete(route);
        return new JsonResult(res, res ? "操作成功" : "操作失败");
    }
}




在路由信息被改变时,程序将通过上面的 RouteHandler 去通知更新路由信息,具体的逻辑代码在 AppRouteService 中:

@Service
public class AppRouteServiceImpl implements AppRouteService {

    private static final Logger logger = LoggerFactory.getLogger(AppRouteService.class);

    @Autowired
    private AppRouteDAO appRouteDAO;

    @Autowired
    private RouteDefinitionCacheService cacheService;

    @Autowired
    private RouteHandler routeHandler;

    @Override
    public List<AppRoute> findAll() {
        return appRouteDAO.findAll();
    }

    @Override
    public boolean saveOrUpdate(AppRoute route) {
        route.setUpdateTime(new Date());
        AppRoute oldRoute = appRouteDAO.findById(route.getId());
        boolean res = false;
        if (oldRoute != null && oldRoute.getId() != null) {
            res = appRouteDAO.update(route);
        } else {
            res = appRouteDAO.insert(route);
        }

        if (res) {
            logger.info("更新缓存,通知网关重新加载路由信息...");
            cacheService.save(route.parseToRoute());
            routeHandler.loadRouteConfig();
        }

        return res;
    }

    @Override
    public boolean delete(AppRoute route) {
        route.setUpdateTime(new Date());
        boolean res = appRouteDAO.delete(route);
        if (res) {
            logger.info("更新缓存,通知网关重新加载路由信息...");
            cacheService.save(route.parseToRoute());
            routeHandler.loadRouteConfig();
        }
        return res;
    }

    @Override
    public AppRoute findByRouteId(String routeId) {
        return appRouteDAO.findByRouteId(routeId);
    }

    @Override
    public AppRoute findById(Integer id) {
        return appRouteDAO.findById(id);
    }
}


到此为止,一个完整的动态路由网关项目已经搭建完毕了,具体的代码请查看:https://github.com/laolunsi/spring-boot-examples 下面我们测试一下。


3. 测试


同上一节创建一个简单的测试服务,取名为 demo,在数据库添加对应配置,如:

INSERT INTO `app_route` 
VALUES 
('1', 'demo', '8003', 'lb://demo', '[{\"name\":\"Path\",\"args\":{\"pattern\":\"/api/demo2/**\"}}]', '[{\"name\":\"StripPrefix\",\"args\":{\"parts\":\"2\"}}]', '2020-08-09 20:54:39', '0');


启动 redis、nacos、网关服务,我们可以看到项目启动后加载了路由信息:

file




下面启动 demo 服务,测试一下路由是否正常:

file



下面测试一下路由信息被修改后,网关服务是否会自动更新路由信息。
通过刚才编写的接口,修改 demo 服务的路由配置,如:

file



我们可以看到网关服务的控制台出现:

file



说明新的路由信息被加载了。
测试旧的地址发现 404,然后根据新的配置访问接口:
file


OK !~



好了,本节内容就到此为止啦!
这两篇文章分别介绍了基于 Nacos 和 基于 mysql 来实现的动态路由功能,而这篇文章更提供了一个完整的路由信息管理的实现,想来应付业务场景已经足够了。


个人水平有限,如文章有错误之处还请指正。有疑问也可以联系我,希望对你有所帮助!
thanks.

Last modification:August 11th, 2020 at 12:30 pm
请作者喝杯肥宅快乐水吧!