Yes, we can use plain curl directly, but usually we want a nice library to connect to an API.
We're going to build a simple, non-production one. We will use Python as an example, but feel free to translate to any language you want.
Are you looking for an async version? Go to Create an auto-updating async API connector after reading this background.
You don't care about the internals and just want a library to get started? Check out the Python Unicat SDK.
The end-result will allow us to use the library like this:
import keyring
secret_api_key = keyring.get_password("unicat-my-project", "my-key-name")
ccapi = UnicatApi("https://unicat.app", "<project gid>")
success, result = ccapi.connect(secret_api_key)
succes, result = ccapi.call("/records/get", {"record": "f2e64fe0-9ffa-4d9a-8750-d561d6542453"})
record = ccapi.data["records"][result["record"]]
print(record["fields"]["en"]["artnr"], record["fields"]["en"]["price"])
The way we use success, result, and fetch the data matches up with the way the Unicat API works - if we haven't checked yet, have a quick look at The basics for sending and retrieving data. We cache all incoming data from the API calls and simple pass on the status through success
and the result-info or error-info through result
.
Note: the secret API key was previously stored in the keyring, see https://pypi.org/project/keyring/
Let's make sure we can connect to the project first. We assume we have a project gid handy, as well as a valid api key for that project.
import requests
import json
import os
class UnicatApi:
"""Connect to a Unicat project and have fun.
Base usage:
ccapi = UnicatApi(CONFIG["url"], CONFIG["project_gid"])
success, result = ccapi.connect(CONFIG["secret_api_key"])
success, result = ccapi.call("/records/root", {"language": "en"})
root_record = ccapi.data["records"][result["root"]]
IMPORTANT:
keep the API key secret, don't store it in code. See
https://pypi.org/project/keyring/ for a safe way to use passwords and API keys.
"""
def __init__(self, base_url, project_gid):
self._base_url = base_url
self._project_gid = project_gid
self._globalapi_url = base_url + "/api"
self._api_url = base_url + "/api/p/" + project_gid
self._auth_header = None
self.data = {
"cc.users": {},
"cc.projects": {},
"cc.projects_members": {},
"cc.languages": {},
"records": {},
"definitions": {},
"classes": {},
"fields": {},
"layouts": {},
"assets": {},
"queries": {},
}
def connect(self, secret_api_key):
return self._call("post", "/auth", {"api_key": secret_api_key})
That's the groundwork done, but it doesn't work yet because we haven't implemented _call
yet.
But we can see that connect
issues a POST request to the api for /auth, specifying the api key as a parameter.
def update_jwt(self, JWT):
if not JWT:
self._auth_header = None
else:
self._auth_header = {"Authorization": f"Bearer {JWT}"}
def _call(self, method, endpoint, data=None):
if endpoint.startswith("//"):
url = self._globalapi_url + endpoint[1:]
else:
url = self._api_url + endpoint
requestmethod = getattr(requests, method)
url = self._api_url + endpoint
if method == "post" and data is None:
data = {}
response = requestmethod(url, headers=self._auth_header, json=data)
if "Authorization" in response.headers:
self.update_jwt(response.headers["Authorization"])
if "WWW-Authenticate" in response.headers:
self.update_jwt(None)
jsontext = response.text
return self._json_response(jsontext)
First we have a little trick using //
-- most of our calls will be on a project API endpoint, but sometimes you need to call the global API. For example, you use /languages/add
to add a localization to the current project, but the list of possible localizations is set globally and can be found using //languages/get
.
What happens then is that we use the requests library, then acquire the GET or POST method we need to use.
For the /auth call, our self._auth_header is still empty, but we should get an Authorization header in the response that contains our JWT - we store that in the self._auth_header so we send it along on subsequent requests. When we get a new Authorization header on subsequent requests, we update the auth_header too - the JWT could be refreshed. If authentication fails, we set auth_header to None and we should re-connect from the client code.
Finally, we get our response body in JSON, so we parse the using self._json_response:
def _json_response(self, jsontext):
try:
jsondata = json.loads(jsontext)
except:
print("ERROR response", jsontext, flush=True)
import sys
sys.exit(1)
if "files" in jsondata["data"]:
jsondata["result"]["data/files"] = jsondata["data"]["files"]
del jsondata["data"]["files"]
if jsondata["success"]:
for key, values in jsondata["data"].items():
if key == "cc.projects_members": # a list, not a dict
self.data[key].update(
{self.project_member_key(value): value for value in values}
)
else:
self.data[key].update(values)
return True, jsondata["result"]
else:
return False, jsondata["result"]
def project_member_key(self, project_member):
return f"{project_member['project_gid']}/{project_member['user_gid']}"
We update the data cache on success, and return True plus the result. If there was a failure, we don't update the data cache, and return False plus the error information.
The cc.projects_members
data is returned as a list, not as a dictionary, so we construct a unique key to store it locally.
Note: the /auth call also conveniently returns a lot of initialization data, such as project and member info, and the lists of definitions, classes, fields, and layouts. See /auth in the API reference for details.
Any API call can return (base64-encoded) files, even failed calls. Therefore, we move those base64-encoded files from the data
property to the result
property that is returned, with the key data/files
.
We now have a working connection. Time to allow arbitrary api calls:
def call(self, endpoint, data=None, method="post"):
return self._call(method, endpoint, data=data)
There you go! A working implementation of a library. You'll probably want to add some checks and a more intelligent cache for a production version of this library, but we got the basics working pretty quickly.
If the internet connection is flaky, some requests may fail. We want to implement a retry mechanism to make our API connector more robust.
For that, we don't use requests
directly for the get
and post
methods, but we use a requests.Session
so we can control the retry parameters. We add this at the start of the UnicatApi
constructor:
def __init__(self, base_url, project_gid, asset_folder=None):
self._requests_session = requests.Session()
retries = requests.adapters.Retry(
total=7, backoff_factor=0.1, allowed_methods=None
)
self._requests_session.mount(
base_url, requests.adapters.HTTPAdapter(max_retries=retries)
)
self._base_url = base_url
…
Then we change the _call
and _call_*
methods to use self._requests_session
instead of requests
directly.
For Retry
, we set allowed_methods
to None
because the default doesn't retry for POST requests, and those are the majority of our requests.
We can use most of the API this way, but there's also the matter of uploading assets. For bonus points, we're implementing this so we can use it as follows:
success, result = ccapi.call("/assets/root")
root_guid = result["root"]
file_path = "~/documentation/upload.svg"
success, result = ccapi.upload(
"/assets/upload", {"parent": root_guid}, file_path
)
asset = ccapi.data["assets"][result["asset"]]
The documentation for /assets/upload is a bit intimidating, but this is hidden from the user by our ccapi library. We'll have to implement it ourselves though:
def _call_upload(self, endpoint, data, filepath):
filesize = os.path.getsize(filepath)
if filesize > 10_000_000:
print()
print(filepath, filesize)
print()
return self._response_error(413, "Request Entity Too Large", info=None)
if not data:
data = {}
url = self._api_url + endpoint
files = {"upload_file": (os.path.basename(filepath), open(filepath, "rb"))}
response = requests.post(url, headers=self._auth_header, data=data, files=files)
if "Authorization" in response.headers:
self.update_jwt(response.headers["Authorization"])
if "WWW-Authenticate" in response.headers:
self.update_jwt(None)
jsontext = response.text
return self._json_response(jsontext)
Instead of adding json data, we're using data and files parameters to create a multipart/form-data request. We restrict uploads to 10 MB max, and return an error response immediately. Other than that, it's mostly the same as _call. The restriction means we also have to import os.path
.
Last thing to do is to make a public method for our upload implementation:
def upload(self, endpoint, data=None, localfilepath=None):
return self._call_upload(endpoint, data=data, filepath=localfilepath)
Now, where can I spend these bonus points?
Uploaded assets are only useful if we can retrieve them later. They become extra useful if we can also retrieve resized or cropped versions, or a preview image for a pdf document.
This is where the /dam endpoint is used, so we need to add it by modifying the constructor:
def __init__(self, base_url, project_gid, asset_folder=None):
self._base_url = base_url
self._project_gid = project_gid
self._asset_folder = asset_folder # added asset folder
if self._asset_folder:
os.makedirs(self._asset_folder, exist_ok=True)
self._api_url = base_url + "/api/p/" + project_gid
self._dam_url = base_url + "/dam/p/" + project_gid # added /dam endpoint
self._auth_header = None
self.data = {}
We want to use it like this
local_pathname = ccapi.download(asset)
# or
options = {
"fill": "400,300",
"type": "jpg",
"dpr": 2,
"optimize": True,
}
local_pathname = ccapi.download_transformed(asset, options)
If we don't want to store the downloads locally and are just interested in the public urls, we go more low-level.
succes, result = ccapi.publish(asset)
download_url = result["public_url"]
# or
options = {
"fill": "400,300",
"type": "jpg",
"dpr": 2,
"optimize": True,
}
succes, result = ccapi.transform(asset, options)
download_url = result["public_url"]
The /dam endpoint is always a GET call, and supports publish
and transform
methods. Both of these expect a storage pathname, which is different from the pathname in the assets tree. The current implementation uses a gid-based filename with versioning. As an example, we have an svg file in /vector/example.svg, with gid 8af797af-7e52-43c9-9d6e-a66bd3028930 and version 31. The pathname will be 8af797af-7e52-43c9-9d6e-a66bd3028930~31.svg (the full gid, ~version, and the extension).
The transform
call supports additional options, that are added to the url as /name=value pairs. See the documentation for dam/transform
for more information.
We do need to present the JWT for authentication and authorization, but calls to the /dam endpoint will never auto-refresh the JWT.
Given these specifications, we build an internal implementation:
def _call_dam(self, endpoint, asset, options=None):
gid = asset["gid"]
version = asset["version"]
_, ext = os.path.splitext(asset["name"])
url = self._dam_url + endpoint + f"/{gid}~{version}{ext}"
if options:
url += "/" + "/".join(
f"{str(key)}={str(value)}" for key, value in options.items()
)
response = requests.get(url, headers=self._auth_header)
if "WWW-Authenticate" in response.headers:
self.update_jwt(None)
jsontext = response.text
return self._json_response(jsontext)
and we expose it as two separate methods on the API:
def publish(self, asset):
return self._call_dam("/publish", asset)
def transform(self, asset, options=None):
if options and "key" not in options:
options["key"] = hash_data(options)
return self._call_dam("/transform", asset, options=options)
If not explicitly set, we add a key to the transform options so we get a unique filename based on the given options. The hash_data function looks like this:
def hash_data(data):
return hash_text(json.dumps(data))
def hash_text(text): # unicode ok
hash = 0
for letter in text:
hash = (31 * hash + ord(letter)) & 0xFFFFFFFF
hash = ((hash + 0x80000000) & 0xFFFFFFFF) - 0x80000000
return ("00000000" + hex(hash)[2:])[-8:]
The result["public_url"] from the publish
and transform
calls can be used to fetch the asset or transformed version of it.
This completes the low-level implementation, but we also want the download
and download_transformed
to store the files locally. The high-level implementation calls the low-level implementation:
def download(self, asset):
if not self._asset_folder:
return False
success, result = self.publish(asset)
if not success:
return None
return self._call_media(result["public_url"], asset["pathname"])
def download_transformed(self, asset, options=None):
if not self._asset_folder:
return False
success, result = self.transform(asset, options)
if not success:
return None
path = os.path.dirname(asset["pathname"])
name = os.path.basename(result["public_url"])
pathname = path + "/" + name
return self._call_media(result["public_url"], pathname)
The actual downloading and storing locally is handed off to the _call_media
method:
def _call_media(self, public_url, pathname):
if not self._asset_folder:
return False
if not pathname.startswith("/"):
pathname = "/" + pathname
abs_pathname = self._asset_folder + pathname
if os.path.isfile(abs_pathname):
return pathname
with requests.get(public_url, stream=True) as response:
if response.status_code != 200:
return False
abs_path = os.path.dirname(abs_pathname)
os.makedirs(abs_path, exist_ok=True)
with open(abs_pathname, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return pathname
This checks if the file already exists locally, and if not, requests it from the server and stores it.
If you use the API and this connector to delete a record, it is still cached in the data store; you'll have to remove it yourself. Now, in a multi-user environment, somebody else may have changed a record, added an asset, or made other changes, while your process is running.
NOTE: every delete (definitions, fields, etc.) will invalidate the cache, so you must make sure to sync the data store afterwards. You can do a few deletes and sync once, no need to call sync after every delete.
You can check for changes using the /sync/get
endpoint. Read through the Sync API documentation, the specifics for /sync/get
are at the bottom.
Let's add this to the API connector.
We want to use it like this
ccapi.sync()
This is a two-step process internally; getting the list of sync-updates, then fetching the relevant data and updating the internal store.
def sync(self):
data = {
"cc_cursor": self.cc_cursor,
"project": self._project_gid,
"cursor": self.project_cursor,
}
success, result = self._call_sync("post", "/get", data=data)
if not success:
return None
self.handle_sync_data(result["sync"])
We need to set up a _call_sync method for the first step.
def __init__(self, base_url, project_gid, asset_folder=None):
…
self._sync_url = base_url + "/sync"
…
def _call_sync(self, method, endpoint, data=None):
url = self._sync_url + endpoint
requestmethod = getattr(requests, method)
if method == "post" and data is None:
data = {}
response = requestmethod(url, headers=self._auth_header, json=data)
if "Authorization" in response.headers:
self.update_jwt(response.headers["Authorization"])
if "WWW-Authenticate" in response.headers:
self.update_jwt(None)
jsontext = response.text
return self._json_response(jsontext)
The second step is handling the returned sync data. The sync data comes in as a list; we handle each item in turn. Deletions can be done immediately by updating the data stores. An insert isn't always of interest to us; a new asset that I'm not using right now? No need to update the storage. But, a change in a definition (and other 'base' data) could have implications for records I'm using, so I need to fetch those from the server. When I do, the stores are automatically updated by _json_response
internally. Updates are only processed when it is about data currently in the stores. After each item is handled we update the local cursor. We should also watch out for backup/restore sync actions. Additionally, syncing can lag behind in some cases, so we check the sync cursor against our current cursors first. Knowing that, let's see how that looks in code:
def handle_sync_data(self, syncdatalist):
# result contains a list of cursor/action/data_type/data_key
# handle each one, updating our cursors as we go
for item in syncdatalist:
# skip lagging syncs (older than our latest cursors)
if item["data_type"] != "jobs":
if item["type"] == "cc":
if self.cc_cursor >= item["cursor"]:
continue
else:
if self.project_cursor >= item["cursor"]:
continue
if item["data_type"] == "cc.version":
if item["data_key"] != self.cc_version:
# alert! version-change mid-program!
print("Server version changed!")
self.cc_version = item["data_key"]
elif item["data_type"] == "jobs":
job = item["data"]
if job["job"] == "backup_project" and job["status"] == "queued":
print("Server database backup started")
elif job["job"] == "backup_project" and job["status"] == "done":
print("Server database backup done")
elif job["job"] == "restore_project" and job["status"] == "queued":
print("Server database restore started")
elif job["job"] == "restore_project" and job["status"] == "done":
print("Server database restored done")
self.init()
elif item["action"] == "DELETE":
# use pop, not del, auto-handles items that aren't in our store
self.data[item["data_type"]].pop(item["data_key"], None)
elif item["action"] == "INSERT":
# we're only interested in inserts that affect our data
# so project-members for our project should fetch the new
# membership, but also the new members
# we're also interested in any base-data for definitions,
# classes, fields, layouts, and queries
# NOTE: fetching data auto-updates our local data-store
if item["data_type"] == "cc.projects_members":
project_gid, user_gid = item["data_key"].split("/")
if project_gid == self._project_gid:
success = self._fetch_syncdataitem(item)
elif item["data_type"] in (
"definitions",
"classes",
"fields",
"layouts",
"queries",
):
success = self._fetch_syncdataitem(item)
elif item["action"] == "UPDATE":
# we're only interested in data we already have locally
if item["data_key"] in self.data[item["data_type"]]:
success = self._fetch_syncdataitem(item)
# always update local cursors
if item["type"] == "cc":
self.cc_cursor = item["cursor"]
else:
self.project_cursor = item["cursor"]
def _fetch_syncdataitem(self, syncdataitem):
type_ = syncdataitem["data_type"]
key = syncdataitem["data_key"]
if type_ == "cc.projects_members":
project, member = key.split("/")
success, result = self.call( # note the // - global api
"//members/get", {"project": project, "member": member}
)
return success
map_calls = {
"cc.users": ["//users/get", "user"], # note the // - global api
"cc.projects": ["//projects/get", "project"], # note the // - global api
"assets": ["/assets/get", "asset"],
"classes": ["/classes/get", "class"],
"definitions": ["/definitions/get", "definition"],
"fields": ["/fields/get", "field"],
"layouts": ["/layouts/get", "layout"],
"queries": ["/queries/get", "query"],
"records": ["/records/get", "record"],
}
map_call = map_calls[type_]
success, result = self.call(map_call[0], {map_call[1]: key})
return success
Note that we could be more efficient by bundling the fetch calls per data type instead of performing them one at a time.
Sure you can, just click on the link to download the API connector. Please note that this isn't production-ready code, so use it as an example or base implementation only. It depends on one external library, requests.
Do you want a more high-level, official client library? You can
pip install unicat
. Check out the Python Unicat SDK.