快捷搜索:  汽车  科技

flink函数join和connect(Flink自定义实现OSS)

flink函数join和connect(Flink自定义实现OSS)import com.aliyun.oss.OSS; import com.aliyun.oss.OSSClientBuilder; import com.aliyun.oss.model.OSSObject; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; imp

导读:本文将讨论如何实现一个 Flink OSS Table Source 读取阿里云 OSS 上的目标 CSV 文件。

类设计思路

flink函数join和connect(Flink自定义实现OSS)(1)

上图描述了 table connectors 从 API 中的纯声明到将在集群上执行的运行时代码的通用架构。官网在 User-defined Sources & Sinks | Apache Flink 这篇文章中对其进行了详细描述,并给出了一个自定义 socket Connector 实现例子,笔者这里就不再赘述。结合上面的信息,我们在实现 OSS Table Source 需要编写以下三个类

  • OssTableConnecterFactory
  • OssDynamicTableSource
  • OssSourceFunction
SQL 设计思路
  1. 根据文档 Java - 对象存储 OSS - 阿里云 的描述,要读取 OSS 上的文件需要有 endpoint、access-key-id、access-key-secret、bucket-name、object-name 这几个参数。
  2. 为了使 OSS Table Source 具有可拓展性并能够解析多种数据类型,笔者便让连接器支持 format 属性。Flink 在接口设计上支持 Format 自定义,自定义的方式与自定义 Connector 相似,Connector 可根据 format 属性找到对应的 Format 对数据进行解析转换。Flink 已提供了部分内建 Format Formats | Apache Flink ,不满足需求时可自定义 Format。笔者认为这个设计非常棒,不但定义好的 Format 可被多个 Connector 共用、对于 Format 定义有一套规范的接口易于实现,而且指定 format 后可额外指定该 format 其他可配置参数使之能够适应更多解析场景。

综合上面的思路设计出的 OSS Table Source SQL 如下:

CREATE TABLE socketSource ( name STRING score INT) WITH ( 'connector' = 'oss' 'endpoint' = 'XXXXXXXXXXXX' 'ACCESS-key-id' = 'XXXXXXXXXXXX' 'access-key-secret' = 'XXXXXXXXXXXX' 'bucket-name' = 'XXXXXXXXXXXX' 'object-name' = 'XXXXXXXXXXXX' 'format' = 'csv', 'csv.field-delimiter' = '|' );文件读取设计思路

考虑到存放于 OSS 上的文件数据量可能很大这会导致文件下载时间很长、占用内存很多的等问题,因此决定采用 流式下载 - 对象存储 OSS - 阿里云 流式读取进行。

这里考虑到数据以流的形式到达 Sink 端,如果每到达一条就执行一次 Insert 操作,那么频繁的 I/O 性能是比较差的。这里解决的思路不在于 OSS Table Source 端,而是 Sink 端需支持数据的缓存,就如 JDBC | Apache Flink 的 sink.buffer-flush.max-rows 和 sink.buffer-flush.interval 功能一样便可以解决此问题。

依赖

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.15.0</version> <scope>provided</scope> </dependency> <!-- 引入 CSV Format --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.15.0</version> </dependency>代码实现

OssTableConnecterFactory

import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DeserializationFormatFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; import java.util.Collections; import java.util.HashSet; import java.util.Set; /** * OssTableConnecterFactory * * @author wxb * @date 2022-05-23 */ public class OssTableConnecterFactory implements DynamicTableSourceFactory { public static final ConfigOption<String> ENDPOINT = ConfigOptions.key("endpoint") .stringType() .noDefaultValue(); public static final ConfigOption<String> ACCESS_KEY_ID = ConfigOptions.key("access-key-id") .stringType() .noDefaultValue(); public static final ConfigOption<String> ACCESS_KEY_SECRET = ConfigOptions.key("access-key-secret") .StringType() .noDefaultValue(); public static final ConfigOption<String> BUCKET_NAME = ConfigOptions.key("bucket-name") .stringType() .noDefaultValue(); public static final ConfigOption<String> OBJECT_NAME = ConfigOptions.key("object-name") .stringType() .noDefaultValue(); @Override public String factoryIdentifier() { return "oss"; } @Override public Set<ConfigOption<?>> requiredOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(ENDPOINT); options.add(ACCESS_KEY_ID); options.add(ACCESS_KEY_SECRET); options.add(BUCKET_NAME); options.add(OBJECT_NAME); options.add(FactoryUtil.FORMAT); return options; } @Override public Set<ConfigOption<?>> optionalOptions() { return Collections.emptySet(); } @Override public DynamicTableSource createDynamicTableSource(Context context) { // helper 封装好了一些在实现自定义 Connector/Format 的通用方法 final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this context); // 根据 format 属性找到对应的 Format final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat( DeserializationFormatFactory.class FactoryUtil.FORMAT); // 验证配置参数 helper.validate(); // 读取配置 final ReadableConfig options = helper.getOptions(); final String endpoint = options.get(ENDPOINT); final String accessKeyId = options.get(ACCESS_KEY_ID); final String accessKeySecret = options.get(ACCESS_KEY_SECRET); final String bucketName = options.get(BUCKET_NAME); final String objectName = options.get(OBJECT_NAME); // 获取字段类型列表 final DataType producedDataType = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(); // 构建 OssDynamicTableSource return new OssDynamicTableSource(endpoint accessKeyId accessKeySecret bucketName objectName decodingFormat producedDataType); } }

OssDynamicTableSource

import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; /** * OssDynamicTableSource * * @author weixubin * @date 2022-95-24 */ public class OssDynamicTableSource implements ScanTableSource { private final String endpoint; private final String accessKeyId; private final String accessKeySecret; private final String bucketName; private final String objectName; private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat; private final DataType producedDataType; public OssDynamicTableSource( String endpoint String accessKeyId String accessKeySecret String bucketName String objectName DecodingFormat<DeserializationSchema<RowData>> decodingFormat DataType producedDataType) { this.endpoint = endpoint; this.accessKeyId = accessKeyId; this.accessKeySecret = accessKeySecret; this.bucketName = bucketName; this.objectName = objectName; this.decodingFormat = decodingFormat; this.producedDataType = producedDataType; } @Override public ChangelogMode getChangelogMode() { return decodingFormat.getChangelogMode(); } @Override public ScanruntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { // 创建发布到集群的运行时类 final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder( runtimeProviderContext producedDataType); final SourceFunction<RowData> sourceFunction = new OssSourceFunction( endpoint accessKeyId accessKeySecret bucketName objectName deserializer); return SourceFunctionProvider.of(sourceFunction true); } @Override public DynamicTableSource copy() { return new OssDynamicTableSource(endpoint accessKeyId accessKeySecret bucketName objectName decodingFormat producedDataType); } @Override public String asSummaryString() { return "OSS Table Source"; } }

OssSourceFunction

import com.aliyun.oss.OSS; import com.aliyun.oss.OSSClientBuilder; import com.aliyun.oss.model.OSSObject; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.table.data.RowData; import java.io.BufferedReader; import java.io.InputStreamReader; /** * OssSourceFunction * * @author wxb * @date 2022-05-23 */ public class OssSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> { private final String endpoint; private final String accessKeyId; private final String accessKeySecret; private final String bucketName; private final String objectName; private final DeserializationSchema<RowData> deserializer; private OSS currentOssClient; public OssSourceFunction(String endpoint String accessKeyId String accessKeySecret String bucketName String objectName DeserializationSchema<RowData> deserializer) { this.endpoint = endpoint; this.accessKeyId = accessKeyId; this.accessKeySecret = accessKeySecret; this.bucketName = bucketName; this.objectName = objectName; this.deserializer = deserializer; } @Override public TypeInformation<RowData> getProducedType() { return deserializer.getProducedType(); } @Override public void open(Configuration parameters) throws Exception { deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter( getRuntimeContext())); currentOssClient = new OSSClientBuilder().build(endpoint accessKeyId accessKeySecret); } @Override public void run(SourceContext<RowData> ctx) throws Exception { OSSObject ossObject = null; BufferedReader reader = null; try { ossObject = currentOssClient.getObject(bucketName objectName); reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent())); while (true) { // 采用流读的方式 String line = reader.readLine(); if (line == null) { break; } else { // 根据指定的 Format 进行分析 ctx.collect(deserializer.deserialize(line.getBytes())); } } } catch (Throwable t) { t.printStackTrace(); } finally { if (reader != null) { reader.close(); } if (ossObject != null) { ossObject.close(); } } } @Override public void cancel() { try { currentOssClient.shutdown(); } catch (Throwable t) { // ignore } } }

注意:需要在 META-INF/services/org.apache.flink.table.factories.Factory 加入 OssTableConnecterFactory

测试

这里我们用一个简单的 python 脚本来生成一个具有 100W 行数据的 CSV 文件,每行数据具有 name 和 age 两列。

import csv import random def write_csv(s): with open('E:\\test\\flink_oss_test_bigdata.csv' 'a' newline='') as f: xieru = csv.writer(f dialect='excel') xieru.writerows(s) print('写入完成') count = 1000000 list = [] for i in range(count): # 随机生成指定字符 name = ''.join(random.sample(['z' 'y' 'x' 'w' 'v' 'u' 't' 's' 'r' 'q' 'p' 'o' 'n' 'm' 'l' 'k' 'j' 'i' 'h' 'g' 'f' 'e' 'd' 'c' 'b' 'a'] 3)) # 随机生成数字 age = random.randint(1 100) tup = (name age) list.append(tup) write_csv(list)

SQL 编写,从 OSS 读取 CSV 文件并写入到 MySQL 中,MYSQL 设定为 5000 条刷新缓冲

CREATE TABLE sourceTable ( name STRING age INT) WITH ( 'connector' = 'oss' 'endpoint' = 'XXXXXXXXXXXXXXXXX' 'access-key-id' = 'XXXXXXXXXXXXXXXXX' 'access-key-secret' = 'XXXXXXXXXXXXXXXXX' 'bucket-name' = 'XXXXXXXXXXXXXXXXX' 'object-name' = 'XXXXXXXXXXXXXXXXX' 'format' = 'csv' ); CREATE TABLE sinkTable ( name VARCHAR age INT ) WITH ( 'connector' = 'jdbc' 'url' = 'jdbc:mysql://XXXXXXX:3306/test?useSSL=false&rewriteBatchedStatements=true' 'table-name' = 'csv_message' 'driver' = 'com.mysql.cj.jdbc.Driver' 'username' = 'XXXXX' 'password' = 'XXXXX' 'sink.buffer-flush.interval'='5s' 'sink.buffer-flush.max-rows' = '5000'); INSERT INTO sinkTable(name age) SELECT name age FROM sourceTable;

成功运行

flink函数join和connect(Flink自定义实现OSS)(2)

最后

以上就是笔者自定义实现 Flink OSS Table Source 的过程及思路,这个 Source 实现得比较简单还有较多地方可进行拓展。这个简单的案例希望对各位了解 Flink 自定义 Connector 有所帮助。

感谢您的阅读,如果喜欢本文欢迎关注和转发,转载需注明出处,本头条号将持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。

猜您喜欢: