Submodules

airbyte_cdk.sources.singer.singer_helpers module

class airbyte_cdk.sources.singer.singer_helpers.Catalogs(singer_catalog: object, airbyte_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog)

Bases: object

airbyte_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
singer_catalog: object
class airbyte_cdk.sources.singer.singer_helpers.SingerHelper

Bases: object

static create_singer_catalog_with_selection(masked_airbyte_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog, discovered_singer_catalog: object) str
static get_catalogs(logger, shell_command: str, sync_mode_overrides: Dict[str, airbyte_cdk.sources.singer.singer_helpers.SyncModeInfo], primary_key_overrides: Dict[str, List[str]], excluded_streams: List) airbyte_cdk.sources.singer.singer_helpers.Catalogs
static read(logger, shell_command, is_message=<function SingerHelper.<lambda>>) Iterator[airbyte_protocol.models.airbyte_protocol.AirbyteMessage]
static singer_catalog_to_airbyte_catalog(singer_catalog: Dict[str, Any], sync_mode_overrides: Dict[str, airbyte_cdk.sources.singer.singer_helpers.SyncModeInfo], primary_key_overrides: Dict[str, List[str]]) airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
Parameters
  • singer_catalog

  • sync_mode_overrides – A dict from stream name to the sync modes it should use. Each stream in this dict must exist in the Singer catalog, but not every stream in the catalog should exist in this

  • primary_key_overrides – A dict of stream name -> list of fields to be used as PKs.

Returns

Airbyte Catalog

class airbyte_cdk.sources.singer.singer_helpers.SyncModeInfo(supported_sync_modes: Optional[List[airbyte_protocol.models.airbyte_protocol.SyncMode]] = None, source_defined_cursor: Optional[bool] = None, default_cursor_field: Optional[List[str]] = None)

Bases: object

default_cursor_field: Optional[List[str]] = None
source_defined_cursor: Optional[bool] = None
supported_sync_modes: Optional[List[airbyte_protocol.models.airbyte_protocol.SyncMode]] = None
airbyte_cdk.sources.singer.singer_helpers.configured_for_incremental(configured_stream: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteStream)
airbyte_cdk.sources.singer.singer_helpers.get_stream_level_metadata(metadatas: List[Dict[str, Any]]) Optional[Dict[str, Any]]
airbyte_cdk.sources.singer.singer_helpers.is_field_metadata(metadata)
airbyte_cdk.sources.singer.singer_helpers.override_sync_modes(airbyte_stream: airbyte_protocol.models.airbyte_protocol.AirbyteStream, overrides: airbyte_cdk.sources.singer.singer_helpers.SyncModeInfo)
airbyte_cdk.sources.singer.singer_helpers.set_sync_modes_from_metadata(airbyte_stream: airbyte_protocol.models.airbyte_protocol.AirbyteStream, metadatas: List[Dict[str, Any]])
airbyte_cdk.sources.singer.singer_helpers.to_json(string)

airbyte_cdk.sources.singer.source module

class airbyte_cdk.sources.singer.source.BaseSingerSource

Bases: airbyte_cdk.sources.singer.source.SingerSource

property api_error: Type[Exception]

Class/Base class of the exception that will be thrown if the tap is misconfigured or service unavailable

check_config(logger: logging.Logger, config_path: str, config: Mapping[str, Any]) airbyte_protocol.models.airbyte_protocol.AirbyteConnectionStatus

Some Singer source may perform check using config_path or config to tests if the input configuration can be used to successfully connect to the integration

discover(logger: logging.Logger, config: airbyte_cdk.sources.singer.source.ConfigContainer) airbyte_protocol.models.airbyte_protocol.AirbyteCatalog

Implements the parent class discover method.

discover_cmd(logger: logging.Logger, config_path: str) str

Returns the command used to run discovery in the singer tap. For example, if the bash command used to invoke the singer tap is tap-postgres, and the config JSON lived in “/path/config.json”, this method would return “tap-postgres –config /path/config.json”

force_full_refresh = False
read_cmd(logger: logging.Logger, config_path: str, catalog_path: str, state_path: Optional[str] = None) str

Returns the command used to read data from the singer tap. For example, if the bash command used to invoke the singer tap is tap-postgres, and the config JSON lived in “/path/config.json”, and the catalog was in “/path/catalog.json”, this method would return “tap-postgres –config /path/config.json –catalog /path/catalog.json”

property tap_cmd: str

Tap command

property tap_name: str

Tap name

try_connect(logger: logging.Logger, config: Mapping[str, Any])

Test provided credentials, raises self.api_error if something goes wrong

class airbyte_cdk.sources.singer.source.ConfigContainer(config, config_path)

Bases: Dict[str, Any]

config_path: str
class airbyte_cdk.sources.singer.source.SingerSource

Bases: airbyte_cdk.sources.source.BaseSource[airbyte_cdk.sources.singer.source.ConfigContainer, str, str]

check(logger: logging.Logger, config: airbyte_cdk.sources.singer.source.ConfigContainer) airbyte_protocol.models.airbyte_protocol.AirbyteConnectionStatus

Tests if the input configuration can be used to successfully connect to the integration

check_config(logger: logging.Logger, config_path: str, config: airbyte_cdk.sources.singer.source.ConfigContainer) airbyte_protocol.models.airbyte_protocol.AirbyteConnectionStatus

Some Singer source may perform check using config_path or config to tests if the input configuration can be used to successfully connect to the integration

configure(config: Mapping[str, Any], temp_dir: str) airbyte_cdk.sources.singer.source.ConfigContainer

Persist raw_config in temporary directory to run the Source job This can be overridden if extra temporary files need to be persisted in the temp dir

discover(logger: logging.Logger, config: airbyte_cdk.sources.singer.source.ConfigContainer) airbyte_protocol.models.airbyte_protocol.AirbyteCatalog

Implements the parent class discover method.

discover_cmd(logger: logging.Logger, config_path: str) str

Returns the command used to run discovery in the singer tap. For example, if the bash command used to invoke the singer tap is tap-postgres, and the config JSON lived in “/path/config.json”, this method would return “tap-postgres –config /path/config.json”

get_excluded_streams() List[str]

This method provide ability to exclude some streams from catalog

Returns

A list of excluded stream names

get_primary_key_overrides() Dict[str, List[str]]

Similar to get_sync_mode_overrides but for primary keys.

Returns

A dict from stream name to the list of primary key fields for the stream.

get_sync_mode_overrides() Dict[str, airbyte_cdk.sources.singer.singer_helpers.SyncModeInfo]

The Singer Spec outlines a way for taps to declare in their catalog that their streams support incremental sync (valid-replication-keys, forced-replication-method, and others). However, many taps which are incremental don’t actually declare that via the catalog, and just use their input state to perform an incremental sync without giving any hints to the user. An Airbyte Connector built on top of such a Singer Tap cannot automatically detect which streams are full refresh or incremental or what their cursors are. In those cases the developer needs to manually specify information about the sync modes.

This method provides a way of doing that: the dict of stream names to SyncModeInfo returned from this method will be used to override each stream’s sync mode information in the Airbyte Catalog output from the discover method. Only set fields provided in the SyncModeInfo are used. If a SyncModeInfo field is not set, it will not be overridden in the output catalog.

Returns

A dict from stream name to the sync modes that should be applied to this stream.

read(logger: logging.Logger, config: airbyte_cdk.sources.singer.source.ConfigContainer, catalog_path: str, state_path: Optional[str] = None) Iterable[airbyte_protocol.models.airbyte_protocol.AirbyteMessage]

Implements the parent class read method.

read_catalog(catalog_path: str) str

Since singer source don’t need actual catalog object, we override this to return path only

read_cmd(logger: logging.Logger, config_path: str, catalog_path: str, state_path: Optional[str] = None) str

Returns the command used to read data from the singer tap. For example, if the bash command used to invoke the singer tap is tap-postgres, and the config JSON lived in “/path/config.json”, and the catalog was in “/path/catalog.json”, this method would return “tap-postgres –config /path/config.json –catalog /path/catalog.json”

read_state(state_path: str) str

Since singer source don’t need actual state object, we override this to return path only

transform_config(config: Mapping[str, Any]) Mapping[str, Any]

Singer source may need to adapt the Config object for the singer tap specifics

Module contents

class airbyte_cdk.sources.singer.ConfigContainer(config, config_path)

Bases: Dict[str, Any]

config_path: str
class airbyte_cdk.sources.singer.SingerHelper

Bases: object

static create_singer_catalog_with_selection(masked_airbyte_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog, discovered_singer_catalog: object) str
static get_catalogs(logger, shell_command: str, sync_mode_overrides: Dict[str, airbyte_cdk.sources.singer.singer_helpers.SyncModeInfo], primary_key_overrides: Dict[str, List[str]], excluded_streams: List) airbyte_cdk.sources.singer.singer_helpers.Catalogs
static read(logger, shell_command, is_message=<function SingerHelper.<lambda>>) Iterator[airbyte_protocol.models.airbyte_protocol.AirbyteMessage]
static singer_catalog_to_airbyte_catalog(singer_catalog: Dict[str, Any], sync_mode_overrides: Dict[str, airbyte_cdk.sources.singer.singer_helpers.SyncModeInfo], primary_key_overrides: Dict[str, List[str]]) airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
Parameters
  • singer_catalog

  • sync_mode_overrides – A dict from stream name to the sync modes it should use. Each stream in this dict must exist in the Singer catalog, but not every stream in the catalog should exist in this

  • primary_key_overrides – A dict of stream name -> list of fields to be used as PKs.

Returns

Airbyte Catalog

class airbyte_cdk.sources.singer.SingerSource

Bases: airbyte_cdk.sources.source.BaseSource[airbyte_cdk.sources.singer.source.ConfigContainer, str, str]

check(logger: logging.Logger, config: airbyte_cdk.sources.singer.source.ConfigContainer) airbyte_protocol.models.airbyte_protocol.AirbyteConnectionStatus

Tests if the input configuration can be used to successfully connect to the integration

check_config(logger: logging.Logger, config_path: str, config: airbyte_cdk.sources.singer.source.ConfigContainer) airbyte_protocol.models.airbyte_protocol.AirbyteConnectionStatus

Some Singer source may perform check using config_path or config to tests if the input configuration can be used to successfully connect to the integration

configure(config: Mapping[str, Any], temp_dir: str) airbyte_cdk.sources.singer.source.ConfigContainer

Persist raw_config in temporary directory to run the Source job This can be overridden if extra temporary files need to be persisted in the temp dir

discover(logger: logging.Logger, config: airbyte_cdk.sources.singer.source.ConfigContainer) airbyte_protocol.models.airbyte_protocol.AirbyteCatalog

Implements the parent class discover method.

discover_cmd(logger: logging.Logger, config_path: str) str

Returns the command used to run discovery in the singer tap. For example, if the bash command used to invoke the singer tap is tap-postgres, and the config JSON lived in “/path/config.json”, this method would return “tap-postgres –config /path/config.json”

get_excluded_streams() List[str]

This method provide ability to exclude some streams from catalog

Returns

A list of excluded stream names

get_primary_key_overrides() Dict[str, List[str]]

Similar to get_sync_mode_overrides but for primary keys.

Returns

A dict from stream name to the list of primary key fields for the stream.

get_sync_mode_overrides() Dict[str, airbyte_cdk.sources.singer.singer_helpers.SyncModeInfo]

The Singer Spec outlines a way for taps to declare in their catalog that their streams support incremental sync (valid-replication-keys, forced-replication-method, and others). However, many taps which are incremental don’t actually declare that via the catalog, and just use their input state to perform an incremental sync without giving any hints to the user. An Airbyte Connector built on top of such a Singer Tap cannot automatically detect which streams are full refresh or incremental or what their cursors are. In those cases the developer needs to manually specify information about the sync modes.

This method provides a way of doing that: the dict of stream names to SyncModeInfo returned from this method will be used to override each stream’s sync mode information in the Airbyte Catalog output from the discover method. Only set fields provided in the SyncModeInfo are used. If a SyncModeInfo field is not set, it will not be overridden in the output catalog.

Returns

A dict from stream name to the sync modes that should be applied to this stream.

read(logger: logging.Logger, config: airbyte_cdk.sources.singer.source.ConfigContainer, catalog_path: str, state_path: Optional[str] = None) Iterable[airbyte_protocol.models.airbyte_protocol.AirbyteMessage]

Implements the parent class read method.

read_catalog(catalog_path: str) str

Since singer source don’t need actual catalog object, we override this to return path only

read_cmd(logger: logging.Logger, config_path: str, catalog_path: str, state_path: Optional[str] = None) str

Returns the command used to read data from the singer tap. For example, if the bash command used to invoke the singer tap is tap-postgres, and the config JSON lived in “/path/config.json”, and the catalog was in “/path/catalog.json”, this method would return “tap-postgres –config /path/config.json –catalog /path/catalog.json”

read_state(state_path: str) str

Since singer source don’t need actual state object, we override this to return path only

transform_config(config: Mapping[str, Any]) Mapping[str, Any]

Singer source may need to adapt the Config object for the singer tap specifics

class airbyte_cdk.sources.singer.SyncModeInfo(supported_sync_modes: Optional[List[airbyte_protocol.models.airbyte_protocol.SyncMode]] = None, source_defined_cursor: Optional[bool] = None, default_cursor_field: Optional[List[str]] = None)

Bases: object

default_cursor_field: Optional[List[str]] = None
source_defined_cursor: Optional[bool] = None
supported_sync_modes: Optional[List[airbyte_protocol.models.airbyte_protocol.SyncMode]] = None