首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Cassandra:如何使用CQL插入新的宽行并具有良好的性能

Cassandra:如何使用CQL插入新的宽行并具有良好的性能
EN

Stack Overflow用户
提问于 2014-02-14 11:52:10
回答 3查看 9.9K关注 0票数 7

我在评估卡桑德拉。我正在使用datastax驱动程序和CQL。

我想用以下内部结构存储一些数据,其中每个更新的名称都不同。

代码语言:javascript
复制
+-------+-------+-------+-------+-------+-------+
|       | name1 | name2 | name3 | ...   | nameN |
| time  +-------+-------+-------+-------+-------+
|       | val1  | val2  | val3  | ...   | valN  |
+-------+-------+-------+-------|-------+-------+

因此,时间应该是列键,名称应该是行键。用于创建此表的CQL语句是:

代码语言:javascript
复制
CREATE TABLE IF NOT EXISTS test.wide (
  time varchar,
  name varchar,
  value varchar,
  PRIMARY KEY (time,name))
  WITH COMPACT STORAGE

为了便于查询,我希望模式是这样的。我还需要偶尔存储超过65000行的更新。因此,使用cassandra列表/set/map数据类型不是一个选项。

我必须能够每秒处理至少1000个宽行插入,其名称/值对的数量变化很大(~ 1000 )。

问题是:我已经编写了一个简单的基准测试,它可以执行每个10000对名称/值对的1000个宽行插入。在CQL和datastax驱动程序中,我的性能非常慢,而不使用CQL的版本(使用astyanax)在相同的测试集群上具有良好的性能。

我已经读过这个相关问题,在这个问题的公认答案中,建议您应该能够使用批处理准备语句创建一个新的宽行,这在cassandra 2中是可用的。

因此,我尝试使用这些方法,但性能仍然很慢(对于运行在localhost上的小型三节点集群,每秒两次插入)。我是不是遗漏了一些显而易见的东西,还是必须使用更低级别的节约API?--我在astyanax中用ColumnListMutation实现了相同的插入,每秒得到大约30个插入。

如果我必须使用较低级别的节俭API:

  • 它实际上是不建议使用的,还是因为它的级别较低而不方便使用?
  • 我是否能够用CQL查询用节俭api创建的表?

下面是scala中的一个自包含代码示例。它只需创建一个批处理语句,用于插入包含10000列的宽行,并重复插入性能。

我使用了BatchStatement的选项和一致性水平,但是没有什么能让我获得更好的性能。

我唯一的解释是,尽管批处理由准备好的语句组成,条目还是一个接一个地添加到行中。

代码语言:javascript
复制
package cassandra

import com.datastax.driver.core._

object CassandraTestMinimized extends App {

  val keyspace = "test"
  val table = "wide"
  val tableName = s"$keyspace.$table"

  def createKeyspace = s"""
CREATE KEYSPACE IF NOT EXISTS ${keyspace}
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
"""

  def createWideTable = s"""
CREATE TABLE IF NOT EXISTS ${tableName} (
time varchar,
name varchar,
value varchar,
PRIMARY KEY (time,name))
WITH COMPACT STORAGE
"""

  def writeTimeNameValue(time: String) = s"""
INSERT INTO ${tableName} (time, name, value)
VALUES ('$time', ?, ?)
"""

  val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
  val session = cluster.connect()

  session.execute(createKeyspace)
  session.execute(createWideTable)

  for(i<-0 until 1000) {
    val entries =
      for {
        i <- 0 until 10000
        name = i.toString
        value = name
      } yield name -> value
    val batchPreparedStatement = writeMap(i, entries)
    val t0 = System.nanoTime()
    session.execute(batchPreparedStatement)
    val dt = System.nanoTime() - t0
    println(i + " " + (dt/1.0e9))
  }

  def writeMap(time: Long, update: Seq[(String, String)]) : BatchStatement = {
    val template = session
      .prepare(writeTimeNameValue(time.toString))
      .setConsistencyLevel(ConsistencyLevel.ONE)
    val batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
    for ((k, v) <- update)
      batch.add(template.bind(k, v))
    batch
  }
}

下面是astyanax代码(从astyanax示例修改而来),它做同样的事情的速度要快15倍。请注意,这也不使用异步调用,因此这是一个公平的比较。这就要求列族已经存在,因为我还没有弄清楚如何使用astyanax来创建它,而且示例中没有任何创建列家族的代码。

代码语言:javascript
复制
package cassandra;

import java.util.Iterator;

import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.serializers.AsciiSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;

public class AstClient {
    private static final Logger logger = LoggerFactory.getLogger(AstClient.class);

    private AstyanaxContext<Keyspace> context;
    private Keyspace keyspace;
    private ColumnFamily<Long, String> EMP_CF;
    private static final String EMP_CF_NAME = "employees2";

    public void init() {
        logger.debug("init()");

        context = new AstyanaxContext.Builder()
                .forCluster("Test Cluster")
                .forKeyspace("test1")
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                )
                .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
                        .setPort(9160)
                        .setMaxConnsPerHost(1)
                        .setSeeds("127.0.0.1:9160")
                )
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setCqlVersion("3.0.0")
                        .setTargetCassandraVersion("2.0.5"))
                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                .buildKeyspace(ThriftFamilyFactory.getInstance());

        context.start();
        keyspace = context.getClient();

        EMP_CF = ColumnFamily.newColumnFamily(
                EMP_CF_NAME,
                LongSerializer.get(),
                AsciiSerializer.get());
    }

    public void insert(long time) {
        MutationBatch m = keyspace.prepareMutationBatch();

        ColumnListMutation<String> x =
                m.withRow(EMP_CF, time);
        for(int i=0;i<10000;i++)
            x.putColumn(Integer.toString(i), Integer.toString(i));

        try {
            @SuppressWarnings("unused")
            Object result = m.execute();
        } catch (ConnectionException e) {
            logger.error("failed to write data to C*", e);
            throw new RuntimeException("failed to write data to C*", e);
        }
        logger.debug("insert ok");
    }

    public void createCF() {
    }

    public void read(long time) {
        OperationResult<ColumnList<String>> result;
        try {
            result = keyspace.prepareQuery(EMP_CF)
                    .getKey(time)
                    .execute();

            ColumnList<String> cols = result.getResult();
            // process data

            // a) iterate over columsn
            for (Iterator<Column<String>> i = cols.iterator(); i.hasNext(); ) {
                Column<String> c = i.next();
                String v = c.getStringValue();
                System.out.println(c.getName() + " " + v);
            }

        } catch (ConnectionException e) {
            logger.error("failed to read from C*", e);
            throw new RuntimeException("failed to read from C*", e);
        }
    }

    public static void main(String[] args) {
        AstClient c = new AstClient();
        c.init();
        long t00 = System.nanoTime();
        for(int i=0;i<1000;i++) {
            long t0 = System.nanoTime();
            c.insert(i);
            long dt = System.nanoTime() - t0;
            System.out.println((1.0e9/dt) + " " + i);
        }
        long dtt = System.nanoTime() - t00;

        c.read(0);
        System.out.println(dtt / 1e9);
    }

}

更新:我在cassandra-用户邮件列表中找到了这个线程。在执行大型宽行插入时,CQL似乎存在性能问题。有一个票证卡桑德拉-6737来跟踪这个问题。

Update2:我已经试用了附加到CASSANDRA-6737的补丁,我可以确认这个补丁完全解决了这个问题。感谢来自DataStax的西尔万·莱布雷斯内,感谢他这么快地修复了这个问题!

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2014-02-19 15:25:36

你不是唯一一个经历过这件事的人。不久前,我写了一篇博客文章,更多地关注CQL和CQL之间的转换,但也有指向看到同样事情的人的邮件列表问题的链接(宽行插入的性能问题是我最初调查的动机):http://thelastpickle.com/blog/2013/09/13/CQL3-to-Astyanax-Compatibility.html

总之,CQL对于为Cassandra新手消除处理输入和理解数据模型的负担是很好的。DataStax驱动程序编写得很好,包含了许多有用的特性。

但是,对于宽行插入而言,Thrift要稍微快一些。Netflix的博客对这个特定的用例不太了解。此外,只要人们使用它(很多人都在使用),Thrift就不是遗留的了。这是一个ASF项目,因此不是由任何一个供应商运行的。

通常,对于任何基于Cassandra的应用程序,如果您找到了一种满足(或经常超过)工作负载的性能要求的方法,请坚持它。

票数 5
EN

Stack Overflow用户

发布于 2014-02-16 10:20:37

您的代码中有一个错误,我认为它解释了您所看到的许多性能问题:对于每一批您再次准备语句。准备语句并不是非常昂贵,但是这样做会增加很大的延迟。等待语句准备的时间是您不构建批处理的时间,而Cassandra没有花费处理该批的时间。准备好的语句只需要准备一次,并且应该重复使用。

我认为许多不好的性能可以解释延迟问题。瓶颈很可能是您的应用程序代码,而不是Cassandra。即使您只准备了一次该语句,您仍然会花费大部分时间,要么在应用程序中绑定CPU (构建一个大批),要么什么也不做(等待网络和Cassandra)。

您可以做两件事:第一,使用CQL驱动程序的异步API并在网络和Cassandra忙于刚才完成的批处理时构建下一批;第二,尝试运行多个线程来完成相同的任务。您必须试验的线程的确切数量,并将取决于您拥有的内核数量,以及如果您在同一台机器上运行一个或三个节点。

在同一台机器上运行三个节点集群会使集群比运行单个节点慢,而在不同机器上运行会使集群运行得更快。同样,在同一台机器上运行应用程序也没有什么帮助。如果要测试性能,请只运行一个节点或在单独的计算机上运行真正的群集。

批处理可以给您额外的性能,但并不总是如此。它们可能导致您在测试代码中看到的问题:缓冲区膨胀。一旦批处理变得太大,您的应用程序就会花费太多的时间构建它们,然后花费太多的时间将它们推到网络上,等待Cassandra处理它们的时间太长。您需要对批处理大小进行实验,看看哪些是最有效的(但是使用真正的集群,否则您将看不到网络的影响,当批处理变得更大时,网络将是一个很大的因素)。

如果您使用批处理,则使用压缩。压缩在大多数请求加载中没有什么不同(响应是另一回事),但是当您发送大量批时,它会产生很大的影响。

宽行在卡桑德拉中没有什么特别之处。除了一些例外,模式不会改变处理写入所需的时间。我运行的应用程序可以每秒执行数万次非批处理、混合、宽行和非宽行写入操作。集群并不大,每个集群只有三个或四个m1.xsize EC2节点。诀窍是永远不要在发送下一个请求之前等待请求返回(这并不意味着触发和忘记,只需以相同的异步方式处理响应)。延迟是一个性能杀手。

票数 8
EN

Stack Overflow用户

发布于 2014-02-14 15:48:29

有些事你可以试试..。在您的cassandra.yaml中(这是Cassandra1.2.x,也许在2.x中对params的称呼有所不同):

  • 禁用行缓存(row_cache_size_in_mb: 0)
  • 在内存中的行溢出到磁盘(min_memory_compaction_limit_in_mb)之前增加内存限制,只有当您看到某些日志输出显示溢出确实发生时,才会这样做。
  • 确保正确配置num_tokens / initial_token值,以便在节点之间分布行

其他你可以尝试的事情:

  • 将集群中的所有节点IP提供给客户端,而不仅仅是一个。
  • 为每个Cassandra节点提供更多RAM
  • 尝试运行您的测试多线程。
  • 如果您在Linux上运行Cassandra,请确保您有安装了JNA并正在使用

需要澄清的事项:

  • 您是否通过nodetool确认了这三个节点已经找到了对方?
  • 关于您的3个节点的负载分布,nodetool是怎么说的?
  • 虚拟集群的物理主机对CPU和I/O的使用有什么看法?也许它已经完全消失了?
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/21778671

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档