Submodules

airbyte_cdk.sources.utils.casing module

airbyte_cdk.sources.utils.casing.camel_to_snake(s: str) str

airbyte_cdk.sources.utils.catalog_helpers module

class airbyte_cdk.sources.utils.catalog_helpers.CatalogHelper

Bases: object

static coerce_catalog_as_full_refresh(catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog) airbyte_protocol.models.airbyte_protocol.AirbyteCatalog

Updates the sync mode on all streams in this catalog to be full refresh

airbyte_cdk.sources.utils.schema_helpers module

class airbyte_cdk.sources.utils.schema_helpers.InternalConfig(*, _limit: int = None, _page_size: int = None)

Bases: pydantic.main.BaseModel

KEYWORDS: ClassVar[set[str]] = {'_limit', '_page_size'}
dict(*args: Any, **kwargs: Any) dict[str, Any]

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

is_limit_reached(records_counter: int) bool

Check if record count reached limit set by internal config. :param records_counter - number of records already red :return True if limit reached, False otherwise

limit: int
page_size: int
class airbyte_cdk.sources.utils.schema_helpers.JsonFileLoader(uri_base: str, shared: str)

Bases: object

Custom json file loader to resolve references to resources located in “shared” directory. We need this for compatability with existing schemas cause all of them have references pointing to shared_schema.json file instead of shared/shared_schema.json

class airbyte_cdk.sources.utils.schema_helpers.ResourceSchemaLoader(package_name: str)

Bases: object

JSONSchema loader from package resources

get_schema(name: str) dict[str, Any]

This method retrieves a JSON schema from the schemas/ folder.

The expected file structure is to have all top-level schemas (corresponding to streams) in the “schemas/” folder, with any shared $refs living inside the “schemas/shared/” folder. For example:

schemas/shared/<shared_definition>.json schemas/<name>.json # contains a $ref to shared_definition schemas/<name2>.json # contains a $ref to shared_definition

airbyte_cdk.sources.utils.schema_helpers.check_config_against_spec_or_exit(config: Mapping[str, Any], spec: airbyte_protocol.models.airbyte_protocol.ConnectorSpecification) None

Check config object against spec. In case of spec is invalid, throws an exception with validation error description.

:param config - config loaded from file specified over command line :param spec - spec object generated by connector

airbyte_cdk.sources.utils.schema_helpers.expand_refs(schema: Any) None

Iterate over schema and replace all occurrences of $ref with their definitions.

Parameters

schema – schema that will be patched

airbyte_cdk.sources.utils.schema_helpers.rename_key(schema: Any, old_key: str, new_key: str) None

Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive.”

Parameters
  • schema – schema that will be patched

  • old_key – name of the key to replace

  • new_key – new name of the key

Scan resolved schema and convert jsonref.JsonRef object to JSON serializable dict.

:param obj - jsonschema object with ref field resolved. :return JSON serializable object with references without external dependencies.

airbyte_cdk.sources.utils.schema_helpers.split_config(config: Mapping[str, Any]) Tuple[dict[str, Any], airbyte_cdk.sources.utils.schema_helpers.InternalConfig]

Break config map object into 2 instances: first is a dict with user defined configuration and second is internal config that contains private keys for acceptance test configuration.

:param

config - Dict object that has been loaded from config file.

:return tuple of user defined config dict with filtered out internal parameters and connector acceptance test internal config object.

airbyte_cdk.sources.utils.schema_models module

class airbyte_cdk.sources.utils.schema_models.AllOptional(name, bases, namespaces, **kwargs)

Bases: pydantic.main.ModelMetaclass

Metaclass for marking all Pydantic model fields as Optional Here is example of declaring model using this metaclass like: ‘’’

class MyModel(BaseModel, metaclass=AllOptional):

a: str b: str

‘’’ it is an equivalent of: ‘’’

class MyModel(BaseModel):

a: Optional[str] b: Optional[str]

‘’’ It would make code more clear and eliminate a lot of manual work.

class airbyte_cdk.sources.utils.schema_models.BaseSchemaModel(**extra_data: Any)

Bases: pydantic.main.BaseModel

Base class for all schema models. It has some extra schema postprocessing. Can be used in combination with AllOptional metaclass

class Config

Bases: object

extra = 'allow'
classmethod schema_extra(schema: Dict[str, Any], model: Type[pydantic.main.BaseModel]) None

Modify generated jsonschema, remove “title”, “description” and “required” fields.

Pydantic doesn’t treat Union[None, Any] type correctly when generate jsonschema, so we can’t set field as nullable (i.e. field that can have either null and non-null values), We generate this jsonschema value manually.

Parameters
  • schema – generated jsonschema

  • model

classmethod schema(*args, **kwargs) Dict[str, Any]

We’re overriding the schema classmethod to enable some post-processing

airbyte_cdk.sources.utils.transform module

class airbyte_cdk.sources.utils.transform.TransformConfig(value)

Bases: enum.Flag

TypeTransformer class config. Configs can be combined using bitwise or operator e.g.

` TransformConfig.DefaultSchemaNormalization | TransformConfig.CustomSchemaNormalization `

CustomSchemaNormalization = 4
DefaultSchemaNormalization = 2
NoTransform = 1
class airbyte_cdk.sources.utils.transform.TypeTransformer(config: airbyte_cdk.sources.utils.transform.TransformConfig)

Bases: object

Class for transforming object before output.

static default_convert(original_item: Any, subschema: Dict[str, Any]) Any

Default transform function that is used when TransformConfig.DefaultSchemaNormalization flag set. :param original_item original value of field. :param subschema part of the jsonschema containing field type/format data. :return transformed field value.

get_error_message(e: jsonschema.exceptions.ValidationError) str
registerCustomTransform(normalization_callback: Callable[[Any, Dict[str, Any]], Any]) Callable

Register custom normalization callback. :param normalization_callback function to be used for value normalization. Takes original value and part type schema. Should return normalized value. See docs/connector-development/cdk-python/schemas.md for details. :return Same callbeck, this is usefull for using registerCustomTransform function as decorator.

transform(record: Dict[str, Any], schema: Mapping[str, Any])

Normalize and validate according to config. :param record: record instance for normalization/transformation. All modification are done by modifying existent object. :param schema: object’s jsonschema for normalization.

Module contents