Hudi Java Client总结|读取Hive写Hudi代码示例
admin
2024-01-17 16:48:10
0

前言

Hudi除了支持Spark、Fink写Hudi外,还支持Java客户端。本文总结Hudi Java Client如何使用,主要为代码示例,可以实现读取Hive表写Hudi表。当然也支持读取其他数据源,比如mysql,实现读取mysql的历史数据和增量数据写Hudi。

版本

Hudi 0.12.0

功能支持

支持insert/upsert/delete,暂不支持bulkInsert
目前仅支持COW表
支持完整的写Hudi操作,包括rollback、clean、archive等

代码

完整代码已上传GitHub:https://github.com/dongkelun/hudi-demo/tree/master/java-client

其中HoodieJavaWriteClientExample是从Hudi源码里拷贝的,包含了insert/upsert/delte/的代码示例,JavaClientHive2Hudi是我自己的写的代码示例总结,实现了kerberos认证、读取Hive表Schema作为写hudi的Schema、读取Hive表数据写hudi表,并同步hudi元数据至hive元数据,实现自动创建Hive元数据,当然也支持读取其他数据源,比如mysql,实现历史和增量写。

相比于HoodieJavaWriteClientExample,JavaClientHive2Hudi加了很多配置参数,更贴近实际使用,比如HoodieJavaWriteClientExample的payload为HoodieAvroPayload这只能作为示例使用,JavaClientHive2Hudi使用的为DefaultHoodieRecordPayload它支持预合并和历史值比较,关于这一点可以参考我之前写的文章:Hudi preCombinedField 总结(二)-源码分析,如果只需要预合并功能,可以使用OverwriteWithLatestAvroPayload,这俩分别是Spark SQL 和 Spark DF的默认值,当然都不需要的话,也支持HoodieAvroPayload,代码里是根据条件判断需要用哪个payloadClassName

String payloadClassName = shouldOrdering ? DefaultHoodieRecordPayload.class.getName() :shouldCombine ? OverwriteWithLatestAvroPayload.class.getName() : HoodieAvroPayload.class.getName();

然后利用反射构造payload,其实这里反射的逻辑就是Hudi Spark源码里的逻辑。

另一个它更贴近实际使用的原因就是我们项目上就是将Hudi Java Client封装成了一个NIFI processor,然后用NIFI调度,其性能和稳定性都能够满足项目需求,这里的核心逻辑和实际项目中的逻辑是差不多的。关于我们使用Java客户端的原因是由于历史原因造成的,因为我们之前还没有调度Spark、Flink的开发工具(之前用的NIFI),而开发一个新的开发工具的话是需要时间成本的,所以选择了Java客户端,我们现在已经将Apache DolphinScheduler作为自己的开发调度工具了,后面会主要使用Spark/Flink,所以现在总结一下Hudi Java Client的使用以及源码,避免遗忘,也希望对大家有所帮助。

初始化Hudi表

Java Client的代码更贴近源码

initTable主要是根据一些配置信息,生成.hoodie元数据路径,并生成hoodie.properties元数据文件,该文件里持久化保存了Hudi的一些配置信息

            if (!(fs.exists(path) && fs.exists(hoodiePath))) { //根据Hudi路径存不存在,判断Hudi表需不需要初始化if (Arrays.asList(INSERT_OPERATION, UPSERT_OPERATION).contains(writeOperationType)) {HoodieTableMetaClient.withPropertyBuilder().setTableType(TABLE_TYPE).setTableName(targetTable).setPayloadClassName(payloadClassName).setRecordKeyFields(recordKeyFields).setPreCombineField(preCombineField).setPartitionFields(partitionFields).setBootstrapIndexClass(NoOpBootstrapIndex.class.getName()).initTable(hadoopConf, tablePath);} else if (writeOperationType.equals(DELETE_OPERATION)) { //Delete操作,Hudi表必须已经存在throw new TableNotFoundException(tablePath);}}

hoodie.properties

#Properties saved on 2022-10-24T07:40:36.530Z
#Mon Oct 24 15:40:36 CST 2022
hoodie.table.name=test_hudi_target
hoodie.archivelog.folder=archived
hoodie.table.type=COPY_ON_WRITE
hoodie.table.version=5
hoodie.timeline.layout.version=1
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.checksum=1749434190

创建HoodieJavaWriteClient

首先要创建HoodieWriteConfig,主要是hudi的一些配置,比如表名、payload、索引、clean等一些参数,具体可以自己去了解。

HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath).withSchema(writeSchema.toString()).withParallelism(2, 2).withDeleteParallelism(2).forTable(targetTable).withWritePayLoad(payloadClassName).withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(orderingField).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM)// .bloomIndexPruneByRanges(false) // 1000万总体时间提升1分钟.bloomFilterFPP(0.000001)   // 1000万总体时间提升3分钟.fromProperties(indexProperties).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileLimit).approxRecordSize(recordSizeEstimate).build()).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(150, 200).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(100).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(maxFileSize).build()).build();writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)

startCommit

返回commitTime,首先会执行rollback,然后创建一个.commit.request,再将commitTime返回

String newCommitTime = writeClient.startCommit();

generateRecord

这里主要是构造写hudi需要的数据结构,包含HoodieKey和payLoad,其中delete操作只需要HoodieKey

    public static List> generateRecord(ResultSet rs,Schema writeSchema,String payloadClassName,boolean shouldCombine) throws IOException, SQLException {List> list = new ArrayList<>();while (rs.next()) {GenericRecord rec = new GenericData.Record(writeSchema);writeSchema.getFields().forEach(field -> {try {rec.put(field.name(), convertValueType(rs, field.name(), field.schema().getType()));} catch (SQLException e) {throw new RuntimeException(e);}});String partitionPath = partitionFields == null ? "" : getRecordPartitionPath(rs, writeSchema);System.out.println(partitionPath);String rowKey = recordKeyFields == null && writeOperationType.equals(INSERT_OPERATION) ? UUID.randomUUID().toString() : getRecordKey(rs, writeSchema);HoodieKey key = new HoodieKey(rowKey, partitionPath);if (shouldCombine) {Object orderingVal = HoodieAvroUtils.getNestedFieldVal(rec, preCombineField, false, false);list.add(new HoodieAvroRecord<>(key, createPayload(payloadClassName, rec, (Comparable) orderingVal)));} else {list.add(new HoodieAvroRecord<>(key, createPayload(payloadClassName, rec)));}}return list;}

写Hudi

最后执行写Hudi的操作,常用upsert/insert/delete,Java Client也是默认开启clean等操作的,具体的实现是在HoodieJavaCopyOnWriteTable中。目前还不支持bulkInsert等操作,后面如果我有能力的话,会尝试提交PR支持。

writeClient.upsert(records, newCommitTime);
writeClient.insert(records, newCommitTime);
writeClient.delete(records, newCommitTime);

同步Hive

最后是同步元数据至Hive,实现在hive中建表,这一步是可选的。这样可以利用Hive SQL和Spark SQL查询Hudi表

    /*** 利用HiveSyncTool同步Hive元数据* Spark写Hudi同步hive元数据的源码就是这样同步的** @param properties* @param hiveConf*/public static void syncHive(TypedProperties properties, HiveConf hiveConf) {HiveSyncTool hiveSyncTool = new HiveSyncTool(properties, hiveConf);hiveSyncTool.syncHoodieTable();}public static HiveConf getHiveConf(String hiveSitePath, String coreSitePath, String hdfsSitePath) {HiveConf configuration = new HiveConf();configuration.addResource(new Path(hiveSitePath));configuration.addResource(new Path(coreSitePath));configuration.addResource(new Path(hdfsSitePath));return configuration;}/*** 同步Hive元数据的一些属性配置* @param basePath* @return*/public static TypedProperties getHiveSyncProperties(String basePath) {TypedProperties properties = new TypedProperties();properties.put(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name());properties.put(HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key(), true);properties.put(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), dbName);properties.put(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), targetTable);properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), basePath);properties.put(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), MultiPartKeysValueExtractor.class.getName());properties.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), partitionFields);if (partitionFields != null && !partitionFields.isEmpty()) {properties.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), partitionFields);}return properties;}

与0.9.0版本差异

之前是基于0.9.0版本开发的,本文代码示例基于0.12.0,核心代码是一样的,差异的地方有两处

1、0.9.0 clean、archive的参数都是在withCompactionConfig中,现在单独拎出来
2、0.9.0 HiveSyncTool的参数为HiveSyncConfig,现在为TypedProperties

总结

Hudi Java Client和Spark、Flink一样都可以实现完整的写Hudi的逻辑,但是目前功能支持还不完善,比如不支持MOR表,而且性能上也不如Spark、Flink,毕竟Spark、FLink都是集群,但是Hudi Java Client可以集成到其他框架中,比如NIFI,集成起来比较方便,集成到NIFI的好处是,可以通过拖来拽配置参数的形式完成历史数据和增量数据写入Hudi。也可以自己实现多线程,提升性能,我们目前测试的性能是Insert可以达到10000条/s,而upsert因为需要读取索引,还有历史数据的更新,可能需要重写整个表,所以当历史数据比较大且更新占比比较高时,单线程的性能会非常差,但是我们基于源码改造,将布隆索引和写数据的部分改为多线程后,性能就会提升很多,当然这也取决于机器的性能,和CPU、内存有关。对于数据量不是很大的ZF数据,一般大表几十亿,性能还是可以满足要求的。

相关阅读

  • Hudi preCombinedField 总结
  • HUDI preCombinedField 总结(二)-源码分析
  • 超硬核!详解Apache Hudi灵活的Payload机制

相关内容

热门资讯

linux入门---制作进度条 了解缓冲区 我们首先来看看下面的操作: 我们首先创建了一个文件并在这个文件里面添加了...
C++ 机房预约系统(六):学... 8、 学生模块 8.1 学生子菜单、登录和注销 实现步骤: 在Student.cpp的...
A.机器学习入门算法(三):基... 机器学习算法(三):K近邻(k-nearest neigh...
数字温湿度传感器DHT11模块... 模块实例https://blog.csdn.net/qq_38393591/article/deta...
有限元三角形单元的等效节点力 文章目录前言一、重新复习一下有限元三角形单元的理论1、三角形单元的形函数(Nÿ...
Redis 所有支持的数据结构... Redis 是一种开源的基于键值对存储的 NoSQL 数据库,支持多种数据结构。以下是...
win下pytorch安装—c... 安装目录一、cuda安装1.1、cuda版本选择1.2、下载安装二、cudnn安装三、pytorch...
MySQL基础-多表查询 文章目录MySQL基础-多表查询一、案例及引入1、基础概念2、笛卡尔积的理解二、多表查询的分类1、等...
keil调试专题篇 调试的前提是需要连接调试器比如STLINK。 然后点击菜单或者快捷图标均可进入调试模式。 如果前面...
MATLAB | 全网最详细网... 一篇超超超长,超超超全面网络图绘制教程,本篇基本能讲清楚所有绘制要点&#...
IHome主页 - 让你的浏览... 随着互联网的发展,人们越来越离不开浏览器了。每天上班、学习、娱乐,浏览器...
TCP 协议 一、TCP 协议概念 TCP即传输控制协议(Transmission Control ...
营业执照的经营范围有哪些 营业执照的经营范围有哪些 经营范围是指企业可以从事的生产经营与服务项目,是进行公司注册...
C++ 可变体(variant... 一、可变体(variant) 基础用法 Union的问题: 无法知道当前使用的类型是什...
血压计语音芯片,电子医疗设备声... 语音电子血压计是带有语音提示功能的电子血压计,测量前至测量结果全程语音播报࿰...
MySQL OCP888题解0... 文章目录1、原题1.1、英文原题1.2、答案2、题目解析2.1、题干解析2.2、选项解析3、知识点3...
【2023-Pytorch-检... (肆十二想说的一些话)Yolo这个系列我们已经更新了大概一年的时间,现在基本的流程也走走通了,包含数...
实战项目:保险行业用户分类 这里写目录标题1、项目介绍1.1 行业背景1.2 数据介绍2、代码实现导入数据探索数据处理列标签名异...
记录--我在前端干工地(thr... 这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 前段时间接触了Th...
43 openEuler搭建A... 文章目录43 openEuler搭建Apache服务器-配置文件说明和管理模块43.1 配置文件说明...