flink函数join和connect(Flink自定义实现OSS)
flink函数join和connect(Flink自定义实现OSS)import com.aliyun.oss.OSS; import com.aliyun.oss.OSSClientBuilder; import com.aliyun.oss.model.OSSObject; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; imp
导读:本文将讨论如何实现一个 Flink OSS Table Source 读取阿里云 OSS 上的目标 CSV 文件。
类设计思路上图描述了 table connectors 从 API 中的纯声明到将在集群上执行的运行时代码的通用架构。官网在 User-defined Sources & Sinks | Apache Flink 这篇文章中对其进行了详细描述,并给出了一个自定义 socket Connector 实现例子,笔者这里就不再赘述。结合上面的信息,我们在实现 OSS Table Source 需要编写以下三个类
- OssTableConnecterFactory
- OssDynamicTableSource
- OssSourceFunction
- 根据文档 Java - 对象存储 OSS - 阿里云 的描述,要读取 OSS 上的文件需要有 endpoint、access-key-id、access-key-secret、bucket-name、object-name 这几个参数。
- 为了使 OSS Table Source 具有可拓展性并能够解析多种数据类型,笔者便让连接器支持 format 属性。Flink 在接口设计上支持 Format 自定义,自定义的方式与自定义 Connector 相似,Connector 可根据 format 属性找到对应的 Format 对数据进行解析转换。Flink 已提供了部分内建 Format Formats | Apache Flink ,不满足需求时可自定义 Format。笔者认为这个设计非常棒,不但定义好的 Format 可被多个 Connector 共用、对于 Format 定义有一套规范的接口易于实现,而且指定 format 后可额外指定该 format 其他可配置参数使之能够适应更多解析场景。
综合上面的思路设计出的 OSS Table Source SQL 如下:
CREATE TABLE socketSource (
name STRING
score INT)
WITH (
'connector' = 'oss'
'endpoint' = 'XXXXXXXXXXXX'
'ACCESS-key-id' = 'XXXXXXXXXXXX'
'access-key-secret' = 'XXXXXXXXXXXX'
'bucket-name' = 'XXXXXXXXXXXX'
'object-name' = 'XXXXXXXXXXXX'
'format' = 'csv',
'csv.field-delimiter' = '|'
);
文件读取设计思路
考虑到存放于 OSS 上的文件数据量可能很大这会导致文件下载时间很长、占用内存很多的等问题,因此决定采用 流式下载 - 对象存储 OSS - 阿里云 流式读取进行。
这里考虑到数据以流的形式到达 Sink 端,如果每到达一条就执行一次 Insert 操作,那么频繁的 I/O 性能是比较差的。这里解决的思路不在于 OSS Table Source 端,而是 Sink 端需支持数据的缓存,就如 JDBC | Apache Flink 的 sink.buffer-flush.max-rows 和 sink.buffer-flush.interval 功能一样便可以解决此问题。
依赖<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
</dependency>
<!-- 引入 CSV Format -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.15.0</version>
</dependency>
代码实现
OssTableConnecterFactory
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* OssTableConnecterFactory
*
* @author wxb
* @date 2022-05-23
*/
public class OssTableConnecterFactory implements DynamicTableSourceFactory {
public static final ConfigOption<String> ENDPOINT = ConfigOptions.key("endpoint")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> ACCESS_KEY_ID = ConfigOptions.key("access-key-id")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> ACCESS_KEY_SECRET = ConfigOptions.key("access-key-secret")
.StringType()
.noDefaultValue();
public static final ConfigOption<String> BUCKET_NAME = ConfigOptions.key("bucket-name")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> OBJECT_NAME = ConfigOptions.key("object-name")
.stringType()
.noDefaultValue();
@Override
public String factoryIdentifier() {
return "oss";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(ENDPOINT);
options.add(ACCESS_KEY_ID);
options.add(ACCESS_KEY_SECRET);
options.add(BUCKET_NAME);
options.add(OBJECT_NAME);
options.add(FactoryUtil.FORMAT);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// helper 封装好了一些在实现自定义 Connector/Format 的通用方法
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this context);
// 根据 format 属性找到对应的 Format
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class
FactoryUtil.FORMAT);
// 验证配置参数
helper.validate();
// 读取配置
final ReadableConfig options = helper.getOptions();
final String endpoint = options.get(ENDPOINT);
final String accessKeyId = options.get(ACCESS_KEY_ID);
final String accessKeySecret = options.get(ACCESS_KEY_SECRET);
final String bucketName = options.get(BUCKET_NAME);
final String objectName = options.get(OBJECT_NAME);
// 获取字段类型列表
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
// 构建 OssDynamicTableSource
return new OssDynamicTableSource(endpoint accessKeyId accessKeySecret bucketName objectName decodingFormat producedDataType);
}
}
OssDynamicTableSource
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
/**
* OssDynamicTableSource
*
* @author weixubin
* @date 2022-95-24
*/
public class OssDynamicTableSource implements ScanTableSource {
private final String endpoint;
private final String accessKeyId;
private final String accessKeySecret;
private final String bucketName;
private final String objectName;
private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
private final DataType producedDataType;
public OssDynamicTableSource(
String endpoint
String accessKeyId
String accessKeySecret
String bucketName
String objectName
DecodingFormat<DeserializationSchema<RowData>> decodingFormat
DataType producedDataType) {
this.endpoint = endpoint;
this.accessKeyId = accessKeyId;
this.accessKeySecret = accessKeySecret;
this.bucketName = bucketName;
this.objectName = objectName;
this.decodingFormat = decodingFormat;
this.producedDataType = producedDataType;
}
@Override
public ChangelogMode getChangelogMode() {
return decodingFormat.getChangelogMode();
}
@Override
public ScanruntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
// 创建发布到集群的运行时类
final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
runtimeProviderContext
producedDataType);
final SourceFunction<RowData> sourceFunction = new OssSourceFunction(
endpoint
accessKeyId
accessKeySecret
bucketName
objectName
deserializer);
return SourceFunctionProvider.of(sourceFunction true);
}
@Override
public DynamicTableSource copy() {
return new OssDynamicTableSource(endpoint accessKeyId accessKeySecret bucketName objectName decodingFormat producedDataType);
}
@Override
public String asSummaryString() {
return "OSS Table Source";
}
}
OssSourceFunction
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.model.OSSObject;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
* OssSourceFunction
*
* @author wxb
* @date 2022-05-23
*/
public class OssSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
private final String endpoint;
private final String accessKeyId;
private final String accessKeySecret;
private final String bucketName;
private final String objectName;
private final DeserializationSchema<RowData> deserializer;
private OSS currentOssClient;
public OssSourceFunction(String endpoint String accessKeyId String accessKeySecret String bucketName String objectName DeserializationSchema<RowData> deserializer) {
this.endpoint = endpoint;
this.accessKeyId = accessKeyId;
this.accessKeySecret = accessKeySecret;
this.bucketName = bucketName;
this.objectName = objectName;
this.deserializer = deserializer;
}
@Override
public TypeInformation<RowData> getProducedType() {
return deserializer.getProducedType();
}
@Override
public void open(Configuration parameters) throws Exception {
deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(
getRuntimeContext()));
currentOssClient = new OSSClientBuilder().build(endpoint accessKeyId accessKeySecret);
}
@Override
public void run(SourceContext<RowData> ctx) throws Exception {
OSSObject ossObject = null;
BufferedReader reader = null;
try {
ossObject = currentOssClient.getObject(bucketName objectName);
reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()));
while (true) {
// 采用流读的方式
String line = reader.readLine();
if (line == null) {
break;
} else {
// 根据指定的 Format 进行分析
ctx.collect(deserializer.deserialize(line.getBytes()));
}
}
} catch (Throwable t) {
t.printStackTrace();
} finally {
if (reader != null) {
reader.close();
}
if (ossObject != null) {
ossObject.close();
}
}
}
@Override
public void cancel() {
try {
currentOssClient.shutdown();
} catch (Throwable t) {
// ignore
}
}
}
注意:需要在 META-INF/services/org.apache.flink.table.factories.Factory 加入 OssTableConnecterFactory
测试这里我们用一个简单的 python 脚本来生成一个具有 100W 行数据的 CSV 文件,每行数据具有 name 和 age 两列。
import csv
import random
def write_csv(s):
with open('E:\\test\\flink_oss_test_bigdata.csv' 'a' newline='') as f:
xieru = csv.writer(f dialect='excel')
xieru.writerows(s)
print('写入完成')
count = 1000000
list = []
for i in range(count):
# 随机生成指定字符
name = ''.join(random.sample(['z' 'y' 'x' 'w' 'v' 'u' 't' 's' 'r' 'q' 'p' 'o' 'n' 'm' 'l' 'k' 'j' 'i' 'h' 'g' 'f' 'e' 'd' 'c' 'b' 'a'] 3))
# 随机生成数字
age = random.randint(1 100)
tup = (name age)
list.append(tup)
write_csv(list)
SQL 编写,从 OSS 读取 CSV 文件并写入到 MySQL 中,MYSQL 设定为 5000 条刷新缓冲
CREATE TABLE sourceTable (
name STRING
age INT)
WITH (
'connector' = 'oss'
'endpoint' = 'XXXXXXXXXXXXXXXXX'
'access-key-id' = 'XXXXXXXXXXXXXXXXX'
'access-key-secret' = 'XXXXXXXXXXXXXXXXX'
'bucket-name' = 'XXXXXXXXXXXXXXXXX'
'object-name' = 'XXXXXXXXXXXXXXXXX'
'format' = 'csv'
);
CREATE TABLE sinkTable (
name VARCHAR
age INT
)
WITH (
'connector' = 'jdbc'
'url' = 'jdbc:mysql://XXXXXXX:3306/test?useSSL=false&rewriteBatchedStatements=true'
'table-name' = 'csv_message'
'driver' = 'com.mysql.cj.jdbc.Driver'
'username' = 'XXXXX'
'password' = 'XXXXX'
'sink.buffer-flush.interval'='5s'
'sink.buffer-flush.max-rows' = '5000');
INSERT INTO sinkTable(name age) SELECT name age FROM sourceTable;
成功运行
最后以上就是笔者自定义实现 Flink OSS Table Source 的过程及思路,这个 Source 实现得比较简单还有较多地方可进行拓展。这个简单的案例希望对各位了解 Flink 自定义 Connector 有所帮助。
感谢您的阅读,如果喜欢本文欢迎关注和转发,转载需注明出处,本头条号将持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。