Skip to content

Commit 91b2f2a

Browse files
committed
Non resumable read for legacy postgres
1 parent 6e9b4d8 commit 91b2f2a

File tree

1 file changed

+38
-13
lines changed

1 file changed

+38
-13
lines changed

airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceJdbcPartitionFactory.kt

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ import io.airbyte.cdk.read.JdbcStreamState
2525
import io.airbyte.cdk.read.Stream
2626
import io.airbyte.cdk.read.StreamFeedBootstrap
2727
import io.airbyte.cdk.util.Jsons
28-
import io.airbyte.integrations.source.postgres.PostgresSourceJdbcPartitionFactory.FilenodeChangeType.*
28+
import io.airbyte.integrations.source.postgres.PostgresSourceJdbcPartitionFactory.FilenodeChangeType.FILENODE_CHANGED
29+
import io.airbyte.integrations.source.postgres.PostgresSourceJdbcPartitionFactory.FilenodeChangeType.FILENODE_NEW_STREAM
30+
import io.airbyte.integrations.source.postgres.PostgresSourceJdbcPartitionFactory.FilenodeChangeType.FILENODE_NOT_FOUND
31+
import io.airbyte.integrations.source.postgres.PostgresSourceJdbcPartitionFactory.FilenodeChangeType.FILENODE_NO_CHANGE
32+
import io.airbyte.integrations.source.postgres.PostgresSourceJdbcPartitionFactory.FilenodeChangeType.NO_FILENODE
2933
import io.airbyte.integrations.source.postgres.config.PostgresSourceConfiguration
3034
import io.airbyte.integrations.source.postgres.ctid.Ctid
3135
import io.airbyte.integrations.source.postgres.operations.PostgresSourceSelectQueryGenerator
@@ -44,7 +48,8 @@ class PostgresSourceJdbcPartitionFactory(
4448
val handler: CatalogValidationFailureHandler,
4549
) :
4650
JdbcPartitionFactory<
47-
DefaultJdbcSharedState, PostgresSourceJdbcStreamState, PostgresSourceJdbcPartition> {
51+
DefaultJdbcSharedState, PostgresSourceJdbcStreamState, PostgresSourceJdbcPartition,
52+
> {
4853
private val streamStates = ConcurrentHashMap<StreamIdentifier, PostgresSourceJdbcStreamState>()
4954

5055
override fun streamState(
@@ -66,7 +71,7 @@ class PostgresSourceJdbcPartitionFactory(
6671
streamState,
6772
lowerBound = null,
6873
upperBound = null,
69-
filenode
74+
filenode,
7075
)
7176
} ?: PostgresSourceJdbcUnsplittableSnapshotPartition(selectQueryGenerator, streamState)
7277
}
@@ -80,15 +85,19 @@ class PostgresSourceJdbcPartitionFactory(
8085
upperBound = null,
8186
cursorChosenFromCatalog,
8287
cursorUpperBound = null,
83-
filenode
88+
filenode,
8489
)
8590
} ?: PostgresSourceJdbcUnsplittableSnapshotWithCursorPartition(
8691
selectQueryGenerator,
8792
streamState,
88-
cursorChosenFromCatalog
93+
cursorChosenFromCatalog,
8994
)
9095
}
9196

97+
val TidRangeScanCapableDBServer: Boolean by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
98+
isTidRangeScanCapableDBServer(JdbcConnectionFactory(config))
99+
}
100+
92101
override fun create(streamFeedBootstrap: StreamFeedBootstrap): PostgresSourceJdbcPartition? {
93102
val stream: Stream = streamFeedBootstrap.feed
94103
val streamState: PostgresSourceJdbcStreamState = streamState(streamFeedBootstrap)
@@ -100,7 +109,10 @@ class PostgresSourceJdbcPartitionFactory(
100109
return null
101110
}
102111

103-
val filenode: Filenode? = getStreamFilenode(streamState)
112+
val filenode: Filenode? = when (TidRangeScanCapableDBServer){
113+
true -> getStreamFilenode(streamState)
114+
false -> null
115+
}
104116
val fileNodeChange: FilenodeChangeType =
105117
ensureNoStreamFilenodeChanged(streamState, filenode)
106118

@@ -145,17 +157,18 @@ class PostgresSourceJdbcPartitionFactory(
145157
streamState,
146158
lowerBound =
147159
listOf(
148-
Jsons.textNode(streamState.maybeCtid!!.toString())
160+
Jsons.textNode(streamState.maybeCtid!!.toString()),
149161
),
150162
upperBound = null,
151-
filenode
163+
filenode,
152164
)
153165
}
154166
}
155167
} else {
156168
val (cursor: DataField, cursorCheckpoint: JsonNode) = cursorPair
157-
if (!isCursorBasedIncremental || fileNodeChange !in listOf(FILENODE_NO_CHANGE,
158-
NO_FILENODE
169+
if (!isCursorBasedIncremental || fileNodeChange !in listOf(
170+
FILENODE_NO_CHANGE,
171+
NO_FILENODE,
159172
)) {
160173
handler.accept(ResetStream(stream.id))
161174
streamState.reset()
@@ -231,6 +244,18 @@ class PostgresSourceJdbcPartitionFactory(
231244
return null
232245
}
233246
}
247+
248+
const val POSTGRESQL_VERSION_TID_RANGE_SCAN_CAPABLE: Int = 14
249+
fun isTidRangeScanCapableDBServer(jdbcConnectionFactory: JdbcConnectionFactory): Boolean {
250+
jdbcConnectionFactory.get().use { connection ->
251+
try {
252+
return connection.metaData.databaseMajorVersion >= POSTGRESQL_VERSION_TID_RANGE_SCAN_CAPABLE
253+
} catch (e: Exception) {
254+
log.error(e) { "Failed to get database major version" }
255+
return true
256+
}
257+
}
258+
}
234259
}
235260

236261
private fun ensureNoStreamFilenodeChanged(
@@ -241,7 +266,7 @@ class PostgresSourceJdbcPartitionFactory(
241266
// No filenode - a view?
242267
streamState.maybeFilenode == null && filenode == null -> NO_FILENODE
243268
// New stream - filenode assigned
244-
streamState.maybeFilenode == null -> FILENODE_NEW_STREAM
269+
streamState.maybeFilenode == null -> FILENODE_NEW_STREAM
245270
// Existing stream - filenode disappeared
246271
filenode == null -> FILENODE_NOT_FOUND
247272
// Existing stream - filenode changed. Must start over reading from ctid (0,0)
@@ -318,7 +343,7 @@ class PostgresSourceJdbcPartitionFactory(
318343
streamState,
319344
lowerBound?.let { listOf(Jsons.textNode(it.toString())) },
320345
upperBound?.let { listOf(Jsons.textNode(it.toString())) },
321-
splitPointValues.first().filenode
346+
splitPointValues.first().filenode,
322347
)
323348
}
324349
}
@@ -349,7 +374,7 @@ class PostgresSourceJdbcPartitionFactory(
349374
upperBound?.let { listOf(Jsons.textNode(it.toString())) },
350375
cursor,
351376
cursorUpperBound,
352-
splitPointValues.first().filenode
377+
splitPointValues.first().filenode,
353378
)
354379
}
355380
}

0 commit comments

Comments
 (0)