前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springboot第44集:Kafka集群和Lua脚本

springboot第44集:Kafka集群和Lua脚本

作者头像
达达前端
发布2023-10-31 08:26:48
2020
发布2023-10-31 08:26:48
举报
文章被收录于专栏:达达前端
  1. servers:Kafka服务器的地址。这是Kafka集群的地址,生产者将使用它来发送消息。

  1. retries:在消息发送失败时,生产者将尝试重新发送消息的次数。这个属性指定了重试次数。
  2. batchSize:指定了生产者在发送消息之前累积的消息大小(以字节为单位)。一次性发送多个消息可以提高性能。
  3. linger:指定了生产者在发送消息之前等待的时间(以毫秒为单位)。这可以帮助在一起发送多个消息以减少网络开销。
  4. bufferMemory:指定了生产者用于缓冲等待发送的消息的内存大小(以字节为单位)。

这是一个自定义注解 @Log,用于在方法上进行注解。以下是对该注解的逐行详细说明:

代码语言:javascript
复制
@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Log {
    /** 标题 */
    public String title() default "";

    /** 功能 */
    public BusinessType businessType() default BusinessType.OTHER;
}

这个自定义注解的作用如下:

  • @Target({ElementType.PARAMETER, ElementType.METHOD}): 指定了可以在方法和方法参数上使用这个注解。
  • @Retention(RetentionPolicy.RUNTIME): 指定了这个注解在运行时保留,这样它可以在运行时被反射读取。
  • @Documented: 表示这个注解应该被包含在JavaDoc中。

注解元素:

  • public String title() default "": 这是一个注解元素,用于指定日志的标题。默认值为空字符串,可以通过注解时指定。
  • public BusinessType businessType() default BusinessType.OTHER: 这是一个注解元素,用于指定日志的业务类型。默认值为 BusinessType.OTHER,表示其他业务类型。可以通过注解时指定不同的业务类型。

这个自定义注解可以用于方法上,以帮助描述方法的功能和业务类型,通常用于日志记录或其他相关的操作。在使用时,可以为方法添加 @Log 注解,并指定标题和业务类型。例如:

代码语言:javascript
复制
@Log(title = "用户登录", businessType = BusinessType.LOGIN)
public void userLogin() {
    // 方法实现
}

在这个示例中,方法 userLogin 被标记为登录操作,并指定了相关的标题。这有助于记录日志和监控应用程序的操作。

Spring Framework 通过 @Autowired 注解来自动装配这些依赖,从而减少了手动配置和依赖管理的工作。

Redis Lua脚本,通常用于在Redis中执行原子操作。这个脚本的主要作用是递增存储在Redis中的一个键的值,并在第一次递增时设置过期时间。以下是脚本的主要步骤:

  1. current变量:首先,脚本声明了一个本地变量current,用于存储键的当前值。
  2. redis.call("incr", KEYS[1]):这一行执行了一个Redis原子操作,即将键KEYS[1]的值递增1,并返回递增后的值,该值被存储在current变量中。
  3. if tonumber(current) == 1 then:接下来,脚本检查current的值是否等于1。这是为了判断是否是第一次递增。tonumber(current)用于将current的值转换为整数,以便进行比较。
  4. 如果current的值等于1,表示第一次递增,那么脚本会执行以下操作:
  • redis.call("expire", KEYS[1], KEYS[2]):设置键KEYS[1]的过期时间为KEYS[2],即设置键在一定时间后过期。

最后,脚本返回current的值,这是递增后的值。

总的来说,这个脚本用于实现一个计数器,每次调用时递增,但仅在第一次递增时设置过期时间。这种模式常用于执行某些操作的限制或计时器功能。

这个Lua脚本的主要作用是在Redis中实现一个计数器,同时在第一次递增时设置键的过期时间。通常,这种功能可以用于限制用户在一段时间内执行某个操作的次数,或者用于实现一个短期有效的计数功能。

示例1:限制用户发送短信的次数

假设你想要限制用户在一分钟内只能发送一次短信,你可以使用这个Lua脚本。脚本会递增计数,如果用户在一分钟内已经发送了一次短信,那么后续请求将返回0,表示不允许再发送。

代码语言:javascript
复制
local current
current = redis.call("incr", KEYS[1])
if tonumber(current) == 1 then
    redis.call("expire", KEYS[1], KEYS[2])
end
return current

示例2:计时器

你可以使用这个脚本来实现一个简单的计时器,以记录某个事件的发生次数。例如,你可以记录一小时内某个事件发生的次数。

代码语言:javascript
复制
local current
current = redis.call("incr", KEYS[1])
if tonumber(current) == 1 then
    redis.call("expire", KEYS[1], 3600)  -- 设置过期时间为1小时
end
return current

在这两个示例中,脚本首先尝试递增计数,然后在第一次递增时设置过期时间。如果计数超过一定限制,后续请求将返回已达到限制的标志,不再允许递增。这可以用于实现许多不同类型的计数和限制功能。

  1. 使用 registry.addMapping("/**") 配置跨域设置,表示针对所有的路径都应用相同的 CORS 配置。
  2. .allowCredentials(false) 指定不允许发送凭证信息,例如 cookies。这表示客户端的请求不包含敏感凭证。
  3. .allowedOrigins("*") 允许来自任何源(包括不同域名或 IP 地址)的请求。使用 "*" 表示允许来自任何源的请求。
  4. .allowedHeaders("*") 允许所有的请求头,这意味着客户端可以发送任意请求头到服务器。
  5. .allowedMethods("GET", "PUT", "POST", "DELETE") 指定允许的 HTTP 方法,这里配置了常见的 GET、PUT、POST 和 DELETE 方法。
  6. .exposedHeaders("*") 暴露所有响应头给客户端,允许客户端访问所有响应头信息。

消息队列是为了解决消息间通信繁忙而诞生的,体现了解耦和异步的实现 为了解决消息间通信繁忙的问题,我们可以理解为引入了一个中间件(消息队列),发送方在发送信息的时候,不是直接发送到接收方,而是发送信息到中间件,接收方通过中间件获取自己想要的信息。 在这个过程中,我们可以把发送方理解为生产者,接收方认为是消费者。 生产者发布信息,消费者订阅信息(通过中间件) 引出一个问题,消费者如何拿到自己想要的数据,这个问题的解决方法就是主题(topic),生产者将不同主题的信息发布到中间件(kafka)中,消费者通过订阅不同的主题来消费自己想要的数据 在主题下面会有分区,分区可以实现分布在不同的服务器上,生产者将数据存储在主题下的不同分区里面 两种模式:1.生产者指定分区 2.分区器(一个算法)通过消息的键(一个标记)来安排数据的存储空间

我们现在可以知道一条消息可能带有以下几个数据:1.主题 2.分区 3.键 4.值(想传送的数据)

那消费者如何读取数据呢?引出偏移量 偏移量(offset):第几个 一个分区里面,每个消息的偏移量都是唯一的 消费者只能顺序读取

这样的话我们就实现了一个borker,borker里面包含主题,分区、 borker为传来的消息根据分区设置偏移量,将其存储在磁盘上,borker也为消费者提供服务,提供响应。 多个Borker集群就是kafka集群,这样就提供了消息的安全性。在这个集群中可能还会有一个集群负责控制器的角色。

bec3eee0402275009ddc9fa09f795867.png
bec3eee0402275009ddc9fa09f795867.png

image.png

310897678f65fb9f1b8fb276ea047386.png
310897678f65fb9f1b8fb276ea047386.png

image.png

ad55ee03e920dea2b3df0254fdc69e1f.png
ad55ee03e920dea2b3df0254fdc69e1f.png

image.png

a9760ccfb53724154ff885b94eba0f36.png
a9760ccfb53724154ff885b94eba0f36.png

image.png

bfe71f4d0006e51356ee7039a239f5bf.png
bfe71f4d0006e51356ee7039a239f5bf.png

image.png

888599616b25f24b0a988abe20abe4de.png
888599616b25f24b0a988abe20abe4de.png

image.png

37459d5c1757dfb033be9b5ea51fb4a2.png
37459d5c1757dfb033be9b5ea51fb4a2.png

image.png

ae7a99fd4cdaded52ec30fb14dce84fb.png
ae7a99fd4cdaded52ec30fb14dce84fb.png

image.png

1b23eac09cf9c76d0e8ecd711f4c34e3.png
1b23eac09cf9c76d0e8ecd711f4c34e3.png

image.png

02341edf17cd8a3adc7086acb680983d.png
02341edf17cd8a3adc7086acb680983d.png

image.png

850b52741d3f8e2c6a2c219f6ec18a9f.png
850b52741d3f8e2c6a2c219f6ec18a9f.png

image.png

0f5ea2a7b69d696b76f5e8e0010ca275.png
0f5ea2a7b69d696b76f5e8e0010ca275.png

image.png

3ddccd69bb59481b91969046a5c99a31.png
3ddccd69bb59481b91969046a5c99a31.png

image.png

第一步:vi ~/.zshrc 第二步:按 i 进行输入 添加:source ~/.bash_profile 第三步:按esc 输入: :wq 进行保存退出

MQTT(消息队列遥测传输)是一种网络协议(长连接,意思就是除了客户端可以主动向服务器通信外,服务器也可以主动向客户端发起),也是基于TCP/IP的,适用于算力低下的硬件设备使用,基于发布\订阅范式的消息协议

275de7c6dba6ad413a1d45d6e4c580db.png
275de7c6dba6ad413a1d45d6e4c580db.png

image.png

d29956f0495102cd8ec2ff42ce221c6c.png
d29956f0495102cd8ec2ff42ce221c6c.png

image.png

Arduino IDE(集成开发环境)是一个用于开发Arduino(一种开源硬件平台)的集成开发环境。Arduino 是一种基于开源硬件和软件的电子原型平台,旨在帮助电子爱好者、学生和专业开发人员快速、轻松地创建各种交互式电子项目。Arduino IDE 用于编写、上传和运行Arduino代码,并与Arduino板上的硬件进行交互。

Arduino IDE 提供了以下主要功能:

  1. 代码编写:您可以使用Arduino IDE编写Arduino的程序代码,这些代码通常使用C/C++语言编写。
  2. 代码编辑:IDE包含代码编辑器,具有语法高亮显示、代码自动完成和调试功能,使编写代码更加容易。
  3. 上传代码:一旦您编写了Arduino代码,可以将它们上传到Arduino开发板上,以便实际运行代码并控制硬件。
  4. 硬件交互:Arduino IDE允许您与Arduino开发板上的各种传感器、执行器和其他外围设备进行交互,以创建各种物联网、嵌入式系统和电子艺术项目。
  5. 库管理:Arduino社区为常见的硬件和传感器提供了大量库,这些库可通过IDE进行管理和导入,以便轻松使用这些硬件组件。
  6. 调试和监控:Arduino IDE提供了一些基本的调试和监控工具,帮助您检查代码的执行和硬件的状态。
  7. 项目管理:IDE允许您管理多个项目,以便组织和跟踪不同的Arduino应用程序。
7844f9ba69c8f2167e7290c34d36f464.png
7844f9ba69c8f2167e7290c34d36f464.png

image.png

如果你是 ECS 云主机,点击实例>点击你的服务器名>安全组>配置规则>手动添加

添加这么一条即可:

340cbe3205f48137772fedc19d25f771.png
340cbe3205f48137772fedc19d25f771.png

image.png

如果你是轻量服务器,点击安全>防火墙>添加规则 即可,跟ECS设置大差不差。

3是准备给前端页面用的 ,2是给后端用的,1是我个人自留的超级用户,wemos是设备用的,即上面设备连接时输入的用户名密码。

2f356ceb3fff595849fd6f6502efa13c.png
2f356ceb3fff595849fd6f6502efa13c.png

image.png

bead396119c3106cc456090977d9fcb6.png
bead396119c3106cc456090977d9fcb6.png

image.png

1883(mqtt默认端口)开启的,当然,和开启18083的方法一样。

  • 1804 websockets 默认端口
  • 3306 mysql默认端口
  • 1803

当你开启完成后,再次尝试使用mqttx连接broker,会发现可以连接了

4cd132b3f2762af4c3ba7ebdcb752d95.png
4cd132b3f2762af4c3ba7ebdcb752d95.png

image.png

左侧添加订阅,右侧的聊天框里会出现该topic的消息

4253d1b6d01a82e6aed3c9a2e775c30d.png
4253d1b6d01a82e6aed3c9a2e775c30d.png

image.png

在loop中每一秒向 home/status/ 发送一条设备在线的提示

设备、服务器、emqx控制台已经跑通了。

前端不必多说,我们使用echarts承载展示数据,由于体量较小,我们不使用任何框架,直接使用jq和echarts实现

代码语言:javascript
复制
<script src="https://cdn.bootcdn.net/ajax/libs/mqtt/4.1.0/mqtt.min.js"></script>
代码语言:javascript
复制
http {
    sendfile on;
    tcp_nopush on;
    tcp_nodelay on;
    keepalive_timeout 65;
    types_hash_max_size 2048;
    include /etc/nginx/mime.types;
    default_type application/octet-stream;

    ##
    # SSL Settings
    ##
    server {
    listen 80;
    server_name jshub.cn;
    #将请求转成https
    rewrite ^(.*)$ https://$host$1 permanent;
    }
    server {
        listen 443 ssl;
                server_name xxx.cn;
                location / {
                    root /larryzhu/web/release/toolbox;
                    index index.html index.htm;
                    try_files $uri $uri/ /index.html;
                }
     location /mqtt {
           proxy_pass http://localhost:8083;
           proxy_http_version 1.1;
           proxy_set_header Upgrade $http_upgrade;
           proxy_set_header Connection "upgrade";
         }
        # SSL 协议版本
        ssl_protocols TLSv1.2;
        # 证书
        ssl_certificate /larryzhu/web/keys/9263126_xxx.cn.pem;
        # 私钥
        ssl_certificate_key /larryzhu/web/keys/9263126_xxx.cn.key;
        # ssl_ciphers ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;
        # ssl_ciphers AES128-GCM-SHA256:AES256-GCM-SHA384:AES128-SHA256;

        # 与False Start没关系,默认此项开启,此处减少抓包的干扰而关闭
        # ssl_session_tickets off;

        # return 200 "https ok \n";
  }

如果数据量超过43200(每两秒插入一条,这是一天的量)条,调用存储过程删除最早的一条数据

693f5b29178b9f4fa9791c92bfb951c5.png
693f5b29178b9f4fa9791c92bfb951c5.png

image.png

a432efa34ed2d24d15fa3b71b0e42614.png
a432efa34ed2d24d15fa3b71b0e42614.png

image.png

打开EMQ云服务器的控制台“**http://127.0.0.1:18083[1]”,初次登录的默认用户名是“admin”,密码是“public”**

./emqx start

EMQX云服务器是自带客户端调试功能面板的,在工具 —> WebSocket中即可进行简单的消息发布及订阅测试,MQTT协议是基于发布/订阅模式的轻量级传输协议

MQTT协议消息的订阅是需要包括消息的主题和服务质量的,消息的主题即“发送该消息的一个标签”,服务质量包括0、1、2,具体见下表:

Qos值

Bit2

Bit1

描述

0

0

0

最多分发一次

1

0

1

至少分发一次

2

1

0

只分发一次

加群联系作者vx:xiaoda0423

仓库地址:https://github.com/webVueBlog/JavaGuideInterview

参考资料

[1]

http://127.0.0.1:18083: https://link.juejin.cn/?target=http%3A%2F%2F127.0.0.1%3A18083

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-10-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 参考资料
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档