这一次我们说一下如何在Flink中使用SQL相关的编程
因为SQL是一个长期有效的,支持范围广的开发语言,所以Flink将SQL融入到了Flink中,对于其的使用,我们将在下面进行讲述
首先是Maven的相关依赖
<dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>de.unkrig.jdisasm</groupId> <artifactId>jdisasm</artifactId> <version>1.0.6</version> </dependency> |
拥有了基本的依赖之后,我们可以看下基本的编程模板
首先创建一个环境
分为了流处理和批处理
首先是流处理
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); |
然后是批处理
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv); |
我们还可以给环境设置一些基本的元数据,诸如catalog,数据库
FsEnv.useCatalog(“custom_catalog”)
FsEnv.useDatabase(“custom_database”)
然后是创建一张表
这里可以直接从datasouce中获取,创建一个虚拟视图,进行查询
FsEnv. createTemporaryTable(“test”,datasouce,”id”)
或者直接创建一个Table对象
Table table = FsEnv.fromDataStream(datasouce);
或者将外部的数据源注册一下
FsEnv. connect()
无论哪种方式,都是推荐将表的元数据填入一下
诸如,withFormat withSchema
其中Schema的格式如下
new Schema()
.field(“id”, DataTypes.STRING()) .field(“ts”, DataTypes.BIGINT()) .field(“vc”, DataTypes.BIGINT()) |
对于拥有表之后,我们可以对其进行查询
查询则分为两种,分为了Table和Sql
对于table的方式,则是通过一步步的操作,将table作为一个对象进行查询的
Table.select(“id,name”).where(“id=’1001’”)
而sql则是实现了标准的sql语法
Table.sqlQuery
(“select * from table where id = ‘1001’”)
首先是创建一个TableEnviroment的对象
TableEnviroment tableEnv =
然后创建一个数据来源表
TableEnv.connect(…).createTemporaryTable(“table1”)
然后注册一个外部的表
TableEnv.connect(…).createTemporaryTable(“outputTable”)
通过Table API的查询创建一个Table 对象
Table tapiResult = tableEnv.from(“table1”).select(…);
通过SQL查询的查询创建一个Table 对象
Table sqlResult = tableEnv.sqlQuery(“SELECT … FROM table1 … “);
将结果写入TableSink
tapiResult.insertInto(“outputTable”);
执行
tableEnv.execute(“java_job”);
对于表的格式化输出,可以考虑写到TableSink,TableSink是一个支持多种文件输出格式,诸如CSV Parquet等,多种外部存储系统 JDBC HBase Cassandra Elasticsearch,以及多种消息队列Kafka Rabbit MQ等
我们拿一个简单的File输出作为案例
首先是创建一个输出表,并指定Schema
final Schema schema = new Schema()
.field(“a”, DataTypes.INT()) .field(“b”, DataTypes.STRING()) .field(“c”, DataTypes.LONG()); |
tableEnv.connect(new FileSystem(“/path/to/file”))
.withFormat(new Csv().fieldDelimiter(‘|’).deriveSchema()) .withSchema(schema) .createTemporaryTable(“CsvSinkTable”); |
然后输出出去
Table.executeInsert(“outputTable”)