Subpackages

Submodules

airbyte_cdk.sources.declarative.create_partial module

airbyte_cdk.sources.declarative.create_partial.create(func, /, *args, **keywords)

Create a partial on steroids. Returns a partial object which when called will behave like func called with the arguments supplied. Parameters will be interpolated before the creation of the object The interpolation will take in kwargs, and config as parameters that can be accessed through interpolating. If any of the parameters are also create functions, they will also be created. kwargs are propagated to the recursive method calls

Parameters
  • func – Function

  • args

  • keywords

Returns

partially created object

airbyte_cdk.sources.declarative.declarative_source module

class airbyte_cdk.sources.declarative.declarative_source.DeclarativeSource

Bases: airbyte_cdk.sources.abstract_source.AbstractSource

Base class for declarative Source. Concrete sources need to define the connection_checker to use

check_connection(logger, config) Tuple[bool, any]
Parameters
  • logger – The source logger

  • config – The user-provided configuration as specified by the source’s spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.

Returns

A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the “error” object should describe what went wrong. The error object will be cast to string to display the problem to the user.

abstract property connection_checker: airbyte_cdk.sources.declarative.checks.connection_checker.ConnectionChecker

Returns the ConnectionChecker to use for the check operation

airbyte_cdk.sources.declarative.declarative_stream module

class airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream(retriever: airbyte_cdk.sources.declarative.retrievers.retriever.Retriever, config: typing.Mapping[str, typing.Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: typing.Optional[typing.Union[str, typing.List[str], typing.List[typing.List[str]]]] = <property object>, schema_loader: typing.Optional[airbyte_cdk.sources.declarative.schema.schema_loader.SchemaLoader] = None, stream_cursor_field: typing.Optional[typing.Union[airbyte_cdk.sources.declarative.interpolation.interpolated_string.InterpolatedString, str]] = None)

Bases: airbyte_cdk.sources.streams.core.Stream

DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever

name

stream name

Type

str

primary_key

the primary key of the stream

Type

Optional[Union[str, List[str], List[List[str]]]]

schema_loader

The schema loader

Type

SchemaLoader

retriever

The retriever

Type

Retriever

config

The user-provided configuration as specified by the source’s spec

Type

Config

stream_cursor_field

The cursor field

Type

Optional[Union[InterpolatedString, str]]

stream. Transformations are applied in the order in which they are defined.
config: Mapping[str, Any]
property cursor_field: Union[str, List[str]]

an API entity might always use created_at as the cursor field. :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.

Type

Override to return the default cursor field used by this stream e.g

get_json_schema() Mapping[str, Any]
Returns

A dict of the JSON schema representing this stream.

The default implementation of this method looks for a JSONSchema file with the same name as this stream’s “name” property. Override as needed.

get_updated_state(current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) MutableMapping[str, Any]

Override to extract state from the latest record. Needed to implement incremental sync.

Inspects the latest record extracted from the data source and the current state object and return an updated state object.

For example: if the state object is based on created_at timestamp, and the current state is {‘created_at’: 10}, and the latest_record is {‘name’: ‘octavia’, ‘created_at’: 20 } then this method would return {‘created_at’: 20} to indicate state should be updated to this object.

Parameters
  • current_stream_state – The stream’s current state object

  • latest_record – The latest record extracted from the stream

Returns

An updated state object

property name: str

Stream name. By default this is the implementing class name, but it can be overridden as needed.

Type

return

parameters: dataclasses.InitVar[Mapping[str, Any]]
property primary_key: Optional[Union[str, List[str], List[List[str]]]]

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

Type

return

read_records(sync_mode: airbyte_protocol.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) Iterable[Mapping[str, Any]]
Param

stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.

retriever: airbyte_cdk.sources.declarative.retrievers.retriever.Retriever
schema_loader: Optional[airbyte_cdk.sources.declarative.schema.schema_loader.SchemaLoader] = None
property state: MutableMapping[str, Any]
property state_checkpoint_interval: Optional[int]

We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but: * In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the

cursor value once at the end of every slice.

  • Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the

    important state is the one at the beginning of the slice

stream_cursor_field: Optional[Union[airbyte_cdk.sources.declarative.interpolation.interpolated_string.InterpolatedString, str]] = None
stream_slices(*, sync_mode: airbyte_protocol.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) Iterable[Optional[Mapping[str, Any]]]

Override to define the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode

  • cursor_field

  • stream_state – we knowingly avoid using stream_state as we want cursors to manage their own state

Returns

airbyte_cdk.sources.declarative.exceptions module

exception airbyte_cdk.sources.declarative.exceptions.ReadException

Bases: Exception

Raise when there is an error reading data from an API Source

airbyte_cdk.sources.declarative.types module

class airbyte_cdk.sources.declarative.types.Record(data: Mapping[str, Any], associated_slice: Optional[Mapping[str, Any]])

Bases: Mapping[str, Any]

property associated_slice: Optional[Mapping[str, Any]]
property data: Mapping[str, Any]

airbyte_cdk.sources.declarative.yaml_declarative_source module

class airbyte_cdk.sources.declarative.yaml_declarative_source.YamlDeclarativeSource(path_to_yaml, debug: bool = False)

Bases: airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource

Declarative source defined by a yaml file

Module contents