我一直试图获得运行Cassandra时附带的MapReduce示例代码,但我得到了运行时错误。源代码:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.Map.Entry;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.nio.charset.CharacterCodingException;
/**
 * This counts the occurrences of words in ColumnFamily
 *   cql3_worldcount ( user_id text,
 *                   category_id text,
 *                   sub_category_id text,
 *                   title  text,
 *                   body  text,
 *                   PRIMARY KEY (user_id, category_id, sub_category_id))
 *
 * For each word, we output the total number of occurrences across all body texts.
 *
 * When outputting to Cassandra, we write the word counts to column family
 *  output_words ( row_id1 text,
 *                 row_id2 text,
 *                 word text,
 *                 count_num text,
 *                 PRIMARY KEY ((row_id1, row_id2), word))
 * as a {word, count} to columns: word, count_num with a row key of "word sum"
 */
public class WordCount extends Configured implements Tool
{
    private static final Logger logger = LoggerFactory.getLogger(WordCount.class);
    static final String KEYSPACE = "cql3_worldcount";
    static final String COLUMN_FAMILY = "inputs";
    static final String OUTPUT_REDUCER_VAR = "output_reducer";
    static final String OUTPUT_COLUMN_FAMILY = "output_words";
    private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count";
    private static final String PRIMARY_KEY = "row_key";
    public static void main(String[] args) throws Exception
    {
        // Let ToolRunner handle generic command-line options
        ToolRunner.run(new Configuration(), new WordCount(), args);
        System.exit(0);
    }
    public static class TokenizerMapper extends Mapper<Map<String, ByteBuffer>, Map<String, ByteBuffer>, Text, IntWritable>
    {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        private ByteBuffer sourceColumn;
        protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException
        {
        }
        public void map(Map<String, ByteBuffer> keys, Map<String, ByteBuffer> columns, Context context) throws IOException, InterruptedException
        {
            for (Entry<String, ByteBuffer> column : columns.entrySet())
            {
                if (!"body".equalsIgnoreCase(column.getKey()))
                    continue;
                String value = ByteBufferUtil.string(column.getValue());
                logger.debug("read {}:{}={} from {}",
                             new Object[] {toString(keys), column.getKey(), value, context.getInputSplit()});
                StringTokenizer itr = new StringTokenizer(value);
                while (itr.hasMoreTokens())
                {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
        private String toString(Map<String, ByteBuffer> keys)
        {
            String result = "";
            try
            {
                for (ByteBuffer key : keys.values())
                    result = result + ByteBufferUtil.string(key) + ":";
            }
            catch (CharacterCodingException e)
            {
                logger.error("Failed to print keys", e);
            }
            return result;
        }
    }
    public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable>
    {
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            for (IntWritable val : values)
                sum += val.get();
            context.write(key, new IntWritable(sum));
        }
    }
    public static class ReducerToCassandra extends Reducer<Text, IntWritable, Map<String, ByteBuffer>, List<ByteBuffer>>
    {
        private Map<String, ByteBuffer> keys;
        private ByteBuffer key;
        protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
        throws IOException, InterruptedException
        {
            keys = new LinkedHashMap<String, ByteBuffer>();
            String[] partitionKeys = context.getConfiguration().get(PRIMARY_KEY).split(",");
            keys.put("row_id1", ByteBufferUtil.bytes(partitionKeys[0]));
            keys.put("row_id2", ByteBufferUtil.bytes(partitionKeys[1]));
        }
        public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            for (IntWritable val : values)
                sum += val.get();
            context.write(keys, getBindVariables(word, sum));
        }
        private List<ByteBuffer> getBindVariables(Text word, int sum)
        {
            List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
            keys.put("word", ByteBufferUtil.bytes(word.toString()));
            variables.add(ByteBufferUtil.bytes(String.valueOf(sum)));         
            return variables;
        }
    }
    public int run(String[] args) throws Exception
    {
        String outputReducerType = "filesystem";
        if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR))
        {
            String[] s = args[0].split("=");
            if (s != null && s.length == 2)
                outputReducerType = s[1];
        }
        logger.info("output reducer type: " + outputReducerType);
        Job job = new Job(getConf(), "wordcount");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        if (outputReducerType.equalsIgnoreCase("filesystem"))
        {
            job.setCombinerClass(ReducerToFilesystem.class);
            job.setReducerClass(ReducerToFilesystem.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
        }
        else
        {
            job.setReducerClass(ReducerToCassandra.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Map.class);
            job.setOutputValueClass(List.class);
            job.setOutputFormatClass(CqlOutputFormat.class);
            ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
            job.getConfiguration().set(PRIMARY_KEY, "word,sum");
            String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY +
                           " SET count_num = ? ";
            CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
            ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
            ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
        }
        job.setInputFormatClass(CqlPagingInputFormat.class);
        ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
        ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
        ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
        ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
        CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
        //this is the user defined filter clauses, you can comment it out if you want count all titles
        CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "title='A'");
        job.waitForCompletion(true);
        return 0;
    }
}它编译得很好,但我得到了以下错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat
        at WordCount.run(WordCount.java:230)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
        at WordCount.main(WordCount.java:94)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:160)
Caused by: java.lang.ClassNotFoundException: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        ... 8 more我使用hadoop 1.2.1和cassandra 2.0.4。如能帮助您处理此错误或示例代码或指导如何使hadoop与cassandra一起工作,将不胜感激。
发布于 2014-02-24 06:38:05
为了解决这个问题,将cassandra文件复制到hadoop目录。
发布于 2014-07-10 22:55:40
请使用以下路径
导出HADOOP_CLASSPATH=/< path到cassandra >/lib/*:$ hadoop _CLASSPATH在/< hadoop路径>/conf/hadoop-env.sh文件中。
https://stackoverflow.com/questions/21977434
复制相似问题