前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hive metastore整体代码分析及详解

Hive metastore整体代码分析及详解

作者头像
用户3003813
发布2018-09-06 14:22:17
4.1K0
发布2018-09-06 14:22:17
举报
文章被收录于专栏:个人分享个人分享

  从上一篇对Hive metastore表结构的简要分析中,我再根据数据设计的实体对象,再进行整个代码结构的总结。那么我们先打开metadata的目录,其目录结构:

  可以看到,整个hivemeta的目录包含metastore(客户端与服务端调用逻辑)、events(事件目录包含table生命周期中的检查、权限认证等listener实现)、hooks(这里的hooks仅包含了jdo connection的相关接口)、parser(对于表达树的解析)、spec(partition的相关代理类)、tools(jdo execute相关方法)及txn及model,下来我们从整个metadata分逐一进行代码分析及注释:

  没有把包打开,很多类?是不是感觉害怕很想死?我也想死,咱们继续。。一开始,我们可能觉得一团乱麻烦躁,这是啥玩意儿啊这。。冷静下来,我们从Hive这个大类开始看,因为它是metastore元数据调用的入口。整个生命周期分析流程为: HiveMetaStoreClient客户端的创建及加载、HiveMetaStore服务端的创建及加载、createTable、dropTable、AlterTable、createPartition、dropPartition、alterPartition。当然,这只是完整metadata的一小部分。

1、HiveMetaStoreClient客户端的创建及加载

  那么我们从Hive这个类一点点开始看:

代码语言:javascript
复制
 1   private HiveConf conf = null;
 2   private IMetaStoreClient metaStoreClient;
 3   private UserGroupInformation owner;
 4 
 5   // metastore calls timing information
 6   private final Map<String, Long> metaCallTimeMap = new HashMap<String, Long>();
 7 
 8   private static ThreadLocal<Hive> hiveDB = new ThreadLocal<Hive>() {
 9     @Override
10     protected synchronized Hive initialValue() {
11       return null;
12     }
13 
14     @Override
15     public synchronized void remove() {
16       if (this.get() != null) {
17         this.get().close();
18       }
19       super.remove();
20     }
21   };

  这里声明的有hiveConf对象、metaStoreClient 、操作用户组userGroupInfomation以及调用时间Map,这里存成一个map,用来记录每一个动作的运行时长。同时维护了一个本地线程hiveDB,如果db为空的情况下,会重新创建一个Hive对象,代码如下:

代码语言:javascript
复制
 1   public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException {
 2     Hive db = hiveDB.get();
 3     if (db == null || needsRefresh || !db.isCurrentUserOwner()) {
 4       if (db != null) {
 5         LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh +
 6           ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
 7       }
 8       closeCurrent();
 9       c.set("fs.scheme.class", "dfs");
10       Hive newdb = new Hive(c);
11       hiveDB.set(newdb);
12       return newdb;
13     }
14     db.conf = c;
15     return db;
16   }

  随后我们会发现,在创建Hive对象时,便已经将function进行注册,什么是function呢,通过上次的表结构分析,可以理解为所有udf等jar包的元数据存储。代码如下:

代码语言:javascript
复制
 1   // register all permanent functions. need improvement
 2   static {
 3     try {
 4       reloadFunctions();
 5     } catch (Exception e) {
 6       LOG.warn("Failed to access metastore. This class should not accessed in runtime.",e);
 7     }
 8   }
 9 
10   public static void reloadFunctions() throws HiveException {
    //获取 Hive对象,用于后续方法的调用
11     Hive db = Hive.get();
    //通过遍历每一个dbName
12     for (String dbName : db.getAllDatabases()) {
      //通过dbName查询挂在该db下的所有function的信息。
13       for (String functionName : db.getFunctions(dbName, "*")) {
14         Function function = db.getFunction(dbName, functionName);
15         try {
     //这里的register便是将查询到的function的数据注册到Registry类中的一个Map<String,FunctionInfo>中,以便计算引擎在调用时,不必再次查询数据库。
16       FunctionRegistry.registerPermanentFunction(
17           FunctionUtils.qualifyFunctionName(functionName, dbName), function.getClassName(),
18           false, FunctionTask.toFunctionResource(function.getResourceUris()));
19         } catch (Exception e) {
20           LOG.warn("Failed to register persistent function " +
21               functionName + ":" + function.getClassName() + ". Ignore and continue.");
22         }
23       }
24     }
25   }

  调用getMSC()方法,进行metadataClient客户端的创建,代码如下:

代码语言:javascript
复制
 1  1   private IMetaStoreClient createMetaStoreClient() throws MetaException {
 2  2   
 3     //这里实现接口HiveMetaHookLoader
 4  3     HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() {
 5  4         @Override
 6  5         public HiveMetaHook getHook(
 7  6           org.apache.hadoop.hive.metastore.api.Table tbl)
 8  7           throws MetaException {
 9  8 
10  9           try {
11 10             if (tbl == null) {
12 11               return null;
13 12             }
14          //根据tble的kv属性加载不同storage的实例,比如hbase、redis等等拓展存储,作为外部表进行存储
15 13             HiveStorageHandler storageHandler =
16 14               HiveUtils.getStorageHandler(conf,
17 15                 tbl.getParameters().get(META_TABLE_STORAGE));
18 16             if (storageHandler == null) {
19 17               return null;
20 18             }
21 19             return storageHandler.getMetaHook();
22 20           } catch (HiveException ex) {
23 21             LOG.error(StringUtils.stringifyException(ex));
24 22             throw new MetaException(
25 23               "Failed to load storage handler:  " + ex.getMessage());
26 24           }
27 25         }
28 26       };
29 27     return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap,
30 28         SessionHiveMetaStoreClient.class.getName());
31 29   }

2、HiveMetaStore服务端的创建及加载

  在HiveMetaStoreClient初始化时,会初始化HiveMetaStore客户端,代码如下:

代码语言:javascript
复制
 1   public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader)
 2     throws MetaException {
 3 
 4     this.hookLoader = hookLoader;
 5     if (conf == null) {
 6       conf = new HiveConf(HiveMetaStoreClient.class);
 7     }
 8     this.conf = conf;
 9     filterHook = loadFilterHooks();
10    //根据hive-site.xml中的hive.metastore.uris配置,如果配置该参数,则认为是远程连接,否则为本地连接
11     String msUri = conf.getVar(HiveConf.ConfVars.METASTOREURIS);
12     localMetaStore = HiveConfUtil.isEmbeddedMetaStore(msUri);
13     if (localMetaStore) {
      //本地连接直接连接HiveMetaStore
16       client = HiveMetaStore.newRetryingHMSHandler("hive client", conf, true);
17       isConnected = true;
18       snapshotActiveConf();
19       return;
20     }
21 
22     //获取配置中的重试次数及timeout时间
23     retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES);
24     retryDelaySeconds = conf.getTimeVar(
25         ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
26 
27     //拼接metastore uri
28     if (conf.getVar(HiveConf.ConfVars.METASTOREURIS) != null) {
29       String metastoreUrisString[] = conf.getVar(
30           HiveConf.ConfVars.METASTOREURIS).split(",");
31       metastoreUris = new URI[metastoreUrisString.length];
32       try {
33         int i = 0;
34         for (String s : metastoreUrisString) {
35           URI tmpUri = new URI(s);
36           if (tmpUri.getScheme() == null) {
37             throw new IllegalArgumentException("URI: " + s
38                 + " does not have a scheme");
39           }
40           metastoreUris[i++] = tmpUri;
41 
42         }
43       } catch (IllegalArgumentException e) {
44         throw (e);
45       } catch (Exception e) {
46         MetaStoreUtils.logAndThrowMetaException(e);
47       }
48     } else {
49       LOG.error("NOT getting uris from conf");
50       throw new MetaException("MetaStoreURIs not found in conf file");
51     }
52     调用open方法创建连接
53     open();
54   }

  从上面代码中可以看出,如果我们是远程连接,需要配置hive-site.xml中的hive.metastore.uri,是不是很熟悉?加入你的client与server不在同一台机器,就需要配置进行远程连接。那么我们继续往下面看,创建连接的open方法:

代码语言:javascript
复制
  1   private void open() throws MetaException {
  2     isConnected = false;
  3     TTransportException tte = null;
     //是否使用Sasl
  4     boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
     //If true, the metastore Thrift interface will use TFramedTransport. When false (default) a standard TTransport is used.
  5     boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
     //If true, the metastore Thrift interface will use TCompactProtocol. When false (default) TBinaryProtocol will be used 具体他们之间的区别我们后续再讨论
  6     boolean useCompactProtocol = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL);
     //获取socket timeout时间
  7     int clientSocketTimeout = (int) conf.getTimeVar(
  8         ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
  9 
 10     for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
 11       for (URI store : metastoreUris) {
 12         LOG.info("Trying to connect to metastore with URI " + store);
 13         try {
 14           transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout);
 15           if (useSasl) {
 16             // Wrap thrift connection with SASL for secure connection.
 17             try {
            //创建HadoopThriftAuthBridge client
 18               HadoopThriftAuthBridge.Client authBridge =
 19                 ShimLoader.getHadoopThriftAuthBridge().createClient();
 20          //权限认证相关
 21               // check if we should use delegation tokens to authenticate
 22               // the call below gets hold of the tokens if they are set up by hadoop
 23               // this should happen on the map/reduce tasks if the client added the
 24               // tokens into hadoop's credential store in the front end during job
 25               // submission.
 26               String tokenSig = conf.get("hive.metastore.token.signature");
 27               // tokenSig could be null
 28               tokenStrForm = Utils.getTokenStrForm(tokenSig);
 29               if(tokenStrForm != null) {
 30                 // authenticate using delegation tokens via the "DIGEST" mechanism
 31                 transport = authBridge.createClientTransport(null, store.getHost(),
 32                     "DIGEST", tokenStrForm, transport,
 33                         MetaStoreUtils.getMetaStoreSaslProperties(conf));
 34               } else {
 35                 String principalConfig =
 36                     conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL);
 37                 transport = authBridge.createClientTransport(
 38                     principalConfig, store.getHost(), "KERBEROS", null,
 39                     transport, MetaStoreUtils.getMetaStoreSaslProperties(conf));
 40               }
 41             } catch (IOException ioe) {
 42               LOG.error("Couldn't create client transport", ioe);
 43               throw new MetaException(ioe.toString());
 44             }
 45           } else if (useFramedTransport) {
 46             transport = new TFramedTransport(transport);
 47           }
 48           final TProtocol protocol;
         //后续详细说明两者的区别(因为俺还没看,哈哈)
 49           if (useCompactProtocol) {
 50             protocol = new TCompactProtocol(transport);
 51           } else {
 52             protocol = new TBinaryProtocol(transport);
 53           }
         //创建ThriftHiveMetastore client
 54           client = new ThriftHiveMetastore.Client(protocol);
 55           try {
 56             transport.open();
 57             isConnected = true;
 58           } catch (TTransportException e) {
 59             tte = e;
 60             if (LOG.isDebugEnabled()) {
 61               LOG.warn("Failed to connect to the MetaStore Server...", e);
 62             } else {
 63               // Don't print full exception trace if DEBUG is not on.
 64               LOG.warn("Failed to connect to the MetaStore Server...");
 65             }
 66           }
 67       //用户组及用户的加载
 68           if (isConnected && !useSasl && conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)){
 69             // Call set_ugi, only in unsecure mode.
 70             try {
 71               UserGroupInformation ugi = Utils.getUGI();
 72               client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames()));
 73             } catch (LoginException e) {
 74               LOG.warn("Failed to do login. set_ugi() is not successful, " +
 75                        "Continuing without it.", e);
 76             } catch (IOException e) {
 77               LOG.warn("Failed to find ugi of client set_ugi() is not successful, " +
 78                   "Continuing without it.", e);
 79             } catch (TException e) {
 80               LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. "
 81                   + "Continuing without it.", e);
 82             }
 83           }
 84         } catch (MetaException e) {
 85           LOG.error("Unable to connect to metastore with URI " + store
 86                     + " in attempt " + attempt, e);
 87         }
 88         if (isConnected) {
 89           break;
 90         }
 91       }
 92       // Wait before launching the next round of connection retries.
 93       if (!isConnected && retryDelaySeconds > 0) {
 94         try {
 95           LOG.info("Waiting " + retryDelaySeconds + " seconds before next connection attempt.");
 96           Thread.sleep(retryDelaySeconds * 1000);
 97         } catch (InterruptedException ignore) {}
 98       }
 99     }
100 
101     if (!isConnected) {
102       throw new MetaException("Could not connect to meta store using any of the URIs provided." +
103         " Most recent failure: " + StringUtils.stringifyException(tte));
104     }
105 
106     snapshotActiveConf();
107 
108     LOG.info("Connected to metastore.");
109   }

  本篇先对对protocol的原理放置一边。从代码中可以看出HiveMetaStore服务端是通过ThriftHiveMetaStore创建,它本是一个class类,但其中定义了接口Iface、AsyncIface,这样做的好处是利于继承实现。那么下来,我们看一下HMSHandler的初始化。如果是在本地调用的过程中,直接调用newRetryingHMSHandler,便会直接进行HMSHandler的初始化。代码如下:

代码语言:javascript
复制
1     public HMSHandler(String name, HiveConf conf, boolean init) throws MetaException {
2       super(name);
3       hiveConf = conf;
4       if (init) {
5         init();
6       }
7     }

  下俩我们继续看它的init方法:

代码语言:javascript
复制
 1     public void init() throws MetaException {
      //获取与数据交互的实现类className,该类为objectStore,是RawStore的实现,负责JDO与数据库的交互。
 2       rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
      //加载Listeners,来自hive.metastore.init.hooks,可自行实现并加载
 3       initListeners = MetaStoreUtils.getMetaStoreListeners(
 4           MetaStoreInitListener.class, hiveConf,
 5           hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS));
 6       for (MetaStoreInitListener singleInitListener: initListeners) {
 7           MetaStoreInitContext context = new MetaStoreInitContext();
 8           singleInitListener.onInit(context);
 9       }
10     //初始化alter的实现类
11       String alterHandlerName = hiveConf.get("hive.metastore.alter.impl",
12           HiveAlterHandler.class.getName());
13       alterHandler = (AlterHandler) ReflectionUtils.newInstance(MetaStoreUtils.getClass(
14           alterHandlerName), hiveConf);
      //初始化warehouse
15       wh = new Warehouse(hiveConf);
16     //创建默认db以及用户,同时加载currentUrl
17       synchronized (HMSHandler.class) {
18         if (currentUrl == null || !currentUrl.equals(MetaStoreInit.getConnectionURL(hiveConf))) {
19           createDefaultDB();
20           createDefaultRoles();
21           addAdminUsers();
22           currentUrl = MetaStoreInit.getConnectionURL(hiveConf);
23         }
24       }
25     //计数信息的初始化
26       if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) {
27         try {
28           Metrics.init();
29         } catch (Exception e) {
30           // log exception, but ignore inability to start
31           LOG.error("error in Metrics init: " + e.getClass().getName() + " "
32               + e.getMessage(), e);
33         }
34       }
35     //Listener的PreListener的初始化
36       preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class,
37           hiveConf,
38           hiveConf.getVar(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS));
39       listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, hiveConf,
40           hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS));
41       listeners.add(new SessionPropertiesListener(hiveConf));
42       endFunctionListeners = MetaStoreUtils.getMetaStoreListeners(
43           MetaStoreEndFunctionListener.class, hiveConf,
44           hiveConf.getVar(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS));
45     //针对partitionName的正则校验,可自行设置,根据hive.metastore.partition.name.whitelist.pattern进行设置
46       String partitionValidationRegex =
47           hiveConf.getVar(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN);
48       if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) {
49         partitionValidationPattern = Pattern.compile(partitionValidationRegex);
50       } else {
51         partitionValidationPattern = null;
52       }
53 
54       long cleanFreq = hiveConf.getTimeVar(ConfVars.METASTORE_EVENT_CLEAN_FREQ, TimeUnit.MILLISECONDS);
55       if (cleanFreq > 0) {
56         // In default config, there is no timer.
57         Timer cleaner = new Timer("Metastore Events Cleaner Thread", true);
58         cleaner.schedule(new EventCleanerTask(this), cleanFreq, cleanFreq);
59       }
60     }

  它初始化了与数据库交互的rawStore的实现类、物理操作的warehouse以及Event与Listener。从而通过接口调用相关meta生命周期方法进行表的操作。

3、createTable

 从createTable方法开始。上代码:

代码语言:javascript
复制
 1   public void createTable(String tableName, List<String> columns, List<String> partCols,
 2                           Class<? extends InputFormat> fileInputFormat,
 3                           Class<?> fileOutputFormat, int bucketCount, List<String> bucketCols,
 4                           Map<String, String> parameters) throws HiveException {
 5     if (columns == null) {
 6       throw new HiveException("columns not specified for table " + tableName);
 7     }
 8 
 9     Table tbl = newTable(tableName);
    //SD表属性,设置该表的input及output class名,在计算引擎计算时,拉取相应的ClassName 通过反射进行input及output类的加载
10     tbl.setInputFormatClass(fileInputFormat.getName());
11     tbl.setOutputFormatClass(fileOutputFormat.getName());
12   
    //封装FileSchema对象,该为每个column的名称及字段类型,并加入到sd对象的的column属性中
13     for (String col : columns) {
14       FieldSchema field = new FieldSchema(col, STRING_TYPE_NAME, "default");
15       tbl.getCols().add(field);
16     }
17 
    //如果在创建表时,设置了分区信息,比如dt字段为该分区。则进行分区信息的记录,最终写入Partition表中
18     if (partCols != null) {
19       for (String partCol : partCols) {
20         FieldSchema part = new FieldSchema();
21         part.setName(partCol);
22         part.setType(STRING_TYPE_NAME); // default partition key
23         tbl.getPartCols().add(part);
24       }
25     }
    //设置序列化的方式
26     tbl.setSerializationLib(LazySimpleSerDe.class.getName());
    //设置分桶信息
27     tbl.setNumBuckets(bucketCount);
28     tbl.setBucketCols(bucketCols);
    //设置table额外添加的kv信息
29     if (parameters != null) {
30       tbl.setParamters(parameters);
31     }
32     createTable(tbl);
33   }

  从代码中可以看到,Hive 构造了一个Table的对象,该对象可以当做是一个model,包含了几乎所有以Tbls表为主表的所有以table_id为的外键表属性(具体可参考hive metastore表结构),封装完毕后在进行createTable的调用,接下来的调用如下:

代码语言:javascript
复制
  public void createTable(Table tbl, boolean ifNotExists) throws HiveException {
    try {
    //这里再次获取SessionState中的CurrentDataBase进行setDbName(安全)
      if (tbl.getDbName() == null || "".equals(tbl.getDbName().trim())) {
        tbl.setDbName(SessionState.get().getCurrentDatabase());
      }
    //这里主要对每一个column属性进行校验,比如是否有非法字符等等
      if (tbl.getCols().size() == 0 || tbl.getSd().getColsSize() == 0) {
        tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getTableName(),
            tbl.getDeserializer()));
      }
    //该方法对table属性中的input、output以及column属性的校验
      tbl.checkValidity();
      if (tbl.getParameters() != null) {
        tbl.getParameters().remove(hive_metastoreConstants.DDL_TIME);
      }
      org.apache.hadoop.hive.metastore.api.Table tTbl = tbl.getTTable();
    //这里开始进行权限认证,牵扯到的便是我们再hive中配置的 hive.security.authorization.createtable.user.grants、hive.security.authorization.createtable.group.grants、
    hive.security.authorization.createtable.role.grants配置参数,来自于hive自己封装的 用户、角色、组的概念。

      PrincipalPrivilegeSet principalPrivs = new PrincipalPrivilegeSet();
      SessionState ss = SessionState.get();
      if (ss != null) {
        CreateTableAutomaticGrant grants = ss.getCreateTableGrants();
        if (grants != null) {
          principalPrivs.setUserPrivileges(grants.getUserGrants());
          principalPrivs.setGroupPrivileges(grants.getGroupGrants());
          principalPrivs.setRolePrivileges(grants.getRoleGrants());
          tTbl.setPrivileges(principalPrivs);
        }
      }
   //通过客户端链接服务端进行table的创建
      getMSC().createTable(tTbl);
    } catch (AlreadyExistsException e) {
      if (!ifNotExists) {
        throw new HiveException(e);
      }
    } catch (Exception e) {
      throw new HiveException(e);
    }
  }

  那么下来,我们来看一下受到调用的HiveMetaClient中createTable方法,代码如下:

代码语言:javascript
复制
 1   public void createTable(Table tbl, EnvironmentContext envContext) throws AlreadyExistsException,
 2       InvalidObjectException, MetaException, NoSuchObjectException, TException {
    //这里获取HiveMeetaHook对象,针对不同的存储引擎进行创建前的加载及验证
 3     HiveMetaHook hook = getHook(tbl);
 4     if (hook != null) {
 5       hook.preCreateTable(tbl);
 6     }
 7     boolean success = false;
 8     try {
     //随即调用HiveMetaStore进行服务端与数据库的创建交互
10       create_table_with_environment_context(tbl, envContext);
11       if (hook != null) {
12         hook.commitCreateTable(tbl);
13       }
14       success = true;
15     } finally {
      如果创建失败的话,进行回滚操作
16       if (!success && (hook != null)) {
17         hook.rollbackCreateTable(tbl);
18       }
19     }
20   }

  这里简要说下Hook的作用,HiveMetaHook为接口,接口方法包括preCreate、rollbackCreateTable、preDropTable等等操作,它的实现为不同存储类型的预创建加载及验证,以及失败回滚等动作。代码如下:

代码语言:javascript
复制
 1 public interface HiveMetaHook {
 2   /**
 3    * Called before a new table definition is added to the metastore
 4    * during CREATE TABLE.
 5    *
 6    * @param table new table definition
 7    */
 8   public void preCreateTable(Table table)
 9     throws MetaException;
10 
11   /** 
12    * Called after failure adding a new table definition to the metastore
13    * during CREATE TABLE.
14    *
15    * @param table new table definition
16    */
17   public void rollbackCreateTable(Table table)
18     throws MetaException;
35   public void preDropTale(Table table)
36     throws MetaException;
...............................

  随后,我们再看一下HiveMetaStore服务端的createTable方法,如下:

代码语言:javascript
复制
  1     private void create_table_core(final RawStore ms, final Table tbl,
  2         final EnvironmentContext envContext)
  3         throws AlreadyExistsException, MetaException,
  4         InvalidObjectException, NoSuchObjectException {
  5     //名称正则校验,校验是否含有非法字符
  6       if (!MetaStoreUtils.validateName(tbl.getTableName())) {
  7         throw new InvalidObjectException(tbl.getTableName()
  8             + " is not a valid object name");
  9       }
      //改端代码属于校验代码,对于column的名称及column type类型j及partitionKey的名称校验
 10       String validate = MetaStoreUtils.validateTblColumns(tbl.getSd().getCols());
 11       if (validate != null) {
 12         throw new InvalidObjectException("Invalid column " + validate);
 13       }

 14       if (tbl.getPartitionKeys() != null) {
 15         validate = MetaStoreUtils.validateTblColumns(tbl.getPartitionKeys());
 16         if (validate != null) {
 17           throw new InvalidObjectException("Invalid partition column " + validate);
 18         }
 19       }
 20       SkewedInfo skew = tbl.getSd().getSkewedInfo();
 21       if (skew != null) {
 22         validate = MetaStoreUtils.validateSkewedColNames(skew.getSkewedColNames());
 23         if (validate != null) {
 24           throw new InvalidObjectException("Invalid skew column " + validate);
 25         }
 26         validate = MetaStoreUtils.validateSkewedColNamesSubsetCol(
 27             skew.getSkewedColNames(), tbl.getSd().getCols());
 28         if (validate != null) {
 29           throw new InvalidObjectException("Invalid skew column " + validate);
 30         }
 31       }
 32 
 33       Path tblPath = null;
 34       boolean success = false, madeDir = false;
 35       try {
       //创建前的事件调用,metastore已实现的listner事件包含DummyPreListener、AuthorizationPreEventListener、AlternateFailurePreListener以及MetaDataExportListener。
       //这些Listener是干嘛的呢?详细解释由分析meta设计模式时,详细说明。
 36         firePreEvent(new PreCreateTableEvent(tbl, this));
 37 
        //打开事务
 38         ms.openTransaction();
 39 
       //如果db不存在的情况下,则抛异常
 40         Database db = ms.getDatabase(tbl.getDbName());
 41         if (db == null) {
 42           throw new NoSuchObjectException("The database " + tbl.getDbName() + " does not exist");
 43         }
 44     
 45         // 校验该db下,table是否存在
 46         if (is_table_exists(ms, tbl.getDbName(), tbl.getTableName())) {
 47           throw new AlreadyExistsException("Table " + tbl.getTableName()
 48               + " already exists");
 49         }
 50      // 如果该表不为视图表,则组装完整的tbleParth ->fs.getUri().getScheme()+fs.getUri().getAuthority()+path.toUri().getPath())

  这里的listener后续会详细说明,那么我们继续垂直往下看,这里的 ms.createTable方法。ms便是RawStore接口对象,这个接口对象包含了所有生命周期的统一方法调用,部分代码如下:

代码语言:javascript
复制
 1   public abstract Database getDatabase(String name)
 2       throws NoSuchObjectException;
 3 
 4   public abstract boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException;
 5 
 6   public abstract boolean alterDatabase(String dbname, Database db) throws NoSuchObjectException, MetaException;
 7 
 8   public abstract List<String> getDatabases(String pattern) throws MetaException;
 9 
10   public abstract List<String> getAllDatabases() throws MetaException;
11 
12   public abstract boolean createType(Type type);
13 
14   public abstract Type getType(String typeName);
15 
16   public abstract boolean dropType(String typeName);
17 
18   public abstract void createTable(Table tbl) throws InvalidObjectException,
19       MetaException;
20 
21   public abstract boolean dropTable(String dbName, String tableName)
22       throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException;
23 
24   public abstract Table getTable(String dbName, String tableName)
25       throws MetaException;
26   ..................

  那么下来我们来看一下具体怎么实现的,首先hive metastore会通过调用getMS()方法,获取本地线程中的RawStore的实现,代码如下:

代码语言:javascript
复制
 1     public RawStore getMS() throws MetaException {
     //获取本地线程中已存在的RawStore
 2       RawStore ms = threadLocalMS.get();
     //如果不存在,则创建该对象的实现,并加入到本地线程中
 3       if (ms == null) {
 4         ms = newRawStore();
 5         ms.verifySchema();
 6         threadLocalMS.set(ms);
 7         ms = threadLocalMS.get();
 8       }
 9       return ms;
10     }

  看到这里,是不是很想看看newRawStore它干嘛啦?那么我们继续:

代码语言:javascript
复制
 1   public static RawStore getProxy(HiveConf hiveConf, Configuration conf, String rawStoreClassName,
 2       int id) throws MetaException {
 3   //通过反射,创建baseClass,随后再进行该实现对象的创建
 4     Class<? extends RawStore> baseClass = (Class<? extends RawStore>) MetaStoreUtils.getClass(
 5         rawStoreClassName);
 6 
 7     RawStoreProxy handler = new RawStoreProxy(hiveConf, conf, baseClass, id);
 8 
 9     // Look for interfaces on both the class and all base classes.
10     return (RawStore) Proxy.newProxyInstance(RawStoreProxy.class.getClassLoader(),
11         getAllInterfaces(baseClass), handler);
12   }

  那么问题来了,rawstoreClassName从哪里来呢?它是在HiveMetaStore进行初始化时加载的,来源于HiveConf中的METASTORE_RAW_STORE_IMPL,配置参数,也就是RawStore的实现类ObjectStore。好了,既然RawStore的实现类已经创建,那么我们继续深入ObjectStore,代码如下:

代码语言:javascript
复制
 1   @Override
 2   public void createTable(Table tbl) throws InvalidObjectException, MetaException {
 3     boolean commited = false;
 4     try {
      //创建事务
 5       openTransaction();
      //这里再次进行db 、table的校验,代码不再贴出来,具体为什么又要做一次校验,还需要深入思考
 6       MTable mtbl = convertToMTable(tbl);
     这里的pm为ObjectStore创建时,init的JDO PersistenceManage对象。这里便是提交Table对象的地方,具体可研究下JDO module对象与数据库的交互
 7       pm.makePersistent(mtbl);
     //封装权限用户、角色、组对象并写入
 8       PrincipalPrivilegeSet principalPrivs = tbl.getPrivileges();
 9       List<Object> toPersistPrivObjs = new ArrayList<Object>();
10       if (principalPrivs != null) {
11         int now = (int)(System.currentTimeMillis()/1000);
12 
13         Map<String, List<PrivilegeGrantInfo>> userPrivs = principalPrivs.getUserPrivileges();
14         putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, userPrivs, PrincipalType.USER);
15 
16         Map<String, List<PrivilegeGrantInfo>> groupPrivs = principalPrivs.getGroupPrivileges();
17         putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, groupPrivs, PrincipalType.GROUP);
18 
19         Map<String, List<PrivilegeGrantInfo>> rolePrivs = principalPrivs.getRolePrivileges();
20         putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, rolePrivs, PrincipalType.ROLE);
21       }
22       pm.makePersistentAll(toPersistPrivObjs);
23       commited = commitTransaction();
24     } finally {
      //如果失败则回滚
25       if (!commited) {
26         rollbackTransaction();
27       }
28     }
29   }

4、dropTable

   二话不说上从Hive类中上代码:

代码语言:javascript
复制
1   public void dropTable(String tableName, boolean ifPurge) throws HiveException {
    //这里Hive 将dbName与TableName合并成一个数组
2     String[] names = Utilities.getDbTableName(tableName);
3     dropTable(names[0], names[1], true, true, ifPurge);
4   }

  为什么要进行这样的处理呢,其实是因为 drop table的时候 我们的sql语句会是drop table dbName.tableName 或者是drop table tableName,这里进行tableName和DbName的组装,如果为drop table tableName,则获取当前session中的dbName,代码如下:

代码语言:javascript
复制
 1   public static String[] getDbTableName(String dbtable) throws SemanticException {
    //获取当前Session中的DbName
 2     return getDbTableName(SessionState.get().getCurrentDatabase(), dbtable);
 3   }
 4 
 5   public static String[] getDbTableName(String defaultDb, String dbtable) throws SemanticException {
 6     if (dbtable == null) {
 7       return new String[2];
 8     }
 9     String[] names =  dbtable.split("\\.");
10     switch (names.length) {
11       case 2:
12         return names;
     //如果长度为1,则重新组装
13       case 1:
14         return new String [] {defaultDb, dbtable};
15       default:
16         throw new SemanticException(ErrorMsg.INVALID_TABLE_NAME, dbtable);
17     }
18   }

  随后通过getMSC()调用HiveMetaStoreClient中的dropTable,代码如下:

代码语言:javascript
复制
 1   public void dropTable(String dbname, String name, boolean deleteData,
 2       boolean ignoreUnknownTab, EnvironmentContext envContext) throws MetaException, TException,
 3       NoSuchObjectException, UnsupportedOperationException {
 4     Table tbl;
 5     try {
     //通过dbName与tableName获取正个Table对象,也就是通过dbName与TableName获取该Table存储的所有元数据
 6       tbl = getTable(dbname, name);
 7     } catch (NoSuchObjectException e) {
 8       if (!ignoreUnknownTab) {
 9         throw e;
10       }
11       return;
12     }
    //根据table type来判断是否为IndexTable,如果为索引表则不允许删除  
13     if (isIndexTable(tbl)) {
14       throw new UnsupportedOperationException("Cannot drop index tables");
15     }
    //这里的getHook 与create时getHook一致,获取对应table存储的hook
16     HiveMetaHook hook = getHook(tbl);
17     if (hook != null) {
18       hook.preDropTable(tbl);
19     }
20     boolean success = false;
21     try {
      调用HiveMetaStore服务端的dropTable方法
22       drop_table_with_environment_context(dbname, name, deleteData, envContext);
23       if (hook != null) {
24         hook.commitDropTable(tbl, deleteData);
25       }
26       success=true;
27     } catch (NoSuchObjectException e) {
28       if (!ignoreUnknownTab) {
29         throw e;
30       }
31     } finally {
32       if (!success && (hook != null)) {
33         hook.rollbackDropTable(tbl);
34       }
35     }
36   }

  下面我们重点看下服务端HiveMetaStore干了些什么,代码如下:

代码语言:javascript
复制
 1    private boolean drop_table_core(final RawStore ms, final String dbname, final String name,
 2         final boolean deleteData, final EnvironmentContext envContext,
 3         final String indexName) throws NoSuchObjectException,
 4         MetaException, IOException, InvalidObjectException, InvalidInputException {
 5       boolean success = false;
 6       boolean isExternal = false;
 7       Path tblPath = null;
 8       List<Path> partPaths = null;
 9       Table tbl = null;
10       boolean ifPurge = false;
11       try {
12         ms.openTransaction();
13         // 获取正个Table的对象属性
14         tbl = get_table_core(dbname, name);
15         if (tbl == null) {
16           throw new NoSuchObjectException(name + " doesn't exist");
17         }
       //如果sd数据为空,则认为该表数据损坏
18         if (tbl.getSd() == null) {
19           throw new MetaException("Table metadata is corrupted");
20         }
21         ifPurge = isMustPurge(envContext, tbl);
22 
23         firePreEvent(new PreDropTableEvent(tbl, deleteData, this));
       //判断如果该表存在索引,则需要先删除该表的索引
25         boolean isIndexTable = isIndexTable(tbl);
26         if (indexName == null && isIndexTable) {
27           throw new RuntimeException(
28               "The table " + name + " is an index table. Please do drop index instead.");
29         }
       //如果不是索引表,则删除索引元数据
31         if (!isIndexTable) {
32           try {
33             List<Index> indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE);
34             while (indexes != null && indexes.size() > 0) {
35               for (Index idx : indexes) {
36                 this.drop_index_by_name(dbname, name, idx.getIndexName(), true);
37               }
38               indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE);
39             }
40           } catch (TException e) {
41             throw new MetaException(e.getMessage());
42           }
43         }
       //判断是否为外部表
44         isExternal = isExternal(tbl);
45         if (tbl.getSd().getLocation() != null) {
46           tblPath = new Path(tbl.getSd().getLocation());
47           if (!wh.isWritable(tblPath.getParent())) {
48             String target = indexName == null ? "Table" : "Index table";
49             throw new MetaException(target + " metadata not deleted since " +
50                 tblPath.getParent() + " is not writable by " +
51                 hiveConf.getUser());
52           }
53         }
54 
56         checkTrashPurgeCombination(tblPath, dbname + "." + name, ifPurge);
57         //获取所有partition的location path 这里有个奇怪的地方,为什么不将Table对象直接传入,而是又在该方法中重新getTable,同时校验上级目录的读写权限
58         partPaths = dropPartitionsAndGetLocations(ms, dbname, name, tblPath,
59             tbl.getPartitionKeys(), deleteData && !isExternal);
60      //调用ObjectStore进行meta数据的删除
61         if (!ms.dropTable(dbname, name)) {
62           String tableName = dbname + "." + name;
63           throw new MetaException(indexName == null ? "Unable to drop table " + tableName:
64               "Unable to drop index table " + tableName + " for index " + indexName);
65         }
66         success = ms.commitTransaction();
67       } finally {
68         if (!success) {
69           ms.rollbackTransaction();
70         } else if (deleteData && !isExternal) {
        //删除物理partition
73           deletePartitionData(partPaths, ifPurge);
74           //删除Table路径
75           deleteTableData(tblPath, ifPurge);
76           // ok even if the data is not deleted
77        
       //Listener 处理
78         for (MetaStoreEventListener listener : listeners) {
79           DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, deleteData, this);
80           dropTableEvent.setEnvironmentContext(envContext);
81           listener.onDropTable(dropTableEvent);
82         }
83       }
84       return success;
85     }

   我们继续深入ObjectStore中的dropTable,会发现 再一次通过dbName与tableName获取整个Table对象,随后逐一删除。也许代码并不是同一个人写的也可能是由于安全性考虑?很多可以通过接口传入的Table对象,都重新获取了,这样会不会加重数据库的负担呢?ObjectStore代码如下:

代码语言:javascript
复制
 1   public boolean dropTable(String dbName, String tableName) throws MetaException,
 2     NoSuchObjectException, InvalidObjectException, InvalidInputException {
 3     boolean success = false;
 4     try {
 5       openTransaction();
      //重新获取Table对象
 6       MTable tbl = getMTable(dbName, tableName);
 7       pm.retrieve(tbl);
 8       if (tbl != null) {
 9         //下列代码查询并删除所有的权限
10         List<MTablePrivilege> tabGrants = listAllTableGrants(dbName, tableName);
11         if (tabGrants != null && tabGrants.size() > 0) {
12           pm.deletePersistentAll(tabGrants);
13         }
       
14         List<MTableColumnPrivilege> tblColGrants = listTableAllColumnGrants(dbName,
15             tableName);
16         if (tblColGrants != null && tblColGrants.size() > 0) {
17           pm.deletePersistentAll(tblColGrants);
18         }
19 
20         List<MPartitionPrivilege> partGrants = this.listTableAllPartitionGrants(dbName, tableName);
21         if (partGrants != null && partGrants.size() > 0) {
22           pm.deletePersistentAll(partGrants);
23         }
24 
25         List<MPartitionColumnPrivilege> partColGrants = listTableAllPartitionColumnGrants(dbName,
26             tableName);
27         if (partColGrants != null && partColGrants.size() > 0) {
28           pm.deletePersistentAll(partColGrants);
29         }
30         // delete column statistics if present
31         try {
        //删除column统计表数据
32           deleteTableColumnStatistics(dbName, tableName, null);
33         } catch (NoSuchObjectException e) {
34           LOG.info("Found no table level column statistics associated with db " + dbName +
35           " table " + tableName + " record to delete");
36         }
37      //删除mcd表数据
38         preDropStorageDescriptor(tbl.getSd());
39         //删除整个Table对象相关表数据
40         pm.deletePersistentAll(tbl);
41       }
42       success = commitTransaction();
43     } finally {
44       if (!success) {
45         rollbackTransaction();
46       }
47     }
48     return success;
49   }

5、AlterTable

  下来我们看下AlterTable,AlterTable包含的逻辑较多,因为牵扯到物理存储上的路径修改等,那么我们来一点点查看。还是从Hive类中开始,上代码:

代码语言:javascript
复制
 1   public void alterTable(String tblName, Table newTbl, boolean cascade)
 2       throws InvalidOperationException, HiveException {
 3     String[] names = Utilities.getDbTableName(tblName);
 4     try {
 5       //删除table kv中的DDL_TIME 因为要alterTable所以,该事件会被改变
 6       if (newTbl.getParameters() != null) {
 7         newTbl.getParameters().remove(hive_metastoreConstants.DDL_TIME);
 8       }
      //进行相关校验,包含dbName、tableName、column、inputOutClass、outputClass的校验等,如果校验不通过则抛出HiveException
 9       newTbl.checkValidity();
      //调用alterTable
10       getMSC().alter_table(names[0], names[1], newTbl.getTTable(), cascade);
11     } catch (MetaException e) {
12       throw new HiveException("Unable to alter table. " + e.getMessage(), e);
13     } catch (TException e) {
14       throw new HiveException("Unable to alter table. " + e.getMessage(), e);
15     }
16   }

  对于HiveMetaClient,并没有做相应处理,所以我们直接来看HiveMetaStore服务端做了些什么呢?

代码语言:javascript
复制
 1     private void alter_table_core(final String dbname, final String name, final Table newTable,
 2         final EnvironmentContext envContext, final boolean cascade)
 3         throws InvalidOperationException, MetaException {
 4       startFunction("alter_table", ": db=" + dbname + " tbl=" + name
 5           + " newtbl=" + newTable.getTableName());
 6 
 7       //更新DDL_Time
 8       if (newTable.getParameters() == null ||
 9           newTable.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
10         newTable.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
11             .currentTimeMillis() / 1000));
12       }
13       boolean success = false;
14       Exception ex = null;
15       try {
       //获取已有Table的整个对象
16         Table oldt = get_table_core(dbname, name);
       //进行Event处理
17         firePreEvent(new PreAlterTableEvent(oldt, newTable, this));
       //进行alterTable处理,后面详细说明
18         alterHandler.alterTable(getMS(), wh, dbname, name, newTable, cascade);
19         success = true;
20     
       //进行Listener处理
21         for (MetaStoreEventListener listener : listeners) {
22           
23           AlterTableEvent alterTableEvent =
24               new AlterTableEvent(oldt, newTable, success, this);
25           alterTableEvent.setEnvironmentContext(envContext);
26           listener.onAlterTable(alterTableEvent);
27         }
28       } catch (NoSuchObjectException e) {
29         // thrown when the table to be altered does not exist
30         ex = e;
31         throw new InvalidOperationException(e.getMessage());
32       } catch (Exception e) {
33         ex = e;
34         if (e instanceof MetaException) {
35           throw (MetaException) e;
36         } else if (e instanceof InvalidOperationException) {
37           throw (InvalidOperationException) e;
38         } else {
39           throw newMetaException(e);
40         }
41       } finally {
42         endFunction("alter_table", success, ex, name);
43       }
44     }

  那么,我们重点看下alterHandler具体所做的事情,在这之前简要说下alterHandler的初始化,它是在HiveMetaStore init时获取的hive.metastore.alter.impl参数的className,也就是HiveAlterHandler的name,那么具体,我们来看下它alterTable时的实现,前方高能,小心火烛:)

代码语言:javascript
复制
  1   public void alterTable(RawStore msdb, Warehouse wh, String dbname,
  2       String name, Table newt, boolean cascade) throws InvalidOperationException, MetaException {
  3     if (newt == null) {
  4       throw new InvalidOperationException("New table is invalid: " + newt);
  5     }
  6    //校验新的tableName是否合法
  7     if (!MetaStoreUtils.validateName(newt.getTableName())) {
  8       throw new InvalidOperationException(newt.getTableName()
  9           + " is not a valid object name");
 10     }
     //校验新的column Name type是否合法
 11     String validate = MetaStoreUtils.validateTblColumns(newt.getSd().getCols());
 12     if (validate != null) {
 13       throw new InvalidOperationException("Invalid column " + validate);
 14     }
 15 
 16     Path srcPath = null;
 17     FileSystem srcFs = null;
 18     Path destPath = null;
 19     FileSystem destFs = null;
 20 
 21     boolean success = false;
 22     boolean moveData = false;
 23     boolean rename = false;
 24     Table oldt = null;
 25     List<ObjectPair<Partition, String>> altps = new ArrayList<ObjectPair<Partition, String>>();
 26 
 27     try {
 28       msdb.openTransaction();
      //这里直接转换小写,可以看出 代码不是一个人写的
 29       name = name.toLowerCase();
 30       dbname = dbname.toLowerCase();
 31 
 32       //校验新的tableName是否存在
 33       if (!newt.getTableName().equalsIgnoreCase(name)
 34           || !newt.getDbName().equalsIgnoreCase(dbname)) {
 35         if (msdb.getTable(newt.getDbName(), newt.getTableName()) != null) {
 36           throw new InvalidOperationException("new table " + newt.getDbName()
 37               + "." + newt.getTableName() + " already exists");
 38         }
 39         rename = true;
 40       }
 41 
 42       //获取老的table对象
 43       oldt = msdb.getTable(dbname, name);
 44       if (oldt == null) {
 45         throw new InvalidOperationException("table " + newt.getDbName() + "."
 46             + newt.getTableName() + " doesn't exist");
 47       }
 48     //alter Table时 获取 METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES配置项,如果为true的话,将改变column的type类型,这里为false
 49       if (HiveConf.getBoolVar(hiveConf,
 50             HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES,
 51             false)) {
 52         // Throws InvalidOperationException if the new column types are not
 53         // compatible with the current column types.
 54         MetaStoreUtils.throwExceptionIfIncompatibleColTypeChange(
 55             oldt.getSd().getCols(), newt.getSd().getCols());
 56       }
 57     //cascade参数由调用Hive altertable方法穿过来的,也就是引擎调用时参数的设置,这里用来查看是否需要alterPartition信息
 58       if (cascade) {
 59         //校验新的column是否与老的column一致,如不一致,说明进行了column的添加或删除操作
 60         if(MetaStoreUtils.isCascadeNeededInAlterTable(oldt, newt)) {
        //根据dbName与tableName获取整个partition的信息
 61           List<Partition> parts = msdb.getPartitions(dbname, name, -1);
 62           for (Partition part : parts) {
 63             List<FieldSchema> oldCols = part.getSd().getCols();
 64             part.getSd().setCols(newt.getSd().getCols());
 65             String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues());
          //如果columns不一致,则删除已有的column统计信息
 66             updatePartColumnStatsForAlterColumns(msdb, part, oldPartName, part.getValues(), oldCols, part);
          //更新整个Partition的信息
 67             msdb.alterPartition(dbname, name, part.getValues(), part);
 68           }
 69         } else {
 70           LOG.warn("Alter table does not cascade changes to its partitions.");
 71         }
 72       }
 73 
 74       //判断parititonkey是否改变,也就是dt 或 hour等partName是否改变
 76       boolean partKeysPartiallyEqual = checkPartialPartKeysEqual(oldt.getPartitionKeys(),
 77           newt.getPartitionKeys());
 78     
      //如果已有表为视图表,同时发现老的partkey与新的partKey不一致,则报错
 79       if(!oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())){
 80         if (oldt.getPartitionKeys().size() != newt.getPartitionKeys().size()
 81             || !partKeysPartiallyEqual) {
 82           throw new InvalidOperationException(
 83               "partition keys can not be changed.");
 84         }
 85       }
 86 
       //如果该表不为视图表,同时,该表的location信息并未发生变化,同时新的location信息并不为空,并且已有的该表不为外部表,说明用户是想要移动数据到新的location地址,那么该操作
       // 为alter table rename操作
 91       if (rename
 92           && !oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())
 93           && (oldt.getSd().getLocation().compareTo(newt.getSd().getLocation()) == 0
 94             || StringUtils.isEmpty(newt.getSd().getLocation()))
 95           && !MetaStoreUtils.isExternalTable(oldt)) {
 96      //获取新的location信息
 97         srcPath = new Path(oldt.getSd().getLocation());
 98         srcFs = wh.getFs(srcPath);
 99 
100         // that means user is asking metastore to move data to new location
101         // corresponding to the new name
102         // get new location
103         Database db = msdb.getDatabase(newt.getDbName());
104         Path databasePath = constructRenamedPath(wh.getDatabasePath(db), srcPath);
105         destPath = new Path(databasePath, newt.getTableName());
106         destFs = wh.getFs(destPath);
107      //设置新的table location信息 用于后续更新动作
108         newt.getSd().setLocation(destPath.toString());
109         moveData = true;
110 
       //校验物理目标地址是否存在,如果存在则会override所有数据,是不允许的。
114         if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
115           throw new InvalidOperationException("table new location " + destPath
116               + " is on a different file system than the old location "
117               + srcPath + ". This operation is not supported");
118         }
119         try {
120           srcFs.exists(srcPath); // check that src exists and also checks
121                                  // permissions necessary
122           if (destFs.exists(destPath)) {
123             throw new InvalidOperationException("New location for this table "
124                 + newt.getDbName() + "." + newt.getTableName()
125                 + " already exists : " + destPath);
126           }
127         } catch (IOException e) {
128           throw new InvalidOperationException("Unable to access new location "
129               + destPath + " for table " + newt.getDbName() + "."
130               + newt.getTableName());
131         }
132         String oldTblLocPath = srcPath.toUri().getPath();
133         String newTblLocPath = destPath.toUri().getPath();
134     
135         //获取old table中的所有partition信息
136         List<Partition> parts = msdb.getPartitions(dbname, name, -1);
137         for (Partition part : parts) {
138           String oldPartLoc = part.getSd().getLocation();
        //这里,便开始新老partition地址的变换,修改partition元数据信息
139           if (oldPartLoc.contains(oldTblLocPath)) {
140             URI oldUri = new Path(oldPartLoc).toUri();
141             String newPath = oldUri.getPath().replace(oldTblLocPath, newTblLocPath);
142             Path newPartLocPath = new Path(oldUri.getScheme(), oldUri.getAuthority(), newPath);
143             altps.add(ObjectPair.create(part, part.getSd().getLocation()));
144             part.getSd().setLocation(newPartLocPath.toString());
145             String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues());
146             try {
147               //existing partition column stats is no longer valid, remove them
148               msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, part.getValues(), null);
149             } catch (InvalidInputException iie) {
150               throw new InvalidOperationException("Unable to update partition stats in table rename." + iie);
151             }
152             msdb.alterPartition(dbname, name, part.getValues(), part);
153           }
154         }
       //更新stats相关信息
155       } else if (MetaStoreUtils.requireCalStats(hiveConf, null, null, newt) &&
156         (newt.getPartitionKeysSize() == 0)) {
157           Database db = msdb.getDatabase(newt.getDbName());
158           // Update table stats. For partitioned table, we update stats in
159           // alterPartition()
160           MetaStoreUtils.updateUnpartitionedTableStatsFast(db, newt, wh, false, true);
161       }
162       updateTableColumnStatsForAlterTable(msdb, oldt, newt);
163       // now finally call alter table
164       msdb.alterTable(dbname, name, newt);
165       // commit the changes
166       success = msdb.commitTransaction();
167     } catch (InvalidObjectException e) {
168       LOG.debug(e);
169       throw new InvalidOperationException(
170           "Unable to change partition or table."
171               + " Check metastore logs for detailed stack." + e.getMessage());
172     } catch (NoSuchObjectException e) {
173       LOG.debug(e);
174       throw new InvalidOperationException(
175           "Unable to change partition or table. Database " + dbname + " does not exist"
176               + " Check metastore logs for detailed stack." + e.getMessage());
177     } finally {
178       if (!success) {
179         msdb.rollbackTransaction();
180       }
181       if (success && moveData) {
       //开始更新hdfs路径,进行老路径的rename到新路径 ,调用fileSystem的rename操作
185         try {
186           if (srcFs.exists(srcPath) && !srcFs.rename(srcPath, destPath)) {
187             throw new IOException("Renaming " + srcPath + " to " + destPath + " failed");
188           }
189         } catch (IOException e) {
190           LOG.error("Alter Table operation for " + dbname + "." + name + " failed.", e);
191           boolean revertMetaDataTransaction = false;
192           try {
193             msdb.openTransaction();
           //这里会发现,又一次进行了alterTable元数据动作,或许跟JDO的特性有关?还是因为安全?
194             msdb.alterTable(newt.getDbName(), newt.getTableName(), oldt);
195             for (ObjectPair<Partition, String> pair : altps) {
196               Partition part = pair.getFirst();
197               part.getSd().setLocation(pair.getSecond());
198               msdb.alterPartition(newt.getDbName(), name, part.getValues(), part);
199             }
200             revertMetaDataTransaction = msdb.commitTransaction();
201           } catch (Exception e1) {
202             // we should log this for manual rollback by administrator
203             LOG.error("Reverting metadata by HDFS operation failure failed During HDFS operation failed", e1);
204             LOG.error("Table " + Warehouse.getQualifiedName(newt) +
205                 " should be renamed to " + Warehouse.getQualifiedName(oldt));
206             LOG.error("Table " + Warehouse.getQualifiedName(newt) +
207                 " should have path " + srcPath);
208             for (ObjectPair<Partition, String> pair : altps) {
209               LOG.error("Partition " + Warehouse.getQualifiedName(pair.getFirst()) +
210                   " should have path " + pair.getSecond());
211             }
212             if (!revertMetaDataTransaction) {
213               msdb.rollbackTransaction();
214             }
215           }
216           throw new InvalidOperationException("Alter Table operation for " + dbname + "." + name +
217             " failed to move data due to: '" + getSimpleMessage(e) + "' See hive log file for details.");
218         }
219       }
220     }
221     if (!success) {
222       throw new MetaException("Committing the alter table transaction was not successful.");
223     }
224   }

6、createPartition   在分区数据写入之前,会先进行partition的元数据注册及物理文件路径的创建(内部表),Hive类代码如下:

代码语言:javascript
复制
1   public Partition createPartition(Table tbl, Map<String, String> partSpec) throws HiveException {
2     try {
    //new出来一个Partition对象,传入Table对象,调用Partition的构造方法来initialize Partition的信息
3       return new Partition(tbl, getMSC().add_partition(
4           Partition.createMetaPartitionObject(tbl, partSpec, null)));
5     } catch (Exception e) {
6       LOG.error(StringUtils.stringifyException(e));
7       throw new HiveException(e);
8     }
9   }

  这里的createMetaPartitionObject作用在于整个Partition传入对象的校验对对象的封装,代码如下:

代码语言:javascript
复制
 1   public static org.apache.hadoop.hive.metastore.api.Partition createMetaPartitionObject(
 2       Table tbl, Map<String, String> partSpec, Path location) throws HiveException {
 3     List<String> pvals = new ArrayList<String>();
    //遍历整个PartCols,并且校验partMap中是否一一对应
 4     for (FieldSchema field : tbl.getPartCols()) {
 5       String val = partSpec.get(field.getName());
 6       if (val == null || val.isEmpty()) {
 7         throw new HiveException("partition spec is invalid; field "
 8             + field.getName() + " does not exist or is empty");
 9       }
10       pvals.add(val);
11     }
12   //set相关的属性信息,包括DbName、TableName、PartValues、以及sd信息
13     org.apache.hadoop.hive.metastore.api.Partition tpart =
14         new org.apache.hadoop.hive.metastore.api.Partition();
15     tpart.setDbName(tbl.getDbName());
16     tpart.setTableName(tbl.getTableName());
17     tpart.setValues(pvals);
18 
19     if (!tbl.isView()) {
20       tpart.setSd(cloneS d(tbl));
21       tpart.getSd().setLocation((location != null) ? location.toString() : null);
22     }
23     return tpart;
24   }

  随之MetaDataClient对于该对象调用MetaDataService的addPartition,并进行了深拷贝,这里不再详细说明,那么我们直接看下服务端干了什么:

代码语言:javascript
复制
 1     private Partition add_partition_core(final RawStore ms,
 2         final Partition part, final EnvironmentContext envContext)
 3         throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
 4       boolean success = false;
 5       Table tbl = null;
 6       try {
 7         ms.openTransaction(); 
       //根据DbName、TableName获取整个Table对象信息
 8         tbl = ms.getTable(part.getDbName(), part.getTableName());
 9         if (tbl == null) {
10           throw new InvalidObjectException(
11               "Unable to add partition because table or database do not exist");
12         }
13      //事件处理
14         firePreEvent(new PreAddPartitionEvent(tbl, part, this));
15      //在创建Partition之前,首先会校验元数据中该partition是否存在
16         boolean shouldAdd = startAddPartition(ms, part, false);
17         assert shouldAdd; // start would throw if it already existed here
       //创建Partition路径
18         boolean madeDir = createLocationForAddedPartition(tbl, part);
19         try {
        //加载一些kv信息
20           initializeAddedPartition(tbl, part, madeDir);
        //写入元数据
21           success = ms.addPartition(part);
22         } finally {
23           if (!success && madeDir) {
         //如果没有成功,便删除物理路径
24             wh.deleteDir(new Path(part.getSd().getLocation()), true);
25           }
26         }
27         // we proceed only if we'd actually succeeded anyway, otherwise,
28         // we'd have thrown an exception
29         success = success && ms.commitTransaction();
30       } finally {
31         if (!success) {
32           ms.rollbackTransaction();
33         }
34         fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, success);
35       }
36       return part;
37     }

  这里提及一个设计上的点,从之前的表结构设计上,没有直接存储PartName,而是将key与value单独存在与kv表中,这里我们看下createLocationForAddedPartition:

代码语言:javascript
复制
 1     private boolean createLocationForAddedPartition(
 2         final Table tbl, final Partition part) throws MetaException {
 3       Path partLocation = null;
 4       String partLocationStr = null;
      //如果sd不为null,则将sd的location信息作为表跟目录赋给partLocationStr
 5       if (part.getSd() != null) {
 6         partLocationStr = part.getSd().getLocation();
 7       }
 8     //如果为null,则重新拼接part Location
 9       if (partLocationStr == null || partLocationStr.isEmpty()) {
10         // set default location if not specified and this is
11         // a physical table partition (not a view)
12         if (tbl.getSd().getLocation() != null) { 
        //如果不为null,则继续拼接文件路径及part的路径,组成完成的Partition location
13           partLocation = new Path(tbl.getSd().getLocation(), Warehouse
14               .makePartName(tbl.getPartitionKeys(), part.getValues()));
15         }
16       } else {
17         if (tbl.getSd().getLocation() == null) {
18           throw new MetaException("Cannot specify location for a view partition");
19         }
20         partLocation = wh.getDnsPath(new Path(partLocationStr));
21       }
22 
23       boolean result = false;
     //将location信息写入sd表
24       if (partLocation != null) {
25         part.getSd().setLocation(partLocation.toString());
26 
27         // Check to see if the directory already exists before calling
28         // mkdirs() because if the file system is read-only, mkdirs will
29         // throw an exception even if the directory already exists.
30         if (!wh.isDir(partLocation)) {
31           if (!wh.mkdirs(partLocation, true)) {
32             throw new MetaException(partLocation
33                 + " is not a directory or unable to create one");
34           }
35           result = true;
36         }
37       }
38       return result;
39     }

  总结:

 7、dropPartition

  删除partition就不再从Hive开始了,我们直接看HiveMetaStore服务端做了什么:

代码语言:javascript
复制
 1     private boolean drop_partition_common(RawStore ms, String db_name, String tbl_name,
 2       List<String> part_vals, final boolean deleteData, final EnvironmentContext envContext)
 3       throws MetaException, NoSuchObjectException, IOException, InvalidObjectException,
 4       InvalidInputException {
 5       boolean success = false;
 6       Path partPath = null;
 7       Table tbl = null;
 8       Partition part = null;
 9       boolean isArchived = false;
10       Path archiveParentDir = null;
11       boolean mustPurge = false;
12 
13       try {
14         ms.openTransaction();
       //根据dbName、tableName、part_values获取整个part信息
15         part = ms.getPartition(db_name, tbl_name, part_vals);
       //获取所有Table对象
16         tbl = get_table_core(db_name, tbl_name);
17         firePreEvent(new PreDropPartitionEvent(tbl, part, deleteData, this));
18         mustPurge = isMustPurge(envContext, tbl);
19 
20         if (part == null) {
21           throw new NoSuchObjectException("Partition doesn't exist. "
22               + part_vals);
23         }
24      //这一片还没有深入看Arrchived partition
25         isArchived = MetaStoreUtils.isArchived(part);
26         if (isArchived) {
27           archiveParentDir = MetaStoreUtils.getOriginalLocation(part);
28           verifyIsWritablePath(archiveParentDir);
29           checkTrashPurgeCombination(archiveParentDir, db_name + "." + tbl_name + "." + part_vals, mustPurge);
30         }
31         if (!ms.dropPartition(db_name, tbl_name, part_vals)) {
32           throw new MetaException("Unable to drop partition");
33         }
34         success = ms.commitTransaction();
35         if ((part.getSd() != null) && (part.getSd().getLocation() != null)) {
36           partPath = new Path(part.getSd().getLocation());
37           verifyIsWritablePath(partPath);
38           checkTrashPurgeCombination(partPath, db_name + "." + tbl_name + "." + part_vals, mustPurge);
39         }
40       } finally {
41         if (!success) {
42           ms.rollbackTransaction();
43         } else if (deleteData && ((partPath != null) || (archiveParentDir != null))) {
44           if (tbl != null && !isExternal(tbl)) {
45             if (mustPurge) {
46               LOG.info("dropPartition() will purge " + partPath + " directly, skipping trash.");
47             }
48             else {
49               LOG.info("dropPartition() will move " + partPath + " to trash-directory.");
50             }
         //删除partition
51             // Archived partitions have har:/to_har_file as their location.
52             // The original directory was saved in params
53             if (isArchived) {
54               assert (archiveParentDir != null);
55               wh.deleteDir(archiveParentDir, true, mustPurge);
56             } else {
57               assert (partPath != null);
58               wh.deleteDir(partPath, true, mustPurge);
59               deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge);
60             }
61             // ok even if the data is not deleted
62           }
63         }
64         for (MetaStoreEventListener listener : listeners) {
65           DropPartitionEvent dropPartitionEvent =
66             new DropPartitionEvent(tbl, part, success, deleteData, this);
67           dropPartitionEvent.setEnvironmentContext(envContext);
68           listener.onDropPartition(dropPartitionEvent);
69         }
70       }
71       return true;
72     }

8、alterPartition

  alterPartition牵扯的校验及文件目录的修改,我们直接从HiveMetaStore中的rename_partition中查看:

代码语言:javascript
复制
 1     private void rename_partition(final String db_name, final String tbl_name,
 2         final List<String> part_vals, final Partition new_part,
 3         final EnvironmentContext envContext)
 4         throws InvalidOperationException, MetaException,
 5         TException {
      //日志记录
 6       startTableFunction("alter_partition", db_name, tbl_name);
 7 
 8       if (LOG.isInfoEnabled()) {
 9         LOG.info("New partition values:" + new_part.getValues());
10         if (part_vals != null && part_vals.size() > 0) {
11           LOG.info("Old Partition values:" + part_vals);
12         }
13       }
14 
15       Partition oldPart = null;
16       Exception ex = null;
17       try {
18         firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, part_vals, new_part, this));
19      //校验PartName的规范性
20         if (part_vals != null && !part_vals.isEmpty()) {
21           MetaStoreUtils.validatePartitionNameCharacters(new_part.getValues(),
22               partitionValidationPattern);
23         }
24      调用alterHandler的alterPartition进行partition物理上的rename,以及元数据修改
25         oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part);
26 
27         // Only fetch the table if we actually have a listener
28         Table table = null;
29         for (MetaStoreEventListener listener : listeners) {
30           if (table == null) {
31             table = getMS().getTable(db_name, tbl_name);
32           }
33           AlterPartitionEvent alterPartitionEvent =
34               new AlterPartitionEvent(oldPart, new_part, table, true, this);
35           alterPartitionEvent.setEnvironmentContext(envContext);
36           listener.onAlterPartition(alterPartitionEvent);
37         }
38       } catch (InvalidObjectException e) {
39         ex = e;
40         throw new InvalidOperationException(e.getMessage());
41       } catch (AlreadyExistsException e) {
42         ex = e;
43         throw new InvalidOperationException(e.getMessage());
44       } catch (Exception e) {
45         ex = e;
46         if (e instanceof MetaException) {
47           throw (MetaException) e;
48         } else if (e instanceof InvalidOperationException) {
49           throw (InvalidOperationException) e;
50         } else if (e instanceof TException) {
51           throw (TException) e;
52         } else {
53           throw newMetaException(e);
54         }
55       } finally {
56         endFunction("alter_partition", oldPart != null, ex, tbl_name);
57       }
58       return;
59     }

  这里我们着重看一下,alterHandler.alterPartition方法,前方高能:

代码语言:javascript
复制
  1   public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname,
  2       final String name, final List<String> part_vals, final Partition new_part)
  3       throws InvalidOperationException, InvalidObjectException, AlreadyExistsException,
  4       MetaException {
  5     boolean success = false;
  6 
  7     Path srcPath = null;
  8     Path destPath = null;
  9     FileSystem srcFs = null;
 10     FileSystem destFs = null;
 11     Partition oldPart = null;
 12     String oldPartLoc = null;
 13     String newPartLoc = null;
 14 
 15     //修改新的partition的DDL时间
 16     if (new_part.getParameters() == null ||
 17         new_part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null ||
 18         Integer.parseInt(new_part.getParameters().get(hive_metastoreConstants.DDL_TIME)) == 0) {
 19       new_part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
 20           .currentTimeMillis() / 1000));
 21     }
 22    //根据dbName、tableName获取整个Table对象
 23     Table tbl = msdb.getTable(dbname, name);
 24     //如果传入的part_vals为空或为0,说明修改的只是partition的其他元数据信息而不牵扯到partKV,则直接元数据,在msdb.alterPartition会直接更新
 25     if (part_vals == null || part_vals.size() == 0) {
 26       try {
 27         oldPart = msdb.getPartition(dbname, name, new_part.getValues());
 28         if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
 29           MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
 30         }
 31         updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part);
 32         msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
 33       } catch (InvalidObjectException e) {
 34         throw new InvalidOperationException("alter is not possible");
 35       } catch (NoSuchObjectException e){
 36         //old partition does not exist
 37         throw new InvalidOperationException("alter is not possible");
 38       }
 39       return oldPart;
 40     }
 41     //rename partition
 42     try {
 43       msdb.openTransaction();
 44       try {
       //获取oldPart对象信息
 45         oldPart = msdb.getPartition(dbname, name, part_vals);
 46       } catch (NoSuchObjectException e) {
 47         // this means there is no existing partition
 48         throw new InvalidObjectException(
 49             "Unable to rename partition because old partition does not exist");
 50       }
 51       Partition check_part = null;
 52       try {
       //组装newPart的partValues等Partition信息
 53         check_part = msdb.getPartition(dbname, name, new_part.getValues());
 54       } catch(NoSuchObjectException e) {
 55         // this means there is no existing partition
 56         check_part = null;
 57       }
      //如果check_part组装成功,说明该part已经存在,则报already exists
 58       if (check_part != null) {
 59         throw new AlreadyExistsException("Partition already exists:" + dbname + "." + name + "." +
 60             new_part.getValues());
 61       }
      //table的信息校验
 62       if (tbl == null) {
 63         throw new InvalidObjectException(
 64             "Unable to rename partition because table or database do not exist");
 65       }
 66 
 67       //如果是外部表的分区变化了,那么不需要操作文件系统,直接更新meta信息即可
 68       if (tbl.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
 69         new_part.getSd().setLocation(oldPart.getSd().getLocation());
 70         String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues());
 71         try {
 72           //existing partition column stats is no longer valid, remove
 73           msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null);
 74         } catch (NoSuchObjectException nsoe) {
 75           //ignore
 76         } catch (InvalidInputException iie) {
 77           throw new InvalidOperationException("Unable to update partition stats in table rename." + iie);
 78         }
 79         msdb.alterPartition(dbname, name, part_vals, new_part);
 80       } else {
 81         try {
         //获取Table的文件路径
 82           destPath = new Path(wh.getTablePath(msdb.getDatabase(dbname), name),
 83             Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues()));
         //拼接新的Partition的路径
 84           destPath = constructRenamedPath(destPath, new Path(new_part.getSd().getLocation()));
 85         } catch (NoSuchObjectException e) {
 86           LOG.debug(e);
 87           throw new InvalidOperationException(
 88             "Unable to change partition or table. Database " + dbname + " does not exist"
 89               + " Check metastore logs for detailed stack." + e.getMessage());
 90         }
       //如果destPath不为空,说明改变了文件路径
 91         if (destPath != null) {
 92           newPartLoc = destPath.toString();
 93           oldPartLoc = oldPart.getSd().getLocation();
 94       //根据原有sd的路径获取老的part路径信息
 95           srcPath = new Path(oldPartLoc);
 96 
 97           LOG.info("srcPath:" + oldPartLoc);
 98           LOG.info("descPath:" + newPartLoc);
 99           srcFs = wh.getFs(srcPath);
100           destFs = wh.getFs(destPath);
101           //查看srcFS与destFs是否Wie同一个fileSystem
102           if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
103             throw new InvalidOperationException("table new location " + destPath
104               + " is on a different file system than the old location "
105               + srcPath + ". This operation is not supported");
106           }
107           try {
          //校验老的partition路径与新的partition路径是否一致,同时新的partition路径是否已经存在  
108             srcFs.exists(srcPath); // check that src exists and also checks
109             if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) {
110               throw new InvalidOperationException("New location for this table "
111                 + tbl.getDbName() + "." + tbl.getTableName()
112                 + " already exists : " + destPath);
113             }
114           } catch (IOException e) {
115             throw new InvalidOperationException("Unable to access new location "
116               + destPath + " for partition " + tbl.getDbName() + "."
117               + tbl.getTableName() + " " + new_part.getValues());
118           }
119           new_part.getSd().setLocation(newPartLoc);
120           if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
121             MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
122           }
         //拼接oldPartName,并且删除原有oldPart的信息,写入新的partition信息
123           String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues());
124           try {
125             //existing partition column stats is no longer valid, remove
126             msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null);
127           } catch (NoSuchObjectException nsoe) {
128             //ignore
129           } catch (InvalidInputException iie) {
130             throw new InvalidOperationException("Unable to update partition stats in table rename." + iie);
131           }
132           msdb.alterPartition(dbname, name, part_vals, new_part);
133         }
134       }
135 
136       success = msdb.commitTransaction();
137     } finally {
138       if (!success) {
139         msdb.rollbackTransaction();
140       }
141       if (success && newPartLoc != null && newPartLoc.compareTo(oldPartLoc) != 0) {
142         //rename the data directory
143         try{
144           if (srcFs.exists(srcPath)) {
145             //如果根路径海微创建,需要重新进行创建,就好比计算引擎先调用了alterTable,又调用了alterPartition,这时partition的根路径或许还未创建
146             Path destParentPath = destPath.getParent();
147             if (!wh.mkdirs(destParentPath, true)) {
148                 throw new IOException("Unable to create path " + destParentPath);
149             }
          //进行原路径与目标路径的rename
150             wh.renameDir(srcPath, destPath, true);
151             LOG.info("rename done!");
152           }
153         } catch (IOException e) {
154           boolean revertMetaDataTransaction = false;
155           try {
156             msdb.openTransaction();
157             msdb.alterPartition(dbname, name, new_part.getValues(), oldPart);
158             revertMetaDataTransaction = msdb.commitTransaction();
159           } catch (Exception e1) {
160             LOG.error("Reverting metadata opeation failed During HDFS operation failed", e1);
161             if (!revertMetaDataTransaction) {
162               msdb.rollbackTransaction();
163             }
164           }
165           throw new InvalidOperationException("Unable to access old location "
166               + srcPath + " for partition " + tbl.getDbName() + "."
167               + tbl.getTableName() + " " + part_vals);
168         }
169       }
170     }
171     return oldPart;
172   }

  暂时到这里吧~后续咱们慢慢玩哈~

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-02-24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档