-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Feature][connector-v2]Add Kafka Protobuf Data Parsing Support #7361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][connector-v2]Add Kafka Protobuf Data Parsing Support #7361
Conversation
|
@Hisoka-X @hailin0 This is an example used, and I will continue to improve the relevant documentation and unit testing. Help me see where we need to optimize , thk. |
+1 for this configure way. cc @hailin0 |
|
Hi @hailin0 , Is this method OK? If so, I will add the documentation and e2e code |
| } | ||
|
|
||
| private static File createTempDirectory() throws IOException { | ||
| File tmpDir = File.createTempFile("tmp_protobuf_", "_proto"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put all protobuf file into /tmp/seatunnel/tmp_protobuf_?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put all protobuf file into
/tmp/seatunnel/tmp_protobuf_?
Protobuf needs to convert configuration content into descriptor files using Protoc.runProtoc conversion, which requires temporary directories and files. When executing the code for the first time, protobuf configuration content will be parsed into descriptor files, which will be used to parse data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this does not conflict when placed in the specified temporary path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this does not conflict when placed in the specified temporary path?
File.createTempFile("tmp_protobuf_", "_proto").getPath()
This method generates directories or files completely randomly, and its structure is as follows:
/var/folders/ys/223gp9td7dqf4nmxl9kw25lc0000gn/T/tmp_protobuf_2079053607848123529_proto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. I got it. Thanks for explain.
| File tmpDir = createTempDirectory(); | ||
| File protoFile = createProtoFile(tmpDir, protoContent); | ||
| String targetDescPath = compileProtoToDescriptor(tmpDir, protoFile); | ||
|
|
||
| try (FileInputStream fis = new FileInputStream(targetDescPath)) { | ||
| DescriptorProtos.FileDescriptorSet descriptorSet = | ||
| DescriptorProtos.FileDescriptorSet.parseFrom(fis); | ||
| Descriptors.FileDescriptor[] descriptorsArray = buildFileDescriptors(descriptorSet); | ||
| return descriptorsArray[0].findMessageTypeByName(messageName); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we delete the tmp file in finally block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we delete the tmp file in finally block?
tmpDir.deleteOnExit();
It will be automatically deleted when the program exits, without the need to call the deletion method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in cluster mode, the program may never exit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in cluster mode, the program may never exit.
But in cluster mode, the program may never exit.
You're right, I will fix it. It should indeed be deleted immediately after use
...el-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/CompileDescriptor.java
Show resolved
Hide resolved
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add e2e test case with read/writer PROTOBUF format.
| } | ||
|
|
||
| private static File createTempDirectory() throws IOException { | ||
| File tmpDir = File.createTempFile("tmp_protobuf_", "_proto"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. I got it. Thanks for explain.
@Hisoka-X Add Complete |
| new HashMap<String, String>() { | ||
| { | ||
| put(PROTOBUF_MESSAGE_NAME.key(), readonlyConfig.get(PROTOBUF_MESSAGE_NAME)); | ||
| put(PROTOBUF_SCHEMA.key(), readonlyConfig.get(PROTOBUF_SCHEMA)); | ||
| } | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user does not use protobuf, the value would be null? So what's use for?
| Console{ | ||
| source_table_name = "kafka_table" | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use assert to check data.
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @zhangshenghang !
wuchunfu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks
…e#7361) * [feature]add protobuf format * [featrue]fix style * [featrue]fix style * [feature]add protobuf * [feature]add protobuf * [feature]add protobuf * [feature]add protobuf * [feature]add license * [feature]fix some problem * [feature]fix some problem * [feature]fix some problem * [feature]fix some problem * [feature]fix some problem * [feature]merge * [improve]delete useless dependencies from known-dependencies.txt * [feature]add unit test * [feature]add e2e and uint test * [feature]add license and doc * [feature]fix some problem --------- Co-authored-by: hailin0 <[email protected]>
…e#7361) * [feature]add protobuf format * [featrue]fix style * [featrue]fix style * [feature]add protobuf * [feature]add protobuf * [feature]add protobuf * [feature]add protobuf * [feature]add license * [feature]fix some problem * [feature]fix some problem * [feature]fix some problem * [feature]fix some problem * [feature]fix some problem * [feature]merge * [improve]delete useless dependencies from known-dependencies.txt * [feature]add unit test * [feature]add e2e and uint test * [feature]add license and doc * [feature]fix some problem --------- Co-authored-by: hailin0 <[email protected]>
|
I am working on a streaming job using this feature, where I need to parse a Kafka topic containing a protobuf message with nested objects in an array. Below is the relevant portion of my configuration: hocon } transform { sink { I've tried the above configuration, but I'm not sure if this is the correct approach. Any guidance or examples of similar configurations would be greatly appreciated! |
|
@chessplay https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/source/Kafka.md |
|
@Hisoka-X I have tried struct-query ,but it does not seems to work。The following is key configuration and log,I would like to ask if this feature does not support handling nested object arrays in JSON? |
|
@chessplay Could you create an issue for this? |
|
ok,i have created an issue for this #7585 |



Purpose of this pull request
handle #7059
How was this patch tested?
Check list
New License Guide
release-note.