Cloud Intelligence™Cloud Intelligence™

Cloud Intelligence™

Google API Python Client in applicazioni production-grade

By Zaar HaiDec 7, 20206 min read

Questa pagina è disponibile anche in English, Deutsch, Español, Français, 日本語 e Português.

1 24dwwcvdshngex319wgtha

google api python client

Come scrivere software migliore con Python e le API di Google

Il Google API Python Client ufficiale ha ormai qualche anno sulle spalle. È nato per Python 2 ed è stato poi adattato a Python 3. Si appoggia al venerabile urllib2 e oggi è considerato in maintenance mode (leggi: "niente nuove funzionalità").

Sono in sviluppo nuove librerie specifiche per singolo prodotto, che però coprono ancora solo una piccola parte delle API di Google, perlopiù concentrate su GCP. Le trovo molto utili, ma non includono servizi come Google Drive o la Google Directory API. Di conseguenza, per il futuro prevedibile siamo di fatto costretti a usare il client low-level originale citato sopra.

Una delle funzionalità che mancano al Google API Python Client è la Thread Safety

Quando la tua app è appesa a un thread

Negli ultimi anni ho scritto soprattutto codice Python asincrono. Progetti come FastAPI fanno da apripista, sia sul piano tecnologico sia su quello del mentoring, e rendono semplice scrivere servizi web sia sync sia async con un unico framework. L'approccio asincrono dà quasi dipendenza, perché ti libera dal pensiero di cosa possa succedere tra due righe di codice in un altro thread: con asyncio sei tu a decidere dove avvengono i context switch.

Tornato a lavorare in modalità sync, mi sono semplicemente scordato per un momento della thread-safety, almeno finché la mia applicazione su Cloud Run non ha cominciato a crashare (sì, proprio con Segmentation Fault) e a sollevare eccezioni casuali come SSLError, TimeoutError dopo appena 25 ms, IncompleteRead e così via.

Una volta capito che si trattava di un problema di thread-safety, mi sono messo a ragionare su come risolverlo. Il mio codice usa le API di G Suite e dispongo di una classe di servizio G Suite Manager (lo so, kingdom of nouns) che inizializzo all'avvio e uso poi durante la gestione delle richieste web nell'app.

Restare dal lato thread-safe

La documentazione ufficiale offre alcuni consigli su come usare la libreria in modo thread-safe: in particolare, gestire manualmente gli oggetti di trasporto httplib2 quando si chiamano le API di Google e assicurarsi che ogni thread ne usi uno proprio.

Non è però chiaro come applicare questo consiglio in un web server, dove ogni richiesta viene gestita in un thread dedicato che può rimanere vivo o meno una volta conclusa. Creare un nuovo oggetto httplib2.Http() a ogni richiesta è uno spreco, perché significa aprire una nuova connessione TCP verso il backend delle API di Google a ogni richiesta gestita e perdere la capacità di httplib2 di sfruttare gli ETag per ridurre il payload delle risposte dai server di Google.

Alla fine ho avuto l'idea di creare un pool thread-safe di oggetti httplib2.Http() e instradare tutte le richieste del Google API Python client attraverso questo pool: l'ho chiamato APIConnector.

Ecco com'è fatto:

from google_auth_httplib2 import AuthorizedHttp
from googleapiclient.http import HttpRequest
import google.auth
import httplib2
@dataclass
class APIConnector:
factory: Callable[[], AuthorizedHttp]
pool: List[AuthorizedHttp] = field(default_factory=[])
@classmethod
def new(
cls,
credentials: google.auth.Credentials,
initial_size: int = 5,
timeout_seconds: int = 10,
) -> APIConnector:
factory = lambda: AuthorizedHttp(
credentials,
http=httplib2.Http(timeout=timeout_seconds)
)
pool: List[AuthorizedHttp] = []
for i in range(initial_size):
pool.append(factory())
return cls(factory, pool=pool)
def execute(self, req: HttpRequest) -> Any:
http: Optional[AuthorizedHttp] = None
try:
http = self._provision_http()
return req.execute(http=http)
finally:
if http:
self.pool.append(http)
def _provision_http(self) -> AuthorizedHttp:
try:
return self.pool.pop()
except IndexError:
logger.info("Pool exhausted. Creating new transport")
return self.factory()

Davvero poco codice in cambio della thread safety, vero? Vale la pena sottolineare che il nostro pool manager è thread-safe senza alcun lock: le operazioni atomiche in Python sono thread-safe, perché ci pensa il GIL a coprirci le spalle.

Ed ecco come si usa quanto sopra:

@dataclass
class GSuiteUserManager:
api: APIConnector
users: googleapiclient.discovery.Resource
domain: str
@classmethod
def new(cls, domain, credentials) -> GSuiteUsersManager:
api = APIConnector.new(Credentials)
service = googleapiclient.discovery.build(
"admin",
"directory_v1",
credentials=credentials,
cache_discovery=False,
)
users = service.users()
return cls(api=api, users=users, domain=domain)
def list(self) -> dict:
return self.api.execute(
self.users.list(domain=self.domain)
)
def get(self, email: str) -> dict:
...

Far passare tutti i metodi API rilevanti attraverso dei Manager, come nell'esempio qui sopra, può sembrare molto codice di contorno, ma nella pratica questi manager fanno ben più che restituire le risposte API così come sono. Come minimo, dovrebbero modellare le risposte delle API di Google in oggetti Python descrittivi e ben definiti (ad es. User, Group, ecc.), invece di restituire semplici dizionari che, molto in fretta, rendono incomprensibile il codice a monte.

Come migliorarlo ulteriormente

Se finora la cosa ti incuriosisce, passiamo alle sezioni con i contenuti bonus.

Timeout

Il Google API Python client non offre un modo nativo per impostare i timeout, ma dato che il nostro APIConnector mette comunque le mani nei suoi meccanismi interni, abbiamo colto l'occasione per gestirli, come il lettore attento avrà già notato.

Cleanup

Usando l'APIConnector di cui sopra "così com'è", al termine del programma probabilmente ti ritroverai con avvisi come quello qui sotto.

sys:1: ResourceWarning: unclosed <ssl.SSLSocket fd=5, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.1.1', 54988), raddr=('142.250.66.173', 443)>

Succede perché nel nostro pool restano oggetti httplib2.Http() non chiusi. Risolviamo aggiungendo questi metodi all'APIConnector:

def close(self) -> None:
for ahttp in self.pool:
ahttp.http.close()
def __del__(self) -> None:
self.close()

Ora gli avvisi sono spariti e otteniamo uno shutdown pulito.

Potresti anche notare che il pool non si riduce mai, ed è vero: ma dato che il mio caso d'uso è Cloud Run, dove le istanze sono comunque di breve durata, lo considero un buon compromesso a favore della semplicità.

Cache

Httplib2 supporta il caching per sfruttare gli ETag memorizzati con header If-None-Match quando si richiede di nuovo una risorsa. Questo può evitarti di scaricare un'altra volta i dati dai server di Google, anche se comporta comunque un round trip di rete.

Il caching di httplib2 è basato su file e non è thread-safe nemmeno lui. Vogliamo però una cache condivisa tra i diversi oggetti Http(), dato che li usiamo come pool di connessioni verso la stessa API a monte.

Quindi, ancora una volta, sfruttando il fatto che le operazioni atomiche sugli oggetti Python sono thread-safe, possiamo costruire facilmente una cache in memoria senza lock per httplib2:

@dataclass

class MemCache:
data: dict[Hashable, Any] = field(default_factory=dict)
def get(self, key: Hashable) -> Any:
if hit := self.data.get(key, None):
logger.debug("Cache hit", key=key)
return hit
def set(self, key: Hashable, data: Any) -> None:
self.data[key] = data
def delete(self, key):
try:
del self.data[1]
except KeyError:
pass

(L'interfaccia della classe è copiata dall'oggetto httplib2.FileCache)

E ora aggiorniamo le classi per usare la cache:

@dataclass
class APIConnector:
...
@classmethod
def new(
...
cache: Optional[MemCache] = None,
) -> APIConnector:
factory = lambda: AuthorizedHttp(
credentials,
http=httplib2.Http(
timeout=timeout_seconds,
cache=cache,
),
)
...
class GSuiteUserManager:
...
@classmethod
def new(
cls, domain, credentials, use_cache: bool = True
) -> GSuiteUsersManager:
cache = MemCache() if use_cache else None
api = APIConnector.new(credentials, cache=cache)
...

Discovery Cache

Infine, forse ti starai chiedendo perché ho passato cache_discover=False nella costruzione del service in GSuiteUserManager: la risposta è che questa funzionalità è piuttosto buggata e genera rumore nei traceback, come descritto in dettaglio qui e qui.


Spero che questo articolo ti aiuti a scrivere software migliore con Python e le API di Google, nonostante la libreria ufficiale abbia qualche spigolo da smussare.

Il codice completo di APIConnector e Memcache è disponibile qui.