

PythonとGoogle APIで、より良いソフトウェアを作るためのヒント
公式の Google API Python Client は、登場からかなりの年月が経っています。もともとPython 2向けに書かれ、後にPython 3へ対応させたもので、内部では古参のurllib2を利用しており、現在はメンテナンスモード(要するに「新機能の追加なし」)と位置付けられています。
新しいプロダクト別の ライブラリ も開発されていますが、対応しているのはまだGoogle APIのごく一部、しかも大半がGCP関連に偏っています。便利ではあるものの、Google DriveやGoogle Directory APIといったサービスはカバーされておらず、当面のあいだは前述の低レベルクライアントに頼らざるを得ないのが実情です。
Google API Python Clientに欠けている機能のひとつが、スレッドセーフであることです
アプリが「スレッド一本」で持ちこたえているとき
ここ数年、私が書いてきたPythonコードはほとんどが非同期処理でした。FastAPI のようなプロジェクトは、技術と思想の両面でこのアプローチを牽引しており、ひとつのWebフレームワークで 同期 と 非同期 両方のWebサービスを手軽に書けるようにしてくれます。非同期処理にはある種の中毒性があり、別スレッドで2行のコードのあいだに何が起こり得るかを意識せずに済むのが魅力です。asyncio では、コンテキストスイッチのタイミングを決めるのは自分自身だからです。
そんな私が同期処理の世界に戻ったとき、スレッドセーフのことをすっかり忘れていました。気づいたのは、Cloud Runで動かしていたアプリケーションがクラッシュ(しかも Segmentation Fault)しはじめ、SSLErrorや、わずか25msで発生するTimeoutError、IncompleteReadなどの例外がランダムに飛んでくるようになったときでした。
原因がスレッドセーフの問題だと分かってから、どう直すかを考えました。コードはG Suite APIを使っており、起動時に初期化してアプリ内のWebリクエスト処理で使い回す G Suite Manager というサービスクラス(名詞の王国、ええ、自覚はあります)を持っていました。
スレッドセーフを保つために
公式ドキュメントには、ライブラリをスレッドセーフに使うための 指針 が示されています。要点は、Google API呼び出し時にhttplib2のトランスポートオブジェクトを手動で管理し、各スレッドが固有のものを使うようにする、というものです。
しかし、各リクエストが専用スレッドで処理され、そのスレッドがリクエスト終了後も生き残るとは限らないWebサーバーの文脈で、この指針をどう適用すべきかは明確ではありません。リクエストごとに新しい httplib2.Http() オブジェクトを作るのは無駄です。処理するリクエストごと にGoogle APIバックエンドへの新しいTCP接続を張ることになりますし、httplib2がETagを使ってGoogleサーバーからのレスポンスペイロードを削減してくれる仕組みも活かせなくなります。
最終的に思いついたのが、独自のスレッドセーフな httplib2.Http() オブジェクトプールを用意し、Google API Python Clientのリクエストをすべてこのプール経由でプロキシするというアプローチです。これをAPIConnectorと呼んでいます。
実装は次のとおりです:
from google_auth_httplib2 import AuthorizedHttpfrom googleapiclient.http import HttpRequestimport google.authimport httplib2@dataclassclass APIConnector: factory: Callable[[], AuthorizedHttp] pool: List[AuthorizedHttp] = field(default_factory=[]) @classmethod def new( cls, credentials: google.auth.Credentials, initial_size: int = 5, timeout_seconds: int = 10, ) -> APIConnector: factory = lambda: AuthorizedHttp( credentials, http=httplib2.Http(timeout=timeout_seconds) ) pool: List[AuthorizedHttp] = [] for i in range(initial_size): pool.append(factory()) return cls(factory, pool=pool) def execute(self, req: HttpRequest) -> Any: http: Optional[AuthorizedHttp] = None try: http = self._provision_http() return req.execute(http=http) finally: if http: self.pool.append(http) def _provision_http(self) -> AuthorizedHttp: try: return self.pool.pop() except IndexError: logger.info("Pool exhausted. Creating new transport") return self.factory()スレッドセーフを得るための代償としては、十分に小さなコード量ではないでしょうか。注目すべきは、このプールマネージャーが ロックを一切使わずにスレッドセーフ である点です。Pythonの アトミック な操作はGILのおかげでスレッドセーフだからです。
使い方は次のとおりです:
@dataclassclass GSuiteUserManager: api: APIConnector users: googleapiclient.discovery.Resource domain: str @classmethod def new(cls, domain, credentials) -> GSuiteUsersManager: api = APIConnector.new(Credentials) service = googleapiclient.discovery.build( "admin", "directory_v1", credentials=credentials, cache_discovery=False, ) users = service.users() return cls(api=api, users=users, domain=domain) def list(self) -> dict: return self.api.execute( self.users.list(domain=self.domain) ) def get(self, email: str) -> dict: ...上記の例のように、関連するAPIメソッドをすべてManager経由でプロキシするのは、一見すると定型コードの山に思えるかもしれません。ただ実際には、こうしたManagerはAPIレスポンスをそのまま返すだけにとどまらない役割を担うことになります。最低限でも、辞書をそのまま返すのではなく、Google APIのレスポンスを モデル化 して意味の伝わるPythonオブジェクト(User、Group など)に落とし込むべきです。そうしないと、上流のコードはあっという間に読めなくなってしまいます。
さらに磨きをかける
ここまで興味を持って読んでいただけたなら、ボーナスコンテンツのセクションへ進みましょう。
タイムアウト
Google API Python Clientには、タイムアウトを指定するための公式な手段がありません。とはいえAPIConnectorはどのみち内部に踏み込んでいるので、その機会を利用してタイムアウトを制御しています。鋭い読者の方ならすでにお気づきかもしれません。
クリーンアップ
上記のAPIConnectorをそのまま使うと、プログラム終了時に次のような警告が出てしまうはずです。
sys:1: ResourceWarning: unclosed <ssl.SSLSocket fd=5, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.1.1', 54988), raddr=('142.250.66.173', 443)>原因は、プール内にクローズされていない httplib2.Http() オブジェクトが残っているためです。APIConnectorに次のメソッドを追加して対応しましょう:
def close(self) -> None: for ahttp in self.pool: ahttp.http.close()def __del__(self) -> None: self.close()これで警告は消え、きれいにシャットダウンできるようになりました。
また、プールが縮小しない点に気づいたかもしれません。確かにその通りですが、私のユースケースであるCloud Runではインスタンス自体が短命なので、シンプルさを優先したトレードオフとしては妥当だと考えています。
キャッシュ
Httplib2は キャッシュに対応 しており、リソースを再取得する際に保存済みのETagを If-None-Match ヘッダーとともに利用できます。これによりGoogleサーバーからのデータ再取得は省けますが、ネットワークのラウンドトリップ自体は依然として発生します。
Httplib2のキャッシュはファイルベースで、こちらもスレッドセーフではありません。とはいえ、同じ上流APIに対するコネクションプールとして使う以上、複数の Http() オブジェクト間でキャッシュを共有したいところです。
そこで再び、Pythonオブジェクトに対するアトミック操作はスレッドセーフであるという性質を活かして、httplib2向けにロックレスなメモリキャッシュを手軽に作れます:
class MemCache: data: dict[Hashable, Any] = field(default_factory=dict) def get(self, key: Hashable) -> Any: if hit := self.data.get(key, None): logger.debug("Cache hit", key=key) return hit def set(self, key: Hashable, data: Any) -> None: self.data[key] = data def delete(self, key): try: del self.data[1] except KeyError: pass(クラスのインターフェースは httplib2.FileCache オブジェクトから拝借しています)
続いて、キャッシュを使うようにクラスを更新します:
@dataclassclass APIConnector: ... @classmethod def new( ... cache: Optional[MemCache] = None, ) -> APIConnector: factory = lambda: AuthorizedHttp( credentials, http=httplib2.Http( timeout=timeout_seconds, cache=cache, ), ) ...class GSuiteUserManager: ... @classmethod def new( cls, domain, credentials, use_cache: bool = True ) -> GSuiteUsersManager: cache = MemCache() if use_cache else None api = APIConnector.new(credentials, cache=cache) ...Discovery Cache
最後に、GSuiteUserManagerでサービスを構築する際に cache_discover=False を渡しているのはなぜか、と疑問に思われたかもしれません。理由は、この機能がかなり壊れていてトレースバックのノイズを撒き散らすからです。詳しくは こちら と こちら で解説されています。
公式ライブラリには粗削りな部分が残っているものの、本記事がPythonとGoogle APIでより良いソフトウェアを作る一助になれば幸いです。
APIConnectorとMemcacheの全コードは こちら で公開しています。