xia_engine_bigquery.engine.BigqueryBatchEngine

class xia_engine_bigquery.engine.BigqueryBatchEngine

Bases: BigqueryWriteEngine

Bigquery Engine Writer Using Batch strategy

__init__()

Methods

__init__()

analyze(document_class, analytic_model)

Run the analytic model

backup(document_class[, location, ...])

Backup data of a model.

batch(operations, originals)

Data Batch Modification

compile(document_class, analytic_request[, ...])

Compile the analysis request

connect([document_class])

Connect to the engine

create(document_class, db_content[, doc_id])

Create the document in Bigquery

create_collection(document_class)

Create Collection if needed

create_table(document_class[, is_log_table])

Create table in Bigquery

db_to_display(document_class, db_content[, ...])

Convert data from database form to display form

delete(document_class, doc_id)

Delete a document by using id

drop(document_class)

Drop the given collection

fetch(document_class, *args)

Get document one by one from a list of document id

get(document_class, doc_id)

Get Document

get_bq_table_id(document_class[, is_log_table])

Get BigQuery Table ID

get_connection([document_class])

Get engine connection。 Always using existed one when it is possible

get_decoder(field[, inner_field])

Get Decoder for a field

get_encoder(field[, inner_field])

Get Encoder for a field

get_project_id(document_class)

Get project id and dataset id for the requested model

get_table_info(table_id)

lock(document_class, doc_id[, timeout])

Lock entries for write

merge(document_class[, start, end, purge, ...])

Merge data from log section into main table

parse_search_option(key)

Reference to search method for the specifications

parse_update_option(key)

Reference to update method for the specifications

replicate(document_class, task_list)

Data replication on Bigquery

restore(document_class[, location, ...])

Restore data of a model

scan(_document_class[, _acl_queries, _limit])

Scan the document class and get the document id list

search(_document_class, *args[, ...])

It is a write-only engine, we don't support any search activities

set(document_class, doc_id, db_content)

Overwrite whole document

truncate(document_class)

Remove all data from the given collection

unlock(document_class, doc_id)

Release the for write

update(_document_class, _doc_id, **kwargs)

Update a document

update_doc_id(document_class, db_content, ...)

Update document id to new value

Attributes

OPERATORS

ORDER_TYPES

TIME_PARTITION_CONFIG

UPDATE_TYPES

analyzer

backup_coder

backup_storer

decoders

default_dataset

Default dataset Name

encoders

engine_connector_class

engine_db_shared

engine_default_connector_param

engine_foreign_key_check

engine_param

engine_scope_check

engine_unique_check

key_required

merge_sql_template

scan_and_fetch

scan_sql_template

store_embedded_as_table

support_unknown

classmethod analyze(document_class: Type[BaseDocument], analytic_model: dict)

Run the analytic model

Parameters
  • analytic_model – Analyze model

  • document_class – (subclass of BaseDocument): Document definition

classmethod backup(document_class: Type[BaseDocument], location: Optional[str] = None, data_encode: Optional[str] = None, data_format: Optional[str] = None, data_store: Optional[str] = None, **kwargs)

Backup data of a model. The real implementation must use kwargs to distribute loads

Parameters
  • document_class (subclass of BaseDocument) – Document definition4

  • data_encode (str) – Backup Data Code

  • data_format (str) – Backup Data Format

  • data_store (str) – Backup Data Store location

  • location (str) – Data location to e used by data store

  • **kwargs – parameter to be passed at engine level

classmethod batch(operations: list, originals: dict)

Data Batch Modification

The data will be updated at once or rolled back

Parameters
  • operations – List of operations to be done * op: Operation type. “S” = set, “I” = create, “D” = delete, “U” = update * cls: Document Class * doc_id: Document ID * content: Document Content in Database form

  • originals – Dictionary (Help to roll back) * class: document class name * id: document id * content: document db form

Returns

return True amd empty message if batch is successful, else False with error message

classmethod compile(document_class: Type[BaseDocument], analytic_request: dict, acl_condition=None)

Compile the analysis request

Parameters
  • document_class (subclass of BaseDocument) – Document definition

  • analytic_request – analytic request

  • acl_condition – User Access List transformed to where conditions

Returns

Model}

Return type

A analytic model ready to be executed represented by as dict {Engine

classmethod connect(document_class: Optional[Type[BaseDocument]] = None)

Connect to the engine

Parameters

document_class – (subclass of BaseDocument): Document definition

Returns

Connection

classmethod create(document_class: Type[BaseDocument], db_content: dict, doc_id: Optional[str] = None)

Create the document in Bigquery

Parameters
  • document_class – Document class

  • db_content – database content

  • doc_id – provided document id

Notes

If table doesn’t exist, the target table will be created automatically

classmethod create_collection(document_class: Type[BaseDocument])

Create Collection if needed

Parameters

document_class – document_class

classmethod create_table(document_class: Type[BaseDocument], is_log_table: bool = False)

Create table in Bigquery

Parameters
  • document_class (BaseDocument) – Document class

  • is_log_table (bool) – it is a log table, should add extra information

classmethod db_to_display(document_class: Type[BaseDocument], db_content: dict, lazy: bool = True, catalog: Optional[dict] = None, show_hidden: bool = False)

Convert data from database form to display form

Parameters
  • document_class – Document class

  • db_content – Database Content

  • lazy – Lazy Mode

  • catalog – Data Catalog

  • show_hidden – Show hidden member or not

Returns

document in display form

default_dataset = 'default'

Default dataset Name

classmethod delete(document_class: Type[BaseDocument], doc_id: str)

Delete a document by using id

Parameters
  • document_class (subclass of BaseDocument) – Document definition

  • doc_id – Document ID

classmethod drop(document_class: Type[BaseDocument])

Drop the given collection

Parameters

document_class (subclass of BaseDocument) – Document definition

engine_connector

alias of Client

engine_writer

alias of BigQueryWriteClient

classmethod fetch(document_class: Type[BaseDocument], *args)

Get document one by one from a list of document id

Returns

An iterator for id, document dictionary pair

Comments:

when doc id is empty, it is probably because that the user only has partial read authorizations

classmethod get(document_class: Type[BaseDocument], doc_id: str) dict

Get Document

Parameters
  • document_class (subclass of BaseDocument) – Document definition

  • doc_id – Document ID

Returns

Document content on python dict

classmethod get_bq_table_id(document_class: Type[BaseDocument], is_log_table: bool = False)

Get BigQuery Table ID

Parameters
  • document_class (BaseDocument) – Document class

  • is_log_table (bool) – it is a log table, should add extra information

Returns

table id as string (project.dataset.table)

classmethod get_connection(document_class: Optional[Type[BaseDocument]] = None)

Get engine connection。 Always using existed one when it is possible

Parameters

document_class – (subclass of BaseDocument): Document definition

Returns

Connection

classmethod get_decoder(field: type, inner_field: Optional[type] = None) callable

Get Decoder for a field

Parameters
  • field (type) – class type of field class

  • inner_field (type) – class type of inner field (Such qs ListField)

Returns

Decoder function

classmethod get_encoder(field: type, inner_field: Optional[type] = None) callable

Get Encoder for a field

Parameters
  • field (type) – class type of field class

  • inner_field (type) – class type of inner field (Such qs ListField)

Returns

Encoder function

classmethod get_project_id(document_class: Type[BaseDocument])

Get project id and dataset id for the requested model

Parameters

document_class – Document class

Returns

project id and dataset id in a tuple

classmethod lock(document_class: Type[BaseDocument], doc_id: str, timeout: Optional[int] = None)

Lock entries for write

Parameters
  • document_class (subclass of BaseDocument) – Document definition

  • doc_id (str) – Having predefined doc id, None means could be generated by engine

  • timeout – Timeout for lock

Returns

return True amd empty message if lock is successful, else false with error message

Comments:

Lock need based engine implementation

classmethod merge(document_class: Type[BaseDocument], start: Optional[float] = None, end: Optional[float] = None, purge: bool = False, criteria: Optional[dict] = None)

Merge data from log section into main table

Parameters
  • document_class – (subclass of BaseDocument): Document definition

  • start (timestamp) – Starting time point

  • end (timestamp) – Ending time point

  • purge – will remove the entries from log table after execution

  • criteria – only merge the given criteria

Comments:

This method is designed to keep a high consistency data. All replicated data is kept on the log table. Only merge the data into main table when passed the consistency check

classmethod parse_search_option(key: str)

Reference to search method for the specifications

Parameters

key (str) –

Returns

key, operator, order

classmethod parse_update_option(key: str)

Reference to update method for the specifications

Parameters

key (str) –

Returns

key, update

classmethod replicate(document_class: Type[BaseDocument], task_list: list)

Data replication on Bigquery

Big query is an append-only optimized database, so it is better to keep a log table aside.

Parameters
  • document_class – Python class of document

  • task_list – List of dictionary with the following keys: * id: document id * content: document db form * op: operation type: “I” for insert, “D” for delete, “U” for update, “L” for load

Returns

List of dictionary with the following keys:
  • id: document id

  • op: operation type: “I” for insert, “D” for delete, “U” for update, “L” for load

  • time: time when data is replicated

  • status: status code of HTTP protocol

Return type

task_results

classmethod restore(document_class: Type[BaseDocument], location: Optional[str] = None, data_encode: Optional[str] = None, data_format: Optional[str] = None, data_store: Optional[str] = None, **kwargs)

Restore data of a model

Parameters
  • document_class (subclass of BaseDocument) – Document definition

  • data_encode (str) – Backup Data Code

  • data_format (str) – Backup Data Format

  • data_store (str) – Backup Data Store location

  • location (str) – Data location to e used by data store

  • **kwargs – parameter to be passed at engine level

classmethod scan(_document_class: Type[BaseDocument], _acl_queries: Optional[list] = None, _limit: int = 1000, **kwargs)

Scan the document class and get the document id list

Parameters
  • _document_class (subclass of BaseDocument) – Document definition

  • _acl_queries (list) – Extra queries calculated from user’s access control list

  • _limit (int) – Limited the scan results

  • **kwargs – Named arguments are search string

Notes for search string:
  • key, str pair: single value search

  • key, list pair: array_contains_any search

  • embedded search: a__b means b component of a. a.b means the key’s name is a.b

  • operators: key is end with __op__. The following op are supported:
    • __eq__: Could ignore because it is a by default behavior

    • __lt__, __le__, __gt__, __ge__, __ne__: as is supposed by the name

    • __asc__, __desc__: the result will be ordered by the fields

Attentions:
  • The complex query might raise compatible issues

classmethod search(_document_class: Type[BaseDocument], *args, _acl_queries: Optional[list] = None, _limit: int = 50, **kwargs)

It is a write-only engine, we don’t support any search activities

classmethod set(document_class: Type[BaseDocument], doc_id: str, db_content: dict) str

Overwrite whole document

Parameters
  • document_class (subclass of BaseDocument) – Document definition

  • doc_id – Document ID

  • db_content – content to be put to engine

Returns

Document ID

classmethod truncate(document_class: Type[BaseDocument])

Remove all data from the given collection

Parameters

document_class (subclass of BaseDocument) – Document definition

classmethod unlock(document_class: Type[BaseDocument], doc_id: str)

Release the for write

Parameters
  • document_class (subclass of BaseDocument) – Document definition

  • doc_id (str) – Having predefined doc id, None means could be generated by engine

Returns

return True amd empty message if lock is successful, else false with error message

Comments:

Unlock need based engine implementation

classmethod update(_document_class: Type[BaseDocument], _doc_id: str, **kwargs) dict

Update a document

Parameters
  • _document_class (subclass of BaseDocument) – Document definition

  • _doc_id (str) – Document ID

  • **kwargs – Named keyword for update

Returns

Updated data

Notes for delete string:
  • embedded update: a__b means b component of a. a.b means the key’s name is a.b

  • operators: key is end with __op__. The following op are supported:
    • __append__: Append an item to array

    • __remove__: Remove an item

    • __delete__: Delete the field

classmethod update_doc_id(document_class: Type[BaseDocument], db_content: dict, old_id: str, new_id: str)

Update document id to new value

Parameters
  • document_class (subclass of BaseDocument) – Document definition

  • db_content – content to be put to new engine

  • old_id – old document id

  • new_id – new document id

Returns

new_document_id if the process is successful

Comments:

By default, we return old id(not implemented). When it is implemented in the Engine, will return new document id