Unicat API How-To's

Up - Home


Create an API connector

Yes, we can use plain curl directly, but usually we want a nice library to connect to an API.

We're going to build a simple, non-production one. We will use Python as an example, but feel free to translate to any language you want.

Are you looking for an async version? Go to Create an auto-updating async API connector after reading this background.

You don't care about the internals and just want a library to get started? Check out the Python Unicat SDK.

The end-result will allow us to use the library like this:

import keyring
secret_api_key = keyring.get_password("unicat-my-project", "my-key-name")

ccapi = UnicatApi("https://unicat.app", "<project gid>")
success, result = ccapi.connect(secret_api_key)

succes, result = ccapi.call("/records/get", {"record": "f2e64fe0-9ffa-4d9a-8750-d561d6542453"})
record = ccapi.data["records"][result["record"]]

print(record["fields"]["en"]["artnr"], record["fields"]["en"]["price"])

The way we use success, result, and fetch the data matches up with the way the Unicat API works - if we haven't checked yet, have a quick look at The basics for sending and retrieving data. We cache all incoming data from the API calls and simple pass on the status through success and the result-info or error-info through result.

Note: the secret API key was previously stored in the keyring, see https://pypi.org/project/keyring/

Show and tell

Let's make sure we can connect to the project first. We assume we have a project gid handy, as well as a valid api key for that project.

import requests
import json
import os


class UnicatApi:
    """Connect to a Unicat project and have fun.

    Base usage:

        ccapi = UnicatApi(CONFIG["url"], CONFIG["project_gid"])
        success, result = ccapi.connect(CONFIG["secret_api_key"])

        success, result = ccapi.call("/records/root", {"language": "en"})
        root_record = ccapi.data["records"][result["root"]]

    IMPORTANT:
        keep the API key secret, don't store it in code. See
        https://pypi.org/project/keyring/ for a safe way to use passwords and API keys.
    """

    def __init__(self, base_url, project_gid):
        self._base_url = base_url
        self._project_gid = project_gid
        self._globalapi_url = base_url + "/api"
        self._api_url = base_url + "/api/p/" + project_gid
        self._auth_header = None
        self.data = {
            "cc.users": {},
            "cc.projects": {},
            "cc.projects_members": {},
            "cc.languages": {},
            "records": {},
            "definitions": {},
            "classes": {},
            "fields": {},
            "layouts": {},
            "assets": {},
            "queries": {},
        }

    def connect(self, secret_api_key):
        return self._call("post", "/auth", {"api_key": secret_api_key})

That's the groundwork done, but it doesn't work yet because we haven't implemented _call yet.

But we can see that connect issues a POST request to the api for /auth, specifying the api key as a parameter.

    def update_jwt(self, JWT):
        if not JWT:
            self._auth_header = None
        else:
            self._auth_header = {"Authorization": f"Bearer {JWT}"}

    def _call(self, method, endpoint, data=None):
        if endpoint.startswith("//"):
            url = self._globalapi_url + endpoint[1:]
        else:
            url = self._api_url + endpoint
        requestmethod = getattr(requests, method)
        url = self._api_url + endpoint
        if method == "post" and data is None:
            data = {}
        response = requestmethod(url, headers=self._auth_header, json=data)
        if "Authorization" in response.headers:
            self.update_jwt(response.headers["Authorization"])
        if "WWW-Authenticate" in response.headers:
            self.update_jwt(None)
        jsontext = response.text
        return self._json_response(jsontext)

First we have a little trick using // -- most of our calls will be on a project API endpoint, but sometimes you need to call the global API. For example, you use /languages/add to add a localization to the current project, but the list of possible localizations is set globally and can be found using //languages/get.

What happens then is that we use the requests library, then acquire the GET or POST method we need to use.

For the /auth call, our self._auth_header is still empty, but we should get an Authorization header in the response that contains our JWT - we store that in the self._auth_header so we send it along on subsequent requests. When we get a new Authorization header on subsequent requests, we update the auth_header too - the JWT could be refreshed. If authentication fails, we set auth_header to None and we should re-connect from the client code.

Finally, we get our response body in JSON, so we parse the using self._json_response:

    def _json_response(self, jsontext):
        try:
            jsondata = json.loads(jsontext)
        except:
            print("ERROR response", jsontext, flush=True)
            import sys

            sys.exit(1)

        if "files" in jsondata["data"]:
            jsondata["result"]["data/files"] = jsondata["data"]["files"]
            del jsondata["data"]["files"]

        if jsondata["success"]:
            for key, values in jsondata["data"].items():
                if key == "cc.projects_members":  # a list, not a dict
                    self.data[key].update(
                        {self.project_member_key(value): value for value in values}
                    )
                else:
                    self.data[key].update(values)
            return True, jsondata["result"]
        else:
            return False, jsondata["result"]

    def project_member_key(self, project_member):
        return f"{project_member['project_gid']}/{project_member['user_gid']}"

We update the data cache on success, and return True plus the result. If there was a failure, we don't update the data cache, and return False plus the error information.

The cc.projects_members data is returned as a list, not as a dictionary, so we construct a unique key to store it locally.

Note: the /auth call also conveniently returns a lot of initialization data, such as project and member info, and the lists of definitions, classes, fields, and layouts. See /auth in the API reference for details.

Any API call can return (base64-encoded) files, even failed calls. Therefore, we move those base64-encoded files from the data property to the result property that is returned, with the key data/files.

We now have a working connection. Time to allow arbitrary api calls:

    def call(self, endpoint, data=None, method="post"):
        return self._call(method, endpoint, data=data)

There you go! A working implementation of a library. You'll probably want to add some checks and a more intelligent cache for a production version of this library, but we got the basics working pretty quickly.

A more robust version

If the internet connection is flaky, some requests may fail. We want to implement a retry mechanism to make our API connector more robust.

For that, we don't use requests directly for the get and post methods, but we use a requests.Session so we can control the retry parameters. We add this at the start of the UnicatApi constructor:

    def __init__(self, base_url, project_gid, asset_folder=None):
        self._requests_session = requests.Session()
        retries = requests.adapters.Retry(
            total=7, backoff_factor=0.1, allowed_methods=None
        )
        self._requests_session.mount(
            base_url, requests.adapters.HTTPAdapter(max_retries=retries)
        )
        self._base_url = base_url
        

Then we change the _call and _call_* methods to use self._requests_session instead of requests directly.

For Retry, we set allowed_methods to None because the default doesn't retry for POST requests, and those are the majority of our requests.

Bonus points

We can use most of the API this way, but there's also the matter of uploading assets. For bonus points, we're implementing this so we can use it as follows:

    success, result = ccapi.call("/assets/root")
    root_guid = result["root"]

    file_path = "~/documentation/upload.svg"
    success, result = ccapi.upload(
        "/assets/upload", {"parent": root_guid}, file_path
    )
    asset = ccapi.data["assets"][result["asset"]]

The documentation for /assets/upload is a bit intimidating, but this is hidden from the user by our ccapi library. We'll have to implement it ourselves though:

    def _call_upload(self, endpoint, data, filepath):
        filesize = os.path.getsize(filepath)
        if filesize > 10_000_000:
            print()
            print(filepath, filesize)
            print()
            return self._response_error(413, "Request Entity Too Large", info=None)
        if not data:
            data = {}
        url = self._api_url + endpoint
        files = {"upload_file": (os.path.basename(filepath), open(filepath, "rb"))}
        response = requests.post(url, headers=self._auth_header, data=data, files=files)
        if "Authorization" in response.headers:
            self.update_jwt(response.headers["Authorization"])
        if "WWW-Authenticate" in response.headers:
            self.update_jwt(None)
        jsontext = response.text
        return self._json_response(jsontext)

Instead of adding json data, we're using data and files parameters to create a multipart/form-data request. We restrict uploads to 10 MB max, and return an error response immediately. Other than that, it's mostly the same as _call. The restriction means we also have to import os.path.

Last thing to do is to make a public method for our upload implementation:

    def upload(self, endpoint, data=None, localfilepath=None):
        return self._call_upload(endpoint, data=data, filepath=localfilepath)

Now, where can I spend these bonus points?

Extend the API connector for asset downloads and previews

Uploaded assets are only useful if we can retrieve them later. They become extra useful if we can also retrieve resized or cropped versions, or a preview image for a pdf document.

This is where the /dam endpoint is used, so we need to add it by modifying the constructor:

    def __init__(self, base_url, project_gid, asset_folder=None):
        self._base_url = base_url
        self._project_gid = project_gid
        self._asset_folder = asset_folder  # added asset folder
        if self._asset_folder:
            os.makedirs(self._asset_folder, exist_ok=True)
        self._api_url = base_url + "/api/p/" + project_gid
        self._dam_url = base_url + "/dam/p/" + project_gid  # added /dam endpoint
        self._auth_header = None
        self.data = {}

We want to use it like this

local_pathname = ccapi.download(asset)

# or

options = {
    "fill": "400,300",
    "type": "jpg",
    "dpr": 2,
    "optimize": True,
}
local_pathname = ccapi.download_transformed(asset, options)

If we don't want to store the downloads locally and are just interested in the public urls, we go more low-level.

succes, result = ccapi.publish(asset)
download_url = result["public_url"]

# or

options = {
    "fill": "400,300",
    "type": "jpg",
    "dpr": 2,
    "optimize": True,
}
succes, result = ccapi.transform(asset, options)
download_url = result["public_url"]

The /dam endpoint is always a GET call, and supports publish and transform methods. Both of these expect a storage pathname, which is different from the pathname in the assets tree. The current implementation uses a gid-based filename with versioning. As an example, we have an svg file in /vector/example.svg, with gid 8af797af-7e52-43c9-9d6e-a66bd3028930 and version 31. The pathname will be 8af797af-7e52-43c9-9d6e-a66bd3028930~31.svg (the full gid, ~version, and the extension).

The transform call supports additional options, that are added to the url as /name=value pairs. See the documentation for dam/transform for more information.

We do need to present the JWT for authentication and authorization, but calls to the /dam endpoint will never auto-refresh the JWT.

Given these specifications, we build an internal implementation:

    def _call_dam(self, endpoint, asset, options=None):
        gid = asset["gid"]
        version = asset["version"]
        _, ext = os.path.splitext(asset["name"])
        url = self._dam_url + endpoint + f"/{gid}~{version}{ext}"
        if options:
            url += "/" + "/".join(
                f"{str(key)}={str(value)}" for key, value in options.items()
            )
        response = requests.get(url, headers=self._auth_header)
        if "WWW-Authenticate" in response.headers:
            self.update_jwt(None)
        jsontext = response.text
        return self._json_response(jsontext)

and we expose it as two separate methods on the API:

    def publish(self, asset):
        return self._call_dam("/publish", asset)

    def transform(self, asset, options=None):
        if options and "key" not in options:
            options["key"] = hash_data(options)
        return self._call_dam("/transform", asset, options=options)

If not explicitly set, we add a key to the transform options so we get a unique filename based on the given options. The hash_data function looks like this:

def hash_data(data):
    return hash_text(json.dumps(data))

def hash_text(text):  # unicode ok
    hash = 0
    for letter in text:
        hash = (31 * hash + ord(letter)) & 0xFFFFFFFF
    hash = ((hash + 0x80000000) & 0xFFFFFFFF) - 0x80000000
    return ("00000000" + hex(hash)[2:])[-8:]

The result["public_url"] from the publish and transform calls can be used to fetch the asset or transformed version of it.

This completes the low-level implementation, but we also want the download and download_transformed to store the files locally. The high-level implementation calls the low-level implementation:

    def download(self, asset):
        if not self._asset_folder:
            return False
        success, result = self.publish(asset)
        if not success:
            return None
        return self._call_media(result["public_url"], asset["pathname"])

    def download_transformed(self, asset, options=None):
        if not self._asset_folder:
            return False
        success, result = self.transform(asset, options)
        if not success:
            return None
        path = os.path.dirname(asset["pathname"])
        name = os.path.basename(result["public_url"])
        pathname = path + "/" + name
        return self._call_media(result["public_url"], pathname)

The actual downloading and storing locally is handed off to the _call_media method:

    def _call_media(self, public_url, pathname):
        if not self._asset_folder:
            return False
        if not pathname.startswith("/"):
            pathname = "/" + pathname
        abs_pathname = self._asset_folder + pathname
        if os.path.isfile(abs_pathname):
            return pathname
        with requests.get(public_url, stream=True) as response:
            if response.status_code != 200:
                return False
            abs_path = os.path.dirname(abs_pathname)
            os.makedirs(abs_path, exist_ok=True)
            with open(abs_pathname, "wb") as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
        return pathname

This checks if the file already exists locally, and if not, requests it from the server and stores it.

Syncing data

If you use the API and this connector to delete a record, it is still cached in the data store; you'll have to remove it yourself. Now, in a multi-user environment, somebody else may have changed a record, added an asset, or made other changes, while your process is running.

NOTE: every delete (definitions, fields, etc.) will invalidate the cache, so you must make sure to sync the data store afterwards. You can do a few deletes and sync once, no need to call sync after every delete.

You can check for changes using the /sync/get endpoint. Read through the Sync API documentation, the specifics for /sync/get are at the bottom.

Let's add this to the API connector.

We want to use it like this

ccapi.sync()

This is a two-step process internally; getting the list of sync-updates, then fetching the relevant data and updating the internal store.

    def sync(self):
        data = {
            "cc_cursor": self.cc_cursor,
            "project": self._project_gid,
            "cursor": self.project_cursor,
        }
        success, result = self._call_sync("post", "/get", data=data)
        if not success:
            return None
        self.handle_sync_data(result["sync"])

We need to set up a _call_sync method for the first step.

    def __init__(self, base_url, project_gid, asset_folder=None):
        
        self._sync_url = base_url + "/sync"
        

    def _call_sync(self, method, endpoint, data=None):
        url = self._sync_url + endpoint
        requestmethod = getattr(requests, method)
        if method == "post" and data is None:
            data = {}
        response = requestmethod(url, headers=self._auth_header, json=data)
        if "Authorization" in response.headers:
            self.update_jwt(response.headers["Authorization"])
        if "WWW-Authenticate" in response.headers:
            self.update_jwt(None)
        jsontext = response.text
        return self._json_response(jsontext)

The second step is handling the returned sync data. The sync data comes in as a list; we handle each item in turn. Deletions can be done immediately by updating the data stores. An insert isn't always of interest to us; a new asset that I'm not using right now? No need to update the storage. But, a change in a definition (and other 'base' data) could have implications for records I'm using, so I need to fetch those from the server. When I do, the stores are automatically updated by _json_response internally. Updates are only processed when it is about data currently in the stores. After each item is handled we update the local cursor. We should also watch out for backup/restore sync actions. Additionally, syncing can lag behind in some cases, so we check the sync cursor against our current cursors first. Knowing that, let's see how that looks in code:

    def handle_sync_data(self, syncdatalist):
        # result contains a list of cursor/action/data_type/data_key
        # handle each one, updating our cursors as we go
        for item in syncdatalist:
            # skip lagging syncs (older than our latest cursors)
            if item["data_type"] != "jobs":
                if item["type"] == "cc":
                    if self.cc_cursor >= item["cursor"]:
                        continue
                else:
                    if self.project_cursor >= item["cursor"]:
                        continue

            if item["data_type"] == "cc.version":
                if item["data_key"] != self.cc_version:
                    # alert! version-change mid-program!
                    print("Server version changed!")
                self.cc_version = item["data_key"]
            elif item["data_type"] == "jobs":
                job = item["data"]
                if job["job"] == "backup_project" and job["status"] == "queued":
                    print("Server database backup started")
                elif job["job"] == "backup_project" and job["status"] == "done":
                    print("Server database backup done")
                elif job["job"] == "restore_project" and job["status"] == "queued":
                    print("Server database restore started")
                elif job["job"] == "restore_project" and job["status"] == "done":
                    print("Server database restored done")
                    self.init()
            elif item["action"] == "DELETE":
                # use pop, not del, auto-handles items that aren't in our store
                self.data[item["data_type"]].pop(item["data_key"], None)
            elif item["action"] == "INSERT":
                # we're only interested in inserts that affect our data
                # so project-members for our project should fetch the new
                # membership, but also the new members
                # we're also interested in any base-data for definitions,
                # classes, fields, layouts, and queries
                # NOTE: fetching data auto-updates our local data-store
                if item["data_type"] == "cc.projects_members":
                    project_gid, user_gid = item["data_key"].split("/")
                    if project_gid == self._project_gid:
                        success = self._fetch_syncdataitem(item)
                elif item["data_type"] in (
                    "definitions",
                    "classes",
                    "fields",
                    "layouts",
                    "queries",
                ):
                    success = self._fetch_syncdataitem(item)
            elif item["action"] == "UPDATE":
                # we're only interested in data we already have locally
                if item["data_key"] in self.data[item["data_type"]]:
                    success = self._fetch_syncdataitem(item)
            # always update local cursors
            if item["type"] == "cc":
                self.cc_cursor = item["cursor"]
            else:
                self.project_cursor = item["cursor"]

    def _fetch_syncdataitem(self, syncdataitem):
        type_ = syncdataitem["data_type"]
        key = syncdataitem["data_key"]
        if type_ == "cc.projects_members":
            project, member = key.split("/")
            success, result = self.call(  # note the // - global api
                "//members/get", {"project": project, "member": member}
            )
            return success
        map_calls = {
            "cc.users": ["//users/get", "user"],  # note the // - global api
            "cc.projects": ["//projects/get", "project"],  # note the // - global api
            "assets": ["/assets/get", "asset"],
            "classes": ["/classes/get", "class"],
            "definitions": ["/definitions/get", "definition"],
            "fields": ["/fields/get", "field"],
            "layouts": ["/layouts/get", "layout"],
            "queries": ["/queries/get", "query"],
            "records": ["/records/get", "record"],
        }
        map_call = map_calls[type_]
        success, result = self.call(map_call[0], {map_call[1]: key})
        return success

Note that we could be more efficient by bundling the fetch calls per data type instead of performing them one at a time.

Can I just download the API connector?

Sure you can, just click on the link to download the API connector. Please note that this isn't production-ready code, so use it as an example or base implementation only. It depends on one external library, requests.

Do you want a more high-level, official client library? You can pip install unicat. Check out the Python Unicat SDK.