從 DynamoDB 動態取得 Service Token
概念
把 token 存在 DynamoDB,讓服務在 runtime 動態讀取,而 不是寫死在環境變數裡。
為什麼需要 Service Token
Worker 送資料到外部 API 時,需要一個 token 證明身份:
Worker 要呼叫外部 API
│
│ 「你是誰?給我看 token」
│
▼
ServiceTokenProvider.get_service_token()
│
│ 去 DynamoDB 查 config 設定
│
▼
拿到 token → 帶著 token 呼叫 API → 送出請求
為什麼不直接把 token 放環境變數?
| 方式 | 優點 | 缺點 |
|---|---|---|
| 環境變數 | 簡單 | 換 token 要重新部署 Pod |
| DynamoDB | 動態讀取 | 換 token 只要改 DB,不用重新部署 |
Token 可能會定期輪換(rotate),放 DynamoDB 就能不重啟服務的情況下換掉 token。
基礎實作
import boto3
from dataclasses import dataclass
@dataclass
class ServiceTokenProvider:
table_name: str
region: str = "us-east-1"
def __post_init__(self):
dynamodb = boto3.resource("dynamodb", region_name=self.region)
self._table = dynamodb.Table(self.table_name)
def get_service_token(self) -> str:
"""每次呼叫都從 DynamoDB 讀最新值,token rotate 後自動生效"""
response = self._table.get_item(Key={"id": "service_token"})
item = response.get("Item")
if not item:
raise ValueError("service_token not found in DynamoDB")
return item["token"]
問題:每次請求都打一次 DynamoDB
如果你的 inference worker 每秒處理 100 個請求,每個請求都呼叫 get_service_token(),就是每秒 100 次 DynamoDB read。Token 通常幾小時甚至幾天才換一次,這些讀取大部分都是浪費的。
加入 TTL 快取(正確做法)
Python 類比:就像用
functools.lru_cache快取昂貴的函式呼叫,但 lru_cache 沒有時間過期,需要自己實作 TTL。import functoolsimport time# lru_cache 沒有 TTL,這樣不行:@functools.lru_cache(maxsize=1)def get_token():return fetch_from_dynamodb() # 永遠不會重新讀取# 需要帶時間的快取
import boto3
import threading
import time
from dataclasses import dataclass, field
@dataclass
class ServiceTokenProvider:
table_name: str
region: str = "us-east-1"
cache_ttl_seconds: int = 300 # 5 分鐘 TTL
def __post_init__(self):
dynamodb = boto3.resource("dynamodb", region_name=self.region)
self._table = dynamodb.Table(self.table_name)
self._cached_token: str | None = None
self._cache_expires_at: float = 0.0
self._lock = threading.Lock() # 執行緒安全
def get_service_token(self) -> str:
"""
從快取讀取 token,過期才重新查 DynamoDB。
快取 TTL = 5 分鐘,token rotate 後最多 5 分鐘生效。
"""
# 先不加鎖快速檢查(大多數情況快取是有效的)
if self._cached_token and time.monotonic() < self._cache_expires_at:
return self._cached_token
# 快取失效,加鎖重新讀取
with self._lock:
# double-checked locking:拿到鎖後再確認一次,避免重複打 DynamoDB
if self._cached_token and time.monotonic() < self._cache_expires_at:
return self._cached_token
token = self._fetch_from_dynamodb()
self._cached_token = token
self._cache_expires_at = time.monotonic() + self.cache_ttl_seconds
return token
def _fetch_from_dynamodb(self) -> str:
"""直接從 DynamoDB 讀取最新 token"""
response = self._table.get_item(Key={"id": "service_token"})
item = response.get("Item")
if not item:
raise ValueError("service_token not found in DynamoDB")
return item["token"]
def invalidate_cache(self) -> None:
"""強制清除快取,下次呼叫會重新讀 DynamoDB"""
with self._lock:
self._cached_token = None
self._cache_expires_at = 0.0
執行緒安全說明
# 為什麼需要 threading.Lock()?
# 假設 10 個 inference worker thread 同時發現快取過期:
# 沒有 lock → 10 個 thread 同時打 DynamoDB → "thundering herd"
# 有 lock → 第一個 thread 拿鎖、讀 DynamoDB、更新快取
# 其他 9 個 thread 排隊等鎖
# 拿到鎖後發現快取已更新(double-check)→ 直接回傳快取
# 使用 threading.Lock 的代價極小(nanoseconds),
# 但避免了在快取過期瞬間的 N 倍 DynamoDB 請求
Token 過期中途處理(請求途中 Token 失效)
Token rotation 有時是強制性的(立即失效),而不是等 TTL 到期。如果 token 在 inference 請求途中被換掉,外部 API 會回傳 401。
Python 類比:就像
requests的 retry 邏輯,遇到 401 就重新取得 token 再試一次。import requestsfrom requests.auth import AuthBaseclass DynamicTokenAuth(AuthBase):def __call__(self, r):r.headers["Authorization"] = f"Bearer {get_fresh_token()}"return r
import httpx
from botocore.exceptions import ClientError
class InferenceAPIClient:
def __init__(self, base_url: str, token_provider: ServiceTokenProvider):
self._base_url = base_url
self._token_provider = token_provider
def call_with_token_retry(self, endpoint: str, payload: dict) -> dict:
"""
呼叫外部 API,遇到 401(token 過期)時自動重試一次。
只重試一次:避免 token 本身有問題導致無限循環。
"""
for attempt in range(2): # 最多嘗試 2 次
token = self._token_provider.get_service_token()
try:
response = httpx.post(
f"{self._base_url}/{endpoint}",
headers={"Authorization": f"Bearer {token}"},
json=payload,
timeout=30.0,
)
if response.status_code == 401 and attempt == 0:
# Token 可能已被 rotate,清快取後重試
print(f"[WARNING] 401 Unauthorized,清除 token 快取後重試")
self._token_provider.invalidate_cache()
continue # 重試
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 401 and attempt == 0:
self._token_provider.invalidate_cache()
continue
raise
raise RuntimeError("Token 重試後仍然 401,請確認 DynamoDB 中的 token 是否有效")
Token Rotation 流程
1. 外部 API 發新 token
↓
2. 直接更新 DynamoDB:
table.update_item(Key={"id": "service_token"}, ...)
↓
3. 各 worker 的快取還是舊 token(最多 5 分鐘)
↓
4a. 快取 TTL 到期後,下一次呼叫自動拿新 token
4b. 或外部 API 回 401 → 立即清快取 → 下一次拿新 token
↓
5. 不需要重新部署、不需要重啟 Pod ✓
# 對比:如果用環境變數
# 換 token 的流程會是:
# 1. 更新 K8s Secret
# 2. kubectl rollout restart deployment/my-worker ← 要重啟!
# 3. Pod 重新啟動,讀取新的環境變數
# → 有 downtime 風險,而且要走一遍部署流程
完整使用範例
import os
import httpx
env = os.environ.get("ENV", "dev")
# 全局 singleton,整個程序共用一個快取
_token_provider = ServiceTokenProvider(
table_name=f"my-service-{env}-config",
cache_ttl_seconds=300, # 5 分鐘快取
)
_api_client = InferenceAPIClient(
base_url="https://external-api.com",
token_provider=_token_provider,
)
def run_inference(payload: dict) -> dict:
"""Inference worker 的主要入口"""
return _api_client.call_with_token_retry("infer", payload)
# 預期行為:
# - 每 5 分鐘才打一次 DynamoDB,其餘 9999 次用快取
# - token 被強制輪換時,最多多一次 401 後自動恢復
# - 多執行緒安全,100 個 worker thread 共用同一個快取