Have you read Create an API connector? We're basically creating an async version based on that, then adding the auto-updating part. You might want to see that background documentation to follow along, as we're skipping a few steps here.
You don't care about the internals and just want a library to get started? Check out the Python Unicat SDK. That one isn't async, though.
We're building the async verion of our non-production API connector. The end-result will allow us to use the library like this:
import keyring
secret_api_key = keyring.get_password("unicat-my-project", "my-key-name")
ccapi = UnicatApi("https://unicat.app", "<project gid>")
success, result = await ccapi.connect(secret_api_key)
succes, result = await ccapi.call("/records/get", {"record": "f2e64fe0-9ffa-4d9a-8750-d561d6542453"})
record = ccapi.data["records"][result["record"]]
print(record["fields"]["en"]["artnr"], record["fields"]["en"]["price"])
The first step in creating the async version is sprinkling the regular version with async and await keywords in all the right places.
For example, see the changes for the connect
method, before and after.
# sync api
def connect(self, secret_api_key):
self._reset_data()
data = {"api_key": secret_api_key}
return self._call("post", "/auth", data)
# async api
async def connect(self, secret_api_key):
self._reset_data()
data = {"api_key": secret_api_key}
return await self._call("post", "/auth", data)
Once that's done, we have to update the _call
method, because we can no longer use the requests
library; instead, we use aiohttp
. We also add some robustness, using the certifi
library for SSL certificate checking.
import aiohttp
import ssl
import certifi
_sslcontext = ssl.create_default_context(cafile=certifi.where())
…
async def _call(self, method, endpoint, data=None):
if endpoint.startswith("//"):
url = self._globalapi_url + endpoint[1:]
else:
url = self._api_url + endpoint
aiohttpconnector = aiohttp.TCPConnector(ssl=_sslcontext)
async with aiohttp.ClientSession(connector=aiohttpconnector) as session:
if method == "post" and data is None:
data = {}
sessionmethod = getattr(session, method)
async with sessionmethod(
url, headers=self._auth_header, json=data
) as response:
if "Authorization" in response.headers:
self.update_jwt(response.headers["Authorization"])
if "WWW-Authenticate" in response.headers:
self.update_jwt(None)
jsontext = await response.text()
return self._json_response(jsontext)
Pretty similar to the synchronous version (for retry mechanisms, you can have a look at the tenacity
module).
We do the same to the other _call…
methods, _call_upload
, call_dam
, and _call_media
.
async def _call_upload(self, endpoint, data, filepath):
if not data:
data = {}
url = self._api_url + endpoint
formdata = aiohttp.FormData()
for key, value in data.items():
formdata.add_field(key, value)
aiohttpconnector = aiohttp.TCPConnector(ssl=_sslcontext)
async with aiohttp.ClientSession(connector=aiohttpconnector) as session:
filesize = os.path.getsize(filepath)
if filesize > 10_000_000:
print()
print(filepath, filesize)
print()
return self._response_error(413, "Request Entity Too Large", info=None)
with open(filepath, "rb") as upload_file:
formdata.add_field(
"upload_file", upload_file, filename=os.path.basename(filepath)
)
async with session.post(
url, headers=self._auth_header, data=formdata
) as response:
if "Authorization" in response.headers:
self.update_jwt(response.headers["Authorization"])
if "WWW-Authenticate" in response.headers:
self.update_jwt(None)
jsontext = await response.text()
return self._json_response(jsontext)
async def _call_dam(self, endpoint, asset, options=None):
gid = asset["gid"]
version = asset["version"]
_, ext = os.path.splitext(asset["name"])
url = self._dam_url + endpoint + f"/{gid}~{version}{ext}"
if options:
url += "/" + "/".join(
f"{str(key)}={str(value)}" for key, value in options.items()
)
aiohttpconnector = aiohttp.TCPConnector(ssl=_sslcontext)
async with aiohttp.ClientSession(connector=aiohttpconnector) as session:
async with session.get(url, headers=self._auth_header) as response:
if "WWW-Authenticate" in response.headers:
self.update_jwt(None)
jsontext = await response.text()
return self._json_response(jsontext)
async def _call_media(self, public_url, pathname):
if not self._asset_folder:
return False
if not pathname.startswith("/"):
pathname = "/" + pathname
abs_pathname = self._asset_folder + pathname
if os.path.isfile(abs_pathname):
return pathname
aiohttpconnector = aiohttp.TCPConnector(ssl=_sslcontext)
async with aiohttp.ClientSession(connector=aiohttpconnector) as session:
async with session.get(public_url) as response:
if response.status != 200:
return False
abs_path = os.path.dirname(abs_pathname)
os.makedirs(abs_path, exist_ok=True)
with open(abs_pathname, "wb") as f:
async for chunk in response.content.iter_chunked(chunk_size=8192):
f.write(chunk)
return pathname
That's the Async API connector done. But we're missing the intriguing part - auto-updating! You may have noticed we didn't port the sync
method to our async version. That is because we will replace the manual syncing with auto-updating.
More bonus points! We're adding auto-updating by using the Sync API. This means we need to add the python-socketio
library.
Let's set up a test program.
import asyncio
from ccasyncapi import UnicatApi
async def main():
ccapi = UnicatApi(API_CONNECTION["url"], API_CONNECTION["project_gid"])
success, result = await ccapi.connect(API_CONNECTION["secret_api_key"])
succes, result = await ccapi.call("/records/get", {"record": "f2e64fe0-9ffa-4d9a-8750-d561d6542453"})
record_gid = result["record"]
artnr, price = None, None
while True:
record = ccapi.data["records"][record_gid]
en_fields = record["fields"]["en"]
if artnr != en_fields["artnr"] or price != en_fields["price"]:
artnr, price = en_fields["artnr"], en_fields["price"]
print(artnr, price)
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
The idea is that we have fetched a record, and we're modifying that in some other process. With auto-updating, those changes should find their way back into the ccapi data cache. Then, we check every 1 second and print if there are changes.
Note that this loop and the checking is only for demonstration purposes, but imagine you are showing an articletable in your UI and your price data changes (by an ERP-update process running on a different server) and this is instantly reflected in your UI - that's a neat trick.
To get auto-update working, we'll look at the UnicatApi again. The Sync API uses socket.io so we must support that:
…
import socketio
class UnicatApiNamespace(socketio.AsyncClientNamespace):
def set_ccapi(self, ccapi):
self.ccapi = ccapi
async def on_connect(self):
print("SYNC CLIENT connected")
await self.emit(
"initsync",
{
"cc.cursor": self.ccapi.cc_cursor,
"project": self.ccapi.project_gid,
"cursor": self.ccapi.project_cursor,
},
callback=self.handle_sync_data,
)
async def on_disconnect(self):
print("SYNC CLIENT disconnected")
async def on_authorization(self, data):
print("SYNC CLIENT authorization received")
self.ccapi.update_jwt(data["JWT"])
async def on_sync(self, syncdata):
print("SYNC CLIENT sync received")
await self.handle_sync_data(syncdata)
async def handle_sync_data(self, syncdata=None):
# let ccapi handle the actual syncdata if there is any
if syncdata is None:
print("-- sync error - auth?")
return
if not syncdata:
print("-- nothing to sync")
return
await self.ccapi.handle_sync_data(syncdata)
…
We created a socketio namespace that works with the Sync API. When a connection is established, the initsync
is called and the results are processed. Then we wait for sync
events from the server and process that data too.
The handling of sync data is handed back to the API connector. Before we look at that, we must connect to the Sync API whenever we start a connection using the API.
The constructor is the place to set up the socketio client and namespace:
def __init__(self, base_url, project_gid, asset_folder=None):
…
self.sio = socketio.AsyncClient()
self.sio_ns = UnicatApiNamespace("/")
self.sio_ns.set_ccapi(self)
self.sio.register_namespace(self.sio_ns)
We use this client in the connect
method, which now looks like this:
async def connect(self, secret_api_key):
self._reset_data()
data = {"api_key": secret_api_key}
result = await self._call("post", "/auth", data)
await self.sio.connect(
self._base_url,
socketio_path="/sync/io",
auth=self._get_sync_auth,
)
return result
The socketio connect call requires auth
; we provide a _get_sync_auth
function that is called on each reconnect, so the required JWT stays fresh. The JWT is stored as an Authorization header, so we must parse it:
def _get_sync_auth(self):
return {"JWT": self._auth_header["Authorization"][len("Bearer ") :]}
When data comes in, it comes as a list; we handle each item in turn. Deletions can be done immediately by updating the data stores. An insert isn't always of interest to us; a new asset that I'm not using right now? No need to update the storage. But, a change in a definition (and other 'base' data) could have implications for records I'm using, so I need to fetch those from the server. When I do, the stores are automatically updated by _json_response
internally. Updates are only processed when it is about data currently in the stores. After each item is handled we update the local cursor. We should also watch out for backup/restore sync actions. Additionally, syncing can lag behind in some cases, so we check the sync cursor against our current cursors first. Knowing that, let's see how that looks in code:
async def handle_sync_data(self, syncdatalist):
# result contains a list of cursor/action/data_type/data_key
# handle each one, updating our cursors as we go
for item in syncdatalist:
# skip lagging syncs (older than our latest cursors)
if item["data_type"] != "jobs":
if item["type"] == "cc":
if self.cc_cursor >= item["cursor"]:
continue
else:
if self.project_cursor >= item["cursor"]:
continue
if item["data_type"] == "cc.version":
if item["data_key"] != self.cc_version:
# alert! version-change mid-program!
print("Server version changed!")
self.cc_version = item["data_key"]
elif item["data_type"] == "jobs":
job = item["data"]
if job["job"] == "backup_project" and job["status"] == "queued":
print("Server database backup started")
elif job["job"] == "backup_project" and job["status"] == "done":
print("Server database backup done")
elif job["job"] == "restore_project" and job["status"] == "queued":
print("Server database restore started")
elif job["job"] == "restore_project" and job["status"] == "done":
print("Server database restored done")
self.init()
elif item["action"] == "DELETE":
# we add the fetch here to avoid race conditions that may
# happen on quick modify/commit sequences
should_fail = await self._fetch_syncdataitem(item)
# use pop, not del, auto-handles items that aren't in our store
self.data[item["data_type"]].pop(item["data_key"], None)
elif item["action"] == "INSERT":
# we're only interested in inserts that affect our data
# so project-members for our project should fetch the new
# membership, but also the new members
# we're also interested in any base-data for definitions,
# classes, fields, layouts, and queries
# NOTE: fetching data auto-updates our local data-store
if item["data_type"] == "cc.projects_members":
project_gid, user_gid = item["data_key"].split("/")
if project_gid == self._project_gid:
success = await self._fetch_syncdataitem(item)
elif item["data_type"] in (
"definitions",
"classes",
"fields",
"layouts",
"queries",
):
success = await self._fetch_syncdataitem(item)
elif item["action"] == "UPDATE":
# we're only interested in data we already have locally
if item["data_key"] in self.data[item["data_type"]]:
success = await self._fetch_syncdataitem(item)
# always update local cursors
if item["type"] == "cc":
self.cc_cursor = item["cursor"]
else:
self.project_cursor = item["cursor"]
We issue an alert whenever the server-version changes while our program is running.
Not shown yet is the _fetch_syncdataitem
method. The API provides …/get
endpoints that can be used, but we need a mapping to call the correct one based on the sync data item:
async def _fetch_syncdataitem(self, syncdataitem):
type_ = syncdataitem["data_type"]
key = syncdataitem["data_key"]
if type_ == "cc.projects_members":
project, member = key.split("/")
success, result = await self.call( # note the // - global api
"//members/get", {"project": project, "member": member}
)
return success
map_calls = {
"cc.users": ["//users/get", "user"], # note the // - global api
"cc.projects": ["//projects/get", "project"], # note the // - global api
"assets": ["/assets/get", "asset"],
"classes": ["/classes/get", "class"],
"definitions": ["/definitions/get", "definition"],
"fields": ["/fields/get", "field"],
"layouts": ["/layouts/get", "layout"],
"queries": ["/queries/get", "query"],
"records": ["/records/get", "record"],
}
map_call = map_calls[type_]
success, result = await self.call(map_call[0], {map_call[1]: key})
return success
And that's it! An Auto-updating Async API connector (AAAPI-connector?). And bonus points!
Sure you can, just click on the link to download the Auto-updating Async API connector. Please note that this isn't production-ready code, so use it as an example or base implementation only. It depends on three external libraries, aiohttp, socketio, and certifi.
Do you want a more high-level, official client library? You can
pip install unicat
. Check out the Python Unicat SDK. It isn't async though, so if you came here specifically for an async version, use the code above.