Unicat API How-To's

Up - Home


Create an auto-updating async API connector

Have you read Create an API connector? We're basically creating an async version based on that, then adding the auto-updating part. You might want to see that background documentation to follow along, as we're skipping a few steps here.

You don't care about the internals and just want a library to get started? Check out the Python Unicat SDK. That one isn't async, though.

We're building the async verion of our non-production API connector. The end-result will allow us to use the library like this:

import keyring
secret_api_key = keyring.get_password("unicat-my-project", "my-key-name")

ccapi = UnicatApi("https://unicat.app", "<project gid>")
success, result = await ccapi.connect(secret_api_key)

succes, result = await ccapi.call("/records/get", {"record": "f2e64fe0-9ffa-4d9a-8750-d561d6542453"})
record = ccapi.data["records"][result["record"]]

print(record["fields"]["en"]["artnr"], record["fields"]["en"]["price"])

Show and tell

The first step in creating the async version is sprinkling the regular version with async and await keywords in all the right places.

For example, see the changes for the connect method, before and after.

    # sync api
    def connect(self, secret_api_key):
        self._reset_data()
        data = {"api_key": secret_api_key}
        return self._call("post", "/auth", data)

    # async api
    async def connect(self, secret_api_key):
        self._reset_data()
        data = {"api_key": secret_api_key}
        return await self._call("post", "/auth", data)

Once that's done, we have to update the _call method, because we can no longer use the requests library; instead, we use aiohttp. We also add some robustness, using the certifi library for SSL certificate checking.

import aiohttp
import ssl
import certifi

_sslcontext = ssl.create_default_context(cafile=certifi.where())



    async def _call(self, method, endpoint, data=None):
        if endpoint.startswith("//"):
            url = self._globalapi_url + endpoint[1:]
        else:
            url = self._api_url + endpoint
        aiohttpconnector = aiohttp.TCPConnector(ssl=_sslcontext)
        async with aiohttp.ClientSession(connector=aiohttpconnector) as session:
            if method == "post" and data is None:
                data = {}
            sessionmethod = getattr(session, method)
            async with sessionmethod(
                url, headers=self._auth_header, json=data
            ) as response:
                if "Authorization" in response.headers:
                    self.update_jwt(response.headers["Authorization"])
                if "WWW-Authenticate" in response.headers:
                    self.update_jwt(None)
                jsontext = await response.text()
                return self._json_response(jsontext)

Pretty similar to the synchronous version (for retry mechanisms, you can have a look at the tenacity module).

We do the same to the other _call… methods, _call_upload, call_dam, and _call_media.

    async def _call_upload(self, endpoint, data, filepath):
        if not data:
            data = {}
        url = self._api_url + endpoint
        formdata = aiohttp.FormData()
        for key, value in data.items():
            formdata.add_field(key, value)
        aiohttpconnector = aiohttp.TCPConnector(ssl=_sslcontext)
        async with aiohttp.ClientSession(connector=aiohttpconnector) as session:
            filesize = os.path.getsize(filepath)
            if filesize > 10_000_000:
                print()
                print(filepath, filesize)
                print()
                return self._response_error(413, "Request Entity Too Large", info=None)
            with open(filepath, "rb") as upload_file:
                formdata.add_field(
                    "upload_file", upload_file, filename=os.path.basename(filepath)
                )
                async with session.post(
                    url, headers=self._auth_header, data=formdata
                ) as response:
                    if "Authorization" in response.headers:
                        self.update_jwt(response.headers["Authorization"])
                    if "WWW-Authenticate" in response.headers:
                        self.update_jwt(None)
                    jsontext = await response.text()
                    return self._json_response(jsontext)

    async def _call_dam(self, endpoint, asset, options=None):
        gid = asset["gid"]
        version = asset["version"]
        _, ext = os.path.splitext(asset["name"])
        url = self._dam_url + endpoint + f"/{gid}~{version}{ext}"
        if options:
            url += "/" + "/".join(
                f"{str(key)}={str(value)}" for key, value in options.items()
            )
        aiohttpconnector = aiohttp.TCPConnector(ssl=_sslcontext)
        async with aiohttp.ClientSession(connector=aiohttpconnector) as session:
            async with session.get(url, headers=self._auth_header) as response:
                if "WWW-Authenticate" in response.headers:
                    self.update_jwt(None)
                jsontext = await response.text()
                return self._json_response(jsontext)

    async def _call_media(self, public_url, pathname):
        if not self._asset_folder:
            return False
        if not pathname.startswith("/"):
            pathname = "/" + pathname
        abs_pathname = self._asset_folder + pathname
        if os.path.isfile(abs_pathname):
            return pathname
        aiohttpconnector = aiohttp.TCPConnector(ssl=_sslcontext)
        async with aiohttp.ClientSession(connector=aiohttpconnector) as session:
            async with session.get(public_url) as response:
                if response.status != 200:
                    return False
                abs_path = os.path.dirname(abs_pathname)
                os.makedirs(abs_path, exist_ok=True)
                with open(abs_pathname, "wb") as f:
                    async for chunk in response.content.iter_chunked(chunk_size=8192):
                        f.write(chunk)
        return pathname

That's the Async API connector done. But we're missing the intriguing part - auto-updating! You may have noticed we didn't port the sync method to our async version. That is because we will replace the manual syncing with auto-updating.

Bonus points

More bonus points! We're adding auto-updating by using the Sync API. This means we need to add the python-socketio library.

Let's set up a test program.

import asyncio
from ccasyncapi import UnicatApi

async def main():
    ccapi = UnicatApi(API_CONNECTION["url"], API_CONNECTION["project_gid"])
    success, result = await ccapi.connect(API_CONNECTION["secret_api_key"])
    succes, result = await ccapi.call("/records/get", {"record": "f2e64fe0-9ffa-4d9a-8750-d561d6542453"})
    record_gid = result["record"]
    artnr, price = None, None
    while True:
        record = ccapi.data["records"][record_gid]
        en_fields = record["fields"]["en"]
        if artnr != en_fields["artnr"] or price != en_fields["price"]:
            artnr, price = en_fields["artnr"], en_fields["price"]
            print(artnr, price)
        await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())

The idea is that we have fetched a record, and we're modifying that in some other process. With auto-updating, those changes should find their way back into the ccapi data cache. Then, we check every 1 second and print if there are changes.

Note that this loop and the checking is only for demonstration purposes, but imagine you are showing an articletable in your UI and your price data changes (by an ERP-update process running on a different server) and this is instantly reflected in your UI - that's a neat trick.

To get auto-update working, we'll look at the UnicatApi again. The Sync API uses socket.io so we must support that:


import socketio

class UnicatApiNamespace(socketio.AsyncClientNamespace):
    def set_ccapi(self, ccapi):
        self.ccapi = ccapi

    async def on_connect(self):
        print("SYNC CLIENT connected")
        await self.emit(
            "initsync",
            {
                "cc.cursor": self.ccapi.cc_cursor,
                "project": self.ccapi.project_gid,
                "cursor": self.ccapi.project_cursor,
            },
            callback=self.handle_sync_data,
        )

    async def on_disconnect(self):
        print("SYNC CLIENT disconnected")

    async def on_authorization(self, data):
        print("SYNC CLIENT authorization received")
        self.ccapi.update_jwt(data["JWT"])

    async def on_sync(self, syncdata):
        print("SYNC CLIENT sync received")
        await self.handle_sync_data(syncdata)

    async def handle_sync_data(self, syncdata=None):
        # let ccapi handle the actual syncdata if there is any
        if syncdata is None:
            print("-- sync error - auth?")
            return
        if not syncdata:
            print("-- nothing to sync")
            return
        await self.ccapi.handle_sync_data(syncdata)

We created a socketio namespace that works with the Sync API. When a connection is established, the initsync is called and the results are processed. Then we wait for sync events from the server and process that data too.

The handling of sync data is handed back to the API connector. Before we look at that, we must connect to the Sync API whenever we start a connection using the API.

The constructor is the place to set up the socketio client and namespace:

    def __init__(self, base_url, project_gid, asset_folder=None):
        
        self.sio = socketio.AsyncClient()
        self.sio_ns = UnicatApiNamespace("/")
        self.sio_ns.set_ccapi(self)
        self.sio.register_namespace(self.sio_ns)

We use this client in the connect method, which now looks like this:

    async def connect(self, secret_api_key):
        self._reset_data()
        data = {"api_key": secret_api_key}
        result = await self._call("post", "/auth", data)
        await self.sio.connect(
            self._base_url,
            socketio_path="/sync/io",
            auth=self._get_sync_auth,
        )
        return result

The socketio connect call requires auth; we provide a _get_sync_auth function that is called on each reconnect, so the required JWT stays fresh. The JWT is stored as an Authorization header, so we must parse it:

    def _get_sync_auth(self):
        return {"JWT": self._auth_header["Authorization"][len("Bearer ") :]}

When data comes in, it comes as a list; we handle each item in turn. Deletions can be done immediately by updating the data stores. An insert isn't always of interest to us; a new asset that I'm not using right now? No need to update the storage. But, a change in a definition (and other 'base' data) could have implications for records I'm using, so I need to fetch those from the server. When I do, the stores are automatically updated by _json_response internally. Updates are only processed when it is about data currently in the stores. After each item is handled we update the local cursor. We should also watch out for backup/restore sync actions. Additionally, syncing can lag behind in some cases, so we check the sync cursor against our current cursors first. Knowing that, let's see how that looks in code:

    async def handle_sync_data(self, syncdatalist):
        # result contains a list of cursor/action/data_type/data_key
        # handle each one, updating our cursors as we go
        for item in syncdatalist:
            # skip lagging syncs (older than our latest cursors)
            if item["data_type"] != "jobs":
                if item["type"] == "cc":
                    if self.cc_cursor >= item["cursor"]:
                        continue
                else:
                    if self.project_cursor >= item["cursor"]:
                        continue

            if item["data_type"] == "cc.version":
                if item["data_key"] != self.cc_version:
                    # alert! version-change mid-program!
                    print("Server version changed!")
                self.cc_version = item["data_key"]
            elif item["data_type"] == "jobs":
                job = item["data"]
                if job["job"] == "backup_project" and job["status"] == "queued":
                    print("Server database backup started")
                elif job["job"] == "backup_project" and job["status"] == "done":
                    print("Server database backup done")
                elif job["job"] == "restore_project" and job["status"] == "queued":
                    print("Server database restore started")
                elif job["job"] == "restore_project" and job["status"] == "done":
                    print("Server database restored done")
                    self.init()
            elif item["action"] == "DELETE":
                # we add the fetch here to avoid race conditions that may
                # happen on quick modify/commit sequences
                should_fail = await self._fetch_syncdataitem(item)
                # use pop, not del, auto-handles items that aren't in our store
                self.data[item["data_type"]].pop(item["data_key"], None)
            elif item["action"] == "INSERT":
                # we're only interested in inserts that affect our data
                # so project-members for our project should fetch the new
                # membership, but also the new members
                # we're also interested in any base-data for definitions,
                # classes, fields, layouts, and queries
                # NOTE: fetching data auto-updates our local data-store
                if item["data_type"] == "cc.projects_members":
                    project_gid, user_gid = item["data_key"].split("/")
                    if project_gid == self._project_gid:
                        success = await self._fetch_syncdataitem(item)
                elif item["data_type"] in (
                    "definitions",
                    "classes",
                    "fields",
                    "layouts",
                    "queries",
                ):
                    success = await self._fetch_syncdataitem(item)
            elif item["action"] == "UPDATE":
                # we're only interested in data we already have locally
                if item["data_key"] in self.data[item["data_type"]]:
                    success = await self._fetch_syncdataitem(item)
            # always update local cursors
            if item["type"] == "cc":
                self.cc_cursor = item["cursor"]
            else:
                self.project_cursor = item["cursor"]

We issue an alert whenever the server-version changes while our program is running.

Not shown yet is the _fetch_syncdataitem method. The API provides …/get endpoints that can be used, but we need a mapping to call the correct one based on the sync data item:

    async def _fetch_syncdataitem(self, syncdataitem):
        type_ = syncdataitem["data_type"]
        key = syncdataitem["data_key"]
        if type_ == "cc.projects_members":
            project, member = key.split("/")
            success, result = await self.call(  # note the // - global api
                "//members/get", {"project": project, "member": member}
            )
            return success
        map_calls = {
            "cc.users": ["//users/get", "user"],  # note the // - global api
            "cc.projects": ["//projects/get", "project"],  # note the // - global api
            "assets": ["/assets/get", "asset"],
            "classes": ["/classes/get", "class"],
            "definitions": ["/definitions/get", "definition"],
            "fields": ["/fields/get", "field"],
            "layouts": ["/layouts/get", "layout"],
            "queries": ["/queries/get", "query"],
            "records": ["/records/get", "record"],
        }
        map_call = map_calls[type_]
        success, result = await self.call(map_call[0], {map_call[1]: key})
        return success

And that's it! An Auto-updating Async API connector (AAAPI-connector?). And bonus points!

Can I just download the Auto-updating Async API connector?

Sure you can, just click on the link to download the Auto-updating Async API connector. Please note that this isn't production-ready code, so use it as an example or base implementation only. It depends on three external libraries, aiohttp, socketio, and certifi.

Do you want a more high-level, official client library? You can pip install unicat. Check out the Python Unicat SDK. It isn't async though, so if you came here specifically for an async version, use the code above.