专栏首页搜云库技术团队SpringBoot项目:RedisTemplate实现轻量级消息队列(含代码)

SpringBoot项目:RedisTemplate实现轻量级消息队列(含代码)

背景: 公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发

一、本文涉及知识点

1、excel文件读写--阿里easyexcel sdk 2、文件上传、下载--腾讯云对象存储 3、远程服务调用--restTemplate 4、生产者、消费者--redisTemplate leftPush和rightPop操作 5、异步处理数据--Executors线程池 6、读取网络文件流--HttpClient 7、自定义注解实现用户身份认证--JWT token认证, 拦截器拦截标注有@LoginRequired注解的请求入口

当然, Java实现咯

涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习。整编:微信公众号,搜云库技术团队,ID:souyunku

二、项目目录结构

项目结构

说明: 数据库DAO层放到另一个模块了, 不是本文重点

三、主要maven依赖

1、easyexcel

<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>easyexcel</artifactId>
    <version>${easyexcel-latestVersion}</version>
</dependency>

2、JWT

<dependency>
    <groupId>io.jsonwebtoken</groupId>
    <artifactId>jjwt</artifactId>
    <version>0.7.0</version>
</dependency>

3、redis

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-redis</artifactId>
    <version>1.3.5.RELEASE</version>
</dependency>

4、腾讯cos

<dependency>
    <groupId>com.qcloud</groupId>
    <artifactId>cos_api</artifactId>
    <version>5.4.5</version>
</dependency>

四、流程

1、用户上传文件 2、将文件存储到腾讯cos 3、将上传后的文件id及上传记录保存到数据库 4、redis生产一条导入消息, 即保存文件id到redis 5、请求结束, 返回"处理中"状态 6、redis消费消息 7、读取cos文件, 异步处理数据 8、将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成" 9、客户端轮询查询处理状态, 并可以下载错误文件 10、结束

五、实现效果

1、上传文件

上传文件

2、数据库导入记录

数据库导入记录

3、导入的数据

导入的数据4、下载错误文件

下载错误文件

5、错误数据提示

错误数据提示

5、查询导入记录

查询导入记录

六、代码实现

1、导入excel控制层

@LoginRequired
@RequestMapping(value = "doImport", method = RequestMethod.POST)
public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {
    PLUser user = getUser(request);
    return orderImportService.doImport(file, user.getId());
}

2、service层

@Override
    public JsonResponse doImport(MultipartFile file, Integer userId) {
        if (null == file || file.isEmpty()) {
            throw new ServiceException("文件不能为空");
        }

        String filename = file.getOriginalFilename();
        if (!checkFileSuffix(filename)) {
            throw new ServiceException("当前仅支持xlsx格式的excel");
        }

        // 存储文件
        String fileId = saveToOss(file);
        if (StringUtils.isBlank(fileId)) {
            throw new ServiceException("文件上传失败, 请稍后重试");
        }

        // 保存记录到数据库
        saveRecordToDB(userId, fileId, filename);

        // 生产一条订单导入消息
        redisProducer.produce(RedisKey.orderImportKey, fileId);

        return JsonResponse.ok("导入成功, 处理中...");
    }

    /**
     * 校验文件格式
     * @param fileName
     * @return
     */
    private static boolean checkFileSuffix(String fileName) {
        if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {
            return false;
        }

        int pointIndex = fileName.lastIndexOf(".");
        String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();
        if (".xlsx".equals(suffix)) {
            return true;
        }

        return false;
    }

   /**
     * 将文件存储到腾讯OSS
     * @param file
     * @return
     */
    private String saveToOss(MultipartFile file) {
        InputStream ins = null;
        try {
            ins = file.getInputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }

        String fileId;
        try {
            String originalFilename = file.getOriginalFilename();
            File f = new File(originalFilename);
            inputStreamToFile(ins, f);
            FileSystemResource resource = new FileSystemResource(f);

            MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
            param.add("file", resource);

            ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
            fileId = (String) responseResult.getData();
        } catch (Exception e) {
            fileId = null;
        }

        return fileId;
    }

3、redis生产者

@Service
public class RedisProducerImpl implements RedisProducer {

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public JsonResponse produce(String key, String msg) {
        Map<String, String> map = Maps.newHashMap();
        map.put("fileId", msg);
        redisTemplate.opsForList().leftPush(key, map);
        return JsonResponse.ok();
    }

}

4、redis消费者

@Service
public class RedisConsumer {

    @Autowired
    public RedisTemplate redisTemplate;

    @Value("${txOssFileUrl}")
    private String txOssFileUrl;

    @Value("${txOssUploadUrl}")
    private String txOssUploadUrl;

    @PostConstruct
    public void init() {
        processOrderImport();
    }

    /**
     * 处理订单导入
     */
    private void processOrderImport() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            while (true) {
                Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);
                if (null == object) {
                    continue;
                }
                String msg = JSON.toJSONString(object);
                executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));
            }
        });
    }

}

5、处理任务线程类

public class OrderImportTask implements Runnable {
    public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {
        this.msg = msg;
        this.txOssFileUrl = txOssFileUrl;
        this.txOssUploadUrl = txOssUploadUrl;
    }
}

/**
 * 注入bean
 */
private void autowireBean() {
    this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);
    this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);
    this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);
}

@Override
public void run() {
    // 注入bean
    autowireBean();

    JSONObject jsonObject = JSON.parseObject(msg);
    String fileId = jsonObject.getString("fileId");

    MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
    param.add("id", fileId);

    ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);
    String fileUrl = (String) responseResult.getData();
    if (StringUtils.isBlank(fileUrl)) {
        return;
    }

    InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);
    List<Object> list = ExcelUtil.read(inputStream);
    process(list, fileId);
}

/**
 * 将文件上传至oss
 * @param file
 * @return
 */
private String saveToOss(File file) {
    String fileId;
    try {
        FileSystemResource resource = new FileSystemResource(file);
        MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
        param.add("file", resource);

        ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
        fileId = (String) responseResult.getData();
    } catch (Exception e) {
        fileId = null;
    }
    return fileId;
}

说明: 处理数据的业务逻辑代码就不用贴了

6、上传文件到cos

@RequestMapping("/txOssUpload")
@ResponseBody
public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {
    if (null == file || file.isEmpty()) {
        return ResponseResult.fail("文件不能为空");
    }

    String originalFilename = file.getOriginalFilename();
    originalFilename = MimeUtility.decodeText(originalFilename);// 解决中文乱码问题
    String contentType = getContentType(originalFilename);
    String key;

    InputStream ins = null;
    File f = null;

    try {
        ins = file.getInputStream();
        f = new File(originalFilename);
        inputStreamToFile(ins, f);
        key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);
    } catch (Exception e) {
        return ResponseResult.fail(e.getMessage());
    } finally {
        if (null != ins) {
            try {
                ins.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (f.exists()) {// 删除临时文件
            f.delete();
        }
    }

    return ResponseResult.ok(key);
}

public static void inputStreamToFile(InputStream ins,File file) {
    try {
        OutputStream os = new FileOutputStream(file);
        int bytesRead = 0;
        byte[] buffer = new byte[8192];
        while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
            os.write(buffer, 0, bytesRead);
        }
        os.close();
        ins.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public String txOssUpload(FileInputStream inputStream, String key, String contentType) {
    key = Uuid.getUuid() + "-" + key;
    OSSUtil.txOssUpload(inputStream, key, contentType);
    try {
        if (null != inputStream) {
            inputStream.close();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
    return key;
}

public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {
    ObjectMetadata objectMetadata = new ObjectMetadata();
    try{
        int length = inputStream.available();
        objectMetadata.setContentLength(length);
    }catch (Exception e){
        logger.info(e.getMessage());
    }
    objectMetadata.setContentType(contentType);
    cosclient.putObject(txbucketName, key, inputStream, objectMetadata);
}

7、下载文件

/**
 * 腾讯云文件下载
 * @param response
 * @param id
 * @return
 */
@RequestMapping("/txOssDownload")
public Object txOssDownload(HttpServletResponse response, String id) {
    COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);
    String contentType = getContentType(id);
    FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);
    return null;
}

public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {
    FileOutputStream fos = null;
    response.reset();
    OutputStream os = null;
    try {
        response.setContentType(contentType + "; charset=utf-8");
        if(!contentType.equals(PlConstans.FileContentType.image)){
            try {
                response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));
            } catch (UnsupportedEncodingException e) {
                response.setHeader("Content-Disposition", "attachment; filename=" + fileName);
                logger.error("encoding file name failed", e);
            }
        }

        os = response.getOutputStream();

        byte[] b = new byte[1024 * 1024];
        int len;
        while ((len = fileStream.read(b)) > 0) {
            os.write(b, 0, len);
            os.flush();
            try {
                if(fos != null) {
                    fos.write(b, 0, len);
                    fos.flush();
                }
            } catch (Exception e) {
                logger.error(e.getMessage());
            }
        }
    } catch (IOException e) {
        IOUtils.closeQuietly(fos);
        fos = null;
    } finally {
        IOUtils.closeQuietly(os);
        IOUtils.closeQuietly(fileStream);
        if(fos != null) {
            IOUtils.closeQuietly(fos);
        }
    }
}

8、读取网络文件流

/**
 * 读取网络文件流
 * @param url
 * @return
 */
public static InputStream readFileFromURL(String url) {
    if (StringUtils.isBlank(url)) {
        return null;
    }

    HttpClient httpClient = new DefaultHttpClient();
    HttpGet methodGet = new HttpGet(url);
    try {
        HttpResponse response = httpClient.execute(methodGet);
        if (response.getStatusLine().getStatusCode() == 200) {
            HttpEntity entity = response.getEntity();
            return entity.getContent();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return null;
}

9、ExcelUtil

/**
     * 读excel
     * @param inputStream 文件输入流
     * @return list集合
     */
    public static List<Object> read(InputStream inputStream) {
        return EasyExcelFactory.read(inputStream, new Sheet(1, 1));
    }

    /**
     * 写excel
     * @param data list数据
     * @param clazz
     * @param saveFilePath 文件保存路径
     * @throws IOException
     */
    public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {
        File tempFile = new File(saveFilePath);
        OutputStream out = new FileOutputStream(tempFile);
        ExcelWriter writer = EasyExcelFactory.getWriter(out);
        Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);
        writer.write(data, sheet);
        writer.finish();
        out.close();
    }

说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考

七、其他

1、@LoginRequired注解

/**
 * 在需要登录验证的Controller的方法上使用此注解
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface LoginRequired {
}

2、MyControllerAdvice

@ControllerAdvice
public class MyControllerAdvice {

    @ResponseBody
    @ExceptionHandler(TokenValidationException.class)
    public JsonResponse tokenValidationExceptionHandler() {
        return JsonResponse.loginInvalid();
    }

    @ResponseBody
    @ExceptionHandler(ServiceException.class)
    public JsonResponse serviceExceptionHandler(ServiceException se) {
        return JsonResponse.fail(se.getMsg());
    }

    @ResponseBody
    @ExceptionHandler(Exception.class)
    public JsonResponse exceptionHandler(Exception e) {
        e.printStackTrace();
        return JsonResponse.fail(e.getMessage());
    }

}

3、AuthenticationInterceptor

public class AuthenticationInterceptor implements HandlerInterceptor {

    private static final String CURRENT_USER = "user";

    @Autowired
    private UserService userService;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        // 如果不是映射到方法直接通过
        if (!(handler instanceof HandlerMethod)) {
            return true;
        }
        HandlerMethod handlerMethod = (HandlerMethod) handler;
        Method method = handlerMethod.getMethod();

        // 判断接口是否有@LoginRequired注解, 有则需要登录
        LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);
        if (methodAnnotation != null) {
            // 验证token
            Integer userId = JwtUtil.verifyToken(request);
            PLUser plUser = userService.selectByPrimaryKey(userId);
            if (null == plUser) {
                throw new RuntimeException("用户不存在,请重新登录");
            }
            request.setAttribute(CURRENT_USER, plUser);
            return true;
        }
        return true;
    }

    @Override
    public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
    }

    @Override
    public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
    }
}

4、JwtUtil

public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天
public static final String SECRET = "pl_token_secret";
public static final String HEADER = "token";
public static final String USER_ID = "userId";

/**
 * 根据userId生成token
 * @param userId
 * @return
 */
public static String generateToken(String userId) {
    HashMap<String, Object> map = new HashMap<>();
    map.put(USER_ID, userId);
    String jwt = Jwts.builder()
            .setClaims(map)
            .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
            .signWith(SignatureAlgorithm.HS512, SECRET)
            .compact();
    return jwt;
}

/**
 * 验证token
 * @param request
 * @return 验证通过返回userId
 */
public static Integer verifyToken(HttpServletRequest request) {
    String token = request.getHeader(HEADER);
    if (token != null) {
        try {
            Map<String, Object> body = Jwts.parser()
                    .setSigningKey(SECRET)
                    .parseClaimsJws(token)
                    .getBody();

            for (Map.Entry entry : body.entrySet()) {
                Object key = entry.getKey();
                Object value = entry.getValue();
                if (key.toString().equals(USER_ID)) {
                    return Integer.valueOf(value.toString());// userId
                }
            }
            return null;
        } catch (Exception e) {
            logger.error(e.getMessage());
            throw new TokenValidationException("unauthorized");
        }
    } else {
        throw new TokenValidationException("missing token");
    }
}

结语: OK, 搞定,睡了, 好困

作者:wangzaiplus

www.jianshu.com/p/0c684076367e

本文分享自微信公众号 - 搜云库技术团队(souyunku)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-01-07

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • SpringBoot整合Activiti工作流(附源码)

    Activiti是一个开源的工作流引擎,它实现了BPMN 2.0规范,可以发布设计好的流程定义,并通过api进行流程调度。Activiti 作为一个遵从 Apa...

    搜云库技术团队
  • 老板看了我的代码,直呼“666”,要涨工资?

    背景:如何更规范化编写Java 代码的重要性想必毋需多言,其中最重要的几点当属提高代码性能、使代码远离Bug、令代码更优雅。

    搜云库技术团队
  • SpringBoot 注解原理,自动装配原理,图文并茂,万字长文!

    点进@SpringBootApplication来看,发现@SpringBootApplication是一个组合注解。

    搜云库技术团队
  • 手把手fastDFS实战2-文件读写

    启动服务以后,可通过 http://localhost:8080/file/ 来访问首页

    技术路漫漫
  • RabbitMQ 快速入门实战

    本文基于docker来安装RabbitMQ,通过pull当前最新版本rabbitmq:3.8.5-management即可,之后通过如下的命令即可运行:

    技术路漫漫
  • Spring Cloud Zuul实现动态路由

    前言 Zuul 是Netflix 提供的一个开源组件,致力于在云平台上提供动态路由,监控,弹性,安全等边缘服务的框架。也有很多公司使用它来作为网关的重要组成部分...

    程序猿DD
  • Java基于百度API的图片文字识别

    http://ai.baidu.com/docs#/OCR-API/e1bd77f3

    Java团长
  • Java基于百度API的图片文字识别(支持中文,英文和中英文混合)

    具体文档:http://ai.baidu.com/docs#/OCR-API/e1bd77f3

    好好学java
  • 【SpringBoot 基础系列】实现一个自定义配置加载器(应用篇)

    Spring 中提供了@Value注解,用来绑定配置,可以实现从配置文件中,读取对应的配置并赋值给成员变量;某些时候,我们的配置可能并不是在配置文件中,如存在 ...

    一灰灰blog
  • 【Java多线程】写入同一文件,自定义线程池与线程回收利用2 顶

    起初为了方便快捷,只为实现功能,写了很多垃圾的代码. 造成性能不高,可读性,可维护性不强。

    linapex

扫码关注云+社区

领取腾讯云代金券