首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >针对cassandra使用hadoop mapreduce的示例代码

针对cassandra使用hadoop mapreduce的示例代码
EN

Stack Overflow用户
提问于 2014-02-24 01:44:11
回答 2查看 4.3K关注 0票数 2

我一直试图获得运行Cassandra时附带的MapReduce示例代码,但我得到了运行时错误。源代码:

代码语言:javascript
运行
复制
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.Map.Entry;

import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.nio.charset.CharacterCodingException;

/**
 * This counts the occurrences of words in ColumnFamily
 *   cql3_worldcount ( user_id text,
 *                   category_id text,
 *                   sub_category_id text,
 *                   title  text,
 *                   body  text,
 *                   PRIMARY KEY (user_id, category_id, sub_category_id))
 *
 * For each word, we output the total number of occurrences across all body texts.
 *
 * When outputting to Cassandra, we write the word counts to column family
 *  output_words ( row_id1 text,
 *                 row_id2 text,
 *                 word text,
 *                 count_num text,
 *                 PRIMARY KEY ((row_id1, row_id2), word))
 * as a {word, count} to columns: word, count_num with a row key of "word sum"
 */
public class WordCount extends Configured implements Tool
{
    private static final Logger logger = LoggerFactory.getLogger(WordCount.class);

    static final String KEYSPACE = "cql3_worldcount";
    static final String COLUMN_FAMILY = "inputs";

    static final String OUTPUT_REDUCER_VAR = "output_reducer";
    static final String OUTPUT_COLUMN_FAMILY = "output_words";

    private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count";

    private static final String PRIMARY_KEY = "row_key";

    public static void main(String[] args) throws Exception
    {
        // Let ToolRunner handle generic command-line options
        ToolRunner.run(new Configuration(), new WordCount(), args);
        System.exit(0);
    }

    public static class TokenizerMapper extends Mapper<Map<String, ByteBuffer>, Map<String, ByteBuffer>, Text, IntWritable>
    {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        private ByteBuffer sourceColumn;

        protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException
        {
        }

        public void map(Map<String, ByteBuffer> keys, Map<String, ByteBuffer> columns, Context context) throws IOException, InterruptedException
        {
            for (Entry<String, ByteBuffer> column : columns.entrySet())
            {
                if (!"body".equalsIgnoreCase(column.getKey()))
                    continue;

                String value = ByteBufferUtil.string(column.getValue());

                logger.debug("read {}:{}={} from {}",
                             new Object[] {toString(keys), column.getKey(), value, context.getInputSplit()});

                StringTokenizer itr = new StringTokenizer(value);
                while (itr.hasMoreTokens())
                {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }

        private String toString(Map<String, ByteBuffer> keys)
        {
            String result = "";
            try
            {
                for (ByteBuffer key : keys.values())
                    result = result + ByteBufferUtil.string(key) + ":";
            }
            catch (CharacterCodingException e)
            {
                logger.error("Failed to print keys", e);
            }
            return result;
        }
    }

    public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable>
    {
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            for (IntWritable val : values)
                sum += val.get();
            context.write(key, new IntWritable(sum));
        }
    }

    public static class ReducerToCassandra extends Reducer<Text, IntWritable, Map<String, ByteBuffer>, List<ByteBuffer>>
    {
        private Map<String, ByteBuffer> keys;
        private ByteBuffer key;
        protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
        throws IOException, InterruptedException
        {
            keys = new LinkedHashMap<String, ByteBuffer>();
            String[] partitionKeys = context.getConfiguration().get(PRIMARY_KEY).split(",");
            keys.put("row_id1", ByteBufferUtil.bytes(partitionKeys[0]));
            keys.put("row_id2", ByteBufferUtil.bytes(partitionKeys[1]));
        }

        public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            for (IntWritable val : values)
                sum += val.get();
            context.write(keys, getBindVariables(word, sum));
        }

        private List<ByteBuffer> getBindVariables(Text word, int sum)
        {
            List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
            keys.put("word", ByteBufferUtil.bytes(word.toString()));
            variables.add(ByteBufferUtil.bytes(String.valueOf(sum)));         
            return variables;
        }
    }

    public int run(String[] args) throws Exception
    {
        String outputReducerType = "filesystem";
        if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR))
        {
            String[] s = args[0].split("=");
            if (s != null && s.length == 2)
                outputReducerType = s[1];
        }
        logger.info("output reducer type: " + outputReducerType);

        Job job = new Job(getConf(), "wordcount");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);

        if (outputReducerType.equalsIgnoreCase("filesystem"))
        {
            job.setCombinerClass(ReducerToFilesystem.class);
            job.setReducerClass(ReducerToFilesystem.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
        }
        else
        {
            job.setReducerClass(ReducerToCassandra.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Map.class);
            job.setOutputValueClass(List.class);

            job.setOutputFormatClass(CqlOutputFormat.class);

            ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
            job.getConfiguration().set(PRIMARY_KEY, "word,sum");
            String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY +
                           " SET count_num = ? ";
            CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
            ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
            ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
        }

        job.setInputFormatClass(CqlPagingInputFormat.class);

        ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
        ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
        ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
        ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");

        CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
        //this is the user defined filter clauses, you can comment it out if you want count all titles
        CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "title='A'");
        job.waitForCompletion(true);
        return 0;
    }
}

它编译得很好,但我得到了以下错误:

代码语言:javascript
运行
复制
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat
        at WordCount.run(WordCount.java:230)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
        at WordCount.main(WordCount.java:94)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:160)
Caused by: java.lang.ClassNotFoundException: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        ... 8 more

我使用hadoop 1.2.1和cassandra 2.0.4。如能帮助您处理此错误或示例代码或指导如何使hadoop与cassandra一起工作,将不胜感激。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2014-02-24 06:38:05

为了解决这个问题,将cassandra文件复制到hadoop目录。

票数 1
EN

Stack Overflow用户

发布于 2014-07-10 22:55:40

请使用以下路径

导出HADOOP_CLASSPATH=/< path到cassandra >/lib/*:$ hadoop _CLASSPATH在/< hadoop路径>/conf/hadoop-env.sh文件中。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/21977434

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档