RowBatch 即对一批行(TupleRow)的封装, 每一行有多个元祖Tuple组成, 最大的行数在构建时是固定的, RowBatch 算子之间交互的最小的单位
我们从下面可以看到Impalal 每一列, 即每一个Tuple 数据类型及对应的大小计算方式,
如 BOOLEAN, INT, .. 的每个Tuple 的大小是固定的, VARCHAR ,STRING 类型,Tuple 大小Debug小时是8字节, 应该是一个指针
enum PrimitiveType {
INVALID_TYPE = 0,
TYPE_NULL,
TYPE_BOOLEAN,
TYPE_TINYINT,
TYPE_SMALLINT,
TYPE_INT,
TYPE_BIGINT,
TYPE_FLOAT,
TYPE_DOUBLE,
TYPE_TIMESTAMP,
TYPE_STRING,
TYPE_DATE,
TYPE_DATETIME, // Not implemented
TYPE_BINARY, // Not used, see AuxColumnType::StringSubtype
TYPE_DECIMAL,
TYPE_CHAR,
TYPE_VARCHAR,
TYPE_FIXED_UDA_INTERMEDIATE,
TYPE_STRUCT,
TYPE_ARRAY,
TYPE_MAP
};
string TypeToString(PrimitiveType t) {
switch (t) {
case INVALID_TYPE: return "INVALID";
case TYPE_NULL: return "NULL";
case TYPE_BOOLEAN: return "BOOLEAN";
case TYPE_TINYINT: return "TINYINT";
case TYPE_SMALLINT: return "SMALLINT";
case TYPE_INT: return "INT";
case TYPE_BIGINT: return "BIGINT";
case TYPE_FLOAT: return "FLOAT";
case TYPE_DOUBLE: return "DOUBLE";
case TYPE_DATE: return "DATE";
case TYPE_DATETIME: return "DATETIME";
case TYPE_TIMESTAMP: return "TIMESTAMP";
case TYPE_STRING: return "STRING";
case TYPE_VARCHAR: return "VARCHAR";
case TYPE_BINARY: return "BINARY";
case TYPE_DECIMAL: return "DECIMAL";
case TYPE_CHAR: return "CHAR";
case TYPE_FIXED_UDA_INTERMEDIATE: return "FIXED_UDA_INTERMEDIATE";
case TYPE_STRUCT: return "STRUCT";
case TYPE_ARRAY: return "ARRAY";
case TYPE_MAP: return "MAP";
};
return "";
}
/// Returns the byte size of this type. Returns 0 for variable length types.
inline int GetByteSize() const { return GetByteSize(*this); }
/// Returns the size of a slot for this type.
inline int GetSlotSize() const { return GetSlotSize(*this); }
/// Helper function for GetSlotSize() so that struct size could be calculated
/// recursively.
static inline int GetSlotSize(const ColumnType& col_type) {
switch (col_type.type) {
case TYPE_STRUCT: {
int struct_size = 0;
for (ColumnType child_type : col_type.children) {
struct_size += GetSlotSize(child_type);
}
return struct_size;
}
case TYPE_STRING:
case TYPE_VARCHAR:
return 12;
case TYPE_CHAR:
case TYPE_FIXED_UDA_INTERMEDIATE:
return col_type.len;
case TYPE_ARRAY:
case TYPE_MAP:
return 12;
default:
return GetByteSize(col_type);
}
}
/// Helper function for GetByteSize()
static inline int GetByteSize(const ColumnType& col_type) {
switch (col_type.type) {
case TYPE_STRUCT: {
int struct_size = 0;
for (ColumnType child_type : col_type.children) {
struct_size += GetByteSize(child_type);
}
return struct_size;
}
// 0
case TYPE_ARRAY:
case TYPE_MAP:
case TYPE_STRING:
case TYPE_VARCHAR:
return 0;
case TYPE_CHAR:
case TYPE_FIXED_UDA_INTERMEDIATE:
return col_type.len;
case TYPE_NULL:
case TYPE_BOOLEAN:
case TYPE_TINYINT:
return 1;
case TYPE_SMALLINT:
return 2;
case TYPE_INT:
case TYPE_DATE:
case TYPE_FLOAT:
return 4;
case TYPE_BIGINT:
case TYPE_DOUBLE:
return 8;
case TYPE_TIMESTAMP:
// This is the size of the slot, the actual size of the data is 12.
return 16;
case TYPE_DECIMAL:
return GetDecimalByteSize(col_type.precision);
case INVALID_TYPE:
default:
DCHECK(false) << "NYI: " << col_type.type;
}
return 0;
}
Tuple, TupeRow, RowBatch
一个Tuple, 即对应个Slot槽位, 在初始化时存储u固定大小槽的连续字节蓄力额, 已经包含每个可谓空槽的指示
Tuple/Slot 即用一个TupleDesciptor来表示,(这里一个TupleDescptor /SlotDescriptor是设置单个RowBatch 或者说一张表的属性)
tuple/slot 在设计上相同的表达, 都是用指针来描述一段连续的存储
class Tuple {
public:
/// initialize individual tuple with data residing in mem pool
static Tuple* Create(int size, MemPool* pool) {
if (size == 0) return NULL;
Tuple* result = reinterpret_cast<Tuple*>(pool->Allocate(size));
result->Init(size);
return result;
}
void* GetSlot(int offset) {
DCHECK(offset != -1); // -1 offset indicates non-materialized slot
return reinterpret_cast<char*>(this) + offset;
}
void* GetSlot(int offset) {
DCHECK(offset != -1); // -1 offset indicates non-materialized slot
return reinterpret_cast<char*>(this) + offset;
}
const void* GetSlot(int offset) const {
DCHECK(offset != -1); // -1 offset indicates non-materialized slot
return reinterpret_cast<const char*>(this) + offset;
}
bool* GetBoolSlot(int offset) {
return static_cast<bool*>(GetSlot(offset));
}
int32_t* GetIntSlot(int offset) {
return static_cast<int32_t*>(GetSlot(offset));
}
int64_t* GetBigIntSlot(int offset) {
return static_cast<int64_t*>(GetSlot(offset));
}
class SlotDescriptor {
public:
const SlotId id_;
const ColumnType type_;
const TupleDescriptor* parent_;
/// Non-NULL only for complex type slots
const TupleDescriptor* children_tuple_descriptor_;
// TODO for 2.3: rename to materialized_path_
const SchemaPath col_path_;
const int tuple_offset_;
const NullIndicatorOffset null_indicator_offset_;
/// the idx of the slot in the tuple descriptor (0-based).
/// this is provided by the FE
const int slot_idx_;
/// the byte size of this slot.
const int slot_size_;
const TVirtualColumnType::type virtual_column_type_;
举个例子: 每张表都有固定id, 在impala/catalogd中都可以通过tuple id 获取到对应表的tuple_desc
class TupleDescriptor {
public:
const std::vector<SlotDescriptor*>& slots() const { return slots_; }
const TableDescriptor* table_desc() const { return table_desc_; }
TupleId id() const { return id_; }
protected:
friend class DescriptorTbl;
const TupleId id_;
TableDescriptor* table_desc_ = nullptr;
const int byte_size_;
const int num_null_bytes_;
const int null_bytes_offset_;
/// Contains all slots. Slots are in the same order as the expressions that materialize
/// them. See Tuple::MaterializeExprs().
std::vector<SlotDescriptor*> slots_;
/// Contains only materialized string slots.
std::vector<SlotDescriptor*> string_slots_;
/// Contains only materialized map and array slots.
std::vector<SlotDescriptor*> collection_slots_;
}
RowBatch
class RowBatch {
public:
int ALWAYS_INLINE AddRows(int n) {
DCHECK_LE(num_rows_ + n, capacity_);
return num_rows_;
}
int ALWAYS_INLINE AddRow() { return AddRows(1); }
void ALWAYS_INLINE CommitRows(int n) {
DCHECK_GE(n, 0);
DCHECK_LE(num_rows_ + n, capacity_);
num_rows_ += n;
}
void ALWAYS_INLINE CommitLastRow() { CommitRows(1); }
class Iterator {
public:
/// Returns the row batch which this iterator is iterating through.
RowBatch* parent() { return parent_; }
private:
/// Number of tuples per row.
const int num_tuples_per_row_;
/// Pointer to the current row.
Tuple** row_; //二级指针课指向具体的每一行
/// Pointer to the row after the last row for read iterators.
Tuple** const row_batch_end_;
/// The row batch being iterated on.
RowBatch* const parent_;
};
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。