
主播(OBS/APP) --(RTMP/SRT)--> 流媒体(SRS / ZLMediaKit) --(HLS/HTTP-FLV/WebRTC)--> CDN --> 观众
│
on_publish/on_close HTTP callbacks
│
后端(Java Spring Boot)
┌─────────────┬──────────────┬─────────────┐
│ 房间管理 │ 鉴权/Token │ WebSocket IM │
│ MySQL/Redis │ 异步转码队列 │ (Netty/Spring)│
└─────────────┴──────────────┴─────────────┘-- users
CREATE TABLE users (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(64) NOT NULL,
password_hash VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- rooms
CREATE TABLE rooms (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
owner_id BIGINT NOT NULL,
title VARCHAR(255),
stream_key VARCHAR(128) NOT NULL UNIQUE,
is_live TINYINT DEFAULT 0,
hls_url VARCHAR(512),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- streams
CREATE TABLE streams (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
room_id BIGINT NOT NULL,
start_at DATETIME,
stop_at DATETIME,
record_url VARCHAR(512),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);java-live/
├─ pom.xml
├─ src/main/java/com/example/live/
│ ├─ LiveApplication.java
│ ├─ controller/
│ │ ├─ HookController.java # on_publish / on_close
│ │ └─ PlayTokenController.java
│ ├─ model/
│ │ ├─ Room.java
│ │ └─ Stream.java
│ ├─ repository/
│ │ ├─ RoomRepository.java
│ │ └─ StreamRepository.java
│ ├─ service/
│ │ ├─ StreamService.java
│ │ └─ TokenService.java
│ └─ websocket/
│ ├─ WebSocketConfig.java
│ └─ ChatHandler.java
├─ src/main/resources/application.yml
└─ Dockerfilepom.xml(关键依赖)<project ...>
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>java-live</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
</dependencies>
</project>application.ymlserver:
port: 8080
spring:
datasource:
url: jdbc:mysql://mysql:3306/live?useSSL=false&serverTimezone=UTC
username: root
password: rootpwd
jpa:
hibernate:
ddl-auto: update
show-sql: true
app:
token-secret: somesecretkey
play-token-expire: 300Room.javapackage com.example.live.model;
import lombok.*;
import javax.persistence.*;
import java.time.LocalDateTime;
@Entity
@Table(name="rooms")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Room {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long ownerId;
private String title;
@Column(unique=true)
private String streamKey;
private Boolean isLive = false;
private String hlsUrl;
private LocalDateTime createdAt = LocalDateTime.now();
}Stream.javapackage com.example.live.model;
import lombok.*;
import javax.persistence.*;
import java.time.LocalDateTime;
@Entity
@Table(name="streams")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Stream {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long roomId;
private LocalDateTime startAt;
private LocalDateTime stopAt;
private String recordUrl;
private LocalDateTime createdAt = LocalDateTime.now();
}RoomRepository.java
package com.example.live.repository;
import com.example.live.model.Room;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.Optional;
public interface RoomRepository extends JpaRepository<Room, Long> {
Optional<Room> findByStreamKey(String streamKey);
}StreamRepository.java
package com.example.live.repository;
import com.example.live.model.Stream;
import org.springframework.data.jpa.repository.JpaRepository;
public interface StreamRepository extends JpaRepository<Stream, Long> {}package com.example.live.service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;
@Service
public class TokenService {
@Value("${app.token-secret}")
private String secret;
@Value("${app.play-token-expire}")
private int expire;
public String generatePlayToken(Long roomId) throws Exception {
long exp = System.currentTimeMillis()/1000 + expire;
String payload = roomId + "|" + exp;
String sig = hmacSha256(payload, secret);
String token = Base64.getUrlEncoder().withoutPadding().encodeToString((payload + "|" + sig).getBytes());
return token;
}
public Long verifyPlayToken(String token) throws Exception {
String raw = new String(Base64.getUrlDecoder().decode(token));
String[] parts = raw.split("\\|");
if(parts.length != 3) return null;
Long roomId = Long.valueOf(parts[0]);
long exp = Long.parseLong(parts[1]);
String sig = parts[2];
if(System.currentTimeMillis()/1000 > exp) return null;
String expected = hmacSha256(parts[0]+"|"+parts[1], secret);
if(!expected.equals(sig)) return null;
return roomId;
}
private String hmacSha256(String data, String key) throws Exception {
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(key.getBytes(), "HmacSHA256"));
byte[] bytes = mac.doFinal(data.getBytes());
return Base64.getUrlEncoder().withoutPadding().encodeToString(bytes);
}
}HookController:流媒体回调(on_publish / on_close)package com.example.live.controller;
import com.example.live.model.Room;
import com.example.live.model.Stream;
import com.example.live.repository.RoomRepository;
import com.example.live.repository.StreamRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
import java.util.Optional;
@RestController
@RequestMapping("/hook")
public class HookController {
@Autowired private RoomRepository roomRepo;
@Autowired private StreamRepository streamRepo;
// nginx-rtmp 或 SRS on_publish 回调调用:POST name=streamKey ...
@PostMapping("/on_publish")
public ResponseEntity<String> onPublish(@RequestParam String name) {
Optional<Room> opt = roomRepo.findByStreamKey(name);
if(!opt.isPresent()) return ResponseEntity.status(403).body("Forbidden");
Room room = opt.get();
room.setIsLive(true);
roomRepo.save(room);
Stream s = new Stream();
s.setRoomId(room.getId());
s.setStartAt(LocalDateTime.now());
streamRepo.save(s);
return ResponseEntity.ok("OK");
}
@PostMapping("/on_close")
public ResponseEntity<String> onClose(@RequestParam String name) {
Optional<Room> opt = roomRepo.findByStreamKey(name);
if(!opt.isPresent()) return ResponseEntity.status(403).body("Forbidden");
Room room = opt.get();
room.setIsLive(false);
roomRepo.save(room);
Stream last = streamRepo.findAll().stream()
.filter(st -> st.getRoomId().equals(room.getId()))
.reduce((first, second) -> second).orElse(null);
if(last != null) {
last.setStopAt(LocalDateTime.now());
streamRepo.save(last);
// dispatch async job to process recording (ffmpeg, upload)
}
return ResponseEntity.ok("OK");
}
}PlayTokenControllerpackage com.example.live.controller;
import com.example.live.model.Room;
import com.example.live.repository.RoomRepository;
import com.example.live.service.TokenService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.Optional;
@RestController
@RequestMapping("/rooms")
public class PlayTokenController {
@Autowired private RoomRepository roomRepo;
@Autowired private TokenService tokenService;
@GetMapping("/{id}/play-token")
public ResponseEntity<?> getPlayToken(@PathVariable Long id) throws Exception {
Optional<Room> opt = roomRepo.findById(id);
if(!opt.isPresent()) return ResponseEntity.badRequest().body("room not found");
String token = tokenService.generatePlayToken(id);
return ResponseEntity.ok().body(new PlayTokenResponse(token, opt.get().getHlsUrl()));
}
static class PlayTokenResponse {
public String token; public String hlsUrl;
public PlayTokenResponse(String t, String h) { token=t; hlsUrl=h; }
}
}WebSocketConfig.java
package com.example.live.websocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.*;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new ChatHandler(), "/ws/chat")
.setAllowedOrigins("*");
}
}ChatHandler.java
package com.example.live.websocket;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.concurrent.CopyOnWriteArraySet;
public class ChatHandler extends TextWebSocketHandler {
private static CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
sessions.add(session);
}
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 简单广播
for(WebSocketSession s : sessions){
if(s.isOpen()) s.sendMessage(message);
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
sessions.remove(session);
}
}生产环境建议用 Netty/Swoole 或基于 Redis Pub/Sub 的分布式 WebSocket 集群,ChatHandler 仅供本地测试。
Dockerfile(Java app)FROM openjdk:17-jdk-slim
VOLUME /tmp
COPY target/java-live-0.0.1-SNAPSHOT.jar app.jar
ENTRYPOINT ["java","-jar","/app.jar"]docker-compose.yml(包含 MySQL + SRS + app)version: '3.7'
services:
mysql:
image: mysql:8
environment:
MYSQL_ROOT_PASSWORD: rootpwd
MYSQL_DATABASE: live
ports: ['3306:3306']
srs:
image: ossrs/srs:4
ports:
- "1935:1935"
- "1985:1985"
- "8080:8080"
volumes:
- ./srs/conf:/usr/local/srs/conf
app:
build: .
depends_on: ['mysql','srs']
ports: ['8080:8080']srs/conf/srs.conf 最简)listen 1935;
max_connections 1000;
vhost __defaultVhost__ {
http_remux {
enabled on;
mount [vhost]/[app]/[stream].flv [vhost]/[app]/[stream].flv;
}
hls {
enabled on;
hls_path ./objs/nginx/html;
}
http_hooks {
enabled on;
on_publish http://app:8080/hook/on_publish;
on_unpublish http://app:8080/hook/on_close;
}
}注意:
on_publish/on_unpublish的 URL 根据 Docker 服务名app配置。
on_close 回调里触发异步任务(消息队列)由专用转码节点执行 ffmpeg -i rtmp://... -c:v copy ... 等命令,再上传到 OSS/S3 并写 streams.record_url。
示例转码命令(合并录制 TS):
ffmpeg -i "http://srs:8080/live/ROOM.flv" -c:v copy -c:a aac /data/records/ROOM.mp4on_publish 服务端校验,支持 IP 白名单或短期签名。

mvn package 构建 jar。
docker-compose up --build(会启动 MySQL、SRS、app)。
stream_key(或用 Admin 插入一条 rooms)。
rtmp://{docker_host}:1935/live/{stream_key}
on_publish 到后端,后端保存流记录并返回 200 => 推流开始。
http://{srs_host}:8080/live/{stream_key}.flv 或通过前端 flv.js 播放;也可访问 HLS m3u8。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。