Subpackages

Submodules

airbyte_cdk.sources.streams.core module

class airbyte_cdk.sources.streams.core.IncrementalMixin

Bases: abc.ABC

Mixin to make stream incremental.

class IncrementalStream(Stream, IncrementalMixin):

@property def state(self):

return self._state

@state.setter def state(self, value):

self._state[self.cursor_field] = value[self.cursor_field]

abstract property state: MutableMapping[str, Any]

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value:
{

self.cursor_field: “cursor_value”

}

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

class airbyte_cdk.sources.streams.core.Stream

Bases: abc.ABC

Base abstract class for an Airbyte Stream. Makes no assumption of the Stream’s underlying transport protocol.

as_airbyte_stream() airbyte_protocol.models.airbyte_protocol.AirbyteStream
property availability_strategy: Optional[AvailabilityStrategy]

The AvailabilityStrategy used to check whether this stream is available.

Type

return

check_availability(logger: logging.Logger, source: Optional[Source] = None) Tuple[bool, Optional[str]]

Checks whether this stream is available.

Parameters
  • logger – source logger

  • source – (optional) source

Returns

A tuple of (boolean, str). If boolean is true, then this stream is available, and no str is required. Otherwise, this stream is unavailable for some reason and the str should describe what went wrong and how to resolve the unavailability, if possible.

property cursor_field: Union[str, List[str]]

an API entity might always use created_at as the cursor field. :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.

Type

Override to return the default cursor field used by this stream e.g

get_error_display_message(exception: BaseException) Optional[str]

Retrieves the user-friendly display message that corresponds to an exception. This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.

The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed.

Parameters

exception – The exception that was raised

Returns

A user-friendly message that indicates the cause of the error

get_json_schema() Mapping[str, Any]
Returns

A dict of the JSON schema representing this stream.

The default implementation of this method looks for a JSONSchema file with the same name as this stream’s “name” property. Override as needed.

get_updated_state(current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) MutableMapping[str, Any]

Override to extract state from the latest record. Needed to implement incremental sync.

Inspects the latest record extracted from the data source and the current state object and return an updated state object.

For example: if the state object is based on created_at timestamp, and the current state is {‘created_at’: 10}, and the latest_record is {‘name’: ‘octavia’, ‘created_at’: 20 } then this method would return {‘created_at’: 20} to indicate state should be updated to this object.

Parameters
  • current_stream_state – The stream’s current state object

  • latest_record – The latest record extracted from the stream

Returns

An updated state object

log_stream_sync_configuration() None

Logs the configuration of this stream.

property logger: logging.Logger
property name: str

Stream name. By default this is the implementing class name, but it can be overridden as needed.

Type

return

property namespace: Optional[str]

Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for. :return: A string containing the name of the namespace.

abstract property primary_key: Optional[Union[str, List[str], List[List[str]]]]

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

Type

return

read_full_refresh(cursor_field: Optional[List[str]], logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger) Iterable[Union[Mapping[str, Any], airbyte_protocol.models.airbyte_protocol.AirbyteMessage]]
read_incremental(cursor_field: Optional[List[str]], logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, stream_state: MutableMapping[str, Any], state_manager, per_stream_state_enabled: bool, internal_config: airbyte_cdk.sources.utils.schema_helpers.InternalConfig) Iterable[Union[Mapping[str, Any], airbyte_protocol.models.airbyte_protocol.AirbyteMessage]]
abstract read_records(sync_mode: airbyte_protocol.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) Iterable[Union[Mapping[str, Any], airbyte_protocol.models.airbyte_protocol.AirbyteMessage]]

This method should be overridden by subclasses to read records based on the inputs

property source_defined_cursor: bool

Return False if the cursor can be configured by the user.

property state_checkpoint_interval: Optional[int]

if this returns a value of 100, then state is persisted after reading 100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source.

Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled.

return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read.

Type

Decides how often to checkpoint state (i.e

Type

emit a STATE message). E.g

stream_slices(*, sync_mode: airbyte_protocol.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) Iterable[Optional[Mapping[str, Any]]]

Override to define the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode

  • cursor_field

  • stream_state

Returns

property supports_incremental: bool

True if this stream supports incrementally reading data

Type

return

transformer: airbyte_cdk.sources.utils.transform.TypeTransformer = <airbyte_cdk.sources.utils.transform.TypeTransformer object>
airbyte_cdk.sources.streams.core.package_name_from_class(cls: object) str

Find the package name given a class name

Module contents

class airbyte_cdk.sources.streams.IncrementalMixin

Bases: abc.ABC

Mixin to make stream incremental.

class IncrementalStream(Stream, IncrementalMixin):

@property def state(self):

return self._state

@state.setter def state(self, value):

self._state[self.cursor_field] = value[self.cursor_field]

abstract property state: MutableMapping[str, Any]

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value:
{

self.cursor_field: “cursor_value”

}

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

class airbyte_cdk.sources.streams.Stream

Bases: abc.ABC

Base abstract class for an Airbyte Stream. Makes no assumption of the Stream’s underlying transport protocol.

as_airbyte_stream() airbyte_protocol.models.airbyte_protocol.AirbyteStream
property availability_strategy: Optional[AvailabilityStrategy]

The AvailabilityStrategy used to check whether this stream is available.

Type

return

check_availability(logger: logging.Logger, source: Optional[Source] = None) Tuple[bool, Optional[str]]

Checks whether this stream is available.

Parameters
  • logger – source logger

  • source – (optional) source

Returns

A tuple of (boolean, str). If boolean is true, then this stream is available, and no str is required. Otherwise, this stream is unavailable for some reason and the str should describe what went wrong and how to resolve the unavailability, if possible.

property cursor_field: Union[str, List[str]]

an API entity might always use created_at as the cursor field. :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.

Type

Override to return the default cursor field used by this stream e.g

get_error_display_message(exception: BaseException) Optional[str]

Retrieves the user-friendly display message that corresponds to an exception. This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.

The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed.

Parameters

exception – The exception that was raised

Returns

A user-friendly message that indicates the cause of the error

get_json_schema() Mapping[str, Any]
Returns

A dict of the JSON schema representing this stream.

The default implementation of this method looks for a JSONSchema file with the same name as this stream’s “name” property. Override as needed.

get_updated_state(current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) MutableMapping[str, Any]

Override to extract state from the latest record. Needed to implement incremental sync.

Inspects the latest record extracted from the data source and the current state object and return an updated state object.

For example: if the state object is based on created_at timestamp, and the current state is {‘created_at’: 10}, and the latest_record is {‘name’: ‘octavia’, ‘created_at’: 20 } then this method would return {‘created_at’: 20} to indicate state should be updated to this object.

Parameters
  • current_stream_state – The stream’s current state object

  • latest_record – The latest record extracted from the stream

Returns

An updated state object

log_stream_sync_configuration() None

Logs the configuration of this stream.

property logger: logging.Logger
property name: str

Stream name. By default this is the implementing class name, but it can be overridden as needed.

Type

return

property namespace: Optional[str]

Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for. :return: A string containing the name of the namespace.

abstract property primary_key: Optional[Union[str, List[str], List[List[str]]]]

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

Type

return

read_full_refresh(cursor_field: Optional[List[str]], logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger) Iterable[Union[Mapping[str, Any], airbyte_protocol.models.airbyte_protocol.AirbyteMessage]]
read_incremental(cursor_field: Optional[List[str]], logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, stream_state: MutableMapping[str, Any], state_manager, per_stream_state_enabled: bool, internal_config: airbyte_cdk.sources.utils.schema_helpers.InternalConfig) Iterable[Union[Mapping[str, Any], airbyte_protocol.models.airbyte_protocol.AirbyteMessage]]
abstract read_records(sync_mode: airbyte_protocol.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) Iterable[Union[Mapping[str, Any], airbyte_protocol.models.airbyte_protocol.AirbyteMessage]]

This method should be overridden by subclasses to read records based on the inputs

property source_defined_cursor: bool

Return False if the cursor can be configured by the user.

property state_checkpoint_interval: Optional[int]

if this returns a value of 100, then state is persisted after reading 100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source.

Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled.

return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read.

Type

Decides how often to checkpoint state (i.e

Type

emit a STATE message). E.g

stream_slices(*, sync_mode: airbyte_protocol.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) Iterable[Optional[Mapping[str, Any]]]

Override to define the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode

  • cursor_field

  • stream_state

Returns

property supports_incremental: bool

True if this stream supports incrementally reading data

Type

return

transformer: airbyte_cdk.sources.utils.transform.TypeTransformer = <airbyte_cdk.sources.utils.transform.TypeTransformer object>