Submodules
airbyte_cdk.sources.utils.casing module
- airbyte_cdk.sources.utils.casing.camel_to_snake(s: str) str
airbyte_cdk.sources.utils.catalog_helpers module
- class airbyte_cdk.sources.utils.catalog_helpers.CatalogHelper
Bases:
object
- static coerce_catalog_as_full_refresh(catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog) airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
Updates the sync mode on all streams in this catalog to be full refresh
airbyte_cdk.sources.utils.schema_helpers module
- class airbyte_cdk.sources.utils.schema_helpers.InternalConfig(*, _limit: int = None, _page_size: int = None)
Bases:
pydantic.main.BaseModel
- KEYWORDS: ClassVar[set[str]] = {'_limit', '_page_size'}
- dict(*args: Any, **kwargs: Any) dict[str, Any]
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
- is_limit_reached(records_counter: int) bool
Check if record count reached limit set by internal config. :param records_counter - number of records already red :return True if limit reached, False otherwise
- limit: int
- page_size: int
- class airbyte_cdk.sources.utils.schema_helpers.JsonFileLoader(uri_base: str, shared: str)
Bases:
object
Custom json file loader to resolve references to resources located in “shared” directory. We need this for compatability with existing schemas cause all of them have references pointing to shared_schema.json file instead of shared/shared_schema.json
- class airbyte_cdk.sources.utils.schema_helpers.ResourceSchemaLoader(package_name: str)
Bases:
object
JSONSchema loader from package resources
- get_schema(name: str) dict[str, Any]
This method retrieves a JSON schema from the schemas/ folder.
The expected file structure is to have all top-level schemas (corresponding to streams) in the “schemas/” folder, with any shared $refs living inside the “schemas/shared/” folder. For example:
schemas/shared/<shared_definition>.json schemas/<name>.json # contains a $ref to shared_definition schemas/<name2>.json # contains a $ref to shared_definition
- airbyte_cdk.sources.utils.schema_helpers.check_config_against_spec_or_exit(config: Mapping[str, Any], spec: airbyte_protocol.models.airbyte_protocol.ConnectorSpecification) None
Check config object against spec. In case of spec is invalid, throws an exception with validation error description.
:param config - config loaded from file specified over command line :param spec - spec object generated by connector
- airbyte_cdk.sources.utils.schema_helpers.expand_refs(schema: Any) None
Iterate over schema and replace all occurrences of $ref with their definitions.
- Parameters
schema – schema that will be patched
- airbyte_cdk.sources.utils.schema_helpers.rename_key(schema: Any, old_key: str, new_key: str) None
Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive.”
- Parameters
schema – schema that will be patched
old_key – name of the key to replace
new_key – new name of the key
- airbyte_cdk.sources.utils.schema_helpers.resolve_ref_links(obj: Any) Any
Scan resolved schema and convert jsonref.JsonRef object to JSON serializable dict.
:param obj - jsonschema object with ref field resolved. :return JSON serializable object with references without external dependencies.
- airbyte_cdk.sources.utils.schema_helpers.split_config(config: Mapping[str, Any]) Tuple[dict[str, Any], airbyte_cdk.sources.utils.schema_helpers.InternalConfig]
Break config map object into 2 instances: first is a dict with user defined configuration and second is internal config that contains private keys for acceptance test configuration.
- :param
config - Dict object that has been loaded from config file.
:return tuple of user defined config dict with filtered out internal parameters and connector acceptance test internal config object.
airbyte_cdk.sources.utils.schema_models module
- class airbyte_cdk.sources.utils.schema_models.AllOptional(name, bases, namespaces, **kwargs)
Bases:
pydantic.main.ModelMetaclass
Metaclass for marking all Pydantic model fields as Optional Here is example of declaring model using this metaclass like: ‘’’
- class MyModel(BaseModel, metaclass=AllOptional):
a: str b: str
‘’’ it is an equivalent of: ‘’’
- class MyModel(BaseModel):
a: Optional[str] b: Optional[str]
‘’’ It would make code more clear and eliminate a lot of manual work.
- class airbyte_cdk.sources.utils.schema_models.BaseSchemaModel(**extra_data: Any)
Bases:
pydantic.main.BaseModel
Base class for all schema models. It has some extra schema postprocessing. Can be used in combination with AllOptional metaclass
- class Config
Bases:
object
- extra = 'allow'
- classmethod schema_extra(schema: Dict[str, Any], model: Type[pydantic.main.BaseModel]) None
Modify generated jsonschema, remove “title”, “description” and “required” fields.
Pydantic doesn’t treat Union[None, Any] type correctly when generate jsonschema, so we can’t set field as nullable (i.e. field that can have either null and non-null values), We generate this jsonschema value manually.
- Parameters
schema – generated jsonschema
model –
- classmethod schema(*args, **kwargs) Dict[str, Any]
We’re overriding the schema classmethod to enable some post-processing
airbyte_cdk.sources.utils.transform module
- class airbyte_cdk.sources.utils.transform.TransformConfig(value)
Bases:
enum.Flag
- TypeTransformer class config. Configs can be combined using bitwise or operator e.g.
` TransformConfig.DefaultSchemaNormalization | TransformConfig.CustomSchemaNormalization `
- CustomSchemaNormalization = 4
- DefaultSchemaNormalization = 2
- NoTransform = 1
- class airbyte_cdk.sources.utils.transform.TypeTransformer(config: airbyte_cdk.sources.utils.transform.TransformConfig)
Bases:
object
Class for transforming object before output.
- static default_convert(original_item: Any, subschema: Dict[str, Any]) Any
Default transform function that is used when TransformConfig.DefaultSchemaNormalization flag set. :param original_item original value of field. :param subschema part of the jsonschema containing field type/format data. :return transformed field value.
- get_error_message(e: jsonschema.exceptions.ValidationError) str
- registerCustomTransform(normalization_callback: Callable[[Any, Dict[str, Any]], Any]) Callable
Register custom normalization callback. :param normalization_callback function to be used for value normalization. Takes original value and part type schema. Should return normalized value. See docs/connector-development/cdk-python/schemas.md for details. :return Same callbeck, this is usefull for using registerCustomTransform function as decorator.
- transform(record: Dict[str, Any], schema: Mapping[str, Any])
Normalize and validate according to config. :param record: record instance for normalization/transformation. All modification are done by modifying existent object. :param schema: object’s jsonschema for normalization.