(用AI做了啥)基于Cloudflare Workers部署问题反馈API
最近关于CloudFlare Workers应用还挺多,想着是否可以基于Workers部署一个问题反馈的API方便收集用户反馈呢?
首先该如何设计这个API呢?需要准备两个API接口,分别是提交用户反馈和获取用户反馈,前者嵌入到程序中,由程序提交,后者需要开发者通过调用API获取结果。
为了保证安全性,可以采用RSA加密算法,用户通过程序发送反馈时,通过公钥进行加密,开发者通过私钥解密数据,以尽可能确保数据安全性
另外,在收集用户反馈时,尽可能少的收集用户本机数据,提交表单使用随机ID提交,避免冲突
备注 以下代码和文档均由AI生成,如需部署到生产环境,需自行审核代码
1. 在 Cloudflare Dashboard 创建 KV¶
进入 Cloudflare Dashboard:
建议命名为:
KV namespace 名字可以随意,但后面绑定到 Worker 时,变量名必须叫 LOGS_KV,因为代码里用的是:
Cloudflare 文档说明,Worker 需要通过 binding 才能访问 KV namespace;binding 名称会作为 Worker 运行时里的变量名使用。
2. 创建 Worker¶
进入:
创建后进入这个 Worker,选择:
Dashboard 创建 Worker 是官方支持路径,Cloudflare 文档也说明可以在 Workers & Pages 页面创建应用并预览 workers.dev 子域名。
3. 给 Worker 绑定 KV¶
进入你的 Worker:
填写:
保存并部署。
注意:这里的 Variable name 必须是 LOGS_KV。如果你填成别的名字,例如 secure_logs,代码里的 env.LOGS_KV 会是 undefined,接口写日志时会报错。
4. 给 Worker 添加 API Token Secret¶
进入你的 Worker:
添加:
例如:
Cloudflare 文档说明,Secrets 是一种 binding,适合保存 API keys、auth tokens 等敏感信息,并且可以在 Worker 的 fetch(request, env, ctx) 里的 env 参数访问。
5. 在网页编辑器里粘贴这个完整 Worker 代码¶
Dashboard 里建议直接用 JavaScript,不要贴之前的 TypeScript 版本。把默认代码全部删掉,粘贴下面这份:
const ALG = "RSA-OAEP-SHA256-RSA3072";
const RSA_3072_CIPHERTEXT_BYTES = 384;
const MAX_JSON_BODY_BYTES = 4096;
const encoder = new TextEncoder();
function jsonResponse(data, status = 200) {
return new Response(JSON.stringify(data, null, 2), {
status,
headers: {
"content-type": "application/json; charset=utf-8",
"cache-control": "no-store",
"strict-transport-security": "max-age=31536000; includeSubDomains; preload"
}
});
}
function errorResponse(message, status = 400) {
return jsonResponse(
{
ok: false,
error: message
},
status
);
}
function timingSafeEqual(a, b) {
const aa = encoder.encode(a);
const bb = encoder.encode(b);
let diff = aa.length ^ bb.length;
const maxLen = Math.max(aa.length, bb.length);
for (let i = 0; i < maxLen; i++) {
diff |= (aa[i] || 0) ^ (bb[i] || 0);
}
return diff === 0;
}
function requireHttps(request) {
const url = new URL(request.url);
if (url.protocol !== "https:") {
return errorResponse("HTTPS is required.", 426);
}
return null;
}
function requireAuth(request, env) {
const given = request.headers.get("x-api-key") || "";
if (!env.API_TOKEN || !timingSafeEqual(given, env.API_TOKEN)) {
return errorResponse("Unauthorized.", 401);
}
return null;
}
function cleanString(value, maxLen) {
if (typeof value !== "string") return undefined;
const trimmed = value.trim();
if (!trimmed) return undefined;
return trimmed.slice(0, maxLen);
}
function cleanTags(value) {
if (!Array.isArray(value)) return undefined;
const tags = value
.filter((v) => typeof v === "string")
.map((v) => v.trim().slice(0, 64))
.filter(Boolean)
.slice(0, 20);
return tags.length > 0 ? tags : undefined;
}
function parseDateMs(value, fieldName) {
if (!value) return undefined;
const ms = Date.parse(value);
if (Number.isNaN(ms)) {
throw new Error(
`Invalid ${fieldName}. Use ISO-8601, for example 2026-05-23T00:00:00Z.`
);
}
return ms;
}
function parseLimit(value) {
if (!value) return undefined;
const parsed = Number.parseInt(value, 10);
if (!Number.isFinite(parsed) || parsed <= 0) {
throw new Error("Invalid limit. It must be a positive integer.");
}
return Math.min(parsed, 5000);
}
function base64DecodedLength(input) {
try {
const binary = atob(input);
return binary.length;
} catch {
return null;
}
}
function validateEnvelope(value) {
if (!value || typeof value !== "object") return null;
if (value.alg !== ALG) return null;
if (typeof value.ciphertext !== "string") return null;
const ciphertext = value.ciphertext.trim();
const decodedLen = base64DecodedLength(ciphertext);
if (decodedLen !== RSA_3072_CIPHERTEXT_BYTES) {
return null;
}
return {
alg: ALG,
ciphertext
};
}
async function readJsonBody(request) {
const contentType = request.headers.get("content-type") || "";
if (!contentType.toLowerCase().includes("application/json")) {
throw new Error("Content-Type must be application/json.");
}
const contentLengthRaw = request.headers.get("content-length");
if (contentLengthRaw) {
const contentLength = Number.parseInt(contentLengthRaw, 10);
if (Number.isFinite(contentLength) && contentLength > MAX_JSON_BODY_BYTES) {
throw new Error(`JSON body is too large. Max ${MAX_JSON_BODY_BYTES} bytes.`);
}
}
const data = await request.json();
if (!data || typeof data !== "object" || Array.isArray(data)) {
throw new Error("JSON body must be an object.");
}
return data;
}
async function handleCreateLog(request, env) {
let body;
try {
body = await readJsonBody(request);
} catch (e) {
return errorResponse(e instanceof Error ? e.message : "Invalid request body.", 400);
}
const encryptedLog = validateEnvelope(body.encrypted_log);
if (!encryptedLog) {
return errorResponse(
"Missing or invalid encrypted_log. Expected RSA-3072 OAEP-SHA256 ciphertext in base64.",
400
);
}
const now = new Date();
const id = crypto.randomUUID();
const entry = {
id,
key: "",
ts: now.toISOString(),
ts_ms: now.getTime(),
source: cleanString(body.source, 128),
level: cleanString(body.level, 32),
tags: cleanTags(body.tags),
client_ts: cleanString(body.client_ts, 64),
encrypted_log: encryptedLog
};
const key = `log:${entry.ts}:${id}`;
entry.key = key;
await env.LOGS_KV.put(key, JSON.stringify(entry));
return jsonResponse({
ok: true,
id,
ts: entry.ts
});
}
function matchesFilters(entry, filters) {
if (filters.fromMs !== undefined && entry.ts_ms < filters.fromMs) return false;
if (filters.toMs !== undefined && entry.ts_ms > filters.toMs) return false;
if (filters.source && entry.source !== filters.source) return false;
if (filters.level && entry.level !== filters.level) return false;
if (filters.tag) {
if (!entry.tags || !entry.tags.includes(filters.tag)) return false;
}
return true;
}
async function handleListLogs(request, env) {
const url = new URL(request.url);
let fromMs;
let toMs;
let limit;
try {
fromMs = parseDateMs(url.searchParams.get("from"), "from");
toMs = parseDateMs(url.searchParams.get("to"), "to");
limit = parseLimit(url.searchParams.get("limit"));
} catch (e) {
return errorResponse(e instanceof Error ? e.message : "Invalid query parameter.", 400);
}
if (fromMs !== undefined && toMs !== undefined && fromMs > toMs) {
return errorResponse("from must be earlier than or equal to to.", 400);
}
const source = cleanString(url.searchParams.get("source"), 128);
const level = cleanString(url.searchParams.get("level"), 32);
const tag = cleanString(url.searchParams.get("tag"), 64);
const orderRaw = url.searchParams.get("order") || "asc";
const order = orderRaw === "desc" ? "desc" : "asc";
const logs = [];
let scanned = 0;
let cursor = undefined;
do {
const page = await env.LOGS_KV.list({
prefix: "log:",
cursor,
limit: 1000
});
scanned += page.keys.length;
const pageValues = await Promise.all(
page.keys.map(async (item) => {
try {
return await env.LOGS_KV.get(item.name, "json");
} catch {
return null;
}
})
);
for (const entry of pageValues) {
if (!entry) continue;
if (
matchesFilters(entry, {
fromMs,
toMs,
source,
level,
tag
})
) {
logs.push(entry);
if (limit !== undefined && logs.length >= limit) {
break;
}
}
}
if (limit !== undefined && logs.length >= limit) {
break;
}
cursor = page.list_complete ? undefined : page.cursor;
} while (cursor);
logs.sort((a, b) => {
if (order === "desc") return b.ts_ms - a.ts_ms;
return a.ts_ms - b.ts_ms;
});
return jsonResponse({
ok: true,
count: logs.length,
scanned,
filters: {
from: fromMs !== undefined ? new Date(fromMs).toISOString() : null,
to: toMs !== undefined ? new Date(toMs).toISOString() : null,
source: source || null,
level: level || null,
tag: tag || null,
limit: limit || null,
order
},
logs
});
}
export default {
async fetch(request, env) {
const httpsError = requireHttps(request);
if (httpsError) return httpsError;
const authError = requireAuth(request, env);
if (authError) return authError;
if (!env.LOGS_KV) {
return errorResponse(
"LOGS_KV binding is missing. Add a KV namespace binding named LOGS_KV.",
500
);
}
const url = new URL(request.url);
if (url.pathname === "/api/log" && request.method === "POST") {
return handleCreateLog(request, env);
}
if (url.pathname === "/api/logs" && request.method === "GET") {
return handleListLogs(request, env);
}
return errorResponse("Not found.", 404);
}
};
6. Python 客户端¶
requirements.txt¶
requests 官方文档支持 HTTPS proxy,也支持 verify=False 跳过证书校验;但文档明确指出 verify=False 会接受任意 TLS 证书并带来中间人攻击风险,所以这里只应作为开发调试开关。
RSA 公私钥生成脚本¶
generate_keys.py¶
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import os
from pathlib import Path
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
RSA_KEY_SIZE = 3072
PUBLIC_EXPONENT = 65537
def generate_keys(out_dir: Path) -> None:
out_dir.mkdir(parents=True, exist_ok=True)
private_key = rsa.generate_private_key(
public_exponent=PUBLIC_EXPONENT,
key_size=RSA_KEY_SIZE,
)
public_key = private_key.public_key()
private_path = out_dir / "private_key.pem"
public_path = out_dir / "public_key.pem"
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
private_path.write_bytes(private_pem)
public_path.write_bytes(public_pem)
try:
os.chmod(private_path, 0o600)
except OSError:
pass
print(f"Generated RSA-{RSA_KEY_SIZE} key pair")
print(f"Private key: {private_path}")
print(f"Public key : {public_path}")
def main() -> None:
parser = argparse.ArgumentParser(
description="Generate RSA-3072 key pair for secure log encryption."
)
parser.add_argument(
"--out-dir",
default="keys",
help="Output directory. Default: keys",
)
args = parser.parse_args()
generate_keys(Path(args.out_dir))
if __name__ == "__main__":
main()
运行:
Python 发送与接收客户端¶
log_client.py¶
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import requests
import urllib3
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
ALG = "RSA-OAEP-SHA256-RSA3072"
RSA_KEY_SIZE = 3072
def b64e(data: bytes) -> str:
return base64.b64encode(data).decode("ascii")
def b64d(data: str) -> bytes:
return base64.b64decode(data.encode("ascii"), validate=True)
def require_https(api_url: str) -> str:
parsed = urlparse(api_url)
if parsed.scheme != "https":
raise ValueError("api-url must start with https://")
if not parsed.netloc:
raise ValueError("api-url is invalid")
return api_url.rstrip("/")
def build_proxies(proxy: str | None) -> dict[str, str] | None:
if not proxy:
return None
return {
"http": proxy,
"https": proxy,
}
def load_public_key(path: str | Path) -> rsa.RSAPublicKey:
with open(path, "rb") as f:
key = serialization.load_pem_public_key(f.read())
if not isinstance(key, rsa.RSAPublicKey):
raise TypeError("Public key file is not an RSA public key.")
if key.key_size != RSA_KEY_SIZE:
raise ValueError(f"Public key must be RSA-{RSA_KEY_SIZE}, got RSA-{key.key_size}.")
return key
def load_private_key(path: str | Path) -> rsa.RSAPrivateKey:
with open(path, "rb") as f:
key = serialization.load_pem_private_key(
f.read(),
password=None,
)
if not isinstance(key, rsa.RSAPrivateKey):
raise TypeError("Private key file is not an RSA private key.")
if key.key_size != RSA_KEY_SIZE:
raise ValueError(f"Private key must be RSA-{RSA_KEY_SIZE}, got RSA-{key.key_size}.")
return key
def max_oaep_sha256_plaintext_bytes(key_size_bits: int) -> int:
key_bytes = key_size_bits // 8
sha256_len = hashes.SHA256().digest_size
return key_bytes - 2 * sha256_len - 2
def rsa_oaep_sha256_padding() -> padding.OAEP:
return padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
)
def encrypt_log_text(plaintext: str, public_key_path: str | Path) -> dict[str, str]:
public_key = load_public_key(public_key_path)
plaintext_bytes = plaintext.encode("utf-8")
max_len = max_oaep_sha256_plaintext_bytes(public_key.key_size)
if len(plaintext_bytes) > max_len:
raise ValueError(
"Log message is too long for direct RSA-OAEP-SHA256 with RSA-3072: "
f"{len(plaintext_bytes)} bytes > {max_len} bytes. "
"Shorten the log message or switch to hybrid encryption."
)
ciphertext = public_key.encrypt(
plaintext_bytes,
rsa_oaep_sha256_padding(),
)
return {
"alg": ALG,
"ciphertext": b64e(ciphertext),
}
def decrypt_log_envelope(envelope: dict[str, Any], private_key_path: str | Path) -> str:
if envelope.get("alg") != ALG:
raise ValueError(f"Unsupported encryption alg: {envelope.get('alg')}")
private_key = load_private_key(private_key_path)
ciphertext = b64d(str(envelope["ciphertext"]))
expected_len = private_key.key_size // 8
if len(ciphertext) != expected_len:
raise ValueError(
f"Invalid ciphertext length: {len(ciphertext)} bytes, expected {expected_len} bytes."
)
plaintext = private_key.decrypt(
ciphertext,
rsa_oaep_sha256_padding(),
)
return plaintext.decode("utf-8")
def make_session(insecure: bool) -> requests.Session:
session = requests.Session()
if insecure:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
return session
def read_message_arg(message: str | None, message_file: str | None) -> str:
if message and message_file:
raise ValueError("Use either --message or --message-file, not both.")
if message_file:
return Path(message_file).read_text(encoding="utf-8")
if message:
return message
raise ValueError("Missing log text. Use --message or --message-file.")
def send_log(args: argparse.Namespace) -> None:
api_url = require_https(args.api_url)
message = read_message_arg(args.message, args.message_file)
encrypted_log = encrypt_log_text(message, args.public_key)
payload: dict[str, Any] = {
"encrypted_log": encrypted_log,
"client_ts": datetime.now(timezone.utc).isoformat(),
}
if args.source:
payload["source"] = args.source
if args.level:
payload["level"] = args.level
if args.tag:
payload["tags"] = args.tag
session = make_session(args.insecure)
response = session.post(
f"{api_url}/api/log",
headers={
"x-api-key": args.token,
"content-type": "application/json",
},
json=payload,
timeout=args.timeout,
proxies=build_proxies(args.proxy),
verify=not args.insecure,
)
response.raise_for_status()
print(json.dumps(response.json(), ensure_ascii=False, indent=2))
def receive_logs(args: argparse.Namespace) -> None:
api_url = require_https(args.api_url)
params: dict[str, str] = {}
if args.from_date:
params["from"] = args.from_date
if args.to_date:
params["to"] = args.to_date
if args.limit is not None:
params["limit"] = str(args.limit)
if args.source:
params["source"] = args.source
if args.level:
params["level"] = args.level
if args.tag:
params["tag"] = args.tag
if args.order:
params["order"] = args.order
session = make_session(args.insecure)
response = session.get(
f"{api_url}/api/logs",
headers={
"x-api-key": args.token,
},
params=params,
timeout=args.timeout,
proxies=build_proxies(args.proxy),
verify=not args.insecure,
)
response.raise_for_status()
data = response.json()
if args.private_key:
for item in data.get("logs", []):
try:
item["plaintext"] = decrypt_log_envelope(
item["encrypted_log"],
args.private_key,
)
except Exception as exc:
item["decrypt_error"] = str(exc)
print(json.dumps(data, ensure_ascii=False, indent=2))
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Client for Cloudflare Workers encrypted log API using RSA-3072 OAEP-SHA256."
)
subparsers = parser.add_subparsers(dest="command", required=True)
common = argparse.ArgumentParser(add_help=False)
common.add_argument(
"--api-url",
required=True,
help="Worker base URL, must start with https://, e.g. https://xxx.workers.dev",
)
common.add_argument(
"--token",
required=True,
help="API token. Must match Worker secret API_TOKEN.",
)
common.add_argument(
"--proxy",
help="HTTP/HTTPS proxy, e.g. http://127.0.0.1:8080",
)
common.add_argument(
"--insecure",
action="store_true",
help="Skip TLS certificate verification for development only.",
)
common.add_argument(
"--timeout",
type=float,
default=20.0,
help="Request timeout in seconds. Default: 20",
)
send_parser = subparsers.add_parser(
"send",
parents=[common],
help="Encrypt and send one log.",
)
send_parser.add_argument(
"--public-key",
required=True,
help="RSA-3072 public key PEM path.",
)
send_parser.add_argument(
"--message",
help="Short log message text.",
)
send_parser.add_argument(
"--message-file",
help="Path to a UTF-8 text file containing a short log message.",
)
send_parser.add_argument(
"--source",
help="Optional log source, e.g. local-test.",
)
send_parser.add_argument(
"--level",
default="info",
help="Optional log level. Default: info.",
)
send_parser.add_argument(
"--tag",
action="append",
help="Optional repeated tag, e.g. --tag debug --tag worker.",
)
send_parser.set_defaults(func=send_log)
recv_parser = subparsers.add_parser(
"recv",
parents=[common],
help="Receive logs. Optionally decrypt locally with private key.",
)
recv_parser.add_argument(
"--from",
dest="from_date",
help="Start time, ISO-8601, e.g. 2026-05-23T00:00:00Z.",
)
recv_parser.add_argument(
"--to",
dest="to_date",
help="End time, ISO-8601, e.g. 2026-05-23T23:59:59Z.",
)
recv_parser.add_argument(
"--limit",
type=int,
help="Optional maximum number of returned logs.",
)
recv_parser.add_argument(
"--source",
help="Optional exact source filter.",
)
recv_parser.add_argument(
"--level",
help="Optional exact level filter.",
)
recv_parser.add_argument(
"--tag",
help="Optional tag filter.",
)
recv_parser.add_argument(
"--order",
choices=["asc", "desc"],
default="asc",
help="Sort order by server timestamp. Default: asc.",
)
recv_parser.add_argument(
"--private-key",
help="RSA-3072 private key PEM path. If provided, decrypt logs locally.",
)
recv_parser.set_defaults(func=receive_logs)
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
try:
args.func(args)
except requests.HTTPError as exc:
status = exc.response.status_code if exc.response is not None else "unknown"
body = exc.response.text if exc.response is not None else ""
print(f"HTTP error: {status}", file=sys.stderr)
if body:
print(body, file=sys.stderr)
sys.exit(1)
except Exception as exc:
print(f"Error: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
调试命令示例¶
安装依赖:
生成密钥:
发送日志:
python log_client.py send \
--api-url "https://你的-worker.你的子域.workers.dev" \
--token "你在 API_TOKEN 里设置的值" \
--public-key keys/public_key.pem \
--source local-test \
--level info \
--tag debug \
--message "hello rsa-3072 encrypted log"
通过代理并跳过证书校验:
python log_client.py send \
--api-url "https://secure-log-api.YOUR_SUBDOMAIN.workers.dev" \
--token "YOUR_API_TOKEN" \
--public-key keys/public_key.pem \
--proxy "http://127.0.0.1:8080" \
--insecure \
--message "proxy + insecure tls test"
返回所有日志并本地解密:
python log_client.py recv \
--api-url "https://你的-worker.你的子域.workers.dev" \
--token "你在 API_TOKEN 里设置的值" \
--private-key keys/private_key.pem
按日期范围返回日志: