定时任务
This commit is contained in:
@@ -82,6 +82,8 @@ public class ImCallHistoryController {
|
||||
|
||||
@PostMapping("/callHangUp")
|
||||
public ResponseVO callHangUp(@RequestBody ImCallHistoryEntity imCallHistoryEntity) {
|
||||
imCallHistoryEntity.setStatus("1");
|
||||
|
||||
callHistoryService.updateBySessionId(imCallHistoryEntity);
|
||||
|
||||
return ResponseVO.successResponse("");
|
||||
|
||||
@@ -46,11 +46,20 @@ public class ImCallHistoryService {
|
||||
|
||||
callHistoryMapper.insert(callHistory);
|
||||
}
|
||||
|
||||
public List<ImCallHistoryEntity> queryListByStatus(String status){
|
||||
QueryWrapper<ImCallHistoryEntity> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.eq("status",status);
|
||||
List<ImCallHistoryEntity> imCallHistoryEntityList = callHistoryMapper.selectList(queryWrapper);
|
||||
return imCallHistoryEntityList;
|
||||
}
|
||||
|
||||
|
||||
public void updateBySessionId(ImCallHistoryEntity callHistory){
|
||||
Date _now=new Date();
|
||||
QueryWrapper<ImCallHistoryEntity> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.eq("session_id",callHistory.getSessionId());
|
||||
queryWrapper.eq("status","1");
|
||||
queryWrapper.eq("status",callHistory.getStatus());
|
||||
|
||||
// end_time
|
||||
ImCallHistoryEntity imCallHistory = callHistoryMapper.selectOne(queryWrapper);
|
||||
|
||||
@@ -25,4 +25,6 @@ public class ImRoomEntity {
|
||||
// publish 开始推流 hangup 结束推流
|
||||
private String cmd;
|
||||
private Integer offline;
|
||||
private Integer checkTimes;
|
||||
|
||||
}
|
||||
|
||||
@@ -61,11 +61,34 @@ public class ImRoomService {
|
||||
|
||||
}
|
||||
|
||||
public void deleteRoomById(Integer id){
|
||||
QueryWrapper<ImRoomEntity> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.eq("id",id);
|
||||
imRoomMapper.delete(queryWrapper);
|
||||
}
|
||||
|
||||
public void deleteAllUserFromRoom(String roomId){
|
||||
QueryWrapper<ImRoomEntity> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.eq("room_id",roomId);
|
||||
imRoomMapper.delete(queryWrapper);
|
||||
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void updateRoomInfo(ImRoomEntity entity) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
public List<ImRoomEntity> getAllRoomList(){
|
||||
QueryWrapper<ImRoomEntity> queryWrapper = new QueryWrapper<>();
|
||||
|
||||
List<ImRoomEntity> roomEntityList = imRoomMapper.selectList(queryWrapper);
|
||||
|
||||
return roomEntityList;
|
||||
}
|
||||
|
||||
|
||||
public List<ImRoomEntity> getRoomInfo(ImRoomEntity entity){
|
||||
QueryWrapper<ImRoomEntity> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.eq("room_id",entity.getRoomId());
|
||||
@@ -83,6 +106,11 @@ public class ImRoomService {
|
||||
if (roomEntityList.size()>0){
|
||||
imRoomMapper.delete(queryWrapper);
|
||||
}
|
||||
}
|
||||
|
||||
public void addCheckTimes(ImRoomEntity entity,Integer times){
|
||||
entity.setCheckTimes(times);
|
||||
imRoomMapper.updateById(entity);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
package com.lld.im.service.taskScheduler;
|
||||
|
||||
import com.lld.im.service.call.dao.ImCallHistoryEntity;
|
||||
import com.lld.im.service.call.dao.mapper.ImCallHistoryMapper;
|
||||
import com.lld.im.service.call.service.ImCallHistoryService;
|
||||
import com.lld.im.service.room.dao.ImRoomEntity;
|
||||
import com.lld.im.service.room.service.ImRoomService;
|
||||
import com.lld.im.service.user.service.ImUserService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
public class CallHistoryTask {
|
||||
@Autowired
|
||||
ImCallHistoryService callHistoryService;
|
||||
|
||||
@Autowired
|
||||
ImUserService imUserService;
|
||||
|
||||
@Autowired
|
||||
ImCallHistoryMapper callHistoryMapper;
|
||||
|
||||
@Autowired
|
||||
ImRoomService roomService;
|
||||
|
||||
|
||||
@Scheduled(cron="0/10 * * * * ? ")
|
||||
public void executeTask(){
|
||||
List<ImCallHistoryEntity> callHistoryEntities = callHistoryService.queryListByStatus("1");
|
||||
for(ImCallHistoryEntity callHistoryEntity: callHistoryEntities){
|
||||
ImRoomEntity imRoomEntity = new ImRoomEntity();
|
||||
imRoomEntity.setRoomId(callHistoryEntity.getRoomId());
|
||||
List<ImRoomEntity> serviceRoomInfo = roomService.getRoomInfo(imRoomEntity);
|
||||
if(serviceRoomInfo.size()<2){
|
||||
log.info("{}房间里面的流少于2路,关掉全部视频流并且判断异常中断",callHistoryEntity.getSessionId());
|
||||
//房间里面的流少于2路,关掉全部视频流并且判断异常中断
|
||||
closeAll(callHistoryEntity,serviceRoomInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void closeAll(ImCallHistoryEntity callHistoryEntity,List<ImRoomEntity> roomList){
|
||||
callHistoryEntity.setStatus("3");
|
||||
callHistoryService.updateBySessionId(callHistoryEntity);
|
||||
|
||||
if(roomList.size()>0){
|
||||
roomService.deleteAllUserFromRoom(roomList.get(0).getRoomId());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -1,20 +1,85 @@
|
||||
package com.lld.im.service.taskScheduler;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
import com.lld.im.common.utils.HttpRequestUtils;
|
||||
import com.lld.im.service.room.dao.ImRoomEntity;
|
||||
import com.lld.im.service.room.service.ImRoomService;
|
||||
import com.lld.im.service.taskScheduler.model.StreamApiResponse;
|
||||
import com.lld.im.service.taskScheduler.model.StreamEntity;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
public class SRSStreamTask {
|
||||
@Scheduled(cron="0/5 * * * * ? ")
|
||||
public void executeTask(){
|
||||
String ip = System.getenv("SRS_HOST");
|
||||
//
|
||||
// log.error("213132");
|
||||
// log.info("========定时任务");
|
||||
|
||||
@Value("${custom.host}")
|
||||
private String host;
|
||||
@Autowired
|
||||
HttpRequestUtils httpRequestUtils;
|
||||
|
||||
|
||||
@Autowired
|
||||
ImRoomService roomService;
|
||||
|
||||
|
||||
@Scheduled(cron="0/2 * * * * ? ")
|
||||
public void executeTask() throws Exception {
|
||||
log.info("====================== 执行定时任务 SRSStreamTask");
|
||||
String _url = "http://"+host+":1985/api/v1/streams";
|
||||
String res = httpRequestUtils.doGet(_url);
|
||||
|
||||
ObjectMapper om =new ObjectMapper();
|
||||
om.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,false);
|
||||
om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);//不存在的字段,不被序列化
|
||||
|
||||
StreamApiResponse response=om.readValue(res, StreamApiResponse.class);
|
||||
|
||||
log.info(JSONObject.toJSONString(response));
|
||||
|
||||
|
||||
List<ImRoomEntity> roomList = roomService.getAllRoomList();
|
||||
for (ImRoomEntity roomEntity:roomList){
|
||||
if(StringUtils.isBlank(roomEntity.getWebrtcUrl())){
|
||||
roomService.deleteRoomById(roomEntity.getId());
|
||||
}else {
|
||||
Boolean hasStream=false;
|
||||
List<StreamEntity> streamEntityList = response.getStreams();
|
||||
for (StreamEntity streamEntity:streamEntityList){
|
||||
String webRTCUrl=streamEntity.getTcUrl()+"/"+streamEntity.getName();
|
||||
if(StringUtils.equals(roomEntity.getWebrtcUrl(),webRTCUrl)){
|
||||
hasStream=true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
// 如果连续检查三次,最后一次流不存在,则在数据库中移除流
|
||||
if(!hasStream){
|
||||
roomService.addCheckTimes(roomEntity,roomEntity.getCheckTimes()+1);
|
||||
if(roomEntity.getCheckTimes()>4){
|
||||
roomService.deleteRoomById(roomEntity.getId());
|
||||
}
|
||||
}else {
|
||||
roomService.addCheckTimes(roomEntity,0);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.lld.im.service.taskScheduler.model;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public class StreamApiResponse {
|
||||
private String code;
|
||||
private String service;
|
||||
|
||||
private String pid;
|
||||
|
||||
private List<StreamEntity> streams;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.lld.im.service.taskScheduler.model;
|
||||
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class StreamEntity {
|
||||
private String id;
|
||||
|
||||
// 例如: livestream
|
||||
private String name;
|
||||
// 例如: live
|
||||
private String app;
|
||||
|
||||
private String url;
|
||||
private String tcUrl;
|
||||
|
||||
private Publish publish;
|
||||
|
||||
@Data
|
||||
class Publish {
|
||||
private boolean active;
|
||||
private String cid;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,3 +1,3 @@
|
||||
spring:
|
||||
profiles:
|
||||
active: dev
|
||||
active: prod
|
||||
|
||||
Reference in New Issue
Block a user