前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从Mysql到本地文件与Kafka队列

从Mysql到本地文件与Kafka队列

作者头像
conanma
修改2022-04-07 20:14:43
1.2K0
修改2022-04-07 20:14:43
举报
文章被收录于专栏:正则正则

准备工作:

1)修改application.properties文件中Mysql数据库的相关配置

2)启动主程序,添加一条记录 {"empId":"002","empName":"keven"}

3)查一下结果:(刚才多添加了一条同样的记录)

  • 4)再将application.properties中spring.datasource.initialization-mode=always这行注释掉,否则每次重启时它都会重建数据库,又要重新添加记录

从上图可以看出:本程序提供了两个功能,从接收浏览器Get/Post两个方法(端点),分别路由到“插入/查询所有记录”两个路径,执行对应功能。

以下做一点扩展:

  • 发送到本地文件

1. 在EmployeeServiceImpl类中添加如下路由:

代码语言:javascript
复制
//write,Mysql--->File
        from("direct:write").to("sql:select * from employee").process(new Processor() {
            public void process(Exchange xchg) throws Exception {
                ArrayList<Map<String, String>> dataList = (ArrayList<Map<String, String>>) xchg.getIn().getBody();
                List<Employee> employees = new ArrayList<Employee>();
                System.out.println(dataList);
                StringBuilder sb=new StringBuilder();
                for (Map<String, String> data : dataList) {
                    sb.append("empId:"+data.get("empId")+",");
                    sb.append("empName:"+data.get("empName"));
                }
                xchg.getIn().setBody(sb.toString());
            }
        }).to("file:data/outbox");

2. 到控制类EmployeeController中加一条

代码语言:javascript
复制
//write
    @RequestMapping(value = "/write", method = RequestMethod.GET)
    public boolean write() {
        producerTemplate.requestBody("direct:write", null, List.class);
        return true;
    }

这样,当页面中接收到write的请求时,程序会先查找记录,再把结果输出到程序的data/outbox目录下

3)重启一下,访问http://localhost:8080/write

再到程序目录下检查一下

可以看到,已经输出到指定目录了

  • 发送到kafka队列 1)准备工作 在poem.xml文件中添加kafka依赖
代码语言:javascript
复制
              <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-kafka</artifactId>
            <version>2.16.3</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.0</version>
        </dependency>

到服务实体类EmployeeServiceImpl中添加kafka定义(也可放到属性文件中去)

代码语言:javascript
复制
    String topicName = "topic=camel-topic";
    String kafkaServer = "kafka:CDH-04:9092";
    String zooKeeperHost = "zookeeperHost=CDH-05&zookeeperPort=2181";
    String serializerClass = "serializerClass=kafka.serializer.StringEncoder";

    String toKafka = new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&")
            .append(zooKeeperHost).append("&").append(serializerClass).toString();

2)修改代码 添加到kafka的路由

代码语言:javascript
复制
//Kafka,Mysql--->Kafka
        from("direct:kafka").to("sql:select * from employee").process(new Processor() {
            public void process(Exchange xchg) throws Exception {
                ArrayList<Map<String, String>> dataList = (ArrayList<Map<String, String>>) xchg.getIn().getBody();
                List<Employee> employees = new ArrayList<Employee>();
                System.out.println(dataList);
                for (Map<String, String> data : dataList) {
                    Employee employee = new Employee();
                    employee.setEmpId(data.get("empId"));
                    employee.setEmpName(data.get("empName"));
                    employees.add(employee);
                }
                xchg.getIn().setBody(employees.toString());
            }
        }).to(toKafka).process(new Processor() {
            public void process(Exchange exchange) throws Exception {
                System.out.println("it is :"+toKafka);
            }
        });

添加触发控制(EmployeeController类)

代码语言:javascript
复制
//kafka
    @RequestMapping(value = "/kafka", method = RequestMethod.GET)
    public boolean kafka() {
        producerTemplate.requestBody("direct:kafka", null, List.class);
        return true;
    }

3)访问一下 http://localhost:8080/kafka

4)查看一下队列

可以看到,已经发送到队列了

本文系外文翻译,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系外文翻译前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CKafka 版
消息队列 CKafka 版(TDMQ for CKafka)是一个分布式、高吞吐量、高可扩展性的消息系统,100%兼容开源 Kafka API 2.4、2.8、3.2 版本。CKafka 基于发布/订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。CKafka 具有高可用、数据压缩、同时支持离线和实时数据处理等优点,适用于日志压缩收集、监控数据聚合、流式数据集成等场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档