Skip to content

[Bug] [Doris] SeaTunnel Spark job execute failed when sink.enable-2pc=true #8990

@Larborator

Description

@Larborator

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

the problem seems like #6854 I can't find solution.

version is 2.3.5 and has merged #6688

When I set sink.enable-2pc=false, job is OK.

But when I set sink.enable-2pc=true, job will failed and exception is:

25/03/17 16:27:46 INFO InternalParquetRecordReader: Assembled and processed 13847816 records from 13 columns in 77705 ms: 178.2101 rec/ms, 2316.7312 cell/ms
25/03/17 16:27:46 INFO InternalParquetRecordReader: time spent so far 4% reading (3902 ms) and 95% processing (77705 ms)
25/03/17 16:27:46 INFO InternalParquetRecordReader: at row 13847816. reading next block
25/03/17 16:27:46 INFO InternalParquetRecordReader: block read in memory in 455 ms. row count = 4088733
25/03/17 16:28:06 INFO RetryExec: I/O exception (java.net.SocketException) caught when processing request to {}->http://dataarch-bjy-d5-154.idchb1az2.hb1.kwaidc.com:8040: Broken pipe (Write failed)
25/03/17 16:28:09 ERROR DorisSinkWriter: stream load finished unexpectedly, interrupt worker thread! org.apache.http.client.ClientProtocolException
25/03/17 16:28:09 ERROR Utils: Aborting task
java.lang.RuntimeException: java.lang.InterruptedException
	at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:57)
	at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:194)
	at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:135)
	at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:52)
	at org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:66)
	at org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:41)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1422)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:95)
	at org.apache.spark.scheduler.Task.run(Task.scala:124)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:495)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1388)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
	at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
	at org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.write(RecordBuffer.java:93)
	at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:55)
	... 19 more
25/03/17 16:28:09 ERROR DataWritingSparkTask: Aborting commit for partition 4 (task 4, attempt 0stage 0.0)
25/03/17 16:28:09 INFO DorisStreamLoad: abort for labelSuffix s_distribute_shelf_zsm_2da1507d273144e384e7bdbe1c35aad6_4. start chkId 1.

SeaTunnel Version

2.3.5 and has merged #6688

SeaTunnel Config

env {
  parallelism = ${TASK_NUM}
  job.mode = "BATCH"
  spark.app.name = "hive2bleem-template"
}

source {

  Hive {
    table_name = ${HIVE_TBL}
    metastore_uri = ${HIVE_URL}
    read_partitions = ${PARTS}
    read_columns = ${COLS}
  }
}

sink {

  Doris {
    table.identifier = ${BLEEM_TBL}
    fenodes = ${BLEEM_FE}
    username="root"
    password=""
    sink.label-prefix="s"
    sink.enable-2pc=${enable-2pc}
    doris.batch.size=200000
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}

Running Command

sparkV2 job

Error Exception

25/03/17 16:27:46 INFO InternalParquetRecordReader: Assembled and processed 13847816 records from 13 columns in 77705 ms: 178.2101 rec/ms, 2316.7312 cell/ms
25/03/17 16:27:46 INFO InternalParquetRecordReader: time spent so far 4% reading (3902 ms) and 95% processing (77705 ms)
25/03/17 16:27:46 INFO InternalParquetRecordReader: at row 13847816. reading next block
25/03/17 16:27:46 INFO InternalParquetRecordReader: block read in memory in 455 ms. row count = 4088733
25/03/17 16:28:06 INFO RetryExec: I/O exception (java.net.SocketException) caught when processing request to {}->http://dataarch-bjy-d5-154.idchb1az2.hb1.kwaidc.com:8040: Broken pipe (Write failed)
25/03/17 16:28:09 ERROR DorisSinkWriter: stream load finished unexpectedly, interrupt worker thread! org.apache.http.client.ClientProtocolException
25/03/17 16:28:09 ERROR Utils: Aborting task
java.lang.RuntimeException: java.lang.InterruptedException
	at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:57)
	at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:194)
	at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:135)
	at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:52)
	at org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:66)
	at org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:41)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1422)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:95)
	at org.apache.spark.scheduler.Task.run(Task.scala:124)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:495)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1388)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
	at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
	at org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.write(RecordBuffer.java:93)
	at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:55)
	... 19 more
25/03/17 16:28:09 ERROR DataWritingSparkTask: Aborting commit for partition 4 (task 4, attempt 0stage 0.0)
25/03/17 16:28:09 INFO DorisStreamLoad: abort for labelSuffix s_distribute_shelf_zsm_2da1507d273144e384e7bdbe1c35aad6_4. start chkId 1.

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions