这一次我们说一下如何在Flink中使用SQL相关的编程

因为SQL是一个长期有效的,支持范围广的开发语言,所以Flink将SQL融入到了Flink中,对于其的使用,我们将在下面进行讲述

首先是Maven的相关依赖

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table-common</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-csv</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>de.unkrig.jdisasm</groupId>

<artifactId>jdisasm</artifactId>

<version>1.0.6</version>

</dependency>

拥有了基本的依赖之后,我们可以看下基本的编程模板

首先创建一个环境

分为了流处理和批处理

首先是流处理

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();

StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

然后是批处理

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();

BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

我们还可以给环境设置一些基本的元数据,诸如catalog,数据库

FsEnv.useCatalog(“custom_catalog”)

FsEnv.useDatabase(“custom_database”)

然后是创建一张表

这里可以直接从datasouce中获取,创建一个虚拟视图,进行查询

FsEnv. createTemporaryTable(“test”,datasouce,”id”)

或者直接创建一个Table对象

Table table = FsEnv.fromDataStream(datasouce);

或者将外部的数据源注册一下

FsEnv. connect()

无论哪种方式,都是推荐将表的元数据填入一下

诸如,withFormat withSchema

其中Schema的格式如下

new Schema()

.field(“id”, DataTypes.STRING())

.field(“ts”, DataTypes.BIGINT())

.field(“vc”, DataTypes.BIGINT())

对于拥有表之后,我们可以对其进行查询

查询则分为两种,分为了Table和Sql

对于table的方式,则是通过一步步的操作,将table作为一个对象进行查询的

Table.select(“id,name”).where(“id=’1001’”)

而sql则是实现了标准的sql语法

Table.sqlQuery

(“select * from table where id = ‘1001’”)

首先是创建一个TableEnviroment的对象

TableEnviroment tableEnv =

然后创建一个数据来源表

TableEnv.connect(…).createTemporaryTable(“table1”)

然后注册一个外部的表

TableEnv.connect(…).createTemporaryTable(“outputTable”)

通过Table API的查询创建一个Table 对象

Table tapiResult = tableEnv.from(“table1”).select(…);

通过SQL查询的查询创建一个Table 对象

Table sqlResult  = tableEnv.sqlQuery(“SELECT … FROM table1 … “);

将结果写入TableSink

tapiResult.insertInto(“outputTable”);

执行

tableEnv.execute(“java_job”);

对于表的格式化输出,可以考虑写到TableSink,TableSink是一个支持多种文件输出格式,诸如CSV Parquet等,多种外部存储系统 JDBC HBase Cassandra Elasticsearch,以及多种消息队列Kafka Rabbit MQ等

我们拿一个简单的File输出作为案例

首先是创建一个输出表,并指定Schema

final Schema schema = new Schema()

.field(“a”, DataTypes.INT())

.field(“b”, DataTypes.STRING())

.field(“c”, DataTypes.LONG());

tableEnv.connect(new FileSystem(“/path/to/file”))

.withFormat(new Csv().fieldDelimiter(‘|’).deriveSchema())

.withSchema(schema)

.createTemporaryTable(“CsvSinkTable”);

然后输出出去

Table.executeInsert(“outputTable”)

发表评论

邮箱地址不会被公开。 必填项已用*标注