Skip to content

[Bug] [sink elasticsearch] the savemode of sink-es conficts with es automatically creating indexes based on templates #7430

@jw-itq

Description

@jw-itq

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

Elasticsearch can automatically create indexes based on templates, such as creating indexes by year based on time, which eliminates the need for savemode. Therefore, the savemode of sink-es should be supported.

SeaTunnel Version

dev

SeaTunnel Config

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
  checkpoint.timeout = 600000
  read_limit.rows_per_second=2000
}

source {
  SqlServer-CDC {
    result_table_name = "**"
    username = "sa"
    password = "****"
    database-names = ["**"]
    table-names = ["**.dbo.**"]
    base-url = "jdbc:sqlserver://127.0.0.1:1433;databaseName=**"
  }

}

transform {

 Sql {
  query = """
    SELECT
      'test_1' AS source,
      FORMATDATETIME ( createTime, 'yyyy' ) AS index_key,
      CONCAT ( 's_', CAST ( id AS string ) ) AS id
    FROM
      test_content
  """
 }

}


sink {
  Elasticsearch {
        hosts = ["127.0.0.1:9200"]
        index = "test_${index_key}"
        username="elastic"
        password="****"
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
        data_save_mode = "APPEND_DATA"
        primary_keys=["id"]
    }
}

Running Command

seatunnel.sh -c config/test_es.conf

Error Exception

2024-08-19 13:34:32,351 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:211)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.JobException: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:377)
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:383)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
        at org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:242)
        at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:499)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: Illegal character in path at index 17: /test_${index_key}
        at org.elasticsearch.client.RestClient.buildUri(RestClient.java:569)
        at org.elasticsearch.client.RestClient$InternalRequest.<init>(RestClient.java:699)
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:234)
        at org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient.createIndex(EsRestClient.java:438)
        at org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient.createIndex(EsRestClient.java:428)
        at org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog.createTable(ElasticSearchCatalog.java:185)
        at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createTable(DefaultSaveModeHandler.java:186)
        at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:112)
        at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:69)
        at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:38)
        at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:375)
        ... 20 more
Caused by: java.net.URISyntaxException: Illegal character in path at index 17: /test_${index_key}
        at java.net.URI$Parser.fail(URI.java:2845)
        at java.net.URI$Parser.checkChars(URI.java:3018)
        at java.net.URI$Parser.parseHierarchical(URI.java:3102)
        at java.net.URI$Parser.parse(URI.java:3060)
        at java.net.URI.<init>(URI.java:588)
        at org.apache.http.client.utils.URIBuilder.<init>(URIBuilder.java:82)
        at org.elasticsearch.client.RestClient.buildUri(RestClient.java:563)
        ... 31 more

        at com.hazelcast.spi.impl.AbstractInvocationFuture.wrapInCompletionException(AbstractInvocationFuture.java:1347)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.cascadeException(AbstractInvocationFuture.java:1340)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.access$200(AbstractInvocationFuture.java:65)
        at com.hazelcast.spi.impl.AbstractInvocationFuture$ApplyNode.execute(AbstractInvocationFuture.java:1478)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockOtherNode(AbstractInvocationFuture.java:797)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockAll(AbstractInvocationFuture.java:759)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1235)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionally(AbstractInvocationFuture.java:709)
        at com.hazelcast.client.impl.spi.impl.ClientInvocation.completeExceptionally(ClientInvocation.java:294)
        at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyExceptionWithOwnedPermission(ClientInvocation.java:321)
        at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyException(ClientInvocation.java:304)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.handleResponse(ClientResponseHandlerSupplier.java:164)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.process(ClientResponseHandlerSupplier.java:141)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.access$300(ClientResponseHandlerSupplier.java:60)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:251)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:243)
        at com.hazelcast.client.impl.connection.tcp.TcpClientConnection.handleClientMessage(TcpClientConnection.java:245)
        at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.handleMessage(ClientMessageDecoder.java:135)
        at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.onRead(ClientMessageDecoder.java:89)
        at com.hazelcast.internal.networking.nio.NioInboundPipeline.process(NioInboundPipeline.java:136)
        at com.hazelcast.internal.networking.nio.NioThread.processSelectionKey(NioThread.java:383)
        at com.hazelcast.internal.networking.nio.NioThread.processSelectionKeys(NioThread.java:368)
        at com.hazelcast.internal.networking.nio.NioThread.selectLoop(NioThread.java:294)
        at com.hazelcast.internal.networking.nio.NioThread.executeRun(NioThread.java:249)
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: org.apache.seatunnel.engine.common.exception.JobException: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:377)
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:383)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
        at org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:242)
        at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:499)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

Zeta or Flink or Spark Version

No response

Java or Scala Version

jdk1.8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions