类MOOC系统开发3-课程发布模块开发
模块功能介绍
1、(机构段)课程预览
2、(平台端)课程审核(机构请求发布,自动审核,后台审核—>发布成功)发布成功,用户端可查看发布的课程
3、(用户端)查看发布课程
课程预览
课程预览是一个单独的门户页面
结构
说明如下:
1、点击课程预览,通过Nginx、后台服务网关请求内容管理服务进行课程预览。
2、内容管理服务查询课程相关信息进行整合,并通过模板引擎技术在服务端渲染生成页面,返回给浏览器。
3、通过课程预览页面点击”马上学习“打开视频播放页面。
4、视频播放页面通过Nginx请求后台服务网关,查询课程信息展示课程计划目录,请求媒资服务查询课程计划绑定的视频文件地址,在线浏览播放视频。
模板引擎Freemarker
可以看到,根据前边的数据模型分析,课程预览就是把课程的相关信息进行整合,在课程预览界面进行展示,课程预览界面与课程发布的课程详情界面一致。
项目采用模板引擎技术实现课程预览界面。什么是模板引擎?
早期我们采用的jsp技术就是一种模板引擎技术,简单来说就是通过jsp浏览的页面的框架以及搭好,需要后端渲染(渲染的过程就是填充数据-模型的过程)
所以模板引擎就是:模板+数据=输出,Jsp页面就是模板,页面中嵌入的jsp标签就是数据(模型),两者相结合输出html网页。
可以说这种模式就不是前后端分离了,后端直接返回的是整个界面。
常用的java模板引擎还有哪些?
Jsp、Freemarker、Thymeleaf 、Velocity 等。
本项目采用Freemarker作为模板引擎技术。
Freemarker官方地址:http://freemarker.foofun.cn/
配置
在内容管理接口工层 添加Freemarker与SpringBoot的整合包
<!-- Spring Boot 对结果视图 Freemarker 集成 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
在nacos为内容管理接口层配置freemarker,公用配置组新加一个freemarker-config-dev.yaml (资料中已做)
那么现在就是看配置信息:
template-loader-path: classpath:/templates/ #页面模板位置(默认为 classpath:/templates/)
这里规定了模板加载(存放)的位置,类路径也就是resources路径,那么我们再这个路径下新建templates文件夹
再微服务的配置文件中引入 freemarker-config-dev.yaml 配置,则:
在内容管理的api工程中引入共享配置
添加模板
添加模板,在resources下创建templates目录,添加test.ftl模板文件
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Hello World!</title>
</head>
<body>
Hello ${name}!
</body>
</html>
填充数据
编写controller方法,准备模型数据
package com.xuecheng.content.api;
import java.util.Map;
@Controller
public class FreemarkerController {
@GetMapping("/testfreemarker")
public ModelAndView test(){
ModelAndView modelAndView = new ModelAndView();
//设置模型数据
modelAndView.addObject("name","小明");
//设置模板名称
modelAndView.setViewName("test"); // 刚才的模板名称
return modelAndView;
}
}
测试
启动内容管理接口工程,访问http://localhost:63040/content/testfreemarker
freemarker提供很多指令用于解析各种类型的数据模型,参考地址:http://freemarker.foofun.cn/ref_directives.html
成功
静态资源
配置主站
在课程预览界面上要加载css、js、图片等内容,这里部署nginx来访问这些静态资源,对于SpringBoot服务的动态资源由Nginx去代理请求,如下图:
本机安装Nginx,直接用黑马提供的资料
运行nginx.exe,访问localhost,可以看到Nginx默认页面则启动成功(若失败,请检查是否存在中文目录或者端口占用情况)
前端资源:解压黑马提供的xc-ui-pc-static-portal.zip文件,并记录解压的位置,在第5步修改Nginx配置时,需要指定其所在目录
现在我们要实现:输入域名能够访问门户网址,这其实还涉及了 DNS解析,为了演示,我们仅修改host文件:
这就相当于一个简单的DNS过程,当浏览器访问上面的几个网址时,全部将域名解析为本地主机
在Nginx的conf/nginx.conf文件中配置如下内容
解释 :一个server相当于一个虚拟主机
是主机就需要配置域名,如果什么都不配置,代理的就是localhost 端口
配置生效:方法1、进入任务管理器,杀死nginx的两个进程 重启 方法 2、命令行 nginx.exe -s reload
- server_name www.51xuecheng.cn localhost;
浏览器访问任意一个域名都能进入nignx 请求网站资源
默认输入域名后访问的时nignx 目录下 /html/下的资源
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
#外界访问 访问了一个端口号为80的虚拟主机,
server {
listen 80;
server_name www.51xuecheng.cn localhost;
#浏览器访问任意一个域名都能进入nignx 请求网站资源
ssi on;
# 把页头页尾放进主页
ssi_silent_errors on;
location / {
alias E:/homework/practice/CodePractice/JAVA/XCProj/xc-ui-pc-static-portal/;
index index.html index.htm;
}
#静态资源
location /static/img/ {
alias E:/homework/practice/CodePractice/JAVA/XCProj/xc-ui-pc-static-portal/img/;
}
location /static/css/ {
alias E:/homework/practice/CodePractice/JAVA/XCProj/xc-ui-pc-static-portal/css/;
}
location /static/js/ {
alias E:/homework/practice/CodePractice/JAVA/XCProj/xc-ui-pc-static-portal/js/;
}
location /static/plugins/ {
alias E:/homework/practice/CodePractice/JAVA/XCProj/xc-ui-pc-static-portal/plugins/;
add_header Access-Control-Allow-Origin http://ucenter.51xuecheng.cn;
add_header Access-Control-Allow-Credentials true;
add_header Access-Control-Allow-Methods GET;
}
location /plugins/ {
alias E:/homework/practice/CodePractice/JAVA/XCProj/xc-ui-pc-static-portal/plugins/;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
这样访问 本机80端口就能访问静态界面啦
访问课程详情页
http://localhost/course/course_template.html
访问的其实时尚未被后端渲染的数据。
文件服务器
在进行课程预览时需要展示课程的图片,在线插放课程视频,课程图片、视频这些都在MinIO文件系统存储,下边统一由Nginx代理,通过文件服务域名统一访问。如下图:
host文件添加 127.0.0.1 www.51xuecheng.cn file.51xuecheng.cn
在nginx.conf中配置文件服务器的代理地址
# server 上面加
#文件服务 被代理的网址
upstream fileserver{
server 192.168.101.65:9000 weight=10;
}
这里支配了一台机器,如果以后有多台机器,可以这样
upstream fileserver{
server 192.168.101.65:9000 weight=10;
server 192.168.101.64:9001 weight=10;
server 192.168.101.65:9002 weight=10;
}
server {
listen 80;
server_name file.51xuecheng.cn; # 外界访问的域名,文件服务的域名
#charset koi8-r;
ssi on;
ssi_silent_errors on;
#access_log logs/host.access.log main;
location /video { # 根据下属规则,当请求 域名/video 时 其实nignx将请求转发给 fileserver fileserver在上面
proxy_pass http://fileserver;
}
location /mediafiles {
proxy_pass http://fileserver;
}
}
访问通过 指定域名访问 文件是文件系统里面的视频
正常访问!
视频播放页面
进入课程详情页面,点击马上学习或课程目录下的小节的名称将打开视频播放页面。
首先在nginx.conf中配置视频播放页面的地址
location /course/preview/learning.html {
alias D:/itcast2022/xc_edu3.0/code_1/xc-ui-pc-static-portal/course/learning.html;
}
location /course/search.html {
root D:/itcast2022/xc_edu3.0/code_1/xc-ui-pc-static-portal;
}
location /course/learning.html {
root D:/itcast2022/xc_edu3.0/code_1/xc-ui-pc-static-portal;
}
下边需要配置learning.html页面的视频播放路径来测试视频播放页面,找到learning.html页面中videoObject对象的定义处,配置视频的播放地址。
这样点击详情视频就能播放绑定的视频了
课程预览界面接口
将课程详情页作为一个模板,往里面填充数据。
准备模板
响应页面到浏览器使用freemarker模板引擎技术实现,首先从课程资料目录下获取课程预览页面course_template.html,拷贝至内容管理的接口工程的resources/templates下,并将其在本目录复制一份命名为course_template.ftl
定义接口
@GetMapping("/coursepreview/{courseId}")
public ModelAndView preview(@PathVariable("courseId") Long courseId){
ModelAndView modelAndView = new ModelAndView();
modelAndView.addObject("model",null);
modelAndView.setViewName("course_template");
return modelAndView;
}
简单访问一下:
发现能简单访问到,但是没有样式,样式都在门户网站里的静态资源里面
F12 发现 他访问静态资源直接访问本服域名路径下的静态资源;也就是说访问静态资源的域名和远端访问的域名绑定了
如何访问呢?统一使用门户网站网址访问所有资源
反向代理
可以用niginx代理内容管理的地址,实际访问门户网站(被代理)
怎么实现呢?
#后台网关
upstream gatewayserver{
server 127.0.0.1:63010 weight=10;
}
#门户虚拟主机
server {
listen 80;
server_name www.51xuecheng.cn localhost;
....
#api
location /api/ {
proxy_pass http://gatewayserver/;
}
....
重启
访问:http://www.51xuecheng.cn/api/content/coursepreview/124
可以正常加载样式了。
接口开发
在使用freemarker渲染生成视图时需要数据模型,此数据模型包括了基本信息、营销信息、课程计划、师资等信息。
所以首先定义一个数据模型类:
@Data
@ToString
public class CoursePreviewDto {
//课程基本信息,课程营销信息
CourseBaseInfoDto courseBase;
//课程计划信息
List<TeachplanDto> teachplans;
//师资信息暂时不加...
}
Service接口
public interface CoursePublishService {
public CoursePreviewDto getCoursePreviewInfo(Long courseId);
}
接口实现如下
@Service
public class CoursePublishServiceImpl implements CoursePublishService {
@Autowired
CourseBaseInfoService courseBaseInfoService;
@Autowired
TeachplanService teachplanService;
@Override
public CoursePreviewDto getCoursePreviewInfo(Long courseId) {
//课程基本信息、营销信息
CourseBaseInfoDto courseBaseInfo = courseBaseInfoService.getCourseBaseInfo(courseId);
//课程计划信息
List<TeachplanDto> teachplanTree= teachplanService.findTeachplanTree(courseId);
CoursePreviewDto coursePreviewDto = new CoursePreviewDto();
coursePreviewDto.setCourseBase(courseBaseInfo);
coursePreviewDto.setTeachplans(teachplanTree);
return coursePreviewDto;
}
}
接口层完善
@Autowired
CoursePublishService coursePublishService;
@GetMapping("/coursepreview/{courseId}")
public ModelAndView preview(@PathVariable("courseId") Long courseId){
//获取课程预览信息
CoursePreviewDto coursePreviewInfo = coursePublishService.getCoursePreviewInfo(courseId);
ModelAndView modelAndView = new ModelAndView();
modelAndView.addObject("model",coursePreviewInfo);
modelAndView.setViewName("course_template");
return modelAndView;
}
前后端联调
重启前端工程,
进入课程列表点击”预览”按钮,
正常打开课程预览页面http://www.51xuecheng.cn/api/content/coursepreview/124
编写模板
模型数据准备好后下一步将模型数据填充到course_template.ftl上,填充时注意不要一次填充太多,一边填充一边刷新调试。
完整的course_template.ftl模板在课程资料目录下,差不多学会了freemarker标签的使用方法,将课程资料下的course_template.ftl覆盖自己的工程下的course_template.ftl。
- freemarker提供了很多指令用于解析各种类型的数据模型,还有if等标签,参考地址:http://freemarker.foofun.cn/ref_directives.html
- 修改模板后需要编译
测试成功!
视频播放页面接口
在此页面需要从后台获取课程信息、根据课程计划获取对应的视频地址,下边编写这两个接口:
获取课程信息接口:/open/content/course/whole/{courseId}
/open/content/course/whole/课程id
响应:同课程预览service接口返回数据
根据课程计划获取视频地址接口:/open/media/preview/{mediaId}
/open/media/preview/课程计划id
响应:
{"code":0,"msg":"success","result":"视频的url","successful":true}
1、在nginx配置如下地址
#openapi
location /open/content/ {
proxy_pass http://gatewayserver/content/open/;
}
location /open/media/ {
proxy_pass http://gatewayserver/media/open/;
}
配置运行nginx.exe -s reload加载nginx的配置文件
接口定义
内容管理
@Api(value = "课程公开查询接口",tags = "课程公开查询接口")
@RestController
@RequestMapping("/open")
public class CourseOpenController {
@Autowired
private CourseBaseInfoService courseBaseInfoService;
@Autowired
private CoursePublishService coursePublishService;
@GetMapping("/course/whole/{courseId}")
public CoursePreviewDto getPreviewInfo(@PathVariable("courseId") Long courseId) {
//获取课程预览信息
CoursePreviewDto coursePreviewInfo = coursePublishService.getCoursePreviewInfo(courseId);
return coursePreviewInfo;
}
}
媒资管理
@Api(value = "媒资文件管理接口",tags = "媒资文件管理接口")
@RestController
@RequestMapping("/open")
public class MediaOpenController {
@Autowired
MediaFileService mediaFileService;
@ApiOperation("预览文件")
@GetMapping("/preview/{mediaId}")
public RestResponse<String> getPlayUrlByMediaId(@PathVariable String mediaId){
MediaFiles mediaFiles = mediaFileService.getFileById(mediaId);
if(mediaFiles == null || StringUtils.isEmpty(mediaFiles.getUrl())){
XueChengPlusException.cast("视频还没有转码处理");
}
return RestResponse.success(mediaFiles.getUrl());
}
}
测试
定义好后,启动内容管理、媒资管理、后台服务网关服务,测试视频播放页面是否可以正常获取课程计划,点击具体的课程计划是否正常可以播放视频。
面试
为什么要用Freemarker静态化?如何做的?
页面静态化是指使用模板引擎技术将一个动态网页生成html静态页面,满足以下条件可以考虑使用静态化
- 该页面被访问频率高,例如:商品信息展示、讲师介绍页面
- 页面上数据变化频率低,例如:商品发布后对商品信息的修改频率低、讲师介绍信息修改频率低
- 静态化的技术很多,Freemarker是一个成熟的开源的模板引擎工具,简单易用,功能强大,本项目使用Freemarker将课程信息静态化
- 使用Freemarker的标签编写课程信息的模板
- 调用接口获取木板上需要的模型数据
- 调用Freemarker的API生成静态页面
- 生成的静态页面最终会上传到文件系统方便访问
提交审核
需求分析
根据模块需求分析,课程发布前先要审核,审核通过方可发布。下图是课程审核及发布的流程图
- 一门课程新增后,它的审核状态为
未提交
,发布状态为未发布
- 课程信息编辑完成,教学机构人员进行
提交审核
操作,此时课程的审核状态为已提交
- 当课程状态为
已提交
时,运营平台人员对课程进行审核 - 运营平台人员审核课程,结果有两个:审核通过、审核不通过
- 课程审核后不管状态是否通过,教学机构都可以再次修改课程并提交审核,此时课程状态为
已提交
,运营平台人员再次审核课程 - 课程审核通过,教学机构人员可以发布课程,发布成功后,课程的发布状态为
已发布
- 课程发布后,通过
下架
操作可以更改课程发布状态为下架
- 课程下架后,通过
上架
操作可以再次发布课程,上架后课程发布状态为发布
数据模型
通过业务流程分析,现在我们思考:课程提交审核后,还允许修改课程吗?
- 如果允许修改,可能会引发冲突 (审核的课程和修改的课程时同一份),本来审核完毕了,但是机构修改了课程(加了一些不合规的内容,这样倒霉的就是平台人员)
- 如果不允许修改,大部分审核周期都很长,想修改只能等到这一轮审核完,效率低
- 解决方案:
- 提供一个预发布表(相当于sql的快照读一样):审核人员审核的是快照;审核通过后,机构发布的也是快照(预发布表的内容)
- 如果审核过程中,机构修改了课程信息,那么修改的不是预发布表。而是原始表的信息,当再次提交时,如果平台正在审核,就暂时不能提交。 载体提交时还是将基本表信息转到预发布表,将审核状态改为审核中,发布状态改为未发布
- 审核通过需要1、将预发布表发表以及课程基本信息状态为审核通过 。 发布时,发布的不是原始表而是快照表,修改不影响快照
接口定义
提交审核
请求网址: http://localhost:8601/api/content/courseaudit/commit/1
请求方法: POST
- 请求路径:/content/courseaudit/commit/{courseId}
- 请求方式:POST
- 从请求路径可以看出,该接口需要定义在content-api下
@PostMapping("/courseaudit/commit/{courseId}")
public void commitAudit(@PathVariable Long courseId) {
}
接口开发
DAO开发
- 使用自动生成的Mapper即可
- 通过上面的分析,我们要查询的内容如下
- 根据传入的courseId,查询课程基本信息、课程营销信息、课程计划信息,并汇总成课程预发布信息
- 向课程预发布表中插入我们汇总好的信息,如果已经存在,则更新,并状态为
已提交
- 更新课程基本信息表的审核状态为
已提交
- 约束
- 未提交或审核完后,方可提交审核
- 本机构只允许提交本机构的课程
- 没有上传图片,不允许提交审核
- 没有添加课程计划,不允许提交审核
service
public void commitAudit(Long companyId,Long courseId);
实现
@Override
public void commitAudit(Long companyId, Long courseId) {
//约束校验
CourseBase courseBase = courseBaseMapper.selectById(courseId);
//课程审核状态
String auditStatus = courseBase.getAuditStatus();
//当前审核状态为已提交不允许再次提交
if("202003".equals(auditStatus)){
XuechengPlusException.cast("当前为等待审核状态,审核完成可以再次提交。");
}
//本机构只允许提交本机构的课程
if(!courseBase.getCompanyId().equals(companyId)){
XuechengPlusException.cast("不允许提交其它机构的课程。");
}
//课程图片是否填写
if(StringUtils.isEmpty(courseBase.getPic())){
XuechengPlusException.cast("提交失败,请上传课程图片");
}
//添加课程预发布记录
CoursePublishPre coursePublishPre = new CoursePublishPre();
//课程基本信息加部分营销信息
CourseBaseInfoDto courseBaseInfo = courseBaseInfoService.getCourseBaseInfo(courseId);
BeanUtils.copyProperties(courseBaseInfo,coursePublishPre);
//课程营销信息
CourseMarket courseMarket = courseMarketMapper.selectById(courseId);
//转为json
String courseMarketJson = JSON.toJSONString(courseMarket);
//将课程营销信息json数据放入课程预发布表
coursePublishPre.setMarket(courseMarketJson);
//查询课程计划信息
List<TeachplanDto> teachplanTree = teachplanService.findTeachplanTree(courseId);
if(teachplanTree.size()<=0){
XuechengPlusException.cast("提交失败,还没有添加课程计划");
}
//转json
String teachplanTreeString = JSON.toJSONString(teachplanTree);
coursePublishPre.setTeachplan(teachplanTreeString);
//设置预发布记录状态,已提交
coursePublishPre.setStatus("202003");
//教学机构id
coursePublishPre.setCompanyId(companyId);
//提交时间
coursePublishPre.setCreateDate(LocalDateTime.now());
CoursePublishPre coursePublishPreUpdate = coursePublishPreMapper.selectById(courseId);
if(coursePublishPreUpdate == null){
//添加课程预发布记录
coursePublishPreMapper.insert(coursePublishPre);
}else{
coursePublishPreMapper.updateById(coursePublishPre);
}
//更新课程基本表的审核状态
courseBase.setAuditStatus("202003");
courseBaseMapper.updateById(courseBase);
}
完善
@ResponseBody
@PostMapping ("/courseaudit/commit/{courseId}")
public void commitAudit(@PathVariable("courseId") Long courseId){
Long companyId = 1232141425L;
coursePublishService.commitAudit(companyId,courseId);
}
课程发布
在网站上展示课程信息需要解决课程信息显示的性能问题,如果速度慢(排除网速)会影响用户的体验性。
如何去快速搜索课程?
打开课程详情页面仍然去查询数据库可行吗?
为了提高网站的速度需要将课程信息进行缓存,并且要将课程信息加入ES索引库 方便搜索,下图显示了课程发布后课程信息的流转情况
1、向内容管理数据库的课程发布表存储课程发布信息,更新课程基本信息表中发布状态为已发布。
2、向Redis存储课程缓存信息。
3、向Elasticsearch存储课程索引信息。
4、请求分布文件系统存储课程静态化页面(即html页面),实现快速浏览课程详情页面。
课程发布表的数据来源于课程预发布表,它们的结构基本一样,只是课程发布表中的状态是课程发布状态,如下图:
redis中的课程缓存信息是将课程发布表中的数据转为json进行存储。
elasticsearch中的课程索引信息是根据搜索需要将课程名称、课程介绍等信息进行索引存储。
MinIO中存储了课程的静态化页面文件(html网页),查看课程详情是通过文件系统去浏览课程详情页面。
分布式事务框架搭建
分布式事务技术方案
什么是分布式事务?
现在的需求是课程发布操作后将数据写入数据库、redis、elasticsearch、MinIO四个地方,这四个地方已经不限制在一个数据库内,是由四个分散的服务去提供,与这四个服务去通信需要网络通信,而网络存在不可到达性,这种分布式系统环境下,通过与不同的服务进行网络通信去完成事务称之为分布式事务。
begin transaction;
//1.本地数据库操作:张三减少金额
//2.远程调用:让李四增加金额
commit transation;
可以设想,当远程调用让李四增加金额成功了,
由于网络问题远程调用并没有返回,
此时本地事务以为提交失败(其实成功了)就回滚了张三减少金额的操作,
此时张三和李四的数据就不一致了。
说白了就是远程调用是网络出问题了,这种情况
什么是CAP理论
CAP是 Consistency、Availability、Partition tolerance三个词语的缩写,分别表示一致性、可用性、分区容忍性。
CAP理论要强调的是在分布式系统中这三点不可能全部满足,由于是分布式系统就要满足分区容忍性,因为服务之间难免出现网络异常,不能因为局部网络异常导致整个系统不可用。
满足P那么C和A不能同时满足:
比如我们添加一个用户小明的信息,该信息先添加到结点1中,再同步到结点2中,如下图:
如果要满足C一致性,必须等待小明的信息同步完成系统才可用(否则会出现请求到结点2时查询不到数据,违反了一致性),在信息同步过程中系统是不可用的,所以满足C的同时无法满足A。
如果要满足A可用性,要时刻保证系统可用就不用等待信息同步完成,此时系统的一致性无法满足。
所以在分布式系统中进行分布式事务控制,要么保证CP、要么保证AP。
CP的场景:满足C舍弃A,强调一致性(绝对安全。一般是转账)。
跨行转账:一次转账请求要等待双方银行系统都完成整个事务才算完成,只要其中一个失败另一方执行回滚操作。
开户操作:在业务系统开户同时要在运营商开户,任何一方开户失败该用户都不可使用,所以要满足CP。
AP的场景:满足A舍弃C,强调可用性。
订单退款,今日退款成功,明日账户到账,只要用户可以接受在一定时间内到账即可。
注册送积分,注册成功积分在24分到账。
在实际应用中符合AP的场景较多,其实虽然AP舍弃C一致性,实际上最终数据还是达到了一致,也就满足了最终一致性,所以业界定义了BASE理论。
什么是BASE理论?
BASE 是 Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent (最终一致性)三个短语的缩写。
分布式事务控制有哪些常用的技术方案?
实现CP就是要实现强一致性:
使用Seata框架基于AT模式实现
使用Seata框架基于TCC模式实现。
实现AP则要保证最终数据一致性:
使用消息队列通知的方式去实现,通知失败自动重试,达到最大失败次数需要人工处理;
使用任务调度的方案,启动任务调度将课程信息由数据库同步到elasticsearch、MinIO、redis中。
课程发布的事务控制方案
学习了这么多的理论,回到课程发布,执行课程发布操作后要向数据库、redis、elasticsearch、MinIO写四份数据,这个场景用哪种方案?
应该强调可用性AP,最终保证一致性即可
课程发布操作后,先更新数据库中的课程发布状态,更新后向redis、elasticsearch、MinIO写课程信息,只要在一定时间内最终向redis、elasticsearch、MinIO写数据成功即可。
步骤:
方案名称: 本地消息表+定时任务调度的方案来实现最终数据的一致性。
1、执行发布操作,内容管理服务存储课程发布表的同时向消息表添加一条“课程发布任务”。这里使用本地事务保证课程发布信息保存成功,同时消息表也保存成功。
2、任务调度服务定时调度内容管理服务扫描消息表,由于课程发布操作后向消息表插入一条课程发布任务,此时扫描到一条任务。
3、拿到任务开始执行任务,分别向redis、elasticsearch及文件系统存储数据。
4、任务完成后删除消息表记录。
发布接口定义
根据课程发布的分布式事务控制方案,课程发布操作首先通过本地事务向课程发布表写入课程发布信息并向消息表插入一条消息,这里定义的课程发布接口要实现该功能。
在内容管理接口工程中定义课程发布接口。
@ApiOperation("课程发布")
@ResponseBody
@PostMapping ("/coursepublish/{courseId}")
public void coursepublish(@PathVariable("courseId") Long courseId){
}
接口开发(消息/任务表同步)
课程发布操作对数据库操作如下(整体是一个事务):
1、向课程发布表course_publish插入一条记录,记录来源于课程预发布表,如果存在则更新,发布状态为:已发布。
2、更新course_base表的课程发布状态为:已发布
3、删除课程预发布表的对应记录。
4、向mq_message消息表插入一条消息,消息类型为:course_publish
后续就是 向 redis ES MinIO插入数据,xxljob循环遍历定时任务来保证最终一致性,这里如果出现问题就是运维的事情了
约束:
1、课程审核通过方可发布。
2、本机构只允许发布本机构的课程。
service
public void publish(Long companyId,Long courseId);
实现
@Transactional
@Override
public void publish(Long companyId, Long courseId) {
//约束校验
//查询课程预发布表
CoursePublishPre coursePublishPre = coursePublishPreMapper.selectById(courseId);
if(coursePublishPre == null){
XuechengPlusException.cast("请先提交课程审核,审核通过才可以发布");
}
//本机构只允许提交本机构的课程
if(!coursePublishPre.getCompanyId().equals(companyId)){
XuechengPlusException.cast("不允许提交其它机构的课程。");
}
//课程审核状态
String auditStatus = coursePublishPre.getStatus();
//审核通过方可发布
if(!"202004".equals(auditStatus)){
XuechengPlusException.cast("操作失败,课程审核通过方可发布。");
}
//保存课程发布信息
saveCoursePublish(courseId);
//保存消息表
saveCoursePublishMessage(courseId);
//删除课程预发布表对应记录
coursePublishPreMapper.deleteById(courseId);
}
/**
* @description 保存课程发布信息
* @param courseId 课程id
* @return void
* @author Mr.M
* @date 2022/9/20 16:32
*/
private void saveCoursePublish(Long courseId){
//整合课程发布信息
//查询课程预发布表
CoursePublishPre coursePublishPre = coursePublishPreMapper.selectById(courseId);
if(coursePublishPre == null){
XuechengPlusException.cast("课程预发布数据为空");
}
CoursePublish coursePublish = new CoursePublish();
//拷贝到课程发布对象
BeanUtils.copyProperties(coursePublishPre,coursePublish);
coursePublish.setStatus("203002");
CoursePublish coursePublishUpdate = coursePublishMapper.selectById(courseId);
if(coursePublishUpdate == null){
coursePublishMapper.insert(coursePublish);
}else{
coursePublishMapper.updateById(coursePublish);
}
//更新课程基本表的发布状态
CourseBase courseBase = courseBaseMapper.selectById(courseId);
courseBase.setStatus("203002");
courseBaseMapper.updateById(courseBase);
}
/**
* @description 保存消息表记录,稍后实现
* @param courseId 课程id
* @return void
* @author Mr.M
* @date 2022/9/20 16:32
*/
private void saveCoursePublishMessage(Long courseId){
}
完善
@ApiOperation("课程发布")
@ResponseBody
@PostMapping ("/coursepublish/{courseId}")
public void coursepublish(@PathVariable("courseId") Long courseId){
Long companyId = 1232141425L;
coursePublishService.publish(companyId,courseId);
}
先测试约束条件:
1、在未提交审核时进行课程发布测试。
2、在课程未审核通过时进行发布。
正常流程测试:
1、提交审核课程
2、手动修改课程预发布表与课程基本信息的审核状态为审核通过。
3、执行课程发布
4、观察课程发布表记录是否正常,课程预发布表记录已经删除,课程基本信息表与课程发布表的发布状态为”发布“。
使用前后端联调方式测试。
消息处理SDK(分布式任务框架搭建)
在发布任务,将一个任务放进消息表后,后续xxljob调用其他三个任务组件处理消息,有关于消息处理的步骤有哪些?
- 新增消息表
- 扫描消息表
- 更新消息表
- 删除消息表
本质上,所有这种需要满足AP(可用性优先、但要满足最终一致性)的,使用 本地消息表+任务调度的方式 都需要这些消息处理步骤,那么
- 如果在每个地方都实现一套针对消息定时扫描、处理的逻辑,基本上都是重复的,软件的复用性太低、成本太高
- 如何解决这个问题?
- 我们可以将消息处理相关的逻辑做成一个通用的东西,如果将消息处理做成一个SDK工具包,相比较通用服务,不仅可以解决将消息处理通用化的需求,还可以降低成本
- 拿课程发布任务举例,执行课程发布任务是要向Redis、索引库等同步数据,其他任务的执行逻辑是不同的,所以执行任务在SDK中不用实现,只需要提供一个抽象方法由具体的执行任务方去实现
在视频处理章节介绍的视频处理的幂等性方案,这里可以采用类似方案
- 任务执行完成后,从消息表删除
- 如果消息表的状态是完成或不存在,则无需执行
如何保证任务不重复执行?
- 任务调度采用分片广播,根据分片参数去获取处理任务,配置调度过期策略为
忽略
,配置任务阻塞处理策略为丢弃后续调度
- 任务调度采用分片广播,根据分片参数去获取处理任务,配置调度过期策略为
- 例如课程发布任务需要执行3个同步操作:存储课程到Redis、存储课程到索引库、存储课程页面到MinIO。如果其中一个小任务已经完成,也不应该去重复执行这个小任务,那么该如何设计呢?(顺序性?)
- 将小任务作为任务的不同阶段,在消息表中设立阶段状态
- 每完成一个阶段,就在对应的阶段状态字段打上标记,即使大任务还没有完成,重新执行大任务时,也会跳过执行完毕了的小任务
综上所述,除了消息表的基本的增、删、改、查的接口外,消息SDK还具有如下接口功能:
package com.xuecheng.messagesdk.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.xuecheng.messagesdk.model.po.MqMessage;
import java.util.List;
/**
* <p>
* 服务类
* </p>
*
* @author Mr.M
* @since 2022-09-21
*/
public interface MqMessageService extends IService<MqMessage> {
/**
* @description 扫描消息表记录,采用与扫描视频处理表相同的思路
* @param shardIndex 分片序号
* @param shardTotal 分片总数
* @param count 扫描记录数
* @return java.util.List 消息记录
*/
public List<MqMessage> getMessageList(int shardIndex, int shardTotal, String messageType,int count);
public MqMessage addMessage(String messageType,String businessKey1,String businessKey2,String businessKey3);
public int completed(long id);
public int completedStageOne(long id);
public int completedStageTwo(long id);
public int completedStageThree(long id);
public int completedStageFour(long id);
public int getStageOne(long id);
public int getStageTwo(long id);
public int getStageThree(long id);
public int getStageFour(long id);
}
这些SDK已经写好了,我们仅需将资料中的SDK微服务拷贝到 项目代码目录下
消息SDK提供消息处理抽象类,此抽象类供使用方去继承使用,
首先谁用谁添加依赖
如在内容管理服务用
那么刚才往消息表里面写数据可以用sdk工具包的内容,补全往课程发布表添加项目的函数
private void saveCoursePublishMessage(Long courseId){
MqMessage mqMessage = mqMessageService.addMessage("course_publish", String.valueOf(courseId), null, null);
if(mqMessage==null){
XueChengPlusException.cast(CommonError.UNKOWN_ERROR);
}
}
ok 插入消息完成
后面该处理消息了
课程发布任务处理
关于消息的处理,SDK工具包也帮我们写了一部分
package com.xuecheng.messagesdk.service;
import com.xuecheng.messagesdk.model.po.MqMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.concurrent.*;
/**
* @author Mr.M
* @version 1.0
* @description 消息处理抽象类
* @date 2022/9/21 19:44
*/
@Slf4j
@Data
public abstract class MessageProcessAbstract {
@Autowired
MqMessageService mqMessageService;
/**
* @param mqMessage 执行任务内容
* @return boolean true:处理成功,false处理失败
* @description 任务处理
* @author Mr.M
* @date 2022/9/21 19:47
*/
public abstract boolean execute(MqMessage mqMessage);
/**
* @description 扫描消息表多线程执行任务
* @param shardIndex 分片序号
* @param shardTotal 分片总数
* @param messageType 消息类型
* @param count 一次取出任务总数
* @param timeout 预估任务执行时间,到此时间如果任务还没有结束则强制结束 单位秒
* @return void
* @author Mr.M
* @date 2022/9/21 20:35
*/
public void process(int shardIndex, int shardTotal, String messageType,int count,long timeout) {
try {
//扫描消息表获取任务清单
List<MqMessage> messageList = mqMessageService.getMessageList(shardIndex, shardTotal,messageType, count);
//任务个数
int size = messageList.size();
log.debug("取出待处理消息"+size+"条");
if(size<=0){
return ;
}
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(size);
//计数器
CountDownLatch countDownLatch = new CountDownLatch(size);
messageList.forEach(message -> {
threadPool.execute(() -> {
log.debug("开始任务:{}",message);
//处理任务
try {
boolean result = execute(message);
if(result){
log.debug("任务执行成功:{})",message);
//更新任务状态,删除消息表记录,添加到历史表
int completed = mqMessageService.completed(message.getId());
if (completed>0){
log.debug("任务执行成功:{}",message);
}else{
log.debug("任务执行失败:{}",message);
}
}
} catch (Exception e) {
e.printStackTrace();
log.debug("任务出现异常:{},任务:{}",e.getMessage(),message);
}
//计数
countDownLatch.countDown();
log.debug("结束任务:{}",message);
});
});
//等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务
countDownLatch.await(timeout,TimeUnit.SECONDS);
System.out.println("结束....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
可以看到,执行逻辑(任务分配逻辑)和之前的处理视频一致,只不过,这里的execute()函数就是具体的执行过程,需要我们自己取编写,
所以秩序继承这个抽象类作为一个模板来重写他的具体任务执行逻辑即可
现在我们来直接编写执行器任务
service工程导入xxljob依赖
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>
在内容service下新建jobhandler(存放任务)
编写任务需要继承刚才的抽象类MessageProcessAbstract(抽象类帮我们分配任务)
下面简单写一下三个小任务的伪代码(具体实现后面再写)
@Slf4j
@Component
public class CoursePublishTask extends MessageProcessAbstract {
//课程发布任务处理
@Override
public boolean execute(MqMessage mqMessage) {
//获取消息相关的业务信息
String businessKey1 = mqMessage.getBusinessKey1();
long courseId = Int.parseInt(businessKey1);
//1、课程静态化 将课程预览静态资源放在MinIO
generateCourseHtml(mqMessage,courseId);
//2、将课程信息存储在 ES 索引库里面
saveCourseIndex(mqMessage,courseId);
//3、将课程信息存储在 Redis里面
saveCourseCache(mqMessage,courseId);
return true;
}
//生成课程静态化页面并上传至文件系统
public void generateCourseHtml(MqMessage mqMessage,long courseId){
log.debug("开始进行课程静态化,课程id:{}",courseId);
//消息id
Long id = mqMessage.getId();
//消息处理的service
MqMessageService mqMessageService = this.getMqMessageService();
//消息幂等性处理(设置该阶段的状态)
int stageOne = mqMessageService.getStageOne(id);
if(stageOne >0){
log.debug("课程静态化已处理直接返回,课程id:{}",courseId);
return ;
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//保存第一阶段状态
mqMessageService.completedStageOne(id);
}
//将课程信息缓存至redis
public void saveCourseCache(MqMessage mqMessage,long courseId){
log.debug("将课程信息缓存至redis,课程id:{}",courseId);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//保存课程索引信息
public void saveCourseIndex(MqMessage mqMessage,long courseId){
log.debug("保存课程索引信息,课程id:{}",courseId);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
添加调度入口:
//任务调度入口
@XxlJob("CoursePublishJobHandler")
public void coursePublishJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
log.debug("shardIndex="+shardIndex+",shardTotal="+shardTotal);
//参数:分片序号、分片总数、消息类型、一次最多取到的任务数量、一次任务调度执行的超时时间
process(shardIndex,shardTotal,"course_publish",30,60);
}
在xxl-job配置任务
nacos配置xxljob执行器信息,才能被发现
xxl:
job:
admin:
addresses: http://192.168.101.65:18088/xxl-job-admin/
executor:
appname: course-publish-job
address:
ip:
port: 10999
logpath: /data/applogs/xxl-job-jobhandler
logretentiondays: 30
accessToken: default_token
xxljob还有个config,直接复制 mediaservice里面的
添加课程发布执行器
捯饬了半天终于上报成功了
在xxl-job添加任务
- JobHandler中填写@XxlJob注解中的内容
启动任务测试,控制台可以看到发布任务执行中...
字样
当然记得在发布表上添加数据哈
测试成功
至此,分布式事务的大体框架搭建完成
下面开始编写分布式任务的具体任务
子任务:页面静态化
页面静态化介绍
课程预览功能通过模板引擎技术在页面模板中填充数据,生成html页面,这个过程是当客户端请求服务器时服务器才开始渲染生成html页面,最后响应给浏览器,服务端渲染的并发能力是有限的。
页面静态化则强调将生成html页面的过程提前,提前使用模板引擎技术生成html页面,当客户端请求时直接请求html页面,由于是静态页面可以使用nginx、apache等高性能的web服务器访问,并发性能高。(就不用请求web服务器了)
什么时候能用页面静态化技术?
当数据变化不频繁,一旦生成静态页面很长一段时间内很少变化,此时可以使用页面静态化。因为如果数据变化频繁,一旦改变就需要重新生成静态页面,导致维护静态页面的工作量很大。
根据课程发布的业务需求,虽然课程发布后仍可以修改课程信息,但需要经过课程审核,且修改频度不大,所以适合使用页面静态化。
静态化测试
下边使用freemarker技术对页面静态化生成html页面。
在内容管理service工程中添加freemarker依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
编写测试方法
package com.xuecheng.content;
@SpringBootTest
public class FreemarkerTest {
@Autowired
CoursePublishService coursePublishService;
//测试页面静态化
@Test
public void testGenerateHtmlByTemplate() throws IOException, TemplateException {
//配置freemarker
Configuration configuration = new Configuration(Configuration.getVersion());
//加载模板
//选指定模板路径,classpath下templates下
//得到classpath路径
String classpath = this.getClass().getResource("/").getPath();
configuration.setDirectoryForTemplateLoading(new File(classpath + "/templates/"));
//设置字符编码
configuration.setDefaultEncoding("utf-8");
//指定模板文件名称
Template template = configuration.getTemplate("course_template.ftl");
//准备数据
CoursePreviewDto coursePreviewInfo = coursePublishService.getCoursePreviewInfo(2L);
Map<String, Object> map = new HashMap<>();
map.put("model", coursePreviewInfo);
//静态化
//参数1:模板,参数2:数据模型
String content = FreeMarkerTemplateUtils.processTemplateIntoString(template, map);
System.out.println(content);
//将静态化内容输出到文件中
InputStream inputStream = IOUtils.toInputStream(content);
//输出流
FileOutputStream outputStream = new FileOutputStream("D:\\develop\\test.html");
IOUtils.copy(inputStream, outputStream);
}
}
185行报错了,明天醒过来在改bug吧
又又又发现了新bug,通过nignx访问http://www.51xuecheng.cn/api/content/coursepreview/1#发现无法访问了
最后发现,原因是开了梯子之后改了我的代理
把这个取消勾选即可
太蠢了。
但原来的bug还是没有解决。
解决:
报错信息来看是:由于在模板的第183行中的表达式${secondNode.teachplanMedia.teachplanId!''}
中的secondNode.teachplanMedia
为null或不存在所导致的。
存在,并采取相应的处理措施。你可以在${secondNode.teachplanMedia.teachplanId!''}
表达式之前添加一个条件检查,例如:
perlCopy code<#if secondNode.teachplanMedia??>
<li><a href="http://www.51xuecheng.cn/course/preview/learning.html?id=${model.courseBase.id!''}&chapter=${secondNode.teachplanMedia.teachplanId!''}" target="_blank">${secondNode.pname!''}</a></li>
</#if>
这样,如果secondNode.teachplanMedia
为null或不存在,整个<li>
元素将被跳过,避免出现空指针异常。
终于通了,哭了
成功生成静态页面:
上传文件测试(内容管理任务远程调用媒资管理上传服务)
微服务之间难免会存在远程调用,在Spring Cloud中可以使用Feign进行远程调用,
Feign是一个声明式的http客户端,官方地址:https://github.com/OpenFeign/feign
其作用就是帮助我们优雅的实现http请求的发送,解决上面提到的问题。
下边先准备Feign的开发环境:
1、在内容管理content-service工程添加依赖:
注意,只是openfeign还不能支持文件参数,还需要导入一个组件才能支持 multipart传参
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- Spring Cloud 微服务远程调用 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>
<!--feign支持Multipart格式传参-->
<dependency>
<groupId>io.github.openfeign.form</groupId>
<artifactId>feign-form</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>io.github.openfeign.form</groupId>
<artifactId>feign-form-spring</artifactId>
<version>3.8.0</version>
</dependency>
解释一下,discovery 是将自己的服务像上报,以及获取其他实例地址,后面的不用解释
2、远程调用时可能发生熔断(下级服务宕机),需要打开熔断开关
单独定义一个文件,在nacos配置feign-dev.yaml公用配置文件
feign:
hystrix:
enabled: true
circuitbreaker:
enabled: true
hystrix:
command:
default:
execution:
isolation:
thread:
timeoutInMilliseconds: 30000 #熔断超时时间
ribbon:
ConnectTimeout: 60000 #连接超时时间
ReadTimeout: 60000 #读超时时间
MaxAutoRetries: 0 #重试次数
MaxAutoRetriesNextServer: 1 #切换实例的重试次数
3、在调用方(centent service )以及配调用方 (media api) 都引入共享配置
shared-configs:
- data-id: feign-${spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true
4、在内容管理service工程配置feign支持Multipart,拷贝课程资料下的MultipartSupportConfig 到content-service工程下的config包下。
被调用方
现在需要将课程的静态文件上传到minio,
但仅仅使用 上传图片接口是不行的,他的储存方式是 按照年月日存储
现在呢,我想单独存储到course目录下objectname为”课程id.html”,那么
在原有的上传文件接口需要增加一个参数 objectname。
方案一: 可以在原来上传文件的基础上 增加一个参数,增加objectname (@RequestParam required =false),判断不为空就传到指定路径
如果为空,那就按照年月日格式上传。
方案二:可以重新编写一个函数
那么对原来的接口做如下改变(方案1)
@ApiOperation("上传文件")
@RequestMapping(value = "/upload/coursefile",consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public UploadFileResultDto upload(@RequestPart("filedata") MultipartFile filedata,
@RequestParam(value= "objectName",required=false) String objectName) throws IOException{
//....
}
service接口也增加一个参数:
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, String localFilePath,String objectName);
实现:
修改原有uploadFile方法,判断如果objectName为空则采取年月日样式的路径方式。
//存储到minio中的对象名(带目录)
if(StringUtils.isEmpty(objectName)){
objectName = defaultFolderPath + fileMd5 + extension;
}
// String objectName = defaultFolderPath + fileMd5 + extension;
调用方
编写远程调用接口
在contentservice 下创建feignclient 包,用于存放调用方接口函数
创建接口
如果相信远程调用,首先在接口上加 feignclient相关注解
1、调用方需要知道要调用的接口是什么?
当然是这个
@ApiOperation("上传文件")
@RequestMapping(value = "/upload/coursefile",consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
@ResponseBody
public UploadFileResultDto upload(@RequestPart("filedata") MultipartFile filedata,
@RequestParam(value = "folder",required=false) String folder,
@RequestParam(value = "objectName",required=false) String objectName)
把这个接口函数复制到调用接口里面,后续修改。
1.1、补全请求路径,@RequestMapping 路径改为 “media/upload/coursefile” ,因为原来微服务配置里面帮忙在前面加了,但是在理作为被调用方,不会自动加
1.2、在接口上面添加@FeignClient() 注解,并在参数中指定调用的微服务名称@FeignClient(“media-api”)
2、原始的Feign不支持Multipart格式的传参,所以还需要加上配置类configuration = MultipartSupportConfig.class
最终接口上的注解应该为 @FeignClient(value = “media-api”,configuration = MultipartSupportConfig.class)
完整应该长这样
发现返回值爆红,是因为还没导报,我们暂时先不导,将其转成String
编写一个调用类(测试)
@SpringBootTest
public class FeignUploadTest {
@Autowired
MediaServiceClient mediaServiceClient;
//远程调用,上传文件
@Test
public void test() {
MultipartFile multipartFile = MultipartSupportConfig.getMultipartFile(new File("D:\\develop\\test.html"));
mediaServiceClient.uploadFile(multipartFile,"course","test.html");
}
}
最后执行还要在启动类上加上@EnableFeignClients注解!!!
因为这里是单元测试,只需要在 service test里面的启动类加上即可
@SpringBootApplication
@EnableFeignClients(basePackages={"com.xuecheng.content.feignclient"})
public class XC_ContentServiceApplication {
public static void main(String[] args) {
SpringApplication.run(XC_ContentServiceApplication.class,args);
}
}
报错,说找不到服务
1、排查配置:
调用方和被调用方都需要将自己的服务上报。(配置discovery 以及 config)
2、依赖
测试成功
然后我们试试直接访问资源
输入网址
http://192.168.101.65:9000/mediafiles/course/test124.html
可以的,但是没有样式,要想访问样式只能从主站走,那么
试着用nignx访问
那么需要增加一个主站下的代理
# 课程详情页的静态界面
# www.51xuecheng.cn/course/test124.html ->file.51xuecheng.cn/mediafiles/course/test124.html
location /course/ {
proxy_pass http://fileserver/mediafiles/course/; # 后面的斜杠一定不能少!!!!!
}
成功!!!!(又遇到梯子把我的代理给屏蔽了的情况,气死我了)
熔断降级
微服务中难免存在服务之间的远程调用,当微服务运行不正常会导致无法正常调用微服务,此时会出现异常,如果这种异常不去处理可能导致雪崩效应。
如何解决由于微服务异常引起的雪崩效应呢?
可以采用熔断、降级的方法去解决。
熔断降级是两个步骤
熔断:下游服务有问题,触发熔断(相当于异常),可以通过负载均衡访问其他下游服务实例
降级:熔断触发,且没有可以使用的下游服务实例,走走另一套处理逻辑,或者走另一套服务,而不去再调用原来的熔断的服务。
开启熔断(调用方和被调用方都)
前面其实已经做过了引入通用配置,配置里面开启熔断
feign:
hystrix:
enabled: true
circuitbreaker:
enabled: true
设置熔断的超时时间,为了防止一次处理时间较长触发熔断这里还需要设置请求和连接的超时时间,如下:
hystrix:
command:
default:
execution:
isolation:
thread:
timeoutInMilliseconds: 30000 #熔断超时时间
ribbon:
ConnectTimeout: 60000 #连接超时时间
ReadTimeout: 60000 #读超时时间
MaxAutoRetries: 0 #重试次数
MaxAutoRetriesNextServer: 1 #切换实例的重试次数
熔断是开启熔断远程调用出 超时/切换实例/异常后/ 自动触发的,而降级需要我们编写降级处理逻辑,
有两种方法实现降级
降级方案一 fallback:
在定义 (@feignclient)feignclient接口的时候,在注解上添加 fallback 属性,fallback 指定一个类(.class),而这个类 实现了 feignclient接口。
缺点:无法取出熔断抛出的异常
@FeignClient(value = "media-api",configuration = MultipartSupportConfig.class,fallback = MediaServiceClientFallback.class)
public interface MediaServiceClient {
@RequestMapping(value = "media/upload/coursefile",consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public String upload(@RequestPart("filedata") MultipartFile filedata,
@RequestParam(value = "folder",required=false) String folder,
@RequestParam(value = "objectName",required=false) String objectName);
}
public class MediaServiceClientFallback implements MediaServiceClient{
@Override
public String upload(MultipartFile filedata, String folder, String objectName) {
throw new XuechengPlusException("调用失败");
// return null;
}
}
降级方案二 : fallbackfactory
在定义 (@feignclient)feignclient接口的时候,在注解上添加 fallbackFactory 属性, fallbackFactory 指定一个工厂类实习类,而这个工厂类实现一个fallbackFactory 工厂接口,接口需要指定feignclient接口 作为泛型, 其次工厂实体类需要重写里面的create方法
create方法自带一个Trowable 参数,这个参数其实就是 下游微服务爆出的异常。,其次create方法的返回值是 feignclient接口
也就是需要 也就是你需要返回一个 实现了feignclient接口的类(也就是书写处理逻辑的地方),一般写一个匿名内部类,重写降级处理逻辑方法
@FeignClient(value = "media-api",configuration = MultipartSupportConfig.class,fallbackFactory = MediaServiceClientFallbackFactory.class)
@Slf4j
@Component
public class MediaServiceClientFallbackFactory implements FallbackFactory<MediaServiceClient> {
@Override
public MediaServiceClient create(Throwable throwable) {
return new MediaServiceClient(){
@Override
public String upload(MultipartFile upload, String folder,String objectName) {
//降级方法
log.debug("调用媒资管理服务上传文件时发生熔断,异常信息:{}",throwable.toString(),throwable);
return null;
}
};
}
}
降级处理逻辑:
返回一个null对象,上游服务请求接口得到一个null说明执行了降级处理。
测试:
停止媒资管理服务或人为制造异常观察是否执行降级逻辑。
完善页面静态化任务
首先定义好页面静态化 以及 上传静态化文件到minio 这两个 函数
在课程发布的service编写这两部分内容,最后通过消息去调度执行
public File generateCourseHtml(Long courseId);
public void uploadCourseHtml(Long courseId,File file);
接口实现
@Override
public File generateCourseHtml(Long courseId) {
//静态化文件
File htmlFile = null;
try {
//配置freemarker
Configuration configuration = new Configuration(Configuration.getVersion());
//加载模板
//选指定模板路径,classpath下templates下
//得到classpath路径
String classpath = this.getClass().getResource("/").getPath();
configuration.setDirectoryForTemplateLoading(new File(classpath + "/templates/"));
//设置字符编码
configuration.setDefaultEncoding("utf-8");
//指定模板文件名称
Template template = configuration.getTemplate("course_template.ftl");
//准备数据
CoursePreviewDto coursePreviewInfo = this.getCoursePreviewInfo(courseId);
Map<String, Object> map = new HashMap<>();
map.put("model", coursePreviewInfo);
//静态化
//参数1:模板,参数2:数据模型
String content = FreeMarkerTemplateUtils.processTemplateIntoString(template, map);
// System.out.println(content);
//将静态化内容输出到文件中
InputStream inputStream = IOUtils.toInputStream(content);
//创建静态化文件
htmlFile = File.createTempFile("course",".html");
log.debug("课程静态化,生成静态文件:{}",htmlFile.getAbsolutePath());
//输出流
FileOutputStream outputStream = new FileOutputStream(htmlFile);
IOUtils.copy(inputStream, outputStream);
} catch (Exception e) {
log.error("课程静态化异常:{}",e.toString());
XuechengPlusException.cast("课程静态化异常");
}
return htmlFile;
}
@Override
public void uploadCourseHtml(Long courseId, File file) {
try{
MultipartFile multipartFile = MultipartSupportConfig.getMultipartFile(file);
String course = mediaServiceClient.upload(multipartFile,"", "course/"+courseId+".html");
if(course==null){
XuechengPlusException.cast("上传静态文件异常");
}
}
catch (Exception ex){
log.error("课程静态化文件上传异常:{}",ex.toString());
XuechengPlusException.cast("课程静态化文件上传异常");
}
}
任务完善
@Autowired
CoursePublishService coursePublishService;
public void generateCourseHtml(MqMessage mqMessage,long courseId){
log.debug("开始进行课程静态化,课程id:{}",courseId);
//消息id
Long id = mqMessage.getId();
//消息处理的service
MqMessageService mqMessageService = this.getMqMessageService();
//消息幂等性处理
int stageOne = mqMessageService.getStageOne(id);
if(stageOne == 1){
log.debug("课程静态化已处理直接返回,课程id:{}",courseId);
return ;
}
//生成静态化页面
File file = coursePublishService.generateCourseHtml(courseId);
//上传静态化页面
if(file!=null){
coursePublishService.uploadCourseHtml(courseId,file);
}else{
XuechengPlusException.cast("生成的静态文件为空");
}
//保存第一阶段状态
mqMessageService.completedStageOne(id);
}
注意还需要开启启动类(api工程中)上的 Feign 的那个注解
@EnableFeignClients(basePackages={"com.xuecheng.content.feignclient"})
api 引入熔断配置
测试 124 课程
删除课程发布表的124行
更改课程基本信息表的124课程,修改其 (发布)状态 status 为 203001 (未发布) 审核状态audit status 为 202004(未审核)
稍微修改一下课程信息,如我改了名称
修改后保存,提交审核
点击后界面弹出提交成功
可以在预发布表看到这门课程
审核
人工修改他的审核状态 审核状态改为202004 审核通过
并修改基本信息表审核状态为通过
发布
点击的同时他会他预发布表的相关信息写道发布表以及信息表
先查看发布表,有了
再查看信息表
都有啦
那么启动任务调度,让他扫描消息
似乎是启动成功了:
查看minio
yeeeeeeeeeeh!!
别急,为了保证消息幂等性,看看消息是否还存在?,发现没有啦,跑通!!!
子任务:课程搜索
采用全文检索的方式取搜索数据
传统搜索(先找文章再找词),先找文档(行数据)在分别判断行数据的每一个属性(全文)里面是否包含关键字
全文检索(先找词再找文章),集体是实现是计算机索引程序通过扫描文章中的每一个词,对每一个词建立一个索引(倒排索引),指明该词在哪个文章中出现?以及文章中出现的次数和位置,当用户查询时,检索程序就根据事先建立的索引进行查找,并将查找的结果反馈给用户的检索方式。这个过程类似于通过字典中的检索字表查字的过程。
这种全文检索技术我们使用ElasticSearch实现,
课程搜索也要将课程信息建立索引,在课程发布时建立课程索引,索引建立好用户可通过搜索网页去查询课程信息。
所以课程搜索这部分模块,我们有两个任务,
1、机构端 在数据插入时(课程发布时),创建(添加到)索引 (这也是我们子任务之一)
2、用户端 在搜索数据时(搜索课程时),利用索引返回文章(这里返回课程号,到时候之间用niginx返回静态化后的课程数据)(虽然是子任务之外的,但是为了行文逻辑流畅就放这里了)
ES KB环境部署
elasticsearch和kibana我之前已经安装在docker里面了,直接用即可
docker restart elasticsearch
docker restart kibana
试试通过kibanana浏览一下 端口号 5601
可以通过以下方式查看已经建立的索引
嗯。这还是几个月以前玩剩下的
索引相当于MySQL中的表,Elasticsearch与MySQL之间概念的对应关系见下表:
要使用elasticsearch需要建立索引,Mapping相当于表结构,Mapping创建后其字段不能删除(要删除只能删除整个表。。。)
总体而言记住上面的与数据库的对应关系表就行。
那么我们先创建这个发布课程的这个索引,
先复制以下 http命令
PUT /course-publish
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"companyId": {
"type": "keyword"
},
"companyName": {
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"type": "text"
},
"name": {
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"type": "text"
},
"users": {
"index": false,
"type": "text"
},
"tags": {
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"type": "text"
},
"mt": {
"type": "keyword"
},
"mtName": {
"type": "keyword"
},
"st": {
"type": "keyword"
},
"stName": {
"type": "keyword"
},
"grade": {
"type": "keyword"
},
"teachmode": {
"type": "keyword"
},
"pic": {
"index": false,
"type": "text"
},
"description": {
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"type": "text"
},
"createDate": {
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
},
"status": {
"type": "keyword"
},
"remark": {
"index": false,
"type": "text"
},
"charge": {
"type": "keyword"
},
"price": {
"type": "scaled_float",
"scaling_factor": 100
},
"originalPrice": {
"type": "scaled_float",
"scaling_factor": 100
},
"validDays": {
"type": "integer"
}
}
}
}
粘贴运行即可
成功
通过 GET /_cat/indices?v 查询所有的索引,查找course-publish是否创建成功。
通过GET /course-publish/_mapping 查询course-publish的索引结构。
部署搜索工程
拷贝黑马提供的搜索工程到项目根目录
修改配置文件 上报服务、接收配置
nacos配置
server: servlet: context-path: /search port: 53080 elasticsearch: hostlist: 192.168.101.128:9200 course: index: course-publish source_fields: id,companyId,companyName,name,users,grade,mt,mtName,st,stName,charge,pic,price,originalPrice,description,teachmode,validDays,createDate
启动网关、搜索服务测试一下
### 添加课程索引
POST {{search_host}}/search/index/course
Content-Type: application/json
{
"charge" : "201000",
"companyId" : 100000,
"companyName" : "北京黑马程序",
"createDate" : "2022-09-25 09:36:11",
"description" : "《Spring编程思想》是2007年6月1日机械工业出版社出版的图书,作者是埃克尔,译者是陈昊鹏。主要内容本书赢得了全球程序员的广泛赞誉,即使是最晦涩的概念,在Bruce Eckel的文字亲和力和小而直接的编程示例面前也会化解于无形。从Java的基础语法到最高级特性(深入的面向对象概念、多线程、自动项目构建、单元测试和调试等),本书都能逐步指导你轻松掌握。从本书获得的各项大奖以及来自世界各地的读者评论中,不难看出这是一本经典之作",
"grade" : "204001",
"id" : 102,
"mt" : "1-3",
"mtName" : "编程开发",
"name" : "Spring编程思想",
"originalPrice" : 200.0,
"pic" : "/mediafiles/2022/09/20/1d0f0e6ed8a0c4a89bfd304b84599d9c.png",
"price" : 100.0,
"remark" : "没有备注",
"st" : "1-3-2",
"stName" : "Java语言",
"status" : "203002",
"tags" : "没有标签",
"teachmode" : "200002",
"validDays" : 222
}
### 搜索课程
GET {{search_host}}/search/course/list?pageNo=1&keywords=spring
Content-Type: application/json
进入前端搜索界面http://www.51xuecheng.cn/course/search.html
可以看到刚才添加的那一条。
简单看看搜索工程代码
配置类
config包下只有一个ElasticSearchConfig,主要是提供了一个Java的客户端来操作ES的,将其注册为一个bean
@Configuration
public class ElasticsearchConfig {
// 1. 从nacos中读取es的地址,不过我这里只部署了单体ES,它这里可能是ES集群
@Value("${elasticsearch.hostlist}")
private String hostlist;
@Bean
public RestHighLevelClient restHighLevelClient() {
//2. 解析hostlist配置信息
String[] split = hostlist.split(",");
//3. 创建HttpHost数组,其中存放es主机和端口的配置信息
HttpHost[] httpHostArray = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
String item = split[i];
httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
}
//4. 创建RestHighLevelClient客户端
return new RestHighLevelClient(RestClient.builder(httpHostArray));
}
}
不过只部署一个节点的话,可以简化一下代码
@Configuration
public class ElasticsearchConfig {
@Value("${elasticsearch.hostlist}")
private String hostlist;
@Bean
public RestHighLevelClient restHighLevelClient() {
return new RestHighLevelClient(RestClient.builder(hostlist));
}
}
Controller
课程索引接口
- 该接口根据课程信息来创建索引
@Api(value = "课程信息索引接口", tags = "课程信息索引接口")
@RestController
@RequestMapping("/index")
public class CourseIndexController {
@Value("${elasticsearch.course.index}")
private String courseIndexStore;
@Autowired
IndexService indexService;
@ApiOperation("添加课程索引")
@PostMapping("course")
public Boolean add(@RequestBody CourseIndex courseIndex) {
Long id = courseIndex.getId();
if (id == null) {
XueChengPlusException.cast("课程id为空");
}
Boolean result = indexService.addCourseIndex(courseIndexStore, String.valueOf(id), courseIndex);
if (!result) {
XueChengPlusException.cast("添加课程索引失败");
}
return true;
}
}
课程搜索接口
@Api(value = "课程搜索接口", tags = "课程搜索接口")
@RestController
@RequestMapping("/course")
public class CourseSearchController {
@Autowired
CourseSearchService courseSearchService;
@ApiOperation("课程搜索列表")
@GetMapping("/list")
public SearchPageResultDto<CourseIndex> list(PageParams pageParams, SearchCourseParamDto searchCourseParamDto) {
return courseSearchService.queryCoursePubIndex(pageParams, searchCourseParamDto);
}
}
dto
定义了两个dto类,一个是接收搜索课程参数的
@Data
@ToString
public class SearchCourseParamDto {
//关键字
private String keywords;
//大分类
private String mt;
//小分类
private String st;
//难度等级
private String grade;
}
另一个是网站门户显示的
@Data
@ToString
public class SearchPageResultDto<T> extends PageResult {
//大分类列表
List<String> mtList;
//小分类列表
List<String> stList;
public SearchPageResultDto(List<T> items, long counts, long page, long pageSize) {
super(items, counts, page, pageSize);
}
}
service
定义了两个Service接口
课程索引Service,提供了三个接口,添加索引、更新索引、删除索引
public interface IndexService {
Boolean addCourseIndex(String indexName, String id, Object object);
Boolean updateCourseIndex(String indexName, String id, Object object);
Boolean deleteCourseIndex(String indexName, String id);
}
搜索
public interface CourseSearchService {
SearchPageResultDto<CourseIndex> queryCoursePubIndex(PageParams pageParams, SearchCourseParamDto searchCourseParamDto);
}
Impl
对应的两个实现类
课程索引接口实现
@Slf4j
@Service
public class IndexServiceImpl implements IndexService {
@Autowired
RestHighLevelClient client;
@Override
public Boolean addCourseIndex(String indexName, String id, Object object) {
String jsonString = JSON.toJSONString(object);
IndexRequest indexRequest = new IndexRequest(indexName).id(id);
//指定索引文档内容
indexRequest.source(jsonString, XContentType.JSON);
//索引响应对象
IndexResponse indexResponse = null;
try {
indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("添加索引出错:{}", e.getMessage());
e.printStackTrace();
XueChengPlusException.cast("添加索引出错");
}
String name = indexResponse.getResult().name();
System.out.println(name);
return name.equalsIgnoreCase("created") || name.equalsIgnoreCase("updated");
}
@Override
public Boolean updateCourseIndex(String indexName, String id, Object object) {
String jsonString = JSON.toJSONString(object);
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
updateRequest.doc(jsonString, XContentType.JSON);
UpdateResponse updateResponse = null;
try {
updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("更新索引出错:{}", e.getMessage());
e.printStackTrace();
XueChengPlusException.cast("更新索引出错");
}
DocWriteResponse.Result result = updateResponse.getResult();
return result.name().equalsIgnoreCase("updated");
}
@Override
public Boolean deleteCourseIndex(String indexName, String id) {
//删除索引请求对象
DeleteRequest deleteRequest = new DeleteRequest(indexName, id);
//响应对象
DeleteResponse deleteResponse = null;
try {
deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("删除索引出错:{}", e.getMessage());
e.printStackTrace();
XueChengPlusException.cast("删除索引出错");
}
//获取响应结果
DocWriteResponse.Result result = deleteResponse.getResult();
return result.name().equalsIgnoreCase("deleted");
}
}
搜索实现
@Slf4j
@Service
public class CourseSearchServiceImpl implements CourseSearchService {
@Value("${elasticsearch.course.index}")
private String courseIndexStore;
@Value("${elasticsearch.course.source_fields}")
private String sourceFields;
@Autowired
RestHighLevelClient client;
@Override
public SearchPageResultDto<CourseIndex> queryCoursePubIndex(PageParams pageParams, SearchCourseParamDto courseSearchParam) {
//设置索引
SearchRequest searchRequest = new SearchRequest(courseIndexStore);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//source源字段过虑
String[] sourceFieldsArray = sourceFields.split(",");
searchSourceBuilder.fetchSource(sourceFieldsArray, new String[]{});
if (courseSearchParam == null) {
courseSearchParam = new SearchCourseParamDto();
}
//关键字
if (StringUtils.isNotEmpty(courseSearchParam.getKeywords())) {
//匹配关键字
MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(courseSearchParam.getKeywords(), "name", "description");
//设置匹配占比
multiMatchQueryBuilder.minimumShouldMatch("70%");
//提升另个字段的Boost值
multiMatchQueryBuilder.field("name", 10);
boolQueryBuilder.must(multiMatchQueryBuilder);
}
//过虑
if (StringUtils.isNotEmpty(courseSearchParam.getMt())) {
boolQueryBuilder.filter(QueryBuilders.termQuery("mtName", courseSearchParam.getMt()));
}
if (StringUtils.isNotEmpty(courseSearchParam.getSt())) {
boolQueryBuilder.filter(QueryBuilders.termQuery("stName", courseSearchParam.getSt()));
}
if (StringUtils.isNotEmpty(courseSearchParam.getGrade())) {
boolQueryBuilder.filter(QueryBuilders.termQuery("grade", courseSearchParam.getGrade()));
}
//分页
Long pageNo = pageParams.getPageNo();
Long pageSize = pageParams.getPageSize();
int start = (int) ((pageNo - 1) * pageSize);
searchSourceBuilder.from(start);
searchSourceBuilder.size(Math.toIntExact(pageSize));
//布尔查询
searchSourceBuilder.query(boolQueryBuilder);
//高亮设置
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.preTags("<font class='eslight'>");
highlightBuilder.postTags("</font>");
//设置高亮字段
highlightBuilder.fields().add(new HighlightBuilder.Field("name"));
searchSourceBuilder.highlighter(highlightBuilder);
//请求搜索
searchRequest.source(searchSourceBuilder);
//聚合设置
buildAggregation(searchRequest);
SearchResponse searchResponse = null;
try {
searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
log.error("课程搜索异常:{}", e.getMessage());
return new SearchPageResultDto<CourseIndex>(new ArrayList(), 0, 0, 0);
}
//结果集处理
SearchHits hits = searchResponse.getHits();
SearchHit[] searchHits = hits.getHits();
//记录总数
TotalHits totalHits = hits.getTotalHits();
//数据列表
List<CourseIndex> list = new ArrayList<>();
for (SearchHit hit : searchHits) {
String sourceAsString = hit.getSourceAsString();
CourseIndex courseIndex = JSON.parseObject(sourceAsString, CourseIndex.class);
//取出source
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
//课程id
Long id = courseIndex.getId();
//取出名称
String name = courseIndex.getName();
//取出高亮字段内容
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
if (highlightFields != null) {
HighlightField nameField = highlightFields.get("name");
if (nameField != null) {
Text[] fragments = nameField.getFragments();
StringBuffer stringBuffer = new StringBuffer();
for (Text str : fragments) {
stringBuffer.append(str.string());
}
name = stringBuffer.toString();
}
}
courseIndex.setId(id);
courseIndex.setName(name);
list.add(courseIndex);
}
SearchPageResultDto<CourseIndex> pageResult = new SearchPageResultDto<>(list, totalHits.value, pageNo, pageSize);
//获取聚合结果
List<String> mtList = getAggregation(searchResponse.getAggregations(), "mtAgg");
List<String> stList = getAggregation(searchResponse.getAggregations(), "stAgg");
pageResult.setMtList(mtList);
pageResult.setStList(stList);
return pageResult;
}
private void buildAggregation(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("mtAgg")
.field("mtName")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("stAgg")
.field("stName")
.size(100)
);
}
private List<String> getAggregation(Aggregations aggregations, String aggName) {
// 4.1.根据聚合名称获取聚合结果
Terms brandTerms = aggregations.get(aggName);
// 4.2.获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
// 4.3.遍历
List<String> brandList = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
// 4.4.获取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}
}
索引管理
emmm。看到实现那部分就有点头大了,尤其是搜索那部分,什么高亮,聚合,几乎都忘了差不多了,来个小复习
REST API
添加文档
索引(结构mapping)创建好就可以向其中添加文档,此时elasticsearch会根据索引的mapping配置对有些字段进行分词。
语法:
POST /{索引库名}/_doc/{id}
{
json ...
}
如:
POST /course-publish/_doc/103
{
"charge" : "201001",
"companyId" : 100000,
"companyName" : "北京黑马程序",
"createDate" : "2022-09-25 09:36:11",
"description" : "HTML/CSS",
"grade" : "204001",
"id" : 102,
"mt" : "1-1",
"mtName" : "前端开发",
"name" : "Html参考大全",
"originalPrice" : 200.0,
"pic" : "/mediafiles/2022/09/20/e726b71ba99c70e8c9d2850c2a7019d7.jpg",
"price" : 100.0,
"remark" : "没有备注",
"st" : "1-1-1",
"stName" : "HTML/CSS",
"status" : "203002",
"tags" : "没有标签",
"teachmode" : "200002",
"validDays" : 222
}
如果要修改文档的内容可以使用上边相同的方法,如果没有则添加,如果存在则更新。
查询文档
语法如下
GET /{索引库名称}/_doc/{id}
修改文档,
分为全量修改和增量修改
可以看到全量修改 和 增加共用一套api
POST /{索引库名}/_doc/{id}
{
json ...
}
增量修改
POST /{索引库名}/_update/{id}
{
"doc":{
"email":"BestApex@Apex.net",
"info":"恐怖G7人--马文"
}
}
接口定义
- 当课程发布时,请求添加课程接口,添加课程信息到索引
- 当课程下架时,请求删除课程接口,从索引中删除课程信息
- 这里先实现添加课程接口
根据索引的mapping结构构建po类
@Data
public class CourseIndex implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
private Long id;
/**
* 机构ID
*/
private Long companyId;
/**
* 公司名称
*/
private String companyName;
/**
* 课程名称
*/
private String name;
/**
* 适用人群
*/
private String users;
/**
* 标签
*/
private String tags;
/**
* 大分类
*/
private String mt;
/**
* 大分类名称
*/
private String mtName;
/**
* 小分类
*/
private String st;
/**
* 小分类名称
*/
private String stName;
/**
* 课程等级
*/
private String grade;
/**
* 教育模式
*/
private String teachmode;
/**
* 课程图片
*/
private String pic;
/**
* 课程介绍
*/
private String description;
/**
* 发布时间
*/
@JSONField(format="yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime createDate;
/**
* 状态
*/
private String status;
/**
* 备注
*/
private String remark;
/**
* 收费规则,对应数据字典--203
*/
private String charge;
/**
* 现价
*/
private Float price;
/**
* 原价
*/
private Float originalPrice;
/**
* 课程有效期天数
*/
private Integer validDays;
}
接口
创建添加课程索引接口
@Api(value = "课程信息索引接口", tags = "课程信息索引接口")
@RestController
@RequestMapping("/index")
public class CourseIndexController {
@ApiOperation("添加课程索引")
@PostMapping("/course")
public Boolean add(@RequestBody CourseIndex courseIndex) {
return null;
}
}
接口开发
定义service接口,请求ES添加课程信息
- 注意:为了适应其他文档信息,需要将添加文档定义为通用的接口,此接口不仅适应添加课程,还适应添加其他信息
/**
* 课程索引service
*/
public interface IndexService {
/**
* 添加索引
* @param indexName 索引名称
* @param id 主键
* @param object 索引对象
* @return true:添加成功;false:添加失败
*/
Boolean addCourseIndex(String indexName, String id, Object object);
}
实现
@Override
public Boolean addCourseIndex(String indexName, String id, Object object) {
// 1. 创建request对象
IndexRequest request = new IndexRequest(indexName).id(id);
// 2. 准备请求参数,对应DSL语句中的JSON文档,所以要把对象序列化为JSON格式
String jsonString = JSON.toJSONString(object);
request.source(jsonString, XContentType.JSON);
IndexResponse response = null;
try {
// 3. 发送请求
response = client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.debug("添加索引出错:{}", e.getMessage());
XueChengPlusException.cast("添加索引出错");
}
// 4. 获取请求结果
String result = response.getResult().name();
// 若文档不存在,则为created,若文档存在,则为updated,若两者均不是,就是出错了
return "UPDATED".equalsIgnoreCase(result) || "CREATED".equalsIgnoreCase(result);
}
接口完善
- 完善Controller层,调用Service接口
@Api(value = "课程信息索引接口", tags = "课程信息索引接口")
@RestController
@RequestMapping("/index")
public class CourseIndexController {
@Autowired
private IndexService indexService;
@Value("${elasticsearch.course.index}")
private String courseIndexName;
@ApiOperation("添加课程信息文档")
@PostMapping("/course")
public Boolean add(@RequestBody CourseIndex courseIndex) {
Long id = courseIndex.getId();
if (id == null) {
XueChengPlusException.cast("课程id为空");
}
Boolean result = indexService.addCourseIndex(courseIndexName, String.valueOf(id), courseIndex);
// 当结果既不是created,又不是updated时,就会报这个错误
if (!result) {
XueChengPlusException.cast("添加课程索引失败!");
}
return true;
}
}
接口测试
### 新增课程文档测试
POST {{search_host}}/search/index/course
Content-Type: application/json
{
"charge" : "201000",
"companyId" : 100000,
"companyName" : "测试CompanyName",
"createDate" : "2023-03-07 15:41:44",
"description" : "《测试测试测试测试》",
"grade" : "204001",
"id" : 102,
"mt" : "1-3",
"mtName" : "编程开发",
"name" : "Java编程思想",
"originalPrice" : 200.0,
"pic" : "/mediafiles/2022/09/20/1d0f0e6ed8a0c4a89bfd304b84599d9c.png",
"price" : 100.0,
"remark" : "没有备注",
"st" : "1-3-2",
"stName" : "Java语言",
"status" : "203002",
"tags" : "没有标签",
"teachmode" : "200002",
"validDays" : 222
}
搜索
需求分析
- 索引信息维护完成下一步定义搜索接口,搜索课程信息
- 首先要搞清楚搜索功能的需求,进入学成在线首页的搜索页面
- 根据搜索页面可知需求如下
- 根据一级分类、二级分类搜索课程信息
- 根据关键字搜索课程信息,搜索方式为全文检索,关键字需要匹配课程的名称、内容
- 根据难度等级搜索课程
- 搜索页面分页显示
- 技术点
- 整体采用布尔查询
- 根据关键字搜索,采用MultiMatchQuery,搜索name、description字段
- 根据分类、课程等级搜索采用过滤器实现
- 分页查询
- 高亮显示
- 为什么课程分类、课程等级等查询使用过滤器方式?
- 使用关键字查询需要计算相关度算分,
- 根据课程分类、课程等级去查询不需要计算相关度得分,
- 使用过滤器实现根据课程分类、课程等级查询的过程不会计算相关度算分、效率更高
接口定义
定义搜索条件DTO类
@Data
@ToString
public class SearchCourseParamDto {
//关键字(搜索条件)
private String keywords;
//大分类
private String mt;
//小分类
private String st;
//难度等级
private String grade;
}
为了适应后期的扩展,定义搜索结果类,让其继承PageResult
@Data
@ToString
public class SearchPageResultDto<T> extends PageResult {
public SearchPageResultDto(List<T> items, long counts, long page, long pageSize) {
super(items, counts, page, pageSize);
}
}
定义接口如下
@Api(value = "课程搜索接口", tags = "课程搜索接口")
@RestController
@RequestMapping("/course")
public class CourseSearchController {
@Autowired
CourseSearchService courseSearchService;
@ApiOperation("课程搜索列表")
@GetMapping("/list")
public SearchPageResultDto<CourseIndex> list(PageParams pageParams, SearchCourseParamDto searchCourseParamDto) {
return null;
}
}
接口开发
定义Service接口
/**
* 课程搜索service
*/
public interface CourseSearchService {
/**
* 搜索课程列表
* @param pageParams 分页参数
* @param searchCourseParamDto 搜索条件
* @return
*/
SearchPageResultDto<CourseIndex> queryCoursePubIndex(PageParams pageParams, SearchCourseParamDto searchCourseParamDto);
}
接口实现,由于搜索接口的内容比较多,所以这里分几步实现
- 实现根据分页搜索
@Value("${elasticsearch.course.index}")
private String courseIndexName;
@Value("${elasticsearch.course.source_fields}")
private String sourceFields;
@Autowired
RestHighLevelClient client;
@Override
public SearchPageResultDto<CourseIndex> queryCoursePubIndex(PageParams pageParams, SearchCourseParamDto courseSearchParam) {
// 1. 准备Request对象
SearchRequest request = new SearchRequest(courseIndexName);
// 2. 组织DSL参数,这里使用布尔查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
String[] sourceFieldsArray = sourceFields.split(",");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// sourceFieldsArray指定要返回的字段,new String[]{}指定不返回的字段
searchSourceBuilder.fetchSource(sourceFieldsArray, new String[]{});
// 3. 分页
Long pageNo = pageParams.getPageNo();
Long pageSize = pageParams.getPageSize();
// 3.1 指定起始查询位置和查询条数
int start = (int) ((pageNo - 1) * pageSize);
searchSourceBuilder.from(start)
.size(Math.toIntExact(pageSize));
// 4. 布尔查询
searchSourceBuilder.query(boolQuery);
request.source(searchSourceBuilder);
// 5. 发送请求,获取响应结果
SearchResponse response = null;
try {
response = client.search(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.debug("课程搜索异常:{}", e.getMessage());
return new SearchPageResultDto<>(new ArrayList<>(), 0, 0, 0);
}
// 6. 解析响应
SearchHits searchHits = response.getHits();
// 6.1 获取总条数
long totalHits = searchHits.getTotalHits().value;
// 6.2 获取文档数组
SearchHit[] hits = searchHits.getHits();
ArrayList<CourseIndex> list = new ArrayList<>();
// 6.3 遍历
for (SearchHit hit : hits) {
// 获取文档source
String jsonCourseString = hit.getSourceAsString();
// 转为CourseIndex对象,加入到集合中
CourseIndex courseIndex = JSON.parseObject(jsonCourseString, CourseIndex.class);
list.add(courseIndex);
}
// 7. 封装结果
SearchPageResultDto<CourseIndex> pageResult = new SearchPageResultDto<>(list, totalHits, pageNo, pageSize);
return pageResult;
}
测试
# 网关中的服务名应与bootstrap中配置的服务名保持一致
- id: search
uri: lb://search
predicates:
- Path=/search/**
根据条件搜索
下面实现根据关键字、一级分类、二级分类、难度等级搜索
@Override
public SearchPageResultDto<CourseIndex> queryCoursePubIndex(PageParams pageParams, SearchCourseParamDto courseSearchParam) {
// 1. 准备Request对象
SearchRequest request = new SearchRequest(courseIndexName);
// 2. 组织DSL参数,这里使用布尔查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
String[] sourceFieldsArray = sourceFields.split(",");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// sourceFieldsArray指定要返回的字段,new String[]{}指定不返回的字段
searchSourceBuilder.fetchSource(sourceFieldsArray, new String[]{});
// 3. 分页
Long pageNo = pageParams.getPageNo();
Long pageSize = pageParams.getPageSize();
+ // 3.1 指定起始查询位置和查询条数
+ int start = (int) ((pageNo - 1) * pageSize);
+ searchSourceBuilder.from(start)
+ .size(Math.toIntExact(pageSize));
+ // 3.2 指定条件查询
+ if (courseSearchParam == null) {
+ courseSearchParam = new SearchCourseParamDto();
+ }
+ // 3.2.1 匹配关键字
+ if (StringUtils.isNotEmpty(courseSearchParam.getKeywords())){
+ String keywords = courseSearchParam.getKeywords();
+ boolQuery.must(QueryBuilders
+ .multiMatchQuery(keywords,"name", "description")
+ .minimumShouldMatch("70%")
+ .field("name", 10));
+ }
+ // 3.2.2 匹配大分类
+ if (StringUtils.isNotEmpty(courseSearchParam.getMt())){
+ boolQuery.filter(QueryBuilders
+ .termQuery("mtName", courseSearchParam.getMt()));
+ }
+ // 3.2.3 匹配小分类
+ if (StringUtils.isNotEmpty(courseSearchParam.getSt())){
+ boolQuery.filter(QueryBuilders
+ .termQuery("stName", courseSearchParam.getSt()));
+ }
+ // 3.2.4 匹配难度
+ if (StringUtils.isNotEmpty(courseSearchParam.getGrade())){
+ boolQuery.filter(QueryBuilders
+ .termQuery("grade", courseSearchParam.getGrade()));
+ }
// 4. 布尔查询
searchSourceBuilder.query(boolQuery);
request.source(searchSourceBuilder);
// 5. 发送请求,获取响应结果
SearchResponse response = null;
try {
response = client.search(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.debug("课程搜索异常:{}", e.getMessage());
return new SearchPageResultDto<>(new ArrayList<>(), 0, 0, 0);
}
// 6. 解析响应
SearchHits searchHits = response.getHits();
// 6.1 获取总条数
long totalHits = searchHits.getTotalHits().value;
// 6.2 获取文档数组
SearchHit[] hits = searchHits.getHits();
ArrayList<CourseIndex> list = new ArrayList<>();
// 6.3 遍历
for (SearchHit hit : hits) {
// 获取文档source
String jsonCourseString = hit.getSourceAsString();
// 转为CourseIndex对象,加入到集合中
CourseIndex courseIndex = JSON.parseObject(jsonCourseString, CourseIndex.class);
list.add(courseIndex);
}
// 7. 封装结果
SearchPageResultDto<CourseIndex> pageResult = new SearchPageResultDto<>(list, totalHits, pageNo, pageSize);
return pageResult;
}
聚合搜索
- 搜索页面上显示的一级分类、二级分类来源于搜索结果,使用聚合搜索实现找到搜索结果中的一级分类、二级分类
- 首先在搜索结果DTO类中添加一级分类、二级分类列表
@Data
@ToString
public class SearchPageResultDto<T> extends PageResult {
+ //大分类列表
+ List<String> mtList;
+ //小分类列表
+ List<String> stList;
public SearchPageResultDto(List<T> items, long counts, long page, long pageSize) {
super(items, counts, page, pageSize);
}
}
@Override
public SearchPageResultDto<CourseIndex> queryCoursePubIndex(PageParams pageParams, SearchCourseParamDto courseSearchParam) {
// 1. 准备Request对象
SearchRequest request = new SearchRequest(courseIndexName);
// 2. 组织DSL参数,这里使用布尔查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
String[] sourceFieldsArray = sourceFields.split(",");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// sourceFieldsArray指定要返回的字段,new String[]{}指定不返回的字段
searchSourceBuilder.fetchSource(sourceFieldsArray, new String[]{});
// 3. 分页
Long pageNo = pageParams.getPageNo();
Long pageSize = pageParams.getPageSize();
// 3.1 指定起始查询位置和查询条数
int start = (int) ((pageNo - 1) * pageSize);
searchSourceBuilder.from(start)
.size(Math.toIntExact(pageSize));
// 3.2 指定条件查询
if (courseSearchParam == null) {
courseSearchParam = new SearchCourseParamDto();
}
// 3.2.1 匹配关键字
if (StringUtils.isNotEmpty(courseSearchParam.getKeywords())){
String keywords = courseSearchParam.getKeywords();
boolQuery.must(QueryBuilders
.multiMatchQuery(keywords,"name", "description")
.minimumShouldMatch("70%")
.field("name", 10));
}
// 3.2.2 匹配大分类
if (StringUtils.isNotEmpty(courseSearchParam.getMt())){
boolQuery.filter(QueryBuilders
.termQuery("mtName", courseSearchParam.getMt()));
}
// 3.2.3 匹配小分类
if (StringUtils.isNotEmpty(courseSearchParam.getSt())){
boolQuery.filter(QueryBuilders
.termQuery("stName", courseSearchParam.getSt()));
}
// 3.2.4 匹配难度
if (StringUtils.isNotEmpty(courseSearchParam.getGrade())){
boolQuery.filter(QueryBuilders
.termQuery("grade", courseSearchParam.getGrade()));
}
// 4. 布尔查询
searchSourceBuilder.query(boolQuery);
request.source(searchSourceBuilder);
+ // 聚合设置
+ buildAggregation(request);
// 5. 发送请求,获取响应结果
SearchResponse response = null;
try {
response = client.search(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.debug("课程搜索异常:{}", e.getMessage());
return new SearchPageResultDto<>(new ArrayList<>(), 0, 0, 0);
}
// 6. 解析响应
SearchHits searchHits = response.getHits();
// 6.1 获取总条数
long totalHits = searchHits.getTotalHits().value;
// 6.2 获取文档数组
SearchHit[] hits = searchHits.getHits();
ArrayList<CourseIndex> list = new ArrayList<>();
// 6.3 遍历
for (SearchHit hit : hits) {
// 获取文档source
String jsonCourseString = hit.getSourceAsString();
// 转为CourseIndex对象,加入到集合中
CourseIndex courseIndex = JSON.parseObject(jsonCourseString, CourseIndex.class);
list.add(courseIndex);
}
// 7. 封装结果
+ List<String> mtList = getAggregation(response.getAggregations(), "mtAgg");
+ List<String> stList = getAggregation(response.getAggregations(), "stAgg");
SearchPageResultDto<CourseIndex> pageResult = new SearchPageResultDto<>(list, totalHits, pageNo, pageSize);
+ pageResult.setMtList(mtList);
+ pageResult.setStList(stList);
return pageResult;
}
+ private void buildAggregation(SearchRequest request) {
+ request.source().aggregation(AggregationBuilders
+ .terms("mtAgg")
+ .field("mtName")
+ .size(100)
+ );
+ request.source().aggregation(AggregationBuilders
+ .terms("stAgg")
+ .field("stName")
+ .size(100)
+ );
+ }
+ /**
+ * 根据聚合名称获取聚合结果
+ * @param aggregations 聚合对象
+ * @param aggName 聚合名称
+ * @return 聚合结果,返回List集合
+ */
+ private List<String> getAggregation(Aggregations aggregations, String aggName) {
+ // 1. 根据聚合名称获取聚合结果
+ Terms brandTerms = aggregations.get(aggName);
+ // 2. 获取buckets
+ List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
+ // 3. 遍历
+ List<String> brandList = new ArrayList<>();
+ for (Terms.Bucket bucket : buckets) {
+ // 4. 获取key
+ String key = bucket.getKeyAsString();
+ // 5. 加入到集合中
+ brandList.add(key);
+ }
+ return brandList;
+ }
聚合搜索测试
进入搜索界面,观察搜索请求的响应内容中是否存在mtList和stList
- 注意:当选中一个一级分类时,才会显示二级分类
高亮设置
最后实现关键字在课程名称中高亮显示
@Override
public SearchPageResultDto<CourseIndex> queryCoursePubIndex(PageParams pageParams, SearchCourseParamDto courseSearchParam) {
log.debug("ES条件查询并响应分页结果");
// 1. 准备Request对象
SearchRequest request = new SearchRequest(courseIndexName);
// 2. 组织DSL参数,这里使用布尔查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
String[] sourceFieldsArray = sourceFields.split(",");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// sourceFieldsArray指定要返回的字段,new String[]{}指定不返回的字段
searchSourceBuilder.fetchSource(sourceFieldsArray, new String[]{});
// 3. 分页
Long pageNo = pageParams.getPageNo();
Long pageSize = pageParams.getPageSize();
// 3.1 指定起始查询位置和查询条数
int start = (int) ((pageNo - 1) * pageSize);
searchSourceBuilder.from(start)
.size(Math.toIntExact(pageSize));
// 3.2 指定条件查询
if (courseSearchParam == null) {
courseSearchParam = new SearchCourseParamDto();
}
// 3.2.1 匹配关键字
if (StringUtils.isNotEmpty(courseSearchParam.getKeywords())) {
String keywords = courseSearchParam.getKeywords();
boolQuery.must(QueryBuilders
.multiMatchQuery(keywords, "name", "description")
.minimumShouldMatch("70%")
.field("name", 10));
}
// 3.2.2 匹配大分类
if (StringUtils.isNotEmpty(courseSearchParam.getMt())) {
boolQuery.filter(QueryBuilders
.termQuery("mtName", courseSearchParam.getMt()));
}
// 3.2.3 匹配小分类
if (StringUtils.isNotEmpty(courseSearchParam.getSt())) {
boolQuery.filter(QueryBuilders
.termQuery("stName", courseSearchParam.getSt()));
}
// 3.2.4 匹配难度
if (StringUtils.isNotEmpty(courseSearchParam.getGrade())) {
boolQuery.filter(QueryBuilders
.termQuery("grade", courseSearchParam.getGrade()));
}
// 4. 布尔查询
searchSourceBuilder.query(boolQuery);
+ // 高亮
+ searchSourceBuilder.highlighter(new HighlightBuilder()
+ .field("name")
+ .preTags("<font class='eslight'>")
+ .postTags("</font>"));
request.source(searchSourceBuilder);
// 聚合设置
buildAggregation(request);
// 5. 发送请求,获取响应结果
SearchResponse response = null;
try {
response = client.search(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.debug("课程搜索异常:{}", e.getMessage());
return new SearchPageResultDto<>(new ArrayList<>(), 0, 0, 0);
}
// 6. 解析响应
SearchHits searchHits = response.getHits();
// 6.1 获取总条数
long totalHits = searchHits.getTotalHits().value;
// 6.2 获取文档数组
SearchHit[] hits = searchHits.getHits();
ArrayList<CourseIndex> list = new ArrayList<>();
// 6.3 遍历
for (SearchHit hit : hits) {
// 获取文档source
String jsonCourseString = hit.getSourceAsString();
// 转为CourseIndex对象
CourseIndex courseIndex = JSON.parseObject(jsonCourseString, CourseIndex.class);
+ // 获取高亮
+ Map<String, HighlightField> highlightFields = hit.getHighlightFields();
+ log.debug("获取高亮:{}", highlightFields);
+ String name = courseIndex.getName();
+ // 健壮性判断
+ if (!CollectionUtils.isEmpty(highlightFields)) {
+ // 获取高亮字段结果
+ HighlightField highlightField = highlightFields.get("name");
+ log.debug("成功获取高亮字段");
+ // 健壮性判断
+ if (highlightField != null) {
+ log.debug("取出高亮结果数组的第一个");
+ // 取出高亮结果数组的第一个
+ name = highlightField.getFragments()[0].toString();
+ }
+ }
+ courseIndex.setName(name);
list.add(courseIndex);
}
// 7. 封装结果
List<String> mtList = getAggregation(response.getAggregations(), "mtAgg");
List<String> stList = getAggregation(response.getAggregations(), "stAgg");
SearchPageResultDto<CourseIndex> pageResult = new SearchPageResultDto<>(list, totalHits, pageNo, pageSize);
pageResult.setMtList(mtList);
pageResult.setStList(stList);
return pageResult;
}
高亮设置测试
课程发布任务完善
需求分析
- 执行课程发布操作后,由消息处理机制向ElasticSearch索引保存课程信息
- 由内容管理服务远程调用搜索服务,添加课程信息索引
- 搜索服务请求ElasticSearch添加课程信息
课程索引任务开发
- 在内容管理服务中添加FeignClient
@FeignClient(value = "search",fallbackFactory = SearchServiceClientFallbackFactory.class)
public interface SearchServiceClient {
@PostMapping("/search/index/course")
public Boolean add(@RequestBody CourseIndex courseIndex);
}
@Slf4j
@Component
public class SearchServiceClientFallbackFactory implements FallbackFactory<SearchServiceClient> {
@Override
public SearchServiceClient create(Throwable throwable) {
return new SearchServiceClient() {
@Override
public Boolean add(CourseIndex courseIndex) {
throwable.printStackTrace();
log.debug("调用搜索发生熔断走降级方法,熔断异常:", throwable.getMessage());
return false;
}
};
}
}
编写课程索引任务执行方法
完善CoursePublishTask类中的saveCourseIndex方法
//保存课程索引信息
public void saveCourseIndex(MqMessage mqMessage,long courseId){
log.debug("保存课程索引信息,课程id:{}",courseId);
//消息id
Long id = mqMessage.getId();
//消息处理的service
MqMessageService mqMessageService = this.getMqMessageService();
//消息幂等性处理
int stageTwo = mqMessageService.getStageTwo(id);
if(stageTwo > 0){
log.debug("课程索引已处理直接返回,课程id:{}",courseId);
return ;
}
Boolean result = saveCourseIndex(courseId);
if(result){
//保存第一阶段状态
mqMessageService.completedStageTwo(id);
}
}
private Boolean saveCourseIndex(Long courseId) {
//取出课程发布信息
CoursePublish coursePublish = coursePublishMapper.selectById(courseId);
//拷贝至课程索引对象
CourseIndex courseIndex = new CourseIndex();
BeanUtils.copyProperties(coursePublish,courseIndex);
//远程调用搜索服务api添加课程信息到索引
Boolean add = searchServiceClient.add(courseIndex);
if(!add){
XueChengPlusException.cast("添加索引失败");
}
return add;
}
5.5.3 测试
测试流程如下:
1、启动elasticsearch、kibana。
2、启动网关、内容管理、搜索服务、nginx。
3、启动xxl-job调度中心。
4、在任务调度中心开始课程发布任务。
5、发布一门课程,页面提示操作成功,查看发布课程任务是否写到任务表。
6、经过任务调度将课程信息写入索引。
7、通过门户进入搜索页面,查看课程信息是否展示。
扩展:课程信息索引同步
通过向索引中添加课程信息最终实现了课程的搜索,我们发现课程信息是先保存在关系数据库中,而后再写入索引,这个过程是将关系数据中的数据同步到elasticsearch索引中的过程,可以简单成为索引同步。
通常项目中使用elasticsearch需要完成索引同步,索引同步的方法很多:
实时性高
1、针对实时性非常高的场景需要满足数据的及时同步,可以同步调用,或使用Canal去实现。
1)方案一:手动写,同步调用。同步调用即在向MySQL写数据后远程调用搜索服务的接口写入索引,此方法简单但是耦合代码太高。
2)方案二:可以使用一个中间的软件canal解决耦合性的问题,但存在学习与维护成本。
canal主要用途是基于 MySQL 数据库增量日志解析,并能提供增量数据订阅和消费,实现将MySQL的数据同步到消息队列、Elasticsearch、其它数据库等,应用场景十分丰富。
它的地址:
github地址:https://github.com/alibaba/canal
版本下载地址:https://github.com/alibaba/canal/releases
文档地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart
Canal基于mysql的binlog技术实现数据同步,什么是binlog,它是一个文件,二进制格式,记录了对数据库更新的SQL语句,向数据库写数据的同时向binlog文件里记录对应的sql语句。当数据库服务器发生了故障就可以使用binlog文件对数据库进行恢复。
所以,使用canal是需要开启mysql的binlog写入功能,Canal工作原理如下
1、canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump
协议
2、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
3、canal 解析 binary log 对象(原始为 byte 流)
详细使用Canal进行索引同步的步骤参考:Canal实现索引同步.pdf
实时性要求不高
2、当索引同步的实时性要求不高时可用的技术比较多,比如:MQ、Logstash、任务调度等。
MQ:向mysql写数据的时候向mq写入消息,搜索服务监听MQ,收到消息后写入索引。使用MQ的优势是代码解耦,但是需要处理消息可靠性的问题有一定的技术成本,做到消息可靠性需要做到生产者投递成功、消息持久化以及消费者消费成功三个方面,另外还要做好消息幂等性问题。
Logstash: 开源实时日志分析平台 ELK包括Elasticsearch、Kibana、Logstash,Logstash负责收集、解析和转换日志信息,可以实现MySQL与Elasticsearch之间的数据同步。也可以实现解耦合并且是官方推荐,但需要增加学习与维护成本。
任务调度:向mysql写数据的时候记录修改记录,开启一个定时任务根据修改记录将数据同步到Elasticsearch。
根据本项目的需求,课程发布后信息同步的实时性要求不高,从提交审核到发布成功一般两个工作日完成。综合比较以上技术方案本项目的索引同步技术使用任务调度的方法。
1、课程发布向消息表插入记录。
2、由任务调度程序通过消息处理SDK对消息记录进行处理。
3、向elasticsearch索引中保存课程信息。
如何向向elasticsearch索引中保存课程信息?
执行流程如下:
由内容管理服务远程调用搜索服务添加课程信息索引,搜索服务再请求elasticsearch向课程索引中添加文档。
面试
ElasticSearch是怎么使用的
本项目使用ElasticSearch开发搜索服务,步骤如下
- 创建索引(相当于数据库表),将课程信息添加到索引库,对课程信息进行分词,存储到索引库
- 开发一个搜索服务,编写课程搜索接口,调用ElasticSearch的rest接口根据关键字、课程分类等信息进行搜索
如何保证索引同步?
本项目时使用本地任务表 + xxl-job任务调度进行索引同步,具体流程如下
- 添加或修改或删除课程的同时,向任务表插入一条记录,这条记录就记录了是添加还是修改还是删除了哪个课程
- 任务调度定时扫描任务表,根据任务表的内容对课程信息进行同步
- 如果添加了课程,将课程添加到索引库
- 如果修改了课程,就修改索引库的课程
- 如果是删除了课程,将课程信息从索引库删除