Skip to content

Conversation

@zhangshenghang
Copy link
Member

Purpose of this pull request

handle #7059

How was this patch tested?

Check list

@github-actions github-actions bot added dependencies Pull requests that update a dependency file connectors-v2 e2e format kafka labels Aug 10, 2024
@zhangshenghang
Copy link
Member Author

@Hisoka-X @hailin0
By configuring the protobuf data structure, we can successfully receive and send related messages.

This is an example used, and I will continue to improve the relevant documentation and unit testing.

Help me see where we need to optimize , thk.

# Defining the runtime environment
env {
    parallelism = 1
    job.mode = "STREAMING"
    flink.execution.checkpointing.interval=5000
     flink.execution.restart.strategy = failure-rate
     flink.execution.restart.failureInterval = 60000
     flink.execution.restart.failureRate = 100
     flink.execution.restart.delayInterval = 10000
   # execution.restart.strategy = fixed-delay
   # execution.restart.attempts = 11
   # execution.restart.delayBetweenAttempts = 10000
}
source {
  Kafka {
    schema = {
      fields {
        name = string
        id = int
        email = string
        Address {
            city = string
            state = string
            street = string
            }
        attributes = "map<string,float>"
        }
    }
    topic = "test"
    format = protobuf
    protobuf_message_name = Person
    protobuf_schema = """
              syntax = "proto3";

              package wiki.hadoop.protobuf;

              option java_outer_classname = "Test2";

              message Person {
                string name = 1;
                int32 id = 2;
                string email = 3;

                message Address {
                  string street = 1;
                  string city = 2;
                  string state = 3;
                  string zip = 4;
                }

                Address address = 4;

                map<string, float> attributes = 5;

                repeated string phone_numbers = 6;
              }
    """
    consumer.group	= "test-202301011"
    bootstrap.servers = "172.16.24.194:9092"
    kafka.config = {
      client.id = client_1
      max.poll.records = 500
      auto.offset.reset = "latest"
      enable.auto.commit = "false"
    }
    result_table_name = "kafka_table"
  }
}
transform {
  Sql {
    source_table_name = "kafka_table"
    result_table_name = "kafka_table2"
    query = "select name, Address.state ,Address.city ,attributes.test1 from kafka_table"
  }
}


sink {

    Console{
    source_table_name = "kafka_table2"
    }

}
# Defining the runtime environment
env {
    parallelism = 1
    job.mode = "STREAMING"
    flink.execution.checkpointing.interval=5000
     flink.execution.restart.strategy = failure-rate
     flink.execution.restart.failureInterval = 60000
     flink.execution.restart.failureRate = 100
     flink.execution.restart.delayInterval = 10000

}
source {
   FakeSource {
      parallelism = 1
      result_table_name = "fake"
      row.num = 16
      schema = {
        fields {
                  name = string
                  id = int
                  email = string
                  Address {
                      city = string
                      state = string
                      street = string
                      }
                  attributes = "map<string,float>"
                  phone_numbers = "array<string>"
        }
      }
    }
}

sink {
  Console{}
  kafka {
      topic = "test_topic"
      bootstrap.servers = "172.16.24.194:9092"
      format = protobuf
      kafka.request.timeout.ms = 60000
#       semantics = EXACTLY_ONCE
      kafka.config = {
        acks = "all"
        request.timeout.ms = 60000
        buffer.memory = 33554432
      }
      protobuf_message_name = Person
      protobuf_schema = """
              syntax = "proto3";

              package wiki.hadoop.protobuf;

              option java_outer_classname = "Test2";

              message Person {
                string name = 1;
                int32 id = 2;
                string email = 3;

                message Address {
                  string street = 1;
                  string city = 2;
                  string state = 3;
                  string zip = 4;
                }

                Address address = 4;

                map<string, float> attributes = 5;

                repeated string phone_numbers = 6;
              }
              """
  }
}

@Hisoka-X
Copy link
Member

protobuf_schema = """
              syntax = "proto3";

              package wiki.hadoop.protobuf;

              option java_outer_classname = "Test2";

              message Person {
                string name = 1;
                int32 id = 2;
                string email = 3;

                message Address {
                  string street = 1;
                  string city = 2;
                  string state = 3;
                  string zip = 4;
                }

                Address address = 4;

                map<string, float> attributes = 5;

                repeated string phone_numbers = 6;
              }
              """

+1 for this configure way. cc @hailin0

@Hisoka-X Hisoka-X linked an issue Aug 10, 2024 that may be closed by this pull request
3 tasks
@zhangshenghang
Copy link
Member Author

Hi @hailin0 , Is this method OK? If so, I will add the documentation and e2e code

}

private static File createTempDirectory() throws IOException {
File tmpDir = File.createTempFile("tmp_protobuf_", "_proto");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put all protobuf file into /tmp/seatunnel/tmp_protobuf_?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put all protobuf file into /tmp/seatunnel/tmp_protobuf_?

Protobuf needs to convert configuration content into descriptor files using Protoc.runProtoc conversion, which requires temporary directories and files. When executing the code for the first time, protobuf configuration content will be parsed into descriptor files, which will be used to parse data

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this does not conflict when placed in the specified temporary path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this does not conflict when placed in the specified temporary path?

File.createTempFile("tmp_protobuf_", "_proto").getPath()
This method generates directories or files completely randomly, and its structure is as follows:
/var/folders/ys/223gp9td7dqf4nmxl9kw25lc0000gn/T/tmp_protobuf_2079053607848123529_proto

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I got it. Thanks for explain.

Comment on lines 42 to 51
File tmpDir = createTempDirectory();
File protoFile = createProtoFile(tmpDir, protoContent);
String targetDescPath = compileProtoToDescriptor(tmpDir, protoFile);

try (FileInputStream fis = new FileInputStream(targetDescPath)) {
DescriptorProtos.FileDescriptorSet descriptorSet =
DescriptorProtos.FileDescriptorSet.parseFrom(fis);
Descriptors.FileDescriptor[] descriptorsArray = buildFileDescriptors(descriptorSet);
return descriptorsArray[0].findMessageTypeByName(messageName);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we delete the tmp file in finally block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we delete the tmp file in finally block?

tmpDir.deleteOnExit();

It will be automatically deleted when the program exits, without the need to call the deletion method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in cluster mode, the program may never exit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in cluster mode, the program may never exit.

But in cluster mode, the program may never exit.

You're right, I will fix it. It should indeed be deleted immediately after use

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add e2e test case with read/writer PROTOBUF format.

}

private static File createTempDirectory() throws IOException {
File tmpDir = File.createTempFile("tmp_protobuf_", "_proto");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I got it. Thanks for explain.

@zhangshenghang
Copy link
Member Author

Please add e2e test case with read/writer PROTOBUF format.

@Hisoka-X Add Complete

Comment on lines 222 to 227
new HashMap<String, String>() {
{
put(PROTOBUF_MESSAGE_NAME.key(), readonlyConfig.get(PROTOBUF_MESSAGE_NAME));
put(PROTOBUF_SCHEMA.key(), readonlyConfig.get(PROTOBUF_SCHEMA));
}
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user does not use protobuf, the value would be null? So what's use for?

Comment on lines 69 to 71
Console{
source_table_name = "kafka_table"
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use assert to check data.

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @zhangshenghang !

Copy link
Member

@wuchunfu wuchunfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks

@wuchunfu wuchunfu merged commit 51c8e1a into apache:dev Aug 28, 2024
hawk9821 pushed a commit to hawk9821/seatunnel that referenced this pull request Aug 29, 2024
…e#7361)

* [feature]add protobuf format

* [featrue]fix style

* [featrue]fix style

* [feature]add protobuf

* [feature]add protobuf

* [feature]add protobuf

* [feature]add protobuf

* [feature]add license

* [feature]fix some problem

* [feature]fix some problem

* [feature]fix some problem

* [feature]fix some problem

* [feature]fix some problem

* [feature]merge

* [improve]delete useless dependencies from known-dependencies.txt

* [feature]add unit test

* [feature]add e2e and uint test

* [feature]add license and doc

* [feature]fix some problem

---------

Co-authored-by: hailin0 <[email protected]>
hawk9821 pushed a commit to hawk9821/seatunnel that referenced this pull request Aug 29, 2024
…e#7361)

* [feature]add protobuf format

* [featrue]fix style

* [featrue]fix style

* [feature]add protobuf

* [feature]add protobuf

* [feature]add protobuf

* [feature]add protobuf

* [feature]add license

* [feature]fix some problem

* [feature]fix some problem

* [feature]fix some problem

* [feature]fix some problem

* [feature]fix some problem

* [feature]merge

* [improve]delete useless dependencies from known-dependencies.txt

* [feature]add unit test

* [feature]add e2e and uint test

* [feature]add license and doc

* [feature]fix some problem

---------

Co-authored-by: hailin0 <[email protected]>
@chessplay
Copy link
Contributor

I am working on a streaming job using this feature, where I need to parse a Kafka topic containing a protobuf message with nested objects in an array. Below is the relevant portion of my configuration:

hocon
复制代码
source {
Kafka {
schema = {
fields {
sn = string
tenantId = int
groupId = int
utcCode = int
updateTime = bigint
totalUsers = int
array{CurrentUserInfo {
userId = int
mac = string
userIp = string
rssi = int
uplinkRate = float
upRate = bigint
timeDelay = int
score = int
}
}
}
}
topic = "pb-wirelessUser"
format = protobuf
protobuf_message_name = CurrentUserContent
protobuf_schema = """
syntax = "proto3";
package com.ruijie.cloud.macc.log.proto.sta;

  message CurrentUserContent {
    string sn = 1;
    uint32 tenantId = 2;
    uint32 groupId = 4;
    int32 utcCode = 8;
    int64 updateTime = 9;
    uint32 totalUsers = 11;
    repeated CurrentUserInfo userList = 20;
  }

  message CurrentUserInfo {
    int32 userId = 1;
    string mac = 2;
    string userIp = 3;
    int32 rssi = 7;
    float uplinkRate = 8;
    int64 upRate = 11;
    int32 timeDelay = 17;
    int32 score = 25;
  }
"""
consumer.group = "seatunnel-test"
bootstrap.servers = "192.168.105.102:9092"
result_table_name = "kafka_table"

}
}

transform {
Sql {
source_table_name = "kafka_table"
result_table_name = "kafka_table2"
query = "select CurrentUserInfo from kafka_table"
}
}

sink {
Console {
source_table_name = "kafka_table2"
}
}
The protobuf message has a repeated field CurrentUserInfo as part of the CurrentUserContent message. My question is: How should I configure SeaTunnel to correctly parse and extract the nested CurrentUserInfo array?

I've tried the above configuration, but I'm not sure if this is the correct approach. Any guidance or examples of similar configurations would be greatly appreciated!

@Hisoka-X
Copy link
Member

Hisoka-X commented Sep 2, 2024

@zhangshenghang
Copy link
Member Author

@chessplay
Copy link
Contributor

chessplay commented Sep 4, 2024

@Hisoka-X I have tried struct-query ,but it does not seems to work。The following is key configuration and log,I would like to ask if this feature does not support handling nested object arrays in JSON?
image

image

image

@Hisoka-X
Copy link
Member

Hisoka-X commented Sep 4, 2024

@chessplay Could you create an issue for this?

@chessplay
Copy link
Contributor

ok,i have created an issue for this #7585

@zhangshenghang zhangshenghang deleted the feature-add-kafka-data-parsing branch September 6, 2024 02:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature][Kafka-connector] Proposal to Add Kafka Protobuf Data Parsing Support

5 participants