TDW Spark API 的妙用

用Spark做大数据计算时,免不了需要存储计算结果以遍二次计算或展示。TDW中提供了Spark里可以调用的API,使得可以方便的创建和删除表、分区,以及数据入库。需要注意的是,这些API调用操作有一定的失败概率。因此,为了保证程序的高可用性,在调用相关API时,建议加上失败重试操作。

创建TDW表

val tdwUtil = new TDWUtil("db_name")

// 判断表是否已创建
if (tdwUtil.tableExist("tb_name")) {
  // 删除表
  tdwUtil.dropTable("tb_name")
}

// 表的字段信息
val cols = Seq(Array("column_name", "column_type", "column_comment"))
val tbDesc = new TableDesc()
  .setTblName("tb_name")
  .setCols(cols)
  .setComment("table comment")
  .setPartType("list")
  .setPartField(cols.head(0))  // 一般以第一个字段作为分区字段
  .setCompress(true)

// 创建表
tdwUtil.createTable(tbDesc)

创建分区

val tdwUtil = new TDWUtil("db_name")

// 判断分区是否已创建
if (tdwUtil.partitionExist("tb_name", "par_name")) {
  // 删除分区
  tdwUtil.dropPartition ("tb_name", "par_name")
}

// 创建分区
tdwUtil.createListPartition("tb_name", "par_name", "par_value1,par_value2")

RDD入库到TDW

RDD可以直接调用TDWProvider.saveToTable来入库,但比较麻烦的是RDD类型必须是RDD[Array[String]]。因此,如果RDD里某些字段不是String类型的话,需要先转为String类型,再调用此方法。

另一种更优雅的方式是,根据表结构将RDD转为DataFrame,然后直接入库,实现代码如下:

val tdwSqlProvider = new TDWSQLProvider(spark, "db_name")

// 获取表结构
val schema = tdwSqlProvider.table("tb_name").schema
// 将RDD转为DataFrame
val df = spark.createDataFrame(rdd, schema).coalesce(1)

// 入库
tdwSqlProvider.saveToTable(df, "tb_name", "par_name", overwrite = false)  // overwrite为false时,保留原分区数据

coalesce(n)将DataFrame重新分区,可以减少入库的文件数,提高取数据时的效率。

Tips

默认情况下,TDWUtil获取某个表的信息之后,会将其(比如表结构、分区)缓存下来,再次拉取会给出缓存中的信息。因此,如果需要周期性获取TDW表的信息,可以加上参数解决:spark.tdw.meta.forceRequest =true

results matching ""

    No results matching ""