From 6f690c1987fb2067fe2b390ff4cad0ba309bbff6 Mon Sep 17 00:00:00 2001 From: rowger Date: Mon, 4 Dec 2023 19:39:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/ImCallHistoryController.java | 2 + .../call/service/ImCallHistoryService.java | 11 ++- .../lld/im/service/room/dao/ImRoomEntity.java | 2 + .../service/room/service/ImRoomService.java | 30 ++++++- .../taskScheduler/CallHistoryTask.java | 59 ++++++++++++++ .../service/taskScheduler/SRSStreamTask.java | 79 +++++++++++++++++-- .../model/StreamApiResponse.java | 16 ++++ .../taskScheduler/model/StreamEntity.java | 26 ++++++ .../src/main/resources/application.yml | 2 +- 9 files changed, 217 insertions(+), 10 deletions(-) create mode 100644 hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/CallHistoryTask.java create mode 100644 hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/model/StreamApiResponse.java create mode 100644 hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/model/StreamEntity.java diff --git a/hs-im-server/im-service/src/main/java/com/lld/im/service/call/controller/ImCallHistoryController.java b/hs-im-server/im-service/src/main/java/com/lld/im/service/call/controller/ImCallHistoryController.java index 6eadaf8..02a4b12 100644 --- a/hs-im-server/im-service/src/main/java/com/lld/im/service/call/controller/ImCallHistoryController.java +++ b/hs-im-server/im-service/src/main/java/com/lld/im/service/call/controller/ImCallHistoryController.java @@ -82,6 +82,8 @@ public class ImCallHistoryController { @PostMapping("/callHangUp") public ResponseVO callHangUp(@RequestBody ImCallHistoryEntity imCallHistoryEntity) { + imCallHistoryEntity.setStatus("1"); + callHistoryService.updateBySessionId(imCallHistoryEntity); return ResponseVO.successResponse(""); diff --git a/hs-im-server/im-service/src/main/java/com/lld/im/service/call/service/ImCallHistoryService.java b/hs-im-server/im-service/src/main/java/com/lld/im/service/call/service/ImCallHistoryService.java index a46382c..6b01dfb 100644 --- a/hs-im-server/im-service/src/main/java/com/lld/im/service/call/service/ImCallHistoryService.java +++ b/hs-im-server/im-service/src/main/java/com/lld/im/service/call/service/ImCallHistoryService.java @@ -46,11 +46,20 @@ public class ImCallHistoryService { callHistoryMapper.insert(callHistory); } + + public List queryListByStatus(String status){ + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("status",status); + List imCallHistoryEntityList = callHistoryMapper.selectList(queryWrapper); + return imCallHistoryEntityList; + } + + public void updateBySessionId(ImCallHistoryEntity callHistory){ Date _now=new Date(); QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("session_id",callHistory.getSessionId()); - queryWrapper.eq("status","1"); + queryWrapper.eq("status",callHistory.getStatus()); // end_time ImCallHistoryEntity imCallHistory = callHistoryMapper.selectOne(queryWrapper); diff --git a/hs-im-server/im-service/src/main/java/com/lld/im/service/room/dao/ImRoomEntity.java b/hs-im-server/im-service/src/main/java/com/lld/im/service/room/dao/ImRoomEntity.java index 7c2be7d..d36306a 100644 --- a/hs-im-server/im-service/src/main/java/com/lld/im/service/room/dao/ImRoomEntity.java +++ b/hs-im-server/im-service/src/main/java/com/lld/im/service/room/dao/ImRoomEntity.java @@ -25,4 +25,6 @@ public class ImRoomEntity { // publish 开始推流 hangup 结束推流 private String cmd; private Integer offline; + private Integer checkTimes; + } diff --git a/hs-im-server/im-service/src/main/java/com/lld/im/service/room/service/ImRoomService.java b/hs-im-server/im-service/src/main/java/com/lld/im/service/room/service/ImRoomService.java index 822a576..fe80906 100644 --- a/hs-im-server/im-service/src/main/java/com/lld/im/service/room/service/ImRoomService.java +++ b/hs-im-server/im-service/src/main/java/com/lld/im/service/room/service/ImRoomService.java @@ -61,11 +61,34 @@ public class ImRoomService { } + public void deleteRoomById(Integer id){ + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("id",id); + imRoomMapper.delete(queryWrapper); + } + + public void deleteAllUserFromRoom(String roomId){ + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("room_id",roomId); + imRoomMapper.delete(queryWrapper); + + } @Transactional public void updateRoomInfo(ImRoomEntity entity) { } + + + public List getAllRoomList(){ + QueryWrapper queryWrapper = new QueryWrapper<>(); + + List roomEntityList = imRoomMapper.selectList(queryWrapper); + + return roomEntityList; + } + + public List getRoomInfo(ImRoomEntity entity){ QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("room_id",entity.getRoomId()); @@ -83,6 +106,11 @@ public class ImRoomService { if (roomEntityList.size()>0){ imRoomMapper.delete(queryWrapper); } - } + + public void addCheckTimes(ImRoomEntity entity,Integer times){ + entity.setCheckTimes(times); + imRoomMapper.updateById(entity); + } + } diff --git a/hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/CallHistoryTask.java b/hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/CallHistoryTask.java new file mode 100644 index 0000000..b583d57 --- /dev/null +++ b/hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/CallHistoryTask.java @@ -0,0 +1,59 @@ +package com.lld.im.service.taskScheduler; + +import com.lld.im.service.call.dao.ImCallHistoryEntity; +import com.lld.im.service.call.dao.mapper.ImCallHistoryMapper; +import com.lld.im.service.call.service.ImCallHistoryService; +import com.lld.im.service.room.dao.ImRoomEntity; +import com.lld.im.service.room.service.ImRoomService; +import com.lld.im.service.user.service.ImUserService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; + +@Slf4j +@Service +public class CallHistoryTask { + @Autowired + ImCallHistoryService callHistoryService; + + @Autowired + ImUserService imUserService; + + @Autowired + ImCallHistoryMapper callHistoryMapper; + + @Autowired + ImRoomService roomService; + + + @Scheduled(cron="0/10 * * * * ? ") + public void executeTask(){ + List callHistoryEntities = callHistoryService.queryListByStatus("1"); + for(ImCallHistoryEntity callHistoryEntity: callHistoryEntities){ + ImRoomEntity imRoomEntity = new ImRoomEntity(); + imRoomEntity.setRoomId(callHistoryEntity.getRoomId()); + List serviceRoomInfo = roomService.getRoomInfo(imRoomEntity); + if(serviceRoomInfo.size()<2){ + log.info("{}房间里面的流少于2路,关掉全部视频流并且判断异常中断",callHistoryEntity.getSessionId()); + //房间里面的流少于2路,关掉全部视频流并且判断异常中断 + closeAll(callHistoryEntity,serviceRoomInfo); + } + } + } + + @Transactional(rollbackFor = Exception.class) + public void closeAll(ImCallHistoryEntity callHistoryEntity,List roomList){ + callHistoryEntity.setStatus("3"); + callHistoryService.updateBySessionId(callHistoryEntity); + + if(roomList.size()>0){ + roomService.deleteAllUserFromRoom(roomList.get(0).getRoomId()); + } + } + + +} diff --git a/hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/SRSStreamTask.java b/hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/SRSStreamTask.java index 11a4a7d..f5567e5 100644 --- a/hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/SRSStreamTask.java +++ b/hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/SRSStreamTask.java @@ -1,20 +1,85 @@ package com.lld.im.service.taskScheduler; +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.lld.im.common.utils.HttpRequestUtils; +import com.lld.im.service.room.dao.ImRoomEntity; +import com.lld.im.service.room.service.ImRoomService; +import com.lld.im.service.taskScheduler.model.StreamApiResponse; +import com.lld.im.service.taskScheduler.model.StreamEntity; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; +import java.util.List; + @Slf4j @Service public class SRSStreamTask { - @Scheduled(cron="0/5 * * * * ? ") - public void executeTask(){ - String ip = System.getenv("SRS_HOST"); -// -// log.error("213132"); -// log.info("========定时任务"); - } + + @Value("${custom.host}") + private String host; + @Autowired + HttpRequestUtils httpRequestUtils; + + + @Autowired + ImRoomService roomService; + + + @Scheduled(cron="0/2 * * * * ? ") + public void executeTask() throws Exception { + log.info("====================== 执行定时任务 SRSStreamTask"); + String _url = "http://"+host+":1985/api/v1/streams"; + String res = httpRequestUtils.doGet(_url); + + ObjectMapper om =new ObjectMapper(); + om.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,false); + om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);//不存在的字段,不被序列化 + + StreamApiResponse response=om.readValue(res, StreamApiResponse.class); + + log.info(JSONObject.toJSONString(response)); + + + List roomList = roomService.getAllRoomList(); + for (ImRoomEntity roomEntity:roomList){ + if(StringUtils.isBlank(roomEntity.getWebrtcUrl())){ + roomService.deleteRoomById(roomEntity.getId()); + }else { + Boolean hasStream=false; + List streamEntityList = response.getStreams(); + for (StreamEntity streamEntity:streamEntityList){ + String webRTCUrl=streamEntity.getTcUrl()+"/"+streamEntity.getName(); + if(StringUtils.equals(roomEntity.getWebrtcUrl(),webRTCUrl)){ + hasStream=true; + break; + } + } + + + + // 如果连续检查三次,最后一次流不存在,则在数据库中移除流 + if(!hasStream){ + roomService.addCheckTimes(roomEntity,roomEntity.getCheckTimes()+1); + if(roomEntity.getCheckTimes()>4){ + roomService.deleteRoomById(roomEntity.getId()); + } + }else { + roomService.addCheckTimes(roomEntity,0); + } + + } + + } + + } } diff --git a/hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/model/StreamApiResponse.java b/hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/model/StreamApiResponse.java new file mode 100644 index 0000000..8a4efcd --- /dev/null +++ b/hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/model/StreamApiResponse.java @@ -0,0 +1,16 @@ +package com.lld.im.service.taskScheduler.model; + +import lombok.Data; + +import java.util.List; + +@Data +public class StreamApiResponse { + private String code; + private String service; + + private String pid; + + private List streams; + +} diff --git a/hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/model/StreamEntity.java b/hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/model/StreamEntity.java new file mode 100644 index 0000000..e17328e --- /dev/null +++ b/hs-im-server/im-service/src/main/java/com/lld/im/service/taskScheduler/model/StreamEntity.java @@ -0,0 +1,26 @@ +package com.lld.im.service.taskScheduler.model; + + +import lombok.Data; + +@Data +public class StreamEntity { + private String id; + + // 例如: livestream + private String name; + // 例如: live + private String app; + + private String url; + private String tcUrl; + + private Publish publish; + + @Data + class Publish { + private boolean active; + private String cid; + } + +} diff --git a/hs-im-server/im-service/src/main/resources/application.yml b/hs-im-server/im-service/src/main/resources/application.yml index 3d7808a..9f96606 100644 --- a/hs-im-server/im-service/src/main/resources/application.yml +++ b/hs-im-server/im-service/src/main/resources/application.yml @@ -1,3 +1,3 @@ spring: profiles: - active: dev + active: prod