首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >java直播系统源码,java直播系统开发,java直播系统设计,直播平台开发及架构设计

java直播系统源码,java直播系统开发,java直播系统设计,直播平台开发及架构设计

原创
作者头像
钠斯音视频开发-林经理
发布2025-11-03 14:22:57
发布2025-11-03 14:22:57
1510
举报
文章被收录于专栏:音视频开发音视频开发

1. 总体架构(简短)

代码语言:javascript
复制
主播(OBS/APP) --(RTMP/SRT)--> 流媒体(SRS / ZLMediaKit) --(HLS/HTTP-FLV/WebRTC)--> CDN --> 观众
                                       │
                         on_publish/on_close HTTP callbacks
                                       │
                                   后端(Java Spring Boot)
                 ┌─────────────┬──────────────┬─────────────┐
                 │ 房间管理    │ 鉴权/Token    │ WebSocket IM │
                 │ MySQL/Redis │ 异步转码队列  │ (Netty/Spring)│
                 └─────────────┴──────────────┴─────────────┘

2. 关键数据表(MySQL)

代码语言:javascript
复制
-- users
CREATE TABLE users (
  id BIGINT AUTO_INCREMENT PRIMARY KEY,
  username VARCHAR(64) NOT NULL,
  password_hash VARCHAR(255),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- rooms
CREATE TABLE rooms (
  id BIGINT AUTO_INCREMENT PRIMARY KEY,
  owner_id BIGINT NOT NULL,
  title VARCHAR(255),
  stream_key VARCHAR(128) NOT NULL UNIQUE,
  is_live TINYINT DEFAULT 0,
  hls_url VARCHAR(512),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- streams
CREATE TABLE streams (
  id BIGINT AUTO_INCREMENT PRIMARY KEY,
  room_id BIGINT NOT NULL,
  start_at DATETIME,
  stop_at DATETIME,
  record_url VARCHAR(512),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

3. 项目结构(Spring Boot)

代码语言:javascript
复制
java-live/
├─ pom.xml
├─ src/main/java/com/example/live/
│   ├─ LiveApplication.java
│   ├─ controller/
│   │   ├─ HookController.java        # on_publish / on_close
│   │   └─ PlayTokenController.java
│   ├─ model/
│   │   ├─ Room.java
│   │   └─ Stream.java
│   ├─ repository/
│   │   ├─ RoomRepository.java
│   │   └─ StreamRepository.java
│   ├─ service/
│   │   ├─ StreamService.java
│   │   └─ TokenService.java
│   └─ websocket/
│       ├─ WebSocketConfig.java
│       └─ ChatHandler.java
├─ src/main/resources/application.yml
└─ Dockerfile

4. 关键源码(可直接复制)

4.1 pom.xml(关键依赖)

代码语言:javascript
复制
<project ...>
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>java-live</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-security</artifactId>
    </dependency>
  </dependencies>
</project>

4.2 application.yml

代码语言:javascript
复制
server:
  port: 8080

spring:
  datasource:
    url: jdbc:mysql://mysql:3306/live?useSSL=false&serverTimezone=UTC
    username: root
    password: rootpwd
  jpa:
    hibernate:
      ddl-auto: update
    show-sql: true

app:
  token-secret: somesecretkey
  play-token-expire: 300

4.3 实体示例 Room.java

代码语言:javascript
复制
package com.example.live.model;
import lombok.*;
import javax.persistence.*;
import java.time.LocalDateTime;

@Entity
@Table(name="rooms")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Room {
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private Long ownerId;
    private String title;
    @Column(unique=true)
    private String streamKey;
    private Boolean isLive = false;
    private String hlsUrl;
    private LocalDateTime createdAt = LocalDateTime.now();
}

4.4 Stream.java

代码语言:javascript
复制
package com.example.live.model;
import lombok.*;
import javax.persistence.*;
import java.time.LocalDateTime;

@Entity
@Table(name="streams")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Stream {
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private Long roomId;
    private LocalDateTime startAt;
    private LocalDateTime stopAt;
    private String recordUrl;
    private LocalDateTime createdAt = LocalDateTime.now();
}

4.5 仓库接口(Spring Data JPA)

RoomRepository.java

代码语言:javascript
复制
package com.example.live.repository;
import com.example.live.model.Room;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.Optional;

public interface RoomRepository extends JpaRepository<Room, Long> {
    Optional<Room> findByStreamKey(String streamKey);
}

StreamRepository.java

代码语言:javascript
复制
package com.example.live.repository;
import com.example.live.model.Stream;
import org.springframework.data.jpa.repository.JpaRepository;

public interface StreamRepository extends JpaRepository<Stream, Long> {}

4.6 TokenService(生成/校验 play token)

代码语言:javascript
复制
package com.example.live.service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;

@Service
public class TokenService {

    @Value("${app.token-secret}")
    private String secret;

    @Value("${app.play-token-expire}")
    private int expire;

    public String generatePlayToken(Long roomId) throws Exception {
        long exp = System.currentTimeMillis()/1000 + expire;
        String payload = roomId + "|" + exp;
        String sig = hmacSha256(payload, secret);
        String token = Base64.getUrlEncoder().withoutPadding().encodeToString((payload + "|" + sig).getBytes());
        return token;
    }

    public Long verifyPlayToken(String token) throws Exception {
        String raw = new String(Base64.getUrlDecoder().decode(token));
        String[] parts = raw.split("\\|");
        if(parts.length != 3) return null;
        Long roomId = Long.valueOf(parts[0]);
        long exp = Long.parseLong(parts[1]);
        String sig = parts[2];
        if(System.currentTimeMillis()/1000 > exp) return null;
        String expected = hmacSha256(parts[0]+"|"+parts[1], secret);
        if(!expected.equals(sig)) return null;
        return roomId;
    }

    private String hmacSha256(String data, String key) throws Exception {
        Mac mac = Mac.getInstance("HmacSHA256");
        mac.init(new SecretKeySpec(key.getBytes(), "HmacSHA256"));
        byte[] bytes = mac.doFinal(data.getBytes());
        return Base64.getUrlEncoder().withoutPadding().encodeToString(bytes);
    }
}

4.7 HookController:流媒体回调(on_publish / on_close)

代码语言:javascript
复制
package com.example.live.controller;
import com.example.live.model.Room;
import com.example.live.model.Stream;
import com.example.live.repository.RoomRepository;
import com.example.live.repository.StreamRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
import java.util.Optional;

@RestController
@RequestMapping("/hook")
public class HookController {

    @Autowired private RoomRepository roomRepo;
    @Autowired private StreamRepository streamRepo;

    // nginx-rtmp 或 SRS on_publish 回调调用:POST name=streamKey ...
    @PostMapping("/on_publish")
    public ResponseEntity<String> onPublish(@RequestParam String name) {
        Optional<Room> opt = roomRepo.findByStreamKey(name);
        if(!opt.isPresent()) return ResponseEntity.status(403).body("Forbidden");
        Room room = opt.get();
        room.setIsLive(true);
        roomRepo.save(room);

        Stream s = new Stream();
        s.setRoomId(room.getId());
        s.setStartAt(LocalDateTime.now());
        streamRepo.save(s);

        return ResponseEntity.ok("OK");
    }

    @PostMapping("/on_close")
    public ResponseEntity<String> onClose(@RequestParam String name) {
        Optional<Room> opt = roomRepo.findByStreamKey(name);
        if(!opt.isPresent()) return ResponseEntity.status(403).body("Forbidden");
        Room room = opt.get();
        room.setIsLive(false);
        roomRepo.save(room);

        Stream last = streamRepo.findAll().stream()
                .filter(st -> st.getRoomId().equals(room.getId()))
                .reduce((first, second) -> second).orElse(null);
        if(last != null) {
            last.setStopAt(LocalDateTime.now());
            streamRepo.save(last);
            // dispatch async job to process recording (ffmpeg, upload)
        }

        return ResponseEntity.ok("OK");
    }
}

4.8 PlayTokenController

代码语言:javascript
复制
package com.example.live.controller;
import com.example.live.model.Room;
import com.example.live.repository.RoomRepository;
import com.example.live.service.TokenService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.Optional;

@RestController
@RequestMapping("/rooms")
public class PlayTokenController {

    @Autowired private RoomRepository roomRepo;
    @Autowired private TokenService tokenService;

    @GetMapping("/{id}/play-token")
    public ResponseEntity<?> getPlayToken(@PathVariable Long id) throws Exception {
        Optional<Room> opt = roomRepo.findById(id);
        if(!opt.isPresent()) return ResponseEntity.badRequest().body("room not found");
        String token = tokenService.generatePlayToken(id);
        return ResponseEntity.ok().body(new PlayTokenResponse(token, opt.get().getHlsUrl()));
    }

    static class PlayTokenResponse {
        public String token; public String hlsUrl;
        public PlayTokenResponse(String t, String h) { token=t; hlsUrl=h; }
    }
}

4.9 WebSocket 简单聊天(Spring WebSocket)

WebSocketConfig.java

代码语言:javascript
复制
package com.example.live.websocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.*;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new ChatHandler(), "/ws/chat")
                .setAllowedOrigins("*");
    }
}

ChatHandler.java

代码语言:javascript
复制
package com.example.live.websocket;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.concurrent.CopyOnWriteArraySet;

public class ChatHandler extends TextWebSocketHandler {
    private static CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        sessions.add(session);
    }

    @Override
    public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        // 简单广播
        for(WebSocketSession s : sessions){
            if(s.isOpen()) s.sendMessage(message);
        }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        sessions.remove(session);
    }
}

生产环境建议用 Netty/Swoole 或基于 Redis Pub/Sub 的分布式 WebSocket 集群,ChatHandler 仅供本地测试。


5. Docker / 本地运行(快速起步)

5.1 Dockerfile(Java app)

代码语言:javascript
复制
FROM openjdk:17-jdk-slim
VOLUME /tmp
COPY target/java-live-0.0.1-SNAPSHOT.jar app.jar
ENTRYPOINT ["java","-jar","/app.jar"]

5.2 docker-compose.yml(包含 MySQL + SRS + app)

代码语言:javascript
复制
version: '3.7'
services:
  mysql:
    image: mysql:8
    environment:
      MYSQL_ROOT_PASSWORD: rootpwd
      MYSQL_DATABASE: live
    ports: ['3306:3306']
  srs:
    image: ossrs/srs:4
    ports:
      - "1935:1935"
      - "1985:1985"
      - "8080:8080"
    volumes:
      - ./srs/conf:/usr/local/srs/conf
  app:
    build: .
    depends_on: ['mysql','srs']
    ports: ['8080:8080']

5.3 SRS 配置(srs/conf/srs.conf 最简)

代码语言:javascript
复制
listen              1935;
max_connections     1000;

vhost __defaultVhost__ {
    http_remux {
        enabled         on;
        mount [vhost]/[app]/[stream].flv [vhost]/[app]/[stream].flv;
    }
    hls {
        enabled on;
        hls_path ./objs/nginx/html;
    }

    http_hooks {
        enabled on;
        on_publish http://app:8080/hook/on_publish;
        on_unpublish http://app:8080/hook/on_close;
    }
}

注意:on_publish / on_unpublish 的 URL 根据 Docker 服务名 app 配置。


6. 录制与转码(要点)

  • SRS/ZLMediaKit 可直接录制 TS 或 MP4;也可以使用 ffmpeg 对切片或原始流进行转码与合并。
  • on_close 回调里触发异步任务(消息队列)由专用转码节点执行 ffmpeg -i rtmp://... -c:v copy ... 等命令,再上传到 OSS/S3 并写 streams.record_url

示例转码命令(合并录制 TS):

代码语言:javascript
复制
ffmpeg -i "http://srs:8080/live/ROOM.flv" -c:v copy -c:a aac /data/records/ROOM.mp4

7. 安全 & 防盗链策略

  • 推流鉴权:stream_key(房间生成)+ on_publish 服务端校验,支持 IP 白名单或短期签名。
  • 播放鉴权:短期 HMAC token(上面 TokenService),CDN 层再加 Signed URL。
  • 接口安全:HTTPS、JWT 用户鉴权、接口权限控制。
  • 防刷/风控:礼物、点赞等操作限流(Redis)、行为分析与封禁。

8. 监控与运维建议

  • 流媒体(SRS)日志 + srs-exporter -> Prometheus -> Grafana(监控流数量、带宽、推流失败率)
  • 应用:Spring Actuator、日志集中(ELK / Loki)
  • 压力测试:用 ffmpeg + 多线程脚本模拟 N 个推流/拉流,测试带宽与服务器性能。

9. 发展建议(从原型到生产)

  1. 原型阶段:本骨架(Spring Boot + SRS)足够,先实现房间生命周期、推流鉴权、前端播放页(flv.js / hls.js)。
  2. 生产化:分离流媒体和业务服务器,HLS/FLV 交给 CDN,录制落 OSS,WebSocket 做独立集群。
  3. 高并发:使用分布式 Redis 缓存、DB 主从、消息队列(Rabbit/Kafka)做异步处理。
  4. 低延迟:需要 1s 级延迟则做 WebRTC(SRS/ZL 提供转码),设计 TURN/ICE 服务。
  5. 安全合规:内容审核、版权水印、用户实名(必要时)。

10. 示例:快速测试流程

  1. 克隆项目 -> mvn package 构建 jar。
  2. 启动 docker-compose up --build(会启动 MySQL、SRS、app)。
  3. 在后台创建房间(写 DB),获取 stream_key(或用 Admin 插入一条 rooms)。
  4. 用 OBS 推流:rtmp://{docker_host}:1935/live/{stream_key}
  5. SRS 会触发 on_publish 到后端,后端保存流记录并返回 200 => 推流开始。
  6. 在浏览器打开 http://{srs_host}:8080/live/{stream_key}.flv 或通过前端 flv.js 播放;也可访问 HLS m3u8。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 总体架构(简短)
  • 2. 关键数据表(MySQL)
  • 3. 项目结构(Spring Boot)
  • 4. 关键源码(可直接复制)
    • 4.1 pom.xml(关键依赖)
    • 4.2 application.yml
    • 4.3 实体示例 Room.java
    • 4.4 Stream.java
    • 4.5 仓库接口(Spring Data JPA)
    • 4.6 TokenService(生成/校验 play token)
    • 4.7 HookController:流媒体回调(on_publish / on_close)
    • 4.8 PlayTokenController
    • 4.9 WebSocket 简单聊天(Spring WebSocket)
  • 5. Docker / 本地运行(快速起步)
    • 5.1 Dockerfile(Java app)
    • 5.2 docker-compose.yml(包含 MySQL + SRS + app)
    • 5.3 SRS 配置(srs/conf/srs.conf 最简)
  • 6. 录制与转码(要点)
  • 7. 安全 & 防盗链策略
  • 8. 监控与运维建议
  • 9. 发展建议(从原型到生产)
  • 10. 示例:快速测试流程
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档