Cloud Intelligence™Cloud Intelligence™

Cloud Intelligence™

Google API Python Client im Produktivbetrieb richtig einsetzen

By Zaar HaiDec 7, 20206 min read

Diese Seite ist auch in English, Español, Français, Italiano, 日本語 und Português verfügbar.

1 24dwwcvdshngex319wgtha

google api python client

Wie Sie mit Python und den Google APIs bessere Software bauen

Der offizielle Google API Python Client hat einige Jahre auf dem Buckel. Ursprünglich für Python 2 geschrieben und später auf Python 3 portiert, setzt er auf das altehrwürdige urllib2 und gilt inzwischen offiziell als Maintenance-Mode-Projekt (sprich: keine neuen Features mehr).

Zwar entstehen neue, produktspezifische Bibliotheken, doch diese decken bislang nur einen kleinen Teil der Google APIs ab – überwiegend rund um GCP. Ich finde sie sehr nützlich, aber Dienste wie die Google Drive API oder die Google Directory API sind dort nicht enthalten. Auf absehbare Zeit bleiben wir also auf den oben erwähnten Low-Level-Client angewiesen.

Eines der Features, das dem Google API Python Client fehlt, ist Thread-Safety.

Wenn Ihre App am seidenen Faden hängt

In den letzten Jahren habe ich überwiegend asynchronen Python-Code geschrieben. Projekte wie FastAPI sind hier Vorreiter – sowohl technologisch als auch konzeptionell – und machen es leicht, synchrone wie asynchrone Webservices in einem einzigen Web-Framework umzusetzen. Der async-Ansatz hat etwas Suchterzeugendes: Sie müssen sich nicht mehr ständig fragen, was zwischen zwei Codezeilen in einem anderen Thread passieren könnte – mit asyncio bestimmen Sie selbst, wo der Kontextwechsel stattfindet.

Beim Wiedereinstieg in synchronen Code hatte ich Thread-Safety dann kurzerhand vergessen – zumindest, bis meine Cloud Run-Anwendung anfing abzustürzen (Stichwort Segmentation Fault) und nach gerade einmal 25 ms wahllos Exceptions wie SSLError, TimeoutError, IncompleteRead und so weiter warf.

Als klar war, dass es sich um ein Thread-Safety-Problem handelte, habe ich überlegt, wie sich das beheben lässt. Mein Code nutzt die G Suite APIs, und ich habe eine G Suite Manager-Service-Klasse (ja, ich weiß: kingdom of nouns), die ich beim Start initialisiere und während der Verarbeitung von Web-Requests in meiner App verwende.

Auf der sicheren Thread-Seite bleiben

Die offizielle Dokumentation gibt einige Hinweise dazu, wie sich die Bibliothek thread-sicher verwenden lässt – im Kern: httplib2-Transport-Objekte beim Aufruf der Google APIs manuell verwalten und dafür sorgen, dass jeder Thread ein eigenes nutzt.

Offen bleibt allerdings, wie man diesen Rat in einem Webserver umsetzt, in dem jeder Request in einem dedizierten Thread bearbeitet wird, der nach Abschluss weiterleben kann oder eben nicht. Bei jedem Request ein neues httplib2.Http()-Objekt zu erzeugen, wäre Verschwendung – schließlich würden wir für jeden bearbeiteten Request eine neue TCP-Verbindung zum Google-API-Backend aufbauen und obendrein die Möglichkeit verlieren, über httplib2 mit ETags die Antwort-Payloads der Google-Server zu reduzieren.

Am Ende kam mir die Idee, einen eigenen thread-sicheren Pool von httplib2.Http()-Objekten zu bauen und sämtliche Aufrufe des Google API Python Client darüber laufen zu lassen – ich nenne ihn APIConnector.

So sieht das aus:

from google_auth_httplib2 import AuthorizedHttp
from googleapiclient.http import HttpRequest
import google.auth
import httplib2
@dataclass
class APIConnector:
factory: Callable[[], AuthorizedHttp]
pool: List[AuthorizedHttp] = field(default_factory=[])
@classmethod
def new(
cls,
credentials: google.auth.Credentials,
initial_size: int = 5,
timeout_seconds: int = 10,
) -> APIConnector:
factory = lambda: AuthorizedHttp(
credentials,
http=httplib2.Http(timeout=timeout_seconds)
)
pool: List[AuthorizedHttp] = []
for i in range(initial_size):
pool.append(factory())
return cls(factory, pool=pool)
def execute(self, req: HttpRequest) -> Any:
http: Optional[AuthorizedHttp] = None
try:
http = self._provision_http()
return req.execute(http=http)
finally:
if http:
self.pool.append(http)
def _provision_http(self) -> AuthorizedHttp:
try:
return self.pool.pop()
except IndexError:
logger.info("Pool exhausted. Creating new transport")
return self.factory()

Erstaunlich wenig Code für saubere Thread-Safety, oder? Wichtig dabei: Unser Pool-Manager ist thread-sicher, ganz ohne Locksatomare Operationen sind in Python thread-sicher, weil das GIL diese Aufgabe für uns übernimmt.

Und so verwenden Sie das Ganze:

@dataclass
class GSuiteUserManager:
api: APIConnector
users: googleapiclient.discovery.Resource
domain: str
@classmethod
def new(cls, domain, credentials) -> GSuiteUsersManager:
api = APIConnector.new(Credentials)
service = googleapiclient.discovery.build(
"admin",
"directory_v1",
credentials=credentials,
cache_discovery=False,
)
users = service.users()
return cls(api=api, users=users, domain=domain)
def list(self) -> dict:
return self.api.execute(
self.users.list(domain=self.domain)
)
def get(self, email: str) -> dict:
...

Sämtliche relevanten API-Methoden wie im Beispiel über Manager zu proxen, mag nach viel Boilerplate aussehen. In der Praxis leisten solche Manager aber meist deutlich mehr, als nur API-Antworten unverändert weiterzureichen. Mindestens sollten sie Google-API-Antworten in saubere, beschreibende Python-Objekte (z. B. User, Group usw.) modellieren, statt einfach Dictionaries zurückzugeben, die den darauf aufbauenden Code im Handumdrehen unleserlich machen.

Noch einen Schritt weiter

Wenn Ihnen das bis hierher gefällt, geht es jetzt mit den Bonus-Abschnitten weiter.

Timeouts

Der Google API Python Client bietet von Haus aus keine Möglichkeit, Timeouts zu setzen. Da unser APIConnector ohnehin tief in seine Interna eingreift, haben wir die Gelegenheit gleich mitgenommen, um Timeouts zu steuern – aufmerksamen Leser:innen ist das oben vermutlich schon aufgefallen.

Cleanups

Wer den APIConnector so wie oben einsetzt, wird beim Beenden des Programms vermutlich Warnungen wie diese sehen:

sys:1: ResourceWarning: unclosed <ssl.SSLSocket fd=5, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.1.1', 54988), raddr=('142.250.66.173', 443)>

Der Grund: Im Pool liegen noch nicht geschlossene httplib2.Http()-Objekte. Beheben lässt sich das, indem wir folgende Methoden im APIConnector ergänzen:

def close(self) -> None:
for ahttp in self.pool:
ahttp.http.close()
def __del__(self) -> None:
self.close()

Damit verschwinden die Warnungen, und wir bekommen ein sauberes Shutdown.

Sie werden außerdem feststellen, dass der Pool nie schrumpft. Das stimmt – aber da mein Anwendungsfall Cloud Run ist und die Instanzen ohnehin kurzlebig sind, finde ich das einen vertretbaren Trade-off zugunsten der Einfachheit.

Cache

Httplib2 unterstützt Caching, um gespeicherte ETags beim erneuten Abrufen einer Ressource per If-None-Match-Header zu nutzen. Das spart das erneute Herunterladen der Daten von den Google-Servern, ein Netzwerk-Roundtrip fällt aber dennoch an.

Das Caching von httplib2 basiert auf Dateien und ist ebenfalls nicht thread-sicher. Wir wollen aber einen gemeinsamen Cache für die verschiedenen Http()-Objekte, da wir sie als Connection-Pool für ein und dieselbe Upstream-API nutzen.

Auch hier nutzen wir wieder, dass atomare Operationen auf Python-Objekten thread-sicher sind, und bauen kurzerhand einen lockfreien In-Memory-Cache für httplib2:

@dataclass

class MemCache:
data: dict[Hashable, Any] = field(default_factory=dict)
def get(self, key: Hashable) -> Any:
if hit := self.data.get(key, None):
logger.debug("Cache hit", key=key)
return hit
def set(self, key: Hashable, data: Any) -> None:
self.data[key] = data
def delete(self, key):
try:
del self.data[1]
except KeyError:
pass

(Das Klasseninterface ist vom httplib2.FileCache-Objekt übernommen.)

Und nun passen wir unsere Klassen so an, dass sie den Cache verwenden:

@dataclass
class APIConnector:
...
@classmethod
def new(
...
cache: Optional[MemCache] = None,
) -> APIConnector:
factory = lambda: AuthorizedHttp(
credentials,
http=httplib2.Http(
timeout=timeout_seconds,
cache=cache,
),
)
...
class GSuiteUserManager:
...
@classmethod
def new(
cls, domain, credentials, use_cache: bool = True
) -> GSuiteUsersManager:
cache = MemCache() if use_cache else None
api = APIConnector.new(credentials, cache=cache)
...

Discovery Cache

Vielleicht fragen Sie sich, warum ich beim Aufbau des Service im GSuiteUserManager cache_discover=False übergeben habe – die Antwort: Diese Funktion ist ziemlich kaputt und produziert Traceback-Rauschen, wie ausführlich hier und hier beschrieben.


Ich hoffe, dieser Artikel hilft Ihnen dabei, mit Python und den Google APIs bessere Software zu bauen – auch wenn die offizielle Bibliothek ein paar Ecken und Kanten hat.

Den vollständigen Code für APIConnector und MemCache finden Sie hier.