Subpackages

Submodules

airbyte_cdk.sources.streams.http.exceptions module

exception airbyte_cdk.sources.streams.http.exceptions.BaseBackoffException(request: requests.models.PreparedRequest, response: requests.models.Response, error_message: str = '')

Bases: requests.exceptions.HTTPError

exception airbyte_cdk.sources.streams.http.exceptions.DefaultBackoffException(request: requests.models.PreparedRequest, response: requests.models.Response, error_message: str = '')

Bases: airbyte_cdk.sources.streams.http.exceptions.BaseBackoffException

exception airbyte_cdk.sources.streams.http.exceptions.RequestBodyException

Bases: Exception

Raised when there are issues in configuring a request body

exception airbyte_cdk.sources.streams.http.exceptions.UserDefinedBackoffException(backoff: Union[int, float], request: requests.models.PreparedRequest, response: requests.models.Response, error_message: str = '')

Bases: airbyte_cdk.sources.streams.http.exceptions.BaseBackoffException

An exception that exposes how long it attempted to backoff

airbyte_cdk.sources.streams.http.http module

class airbyte_cdk.sources.streams.http.http.HttpStream(authenticator: Optional[Union[requests.auth.AuthBase, airbyte_cdk.sources.streams.http.auth.core.HttpAuthenticator]] = None, api_budget: Optional[airbyte_cdk.sources.streams.call_rate.APIBudget] = None)

Bases: airbyte_cdk.sources.streams.core.Stream, abc.ABC

Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.

property authenticator: airbyte_cdk.sources.streams.http.auth.core.HttpAuthenticator
property availability_strategy: Optional[airbyte_cdk.sources.streams.availability_strategy.AvailabilityStrategy]

The AvailabilityStrategy used to check whether this stream is available.

Type

return

backoff_time(response: requests.models.Response) Optional[float]

Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.

This method is called only if should_backoff() returns True for the input request.

Parameters

response

:return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).

property cache_filename: str

Override if needed. Return the name of cache file Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

clear_cache() None

Clear cached requests for current session, can be called any time

deduplicate_query_params(url: str, params: Optional[Mapping[str, Any]]) Mapping[str, Any]

Remove query parameters from params mapping if they are already encoded in the URL. :param url: URL with :param params: :return:

error_message(response: requests.models.Response) str

Override this method to specify a custom error message which can incorporate the HTTP response received

Parameters

response – The incoming HTTP response from the partner API

Returns

get_error_display_message(exception: BaseException) Optional[str]

Retrieves the user-friendly display message that corresponds to an exception. This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.

The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message(). The method should be overriden as needed to handle any additional exception types.

Parameters

exception – The exception that was raised

Returns

A user-friendly message that indicates the cause of the error

property http_method: str

Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.

property max_retries: Optional[int]

Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.

property max_time: Optional[int]

Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.

must_deduplicate_query_params() bool
abstract next_page_token(response: requests.models.Response) Optional[Mapping[str, Any]]

Override this method to define a pagination strategy.

The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.

Returns

The token for the next page from the input response object. Returning None means there are no more pages to read in this response.

page_size: Optional[int] = None
abstract parse_response(response: requests.models.Response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) Iterable[Mapping[str, Any]]

Parses the raw response object into a list of records. By default, this returns an iterable containing the input. Override to parse differently. :param response: :param stream_state: :param stream_slice: :param next_page_token: :return: An iterable containing the parsed response

classmethod parse_response_error_message(response: requests.models.Response) Optional[str]

Parses the raw response object from a failed request into a user-friendly error message. By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.

Parameters

response

Returns

A user-friendly message that indicates the cause of the error

abstract path(*, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) str

Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return “some_entity”

property raise_on_http_errors: bool

Override if needed. If set to False, allows opting-out of raising HTTP code exception.

read_records(sync_mode: airbyte_protocol.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) Iterable[Union[Mapping[str, Any], airbyte_protocol.models.airbyte_protocol.AirbyteMessage]]

This method should be overridden by subclasses to read records based on the inputs

request_body_data(stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) Optional[Union[Mapping[str, Any], str]]

Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {“key1”: “value1”, “key2”: “value2”} => “key1=value1&key2=value2”

At the same time only one of the ‘request_body_data’ and ‘request_body_json’ functions can be overridden.

request_body_json(stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) Optional[Mapping[str, Any]]

Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.

At the same time only one of the ‘request_body_data’ and ‘request_body_json’ functions can be overridden.

request_headers(stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) Mapping[str, Any]

Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

request_kwargs(stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) Mapping[str, Any]

Override to return a mapping of keyword arguments to be used when creating the HTTP request. Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from this method. Note that these options do not conflict with request-level options such as headers, request params, etc..

request_params(stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) MutableMapping[str, Any]

Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

request_session() requests.sessions.Session

Session factory based on use_cache property and call rate limits (api_budget parameter) :return: instance of request-based session

property retry_factor: float

Override if needed. Specifies factor for backoff policy.

should_retry(response: requests.models.Response) bool

Override to set different conditions for backoff based on the response from the server.

By default, back off on the following HTTP response statuses:
  • 429 (Too Many Requests) indicating rate limiting

  • 500s to handle transient server errors

Unexpected but transient exceptions (connection timeout, DNS resolution failed, etc..) are retried by default.

source_defined_cursor = True
abstract property url_base: str

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return “https://myapi.com/v1/

Type

return

property use_cache: bool

Override if needed. If True, all records will be cached. Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

class airbyte_cdk.sources.streams.http.http.HttpSubStream(parent: airbyte_cdk.sources.streams.http.http.HttpStream, **kwargs: Any)

Bases: airbyte_cdk.sources.streams.http.http.HttpStream, abc.ABC

stream_slices(sync_mode: airbyte_protocol.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) Iterable[Optional[Mapping[str, Any]]]

Override to define the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode

  • cursor_field

  • stream_state

Returns

airbyte_cdk.sources.streams.http.rate_limiting module

airbyte_cdk.sources.streams.http.rate_limiting.default_backoff_handler(max_tries: Optional[int], factor: float, max_time: Optional[int] = None, **kwargs: Any) Callable[[Callable[[requests.models.PreparedRequest, Mapping[str, Any]], requests.models.Response]], Callable[[requests.models.PreparedRequest, Mapping[str, Any]], requests.models.Response]]
airbyte_cdk.sources.streams.http.rate_limiting.user_defined_backoff_handler(max_tries: Optional[int], max_time: Optional[int] = None, **kwargs: Any) Callable[[Callable[[requests.models.PreparedRequest, Mapping[str, Any]], requests.models.Response]], Callable[[requests.models.PreparedRequest, Mapping[str, Any]], requests.models.Response]]

Module contents

class airbyte_cdk.sources.streams.http.HttpStream(authenticator: Optional[Union[requests.auth.AuthBase, airbyte_cdk.sources.streams.http.auth.core.HttpAuthenticator]] = None, api_budget: Optional[airbyte_cdk.sources.streams.call_rate.APIBudget] = None)

Bases: airbyte_cdk.sources.streams.core.Stream, abc.ABC

Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.

property authenticator: airbyte_cdk.sources.streams.http.auth.core.HttpAuthenticator
property availability_strategy: Optional[airbyte_cdk.sources.streams.availability_strategy.AvailabilityStrategy]

The AvailabilityStrategy used to check whether this stream is available.

Type

return

backoff_time(response: requests.models.Response) Optional[float]

Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.

This method is called only if should_backoff() returns True for the input request.

Parameters

response

:return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).

property cache_filename: str

Override if needed. Return the name of cache file Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

clear_cache() None

Clear cached requests for current session, can be called any time

deduplicate_query_params(url: str, params: Optional[Mapping[str, Any]]) Mapping[str, Any]

Remove query parameters from params mapping if they are already encoded in the URL. :param url: URL with :param params: :return:

error_message(response: requests.models.Response) str

Override this method to specify a custom error message which can incorporate the HTTP response received

Parameters

response – The incoming HTTP response from the partner API

Returns

get_error_display_message(exception: BaseException) Optional[str]

Retrieves the user-friendly display message that corresponds to an exception. This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.

The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message(). The method should be overriden as needed to handle any additional exception types.

Parameters

exception – The exception that was raised

Returns

A user-friendly message that indicates the cause of the error

property http_method: str

Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.

property max_retries: Optional[int]

Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.

property max_time: Optional[int]

Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.

must_deduplicate_query_params() bool
abstract next_page_token(response: requests.models.Response) Optional[Mapping[str, Any]]

Override this method to define a pagination strategy.

The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.

Returns

The token for the next page from the input response object. Returning None means there are no more pages to read in this response.

page_size: Optional[int] = None
abstract parse_response(response: requests.models.Response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) Iterable[Mapping[str, Any]]

Parses the raw response object into a list of records. By default, this returns an iterable containing the input. Override to parse differently. :param response: :param stream_state: :param stream_slice: :param next_page_token: :return: An iterable containing the parsed response

classmethod parse_response_error_message(response: requests.models.Response) Optional[str]

Parses the raw response object from a failed request into a user-friendly error message. By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.

Parameters

response

Returns

A user-friendly message that indicates the cause of the error

abstract path(*, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) str

Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return “some_entity”

property raise_on_http_errors: bool

Override if needed. If set to False, allows opting-out of raising HTTP code exception.

read_records(sync_mode: airbyte_protocol.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) Iterable[Union[Mapping[str, Any], airbyte_protocol.models.airbyte_protocol.AirbyteMessage]]

This method should be overridden by subclasses to read records based on the inputs

request_body_data(stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) Optional[Union[Mapping[str, Any], str]]

Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {“key1”: “value1”, “key2”: “value2”} => “key1=value1&key2=value2”

At the same time only one of the ‘request_body_data’ and ‘request_body_json’ functions can be overridden.

request_body_json(stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) Optional[Mapping[str, Any]]

Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.

At the same time only one of the ‘request_body_data’ and ‘request_body_json’ functions can be overridden.

request_headers(stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) Mapping[str, Any]

Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

request_kwargs(stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) Mapping[str, Any]

Override to return a mapping of keyword arguments to be used when creating the HTTP request. Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from this method. Note that these options do not conflict with request-level options such as headers, request params, etc..

request_params(stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) MutableMapping[str, Any]

Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

request_session() requests.sessions.Session

Session factory based on use_cache property and call rate limits (api_budget parameter) :return: instance of request-based session

property retry_factor: float

Override if needed. Specifies factor for backoff policy.

should_retry(response: requests.models.Response) bool

Override to set different conditions for backoff based on the response from the server.

By default, back off on the following HTTP response statuses:
  • 429 (Too Many Requests) indicating rate limiting

  • 500s to handle transient server errors

Unexpected but transient exceptions (connection timeout, DNS resolution failed, etc..) are retried by default.

source_defined_cursor = True
abstract property url_base: str

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return “https://myapi.com/v1/

Type

return

property use_cache: bool

Override if needed. If True, all records will be cached. Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

class airbyte_cdk.sources.streams.http.HttpSubStream(parent: airbyte_cdk.sources.streams.http.http.HttpStream, **kwargs: Any)

Bases: airbyte_cdk.sources.streams.http.http.HttpStream, abc.ABC

stream_slices(sync_mode: airbyte_protocol.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) Iterable[Optional[Mapping[str, Any]]]

Override to define the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode

  • cursor_field

  • stream_state

Returns

exception airbyte_cdk.sources.streams.http.UserDefinedBackoffException(backoff: Union[int, float], request: requests.models.PreparedRequest, response: requests.models.Response, error_message: str = '')

Bases: airbyte_cdk.sources.streams.http.exceptions.BaseBackoffException

An exception that exposes how long it attempted to backoff