Skip to content

Commit 0d97cb7

Browse files
authored
Merge branch 'master' into 13758-SSH_control_host_port_setting_not_used_when_tunneling
2 parents 2fd6edf + 79a54a8 commit 0d97cb7

File tree

53 files changed

+873
-371
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+873
-371
lines changed

airbyte-cdk/python/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
## 0.1.69
4+
- AbstractSource emits a state message when reading incremental even if there were no stream slices to process.
5+
36
## 0.1.68
47
- Replace parse-time string interpolation with run-time interpolation in YAML-based sources
58

airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,10 @@ def _read_incremental(
226226
)
227227
logger.debug(f"Processing stream slices for {stream_name}", extra={"stream_slices": slices})
228228
total_records_counter = 0
229+
if not slices:
230+
# Safety net to ensure we always emit at least one state message even if there are no slices
231+
checkpoint = self._checkpoint_state(stream_instance, stream_instance.state, connector_state)
232+
yield checkpoint
229233
for _slice in slices:
230234
logger.debug("Processing stream slice", extra={"slice": _slice})
231235
records = stream_instance.read_records(

airbyte-cdk/python/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
setup(
1717
name="airbyte-cdk",
18-
version="0.1.68",
18+
version="0.1.69",
1919
description="A framework for writing Airbyte Connectors.",
2020
long_description=README,
2121
long_description_content_type="text/markdown",

airbyte-cdk/python/unit_tests/sources/test_abstract_source.py

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,13 @@ def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
9898
class MockStreamWithState(MockStream):
9999
cursor_field = "cursor"
100100

101+
def __init__(self, inputs_and_mocked_outputs: List[Tuple[Mapping[str, Any], Iterable[Mapping[str, Any]]]], name: str, state=None):
102+
super().__init__(inputs_and_mocked_outputs, name)
103+
self._state = state
104+
101105
@property
102106
def state(self):
103-
return {}
107+
return self._state
104108

105109
@state.setter
106110
def state(self, value):
@@ -452,6 +456,74 @@ def test_with_slices(self, mocker):
452456

453457
assert expected == messages
454458

459+
def test_no_slices(self, mocker):
460+
"""
461+
Tests that an incremental read returns at least one state messages even if no records were read:
462+
1. outputs a state message after reading the entire stream
463+
"""
464+
slices = []
465+
stream_output = [{"k1": "v1"}, {"k2": "v2"}, {"k3": "v3"}]
466+
state = {"cursor": "value"}
467+
s1 = MockStreamWithState(
468+
[
469+
(
470+
{
471+
"sync_mode": SyncMode.incremental,
472+
"stream_slice": s,
473+
"stream_state": mocker.ANY,
474+
},
475+
stream_output,
476+
)
477+
for s in slices
478+
],
479+
name="s1",
480+
state=state,
481+
)
482+
s2 = MockStreamWithState(
483+
[
484+
(
485+
{
486+
"sync_mode": SyncMode.incremental,
487+
"stream_slice": s,
488+
"stream_state": mocker.ANY,
489+
},
490+
stream_output,
491+
)
492+
for s in slices
493+
],
494+
name="s2",
495+
state=state,
496+
)
497+
498+
mocker.patch.object(MockStreamWithState, "supports_incremental", return_value=True)
499+
mocker.patch.object(MockStreamWithState, "get_json_schema", return_value={})
500+
mocker.patch.object(MockStreamWithState, "stream_slices", return_value=slices)
501+
mocker.patch.object(
502+
MockStreamWithState,
503+
"state_checkpoint_interval",
504+
new_callable=mocker.PropertyMock,
505+
return_value=2,
506+
)
507+
508+
src = MockSource(streams=[s1, s2])
509+
catalog = ConfiguredAirbyteCatalog(
510+
streams=[
511+
_configured_stream(s1, SyncMode.incremental),
512+
_configured_stream(s2, SyncMode.incremental),
513+
]
514+
)
515+
516+
expected = [
517+
_state({"s1": state}),
518+
_state({"s1": state, "s2": state}),
519+
]
520+
521+
messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict))))
522+
523+
print(f"expected:\n{expected}")
524+
print(f"messages:\n{messages}")
525+
assert expected == messages
526+
455527
def test_with_slices_and_interval(self, mocker):
456528
"""
457529
Tests that an incremental read which uses slices and a checkpoint interval:

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@
5353
- name: Cassandra
5454
destinationDefinitionId: 707456df-6f4f-4ced-b5c6-03f73bcad1c5
5555
dockerRepository: airbyte/destination-cassandra
56-
dockerImageTag: 0.1.2
56+
dockerImageTag: 0.1.3
5757
documentationUrl: https://docs.airbyte.io/integrations/destinations/cassandra
5858
icon: cassandra.svg
5959
releaseStage: alpha
6060
- name: Chargify (Keen)
6161
destinationDefinitionId: 81740ce8-d764-4ea7-94df-16bb41de36ae
6262
dockerRepository: airbyte/destination-keen
63-
dockerImageTag: 0.2.3
63+
dockerImageTag: 0.2.4
6464
documentationUrl: https://docs.airbyte.io/integrations/destinations/keen
6565
icon: chargify.svg
6666
releaseStage: alpha
@@ -133,7 +133,7 @@
133133
- name: Kafka
134134
destinationDefinitionId: 9f760101-60ae-462f-9ee6-b7a9dafd454d
135135
dockerRepository: airbyte/destination-kafka
136-
dockerImageTag: 0.1.9
136+
dockerImageTag: 0.1.10
137137
documentationUrl: https://docs.airbyte.io/integrations/destinations/kafka
138138
icon: kafka.svg
139139
releaseStage: alpha

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,7 @@
693693
supported_destination_sync_modes:
694694
- "overwrite"
695695
- "append"
696-
- dockerImage: "airbyte/destination-cassandra:0.1.2"
696+
- dockerImage: "airbyte/destination-cassandra:0.1.3"
697697
spec:
698698
documentationUrl: "https://docs.airbyte.io/integrations/destinations/cassandra"
699699
connectionSpecification:
@@ -758,7 +758,7 @@
758758
supported_destination_sync_modes:
759759
- "overwrite"
760760
- "append"
761-
- dockerImage: "airbyte/destination-keen:0.2.3"
761+
- dockerImage: "airbyte/destination-keen:0.2.4"
762762
spec:
763763
documentationUrl: "https://docs.airbyte.io/integrations/destinations/keen"
764764
connectionSpecification:
@@ -2005,7 +2005,7 @@
20052005
supportsDBT: false
20062006
supported_destination_sync_modes:
20072007
- "append"
2008-
- dockerImage: "airbyte/destination-kafka:0.1.9"
2008+
- dockerImage: "airbyte/destination-kafka:0.1.10"
20092009
spec:
20102010
documentationUrl: "https://docs.airbyte.io/integrations/destinations/kafka"
20112011
connectionSpecification:

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@
564564
- name: Microsoft SQL Server (MSSQL)
565565
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
566566
dockerRepository: airbyte/source-mssql
567-
dockerImageTag: 0.4.12
567+
dockerImageTag: 0.4.13
568568
documentationUrl: https://docs.airbyte.io/integrations/sources/mssql
569569
icon: mssql.svg
570570
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5003,7 +5003,7 @@
50035003
supportsNormalization: false
50045004
supportsDBT: false
50055005
supported_destination_sync_modes: []
5006-
- dockerImage: "airbyte/source-mssql:0.4.12"
5006+
- dockerImage: "airbyte/source-mssql:0.4.13"
50075007
spec:
50085008
documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql"
50095009
connectionSpecification:

airbyte-integrations/connectors/destination-cassandra/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION destination-cassandra
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.1.2
19+
LABEL io.airbyte.version=0.1.3
2020
LABEL io.airbyte.name=airbyte/destination-cassandra

airbyte-integrations/connectors/destination-cassandra/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ dependencies {
2626
// https://mvnrepository.com/artifact/org.assertj/assertj-core
2727
testImplementation "org.assertj:assertj-core:${assertVersion}"
2828
testImplementation libs.connectors.testcontainers.cassandra
29+
testImplementation project(':airbyte-integrations:bases:standard-destination-test')
2930

3031

3132
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')

0 commit comments

Comments
 (0)