前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >手把手教学--从Pulsar到TDMQ

手把手教学--从Pulsar到TDMQ

原创
作者头像
沐榕樰
修改2020-12-15 18:51:36
1.8K0
修改2020-12-15 18:51:36
举报

导语:介于TDMQ还没有公网的访问功能,不可能买台CVM安装windows吧,VPN又只能支持协议类型: IKE/IPsec,意思是企业用户才能用,对于个人就只能再想办法了,但办法总比问题多。本地开发测试环境使用pulsar的单机版,生产使用TDMQ,这样怎么样,一起来看看怎么配置。

一、用CVM安装单机版的pulsar

1、安装JDK1.8

下载JDK1.8:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html

a、将免安装的JDK拷贝到linux目录下

/etc/jdk1.8.0_271

b、更改环境变量

vim /etc/profile

代码语言:javascript
复制
export JAVA_HOME=/etc/jdk1.8.0_271
export PATH=.:$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

c、刷新配置

source /etc/profile

这样就可以了

2、安装pulsar

下载安装二进制版本pulsar:http://pulsar.apache.org/docs/zh-CN/next/standalone/

这里下载比较快:

解压之后启动:bin/pulsar-daemon start standalone

启动日志(简略版):

代码语言:javascript
复制
14:09:02.061 [main] INFO  org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Starting ZK server
14:09:02.467 [main] INFO  org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Server UP
14:09:02.467 [main] INFO  org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - ZooKeeper server up: true
14:09:02.467 [main] INFO  org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Instantiate ZK Client
14:09:02.586 [main] INFO  org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Starting Bookie(s)
14:09:02.980 [main] INFO  org.apache.bookkeeper.meta.MetadataDrivers - BookKeeper metadata driver manager initialized
14:09:02.984 [main] INFO  org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase - Initialize zookeeper metadata driver at metadata service uri zk+null:/
/127.0.0.1:2181/ledgers : zkServers = 127.0.0.1:2181, ledgersRootPath = /ledgers.
14:09:02.987 [main] INFO  org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=10000 watcher=or
g.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase@51751e5f
14:09:02.997 [main-EventThread] INFO  org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase - ZooKeeper client is connected now.
14:09:03.140 [main] INFO  org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage - Started Db Ledger Storage
14:09:03.140 [main] INFO  org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage -  - Number of directories: 1
14:09:03.140 [main] INFO  org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage -  - Write cache size: 1024 MB
14:09:03.140 [main] INFO  org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage -  - Read Cache: 1024 MB
14:09:03.143 [main] INFO  org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage - Creating single directory db ledger storage on data/standalone/b
ookkeeper0/current

注:如果使用命令在后台运行服务:pulsar-daemon start standalone;则可以通过以下命令终止服务:pulsar-daemon stop standalone

3、使用pulsar-client测试生产消费

Produce 消息:

向名称为 my-topic 的 topic 发送一条简单的消息 hello-pulsar:

bin/pulsar-client produce my-topic --messages "hello-pulsar"

Consume 消息:

在 first-subscription 订阅的my-topic消费消息:

bin/pulsar-client consume my-topic -s "first-subscription"

这样:单机版的pulsar是可以使用的。
这样:单机版的pulsar是可以使用的。

二、搭建本地开发环境

1、下载TDMQ的demo:

https://github.com/TencentCloud/tdmq-java-client

2、下载Pulsar的Java SDK 下载方式:

您 Java 工程的 全整pom.xml

代码语言:javascript
复制
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.tencent</groupId>
	<artifactId>tdmq-demo-cloud</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>tdmq-demo-cloud</name>
	<url>http://maven.apache.org</url>

	<properties>
		<pulsar.version>2.6.0</pulsar.version>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.pulsar</groupId>
			<artifactId>pulsar-client</artifactId>
			<version>${pulsar.version}</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>1.2.3</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<!--scope>test</scope-->
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>
				<version>2.4</version>
				<configuration>
					<archive>
						<addMavenDescriptor>false</addMavenDescriptor>
						<manifest>
							<addClasspath>true</addClasspath>
							<classpathPrefix>lib/</classpathPrefix>
						</manifest>
					</archive>
					<excludes>
						<exclude>**/assembly/</exclude>
					</excludes>
				</configuration>
			</plugin>
			<plugin>
				<artifactId>maven-compiler-plugin</artifactId>
				  <version>2.3.2</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
					<encoding>UTF-8</encoding>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>2.2.1</version>
				<configuration>
					<descriptors>
						<descriptor>assembly.xml</descriptor>
					</descriptors>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

在 pom.xml 所在目录执行即可下载 Pulsar SDK。

代码语言:javascript
复制
mvn clean package

3、连接单机版的pulsar的测试代码:SimpleProducerAndCosnumer.java

代码语言:javascript
复制

package com.tencent.tdmq.demo.cloud;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import org.apache.pulsar.client.api.*;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

//import org.apache.pulsar.client.api.NetModel;

/**
 * 简单的生产和消息例子
 *
 */

public class SimpleProducerAndCosnumer {

	public static void invork() throws PulsarClientException {
		Map<String, String> authParams = new HashMap<>();
		//authParams.put("secretId", "AKID6VfaFeTq12nJadkMnOhMFveVR7XjfMnQ");
		//authParams.put("secretKey", "aS2MbkLafbiXO2RM1nTv51h8uW9XQb1t");
		//authParams.put("region", "ap-guangzhou");
		PulsarClient client = PulsarClient.builder()
				//.serviceUrl("pulsar://10.0.0.4:6000")// 填写腾讯云TDMQ接入点的地址
		        //.listenerName("custom:pulsar-geookmr4pz/vpc-paywuhyf/subnet-6jummj9y")
				//.authentication(AuthenticationFactory.token("eyJrZXlJZCI6In9va21yNHB6IiB1bHNhci1nZWwiYWxnIjoiSFMyNTYifQ.eyJzdWIiOiJkZGRkIn0.otXNHuw3FJhQ0l4msNQe_zAH2Bh7lB8kVrfoU4XRTqs"))
				.serviceUrl("pulsar://110.29.14.63:6650")// 填写测试环境pulsar的地址
				.build();
		// 创建消费者对象
		Consumer<byte[]> consumer = client.newConsumer()
				//.topic("persistent://pulsar-geookmr4pz/default/sfhuang")//命名规则:appid/namespace/topic
				.topic("my-topic")
				.subscriptionName("sub-sfhuang")
				.subscribe();
		// 创建生产者对象
		Producer<byte[]> producer = client.newProducer()
				//.topic("persistent://pulsar-geookmr4pz/default/sfhuang")//这个是腾讯云的topic
				.topic("my-topic")
				.create();

		for (int i = 0; i < 5; i++) {
			String value = "my-sync-message-" + i;
			System.out.println("");
			MessageId msgId = producer.newMessage().value(value.getBytes()).send();
			System.out.println("produce sync msg id:" + msgId + ", value:" + value);
		}
		producer.close();
		for (int i = 0; i < 5; i++) {
			Message<byte[]> msg = consumer.receive();
			String msgId = msg.getMessageId().toString();
			String value = new String(msg.getValue());
			System.out.println("receive msg " + msgId + ",value:" + value);
			consumer.acknowledge(msg);
		}
		// 关闭
		consumer.close();
		client.close();
	}

	public static void main(String[] args) throws JoranException, PulsarClientException {
		String logbackFile = "D:\\programming\\tdmq-java-client-master\\conf\\logback.xml";
		//String logbackFile = "/root/tdmq-demo-cloud-0.0.1-SNAPSHOT/conf/logback.xml";
		if (logbackFile != null) {
			LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
			JoranConfigurator configurator = new JoranConfigurator();
			configurator.setContext(lc);
			lc.reset();
			configurator.doConfigure(logbackFile);
		}
		invork();

	}

}

还有logback.xml文件也要修改:

本地运行结果是:

也是可以生产和消费搭建在CVM上的Pulsar
也是可以生产和消费搭建在CVM上的Pulsar

三、测试连接TDMQ并生产消费

1、创建TDMQ的topic

默认创建在default环境
默认创建在default环境

2、创建接入点

这里要选CVM所在的VPC子网,才通连通,地址和路由ID都会在代码中用到。
这里要选CVM所在的VPC子网,才通连通,地址和路由ID都会在代码中用到。

3、创建角色

秘钥是在这里,代码中也会用到
秘钥是在这里,代码中也会用到

4、对环境进行权限配置

有读写的权限:

5、创建订阅者

这里的订阅者需要在控制台创建,系统会生产两个topic,一个是重试队列,一个是死信队列
这里的订阅者需要在控制台创建,系统会生产两个topic,一个是重试队列,一个是死信队列

订阅者:sub-sfhuang

查看订阅者:sub-sfhuang
查看订阅者:sub-sfhuang

6、修改本地代码:

代码语言:javascript
复制
package com.tencent.tdmq.demo.cloud;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import org.apache.pulsar.client.api.*;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

//import org.apache.pulsar.client.api.NetModel;

/**
 * 简单的生产和消息例子
 *
 */

public class SimpleProducerAndCosnumer {

	public static void invork() throws PulsarClientException {
		Map<String, String> authParams = new HashMap<>();
		authParams.put("secretId", "AKID6VfaFeTq12nJadkMnOhMFveVR7XjfMnQ");
		authParams.put("secretKey", "aS2MbkLafbiXO2RM1nTv51h8uW9XQb1t");
		authParams.put("region", "ap-guangzhou");
		PulsarClient client = PulsarClient.builder()
				.serviceUrl("pulsar://10.0.0.4:6000")// 填写腾讯云TDMQ接入点的地址
		        .listenerName("custom:pulsar-geookmr4pz/vpc-paywuhyf/subnet-6jummj9y")//路由ID
				.authentication(AuthenticationFactory.token("eyJrZXlJZCI6In9va21yNHB6IiB1bHNhci1nZWwiYWxnIjoiSFMyNTYifQ.eyJzdWIiOiJkZGRkIn0.otXNHuw3FJhQ0l4msNQe_zAH2Bh7lB8kVrfoU4XRTqs"))//秘钥
				//.serviceUrl("pulsar://110.29.14.63:6650")// 填写测试环境pulsar的地址
				.build();
		// 创建消费者对象
		Consumer<byte[]> consumer = client.newConsumer()
				.topic("persistent://pulsar-geookmr4pz/default/sfhuang")//这个是腾讯云TDMQ的topic
				//.topic("my-topic")
				.subscriptionName("sub-sfhuang")//这个要在控制台创建
				.subscribe();
		// 创建生产者对象
		Producer<byte[]> producer = client.newProducer()
				.topic("persistent://pulsar-geookmr4pz/default/sfhuang")//这个是腾讯云的topic
				//.topic("my-topic")
				.create();

		for (int i = 0; i < 5; i++) {
			String value = "my-sync-message-" + i;
			System.out.println("");
			MessageId msgId = producer.newMessage().value(value.getBytes()).send();
			System.out.println("produce sync msg id:" + msgId + ", value:" + value);
		}
		producer.close();
		for (int i = 0; i < 5; i++) {
			Message<byte[]> msg = consumer.receive();
			String msgId = msg.getMessageId().toString();
			String value = new String(msg.getValue());
			System.out.println("receive msg " + msgId + ",value:" + value);
			consumer.acknowledge(msg);
		}
		// 关闭
		consumer.close();
		client.close();
	}

	public static void main(String[] args) throws JoranException, PulsarClientException {
		//String logbackFile = "D:\\programming\\tdmq-java-client-master\\conf\\logback.xml";
		String logbackFile = "/root/tdmq-demo-cloud-0.0.1-SNAPSHOT/conf/logback.xml";
		if (logbackFile != null) {
			LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
			JoranConfigurator configurator = new JoranConfigurator();
			configurator.setContext(lc);
			lc.reset();
			configurator.doConfigure(logbackFile);
		}
		invork();

	}

}

还有logback.xml文件也要修改:

7、在 pom.xml 所在目录执行即可打zip包。

代码语言:javascript
复制
mvn clean package

我本地所在的目录:

四、打包上传并测试

这里的生产环境也是要有JDK1.8,可以使用Pulsar单机版的机器测试

1、上传zip包

[root@VM-0-9-centos ~]# rz

Sent - tdmq-demo-cloud-0.0.1-SNAPSHOT-bin.zip 5.25 MB/s Spend: 5 seconds

2、解压

[root@VM-0-9-centos ~]# unzip tdmq-demo-cloud-0.0.1-SNAPSHOT-bin.zip

Archive: tdmq-demo-cloud-0.0.1-SNAPSHOT-bin.zip

creating: tdmq-demo-cloud-0.0.1-SNAPSHOT/

creating: tdmq-demo-cloud-0.0.1-SNAPSHOT/conf/

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/conf/logback.xml

creating: tdmq-demo-cloud-0.0.1-SNAPSHOT/bin/

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/bin/runserver.sh

creating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/pulsar-client-2.6.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/pulsar-client-api-2.6.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/pulsar-transaction-common-2.6.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/protobuf-java-3.5.1.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/protobuf-java-util-3.5.1.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/guava-19.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/gson-2.7.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/protobuf-shaded-2.1.0-incubating.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/jsr305-3.0.2.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/checker-qual-2.0.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/error_prone_annotations-2.1.3.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/j2objc-annotations-1.1.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/animal-sniffer-annotations-1.14.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/aircompressor-0.16.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/javax.ws.rs-api-2.1.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/bouncy-castle-bc-shaded-2.6.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/javax.activation-1.2.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/slf4j-api-1.7.25.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/validation-api-1.1.0.Final.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/jcip-annotations-1.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/logback-classic-1.2.3.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/logback-core-1.2.3.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/junit-3.8.1.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/tdmq-demo-cloud-0.0.1-SNAPSHOT.jar

[root@VM-0-9-centos ~]# cd tdmq-demo-cloud-0.0.1-SNAPSHOT/

[root@VM-0-9-centos tdmq-demo-cloud-0.0.1-SNAPSHOT]# ll

total 12

drwxr-xr-x 2 root root 4096 Nov 19 11:34 bin

drwxr-xr-x 2 root root 4096 Dec 3 11:16 conf

drwxrwxrwx 2 root root 4096 Dec 3 12:49 lib

3、授予执行权限

[root@VM-0-9-centos tdmq-demo-cloud-0.0.1-SNAPSHOT]# chmod 755 bin/runserver.sh

4、运行代码

[root@VM-0-9-centos tdmq-demo-cloud-0.0.1-SNAPSHOT]# bin/runserver.sh -classpath lib/tdmq-demo-cloud-0.0.1-SNAPSHOT.jar com.tencent.tdmq.demo.cloud.SimpleProducerAndCosnumer

结果:成功了!!!

总结:使用线上的TDMQ省去了运维,扩展性也会更好,配置也不复杂,公测期间还免费,快快来体验一下吧。

后面会为大家分享TDMQ的其它使用。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、用CVM安装单机版的pulsar
    • 1、安装JDK1.8
      • 2、安装pulsar
        • 3、使用pulsar-client测试生产消费
        • 二、搭建本地开发环境
          • 1、下载TDMQ的demo:
            • 2、下载Pulsar的Java SDK 下载方式:
              • 3、连接单机版的pulsar的测试代码:SimpleProducerAndCosnumer.java
              • 三、测试连接TDMQ并生产消费
                • 5、创建订阅者
                  • 7、在 pom.xml 所在目录执行即可打zip包。
                  • 四、打包上传并测试
                    • 1、上传zip包
                      • 2、解压
                        • 3、授予执行权限
                          • 4、运行代码
                          相关产品与服务
                          消息队列 CMQ 版
                          消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档