PyPI-Server

X-I-A Easy Protobuf Convertor

Introduction

Quick protobuf serialization without any definition. The main use case is for using Bigquery’s Storage Write API.

Requirements

In order to use this module, please using the following python runtime

  • Python 3.9 or 3.10

  • Windows 64bit, Linux or MacOS11+

Quick start

Install the package:

pip install xia-easy-proto

And then create your first test test.py:

from xia_easy_proto import EasyProto


if __name__ == '__main__':
    songs = {"composer": {'given_name': 'Johann', 'family_name': 'Pachelbel'},
             "title": 'Canon in D',
             "year": [1680, 1681]}
    song_class, song_payload = EasyProto.serialize(songs)
    print(song_class)  # It is the message class
    print(song_payload)  # It is the serialized message

What you need to do is giving a python object and then call EasyProto.serialize() and all job is done.

NO MORE precompile / NO MORE message class pre-definition.

Data Format

Structure

The module is designed to hold the json type records. That means the list of python dictionary. The embedded format could be a dictionary or even a list of dictionary.

We apply the same rule as Bigquery tables so any data exported by Bigquery are supported.

Attention, the same as Bigquery, list of List are not supported.

Data Element

Only int, float, str, bool and bytes are supported as data element. The other format will be ignored during the parse. Check FAQ part to get more information about how to deal with other data element such as datetime.

FAQ

1. Why developing this module?

The new Bigquery Storage Write API is hard to use with Python. We must compile the data model at design time which seems to be far away from a pythonic approach.

2. How to improve the performance

When transforming huge amount of data (more than 1G in memory data), please provide a complete example to avoid a content full scan.

Given a simple example : [{“Hello”: 1}, {“World”: 2}, {“Hello”: 3}, {“World”: 4}, …]. The parser won’t know the records only has two column “Hello” and “World” before the end of full scan. So if you could pass sample_data parameter as:

EasyProto.serialize(songs, sample_data=[{"Hello": 1, "World": 2}])

The cpu/ram consumption will be dramatically reduced.

When the first serialization is finished, you will get the message class as the return value. You could use it in later like:

EasyProto.serialize(songs, message_class=song_class)

When you are sure that the data structure won’t change during the whole transfer, you could precise the label parameter, Song for example, like:

EasyProto.serialize(songs, label="Song")

Let’s sort the algorithm by priority:

  1. If label is defined and a compiled message class found under this label, using found one

  2. If message_class is defined, using defined one

  3. If sample_data is given, compile the message_class by using sample_data

  4. Compile the message_class by full scan of payload

3. How to handle complex datatype

Datatype such as Datetime are never stored as datetime in the database. So it is upto you to do the adaptation. For the Bigquery use case, datetime is saved at INTEGER with the value of int(timestamp * 1000000) Anyway, this module is already better than the class streaming API because we support bytes type.

4. How to do data validation

We want to keep the things as simple as possible. You should define your own data validation before providing python data object. Again, by comparing to classical json format, we don’t loss any functionality

5. Where to find the source code

Using this module will be always FREE.

This project will be open sourced when it becomes popular.

Bigquery Integration

Here is the example if you want to put the data song to to bigquery:

import asyncio
from google.protobuf.descriptor_pb2 import DescriptorProto
from google.cloud.bigquery_storage_v1.types.storage import AppendRowsRequest
from google.cloud.bigquery_storage_v1.types.protobuf import ProtoSchema, ProtoRows
from google.cloud.bigquery_storage_v1.services.big_query_write import BigQueryWriteAsyncClient
from xia_easy_proto import EasyProto


songs = {"composer": {'given_name': 'Johann', 'family_name': 'Pachelbel'},
         "title": 'Canon in E',
         "year": [1680, 1681]}
song_class, song_payload = EasyProto.serialize(songs)


async def main():
    stream_path = BigQueryWriteAsyncClient.write_stream_path("xxx", "xxx", "xxx", "_default")
    bq_write_client = BigQueryWriteAsyncClient()
    proto_descriptor = DescriptorProto()
    song_class().DESCRIPTOR.CopyToProto(proto_descriptor)
    proto_schema = ProtoSchema(proto_descriptor=proto_descriptor)
    proto_data = AppendRowsRequest.ProtoData(
        rows=ProtoRows(serialized_rows=song_payload),
        writer_schema=proto_schema
    )
    append_row_request = AppendRowsRequest(
        write_stream=stream_path,
        proto_rows=proto_data
    )
    result = await bq_write_client.append_rows(iter([append_row_request]))
    async for item in result:
        print(item)

if __name__ == "__main__":
    asyncio.run(main())

BQ Table should be:

[
    {
        "name": "composer",
        "type": "RECORD",
        "mode": "NULLABLE",
        "fields": [
            {
                "name": "given_name",
                "type": "STRING",
                "mode": "NULLABLE"
            },
            {
                "name": "family_name",
                "type": "STRING",
                "mode": "NULLABLE"
            }
        ]
    }
    {
        "name": "title",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "lyrics",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "year",
        "type": "INTEGER",
        "mode": "REPEATED"
    }
]