Hive事务设计


发布于 2025-02-16 / 3 阅读 / 0 评论 /
Hive事务设计和实现

1.Hive事务表

随着技术的发展,hive在事务上的支持正发生着一些变化。

1.1.Hive早期版本不支持事务

Hive 原本是不支持事务的,也就是不支持增删改(insert、delete、update)、回滚等操作。原因有两个:

(1)Hive的核心目标是:将已经存在的结构化数据文件映射成为表,然后提供基于表的SQL分析处理。也就是说Hive是面向分析的,并不是面向事务的。

(2)HDFS不支持随机修改文件。

1.2.Hive不支持事务带来的弊端

但是随着技术的发展,不支持事务在某些方面也会带来很大的弊端,比如以下三方面:

(1)流式传输数据

使用如Apache Flume、Apache Kafka之类的工具将数据流式传输到Hadoop集群中。虽然这些工具可以每秒数百行或更多行的速度写入数据,但是Hive只能每隔15分钟到一个小时添加一次分区。如果每分甚至每秒频繁添加分区会很快导致表中大量的分区,并将许多小文件留在目录中,这将给NameNode带来压力。

因此通常使用这些工具将数据流式传输到已有分区中,但这有可能会造成脏读(数据传输一半失败,回滚了)。

需要通过事务功能,允许用户获得一致的数据视图并避免过多的小文件产生。

(2)尺寸变化缓慢

星型模式数据仓库中,维度表随时间缓慢变化。例如,零售商将开设新商店,需要将其添加到商店表中,或者现有商店

可能会更改其平方英尺或某些其他跟踪的特征。这些更改导致需要插入单个记录或更新单条记录(取决于所选策略)。

(3)数据重述

有时发现收集的数据不正确,需要更正。

1.3.Hive事务表的局限

所以Hive0.14后开始支持事务,即创建事务表。但是事务表有很大的限制,体现在以下8个方面:

(1)尚不支持BEGIN,COMMIT和ROLLBACK,所有语言操作都是自动提交的;

(2)表必须是分桶表(Bucketed)才可以使用事务功能。

(3)表文件存储格式仅支持ORC(STORED AS ORC);

(4)表属性参数transactional必须设置为true;

(5)默认情况下事务配置为关闭。需要配置参数开启使用。

(6)外部表无法创建为事务表,因为Hive只能控制元数据,无法管理数据;

(7)必须将Hive事务管理器设置为org.apache.hadoop.hive.ql.lockmgr.DbTxnManager才能使用ACID表;

(8)事务表不支持LOAD DATA⋯语句。

2.Hive事务实现原理

由于HDFS中只支持追加数据,并不能像MySQL一样随机修改数据。所以,采用不改变原始数据,创建文件进行标记的方法来实现事务。

2.1.事务开始时创建文件夹

每次执行一次事务操作都会创建一个文件夹。

(1)insert语句会创建delta_xxx_xxx_xxx文件夹;

(2)delete语句会创建delete_delta_xxx_xxx_xxx文件夹;

(3)update语句是通过delete、insert两个语句完成的,即先删除,后添加。

举例来说,先插入两条数据,再更新一条

CREATE TABLE emp (id int, name string, salary int)
STORED AS ORC TBLPROPERTIES ('transactional' = 'true');

INSERT INTO emp VALUES
(1, 'xiaoming', 3600),
(2, 'zhangsan',   5700),
(3, 'lisi',  9200);

UPDATE people SET salary = 6000 WHERE id = 2;

那么在hdfs上可以看到如下delta目录

/usr/hive/warehouse/emp/delta_0000001_0000001_0000
/usr/hive/warehouse/emp/delete_delta_0000002_0000002_0000
/usr/hive/warehouse/emp/delta_0000002_0000002_0000

其中delta_0000001_0000001_0000是insert事务后生成的。

delete_delta_0000002_0000002_0000和delta_0000002_0000002_0000是update事务后生成的。

2.1.1.文件夹命名规则

对应的delta文件夹是如何命名的?

Hive会为每个写入语句(INSERT、DELETE等)创建一个写事务ID(Write ID),该ID是自增的。

而一个事务可能含有多条写入语句时,即会产生多个写事务ID。因为ID是自增的,所以,一个事务内的多个写事务ID是连续,故可用mixWID 至 maxWID表示此次事务的所有写事务ID,然后再用stmtID对写事务ID进行区分。

因此,命名规范为delta_minWID_maxWID_stmtID

2.1.2.事务中文件夹命名规则

正在执行中的事务,是以一个.hive-staging_hive_xxx的文件夹维护的,执行结束后就会改名为delta_xxx_xxx_xxx或delete_delta_xxx_xxx_xxx文件夹。

当访问Hive数据时,根据HDFS原始文件和相应的delta增量文件做合并,从而得到查询数据。

2.2.事务文件夹中的文件内容

每个事务的delta文件夹下,都有两个文件:

/usr/hive/warehouse/emp/delta_0000001_0000001_0000
/usr/hive/warehouse/emp/delta_0000001_0000001_0000/_orc_acid_version
/usr/hive/warehouse/emp/delta_0000001_0000001_0000/bucket_00000

orc_acid_version文件:文件里的内容是2,标识acid版本为2。和版本1的主要区别是UPDATE语句采用了split-update特性,即先删除、后插入。

bucket_00000文件:文件里是写入的数据内容。如果事务表没有分区和分桶,就只有一个这样的文件。

解析bucket_00000文件,内容如下所示:

{"operation": 0, "originalTransaction": 1, "bucket": 12345678, "currentTransaction":1, "rowId": 1,"currentTransaction": 1, "row": {"id": 1, "name": "xiaoming", "salary": 3600}}
{"operation": 0, "originalTransaction": 1, "bucket": 12345678, "currentTransaction":1, "rowId": 1,"currentTransaction": 1, "row": {"id": 2, "name": "zhangsan", "salary": 5700}}
{"operation": 0, "originalTransaction": 1, "bucket": 12345678, "currentTransaction":1, "rowId": 1,"currentTransaction": 1, "row": {"id": 3, "name": "lisi", "salary": 9400}}

每一条语句的组成为:

operation:0表示插入,1表示更新,2表示删除。由于版本2的update是通过delete和insert完成的,所以实际中,operation的值只为0或2。如果是delta文件夹下的,则为0;如果为delete_delta文件夹下,则为2

originalTransaction:该条记录的原始写事务ID

currentTransaction:当前的写事务ID。如果和originalTransaction相等,说明数据没有被修改过。

rowId:一个自增的唯一ID,在写事务和分桶的组合中唯一。

row:具体数据。对于DELETE语句,则为null,对于INSERT就是插入的数据,对于UPDATE就是更新后的数据。

2.3.合并器(Compactor)

随着写操作的积累,表中的 delta 和 delete 文件会越来越多。事务表的读取过程中需要合并所有文件,数量一多势必会影响效率。此外,小文件对 HDFS 这样的文件系统也是不够友好的。

合并器Compactor是一套在Hive Metastore内运行,支持ACID系统的后台进程。所有合并都是在后台完成的,不会阻止数据的并发读、写。

合并后,系统将等待所有旧文件的读操作完成后,删除旧文件。在 Minor 或 Major Compaction 执行之后,原来的文件不会被立刻删除。这是因为删除的动作是在另一个名为 Cleaner 的线程中执行的。因此,表中可能同时存在不同事务 ID 的文件组合,这在读取过程中需要做特殊处理。

Hive 引入了压缩(Compaction)的概念,分为 Minor 和 Major 两类。

2.3.1.minor compaction(小合并)

将一组delta增量文件重写为单个增量文件,数据会被排序和合并起来。Minor Compaction 不会删除任何数据。

minor默认触发条件为10个delta文件,由hive.compactor.delta.num.threshold参数控制。也可以手动触发,语法如下:

ALTER TABLE emp COMPACT 'minor';

合并后,表目录结构如下所示:

/usr/hive/warehouse/emp/delete_delta_0000001_0000002
/usr/hive/warehouse/emp/delta_0000001_0000002

2.3.2.major compaction(大合并)

将多个增量文件和基础文件重写为新的基础文件,默认触发条件为delta文件相应于基础文件占比,10%,相关参数为hive.compactor.delta.pct.threshold。

大合并后,表目录结构如下所示:

/usr/hive/warehouse/emp/base_0000002

2.4.读事务过程

ACID 事务表中会包含三类文件,分别是 base、delta、以及 delete。文件中的每一行数据都会以 row__id 作为标识并排序。

从 ACID 事务表中读取数据就是对这些文件进行合并,从而得到最新事务的结果。合并过程如下:

(1)对所有数据行按照 (originalTransaction, bucketId, rowId) 正序排列,(currentTransaction) 倒序排列

(2)获取下一条记录;

(3)如果当前记录的 row__id 和上条数据一样,则跳过;

(4)如果当前记录的操作类型为 DELETE,也跳过;

(5)如果没有跳过,记录将被输出给下游;

(6)从(2)开始循环,重复以上过程。

这一过程是在 OrcInputFormat 和 OrcRawRecordMerger 类中实现的,本质上是一个合并排序的算法。

3.事务和锁的相关配置参数

hive事务和锁相关的参数主要有以下几个:

(1)hive.txn.manager

配置为 org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager 以提供类似 Hive-0.13 版本之前的传统的锁机制,此时不提供对 transactions 事务的支持;

配置为 org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 以提供对事务表的支持;

(2)hive.lock.manager

配置为org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager,基于 zk构建锁,提供类似 Hive-0.13 版本之前的传统的锁机制,此时不提供对 transactions 事务的支持;

配置为 org.apache.hadoop.hive.ql.lockmgr.DbLockManager,基于 metastore db构建锁,提供了对事务表的支持;

(3)hive.support.concurrency:

需要配置为 TURE 才能支持并发读写和事务;

(4)hive.txn.strict.locking.mode

true,在 STRICT 模式下,非事务表获取的也是传统的 R/W读写锁,此时 INSERT 获取的时 X排他锁。

false,在非严格模式下,INSERT 非事务表时会获取S锁,即允许多个作业并发写同一个非事务表。

需要说明的是,当 hive.txn.manager 配置为 org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 时,不管 hive.lock.manager的配置是org.apache.hadoop.hive.ql.lockmgr.DbLockManager还是 org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager,其底层实际使用的都是 org.apache.hadoop.hive.ql.lockmgr.DbLockManager