TDW Spark API 的妙用
用Spark做大数据计算时,免不了需要存储计算结果以遍二次计算或展示。TDW中提供了Spark里可以调用的API,使得可以方便的创建和删除表、分区,以及数据入库。需要注意的是,这些API调用操作有一定的失败概率。因此,为了保证程序的高可用性,在调用相关API时,建议加上失败重试操作。
创建TDW表
val tdwUtil = new TDWUtil("db_name")
// 判断表是否已创建
if (tdwUtil.tableExist("tb_name")) {
// 删除表
tdwUtil.dropTable("tb_name")
}
// 表的字段信息
val cols = Seq(Array("column_name", "column_type", "column_comment"))
val tbDesc = new TableDesc()
.setTblName("tb_name")
.setCols(cols)
.setComment("table comment")
.setPartType("list")
.setPartField(cols.head(0)) // 一般以第一个字段作为分区字段
.setCompress(true)
// 创建表
tdwUtil.createTable(tbDesc)
创建分区
val tdwUtil = new TDWUtil("db_name")
// 判断分区是否已创建
if (tdwUtil.partitionExist("tb_name", "par_name")) {
// 删除分区
tdwUtil.dropPartition ("tb_name", "par_name")
}
// 创建分区
tdwUtil.createListPartition("tb_name", "par_name", "par_value1,par_value2")
RDD入库到TDW
RDD可以直接调用TDWProvider.saveToTable来入库,但比较麻烦的是RDD类型必须是RDD[Array[String]]。因此,如果RDD里某些字段不是String类型的话,需要先转为String类型,再调用此方法。
另一种更优雅的方式是,根据表结构将RDD转为DataFrame,然后直接入库,实现代码如下:
val tdwSqlProvider = new TDWSQLProvider(spark, "db_name")
// 获取表结构
val schema = tdwSqlProvider.table("tb_name").schema
// 将RDD转为DataFrame
val df = spark.createDataFrame(rdd, schema).coalesce(1)
// 入库
tdwSqlProvider.saveToTable(df, "tb_name", "par_name", overwrite = false) // overwrite为false时,保留原分区数据
coalesce(n)将DataFrame重新分区,可以减少入库的文件数,提高取数据时的效率。
Tips
默认情况下,TDWUtil获取某个表的信息之后,会将其(比如表结构、分区)缓存下来,再次拉取会给出缓存中的信息。因此,如果需要周期性获取TDW表的信息,可以加上参数解决:spark.tdw.meta.forceRequest =true