开发人员最常询问的Webhook模式。
一个可靠的 webhook 处理程序按顺序执行四个操作:验证签名与原始主体(Node 中的 webhooks.verify,其他地方的文档 HMAC)相符,按事件 ID 去重,将工作排入后台队列,并返回 200。长时间运行的工作发生在工作者中,队列层具有重试和幂等性。由于没有失败事件,计划的清扫会超时等待付款的作业并对其进行对账。
在您开始之前。
- 一个工作代理配置文件和您仪表板上的签名密钥(设置 - Webhooks)。
- 一个具有原始主体访问权限的 Web 框架 - 使用
express.raw的 Express、FastAPI、Flask 等。自动解析 JSON 中间件会破坏签名验证。 - 一个作业队列:BullMQ(Node)或 Celery/arq(Python)。Webhook 快速返回 200,队列处理慢工作。
- 一个具有 upsert 原语的数据库(Postgres 可用;Redis SET NX 也适用于短期去重)。
- 一个公共 HTTPS 端点 - 在开发中,ngrok 或部署预览。发送者不会将其发送到私有 URL。
验证签名。
The signature is HMAC-SHA256 over {t}.{rawBody} with your webhook secret, hex-encoded, in the X-Blockchain0x-Signature header (t=<unix>,v1=<hex>), inside a 5-minute replay window. In Node, webhooks.verify from @blockchain0x/node does it and returns a discriminated union; in other languages compute the same HMAC and compare in constant time. Raw-body access matters: if the bytes you sign locally differ from the bytes that arrived, it fails.
import express from "express";
import { webhooks } from "@blockchain0x/node";
const app = express();
// Raw body so the HMAC matches the exact bytes on the wire.
app.use(express.raw({ type: "application/json" }));
app.post("/webhooks/payment", (req, res) => {
const result = webhooks.verify({
headers: req.headers,
rawBody: req.body, // Buffer, raw bytes
secret: process.env.BLOCKCHAIN0X_WEBHOOK_SECRET!,
});
// Discriminated union: branch on ok, no try/catch.
if (!result.ok) return res.status(400).json({ code: result.code });
// result.eventType / result.eventId are now set.
handleEvent(result);
res.status(200).send("ok");
});import hmac, hashlib, os, time
from flask import request
SECRET = os.environ["BLOCKCHAIN0X_WEBHOOK_SECRET"].encode()
# In Node, webhooks.verify does this. In Python, verify by hand against the
# documented algorithm: HMAC-SHA256 over "{t}.{rawBody}", 300s replay window.
def verify_signature(raw_body: bytes) -> bool:
sig = request.headers.get("X-Blockchain0x-Signature", "")
ts = request.headers.get("X-Blockchain0x-Timestamp", "")
parts = dict(p.split("=", 1) for p in sig.split(",") if "=" in p)
t, v1 = parts.get("t", ts), parts.get("v1", sig)
want = hmac.new(SECRET, t.encode() + b"." + raw_body, hashlib.sha256).hexdigest()
return hmac.compare_digest(want, v1) and abs(time.time() - int(t)) <= 300使处理程序具有幂等性。
Webhooks retry on any non-2xx response, and the same event will arrive multiple times under load even when nothing has gone wrong. Dedupe on the event's id using a database upsert. If the row already exists, skip; if it does not, insert and proceed. Postgres makes this a single statement.
// Pseudocode for a Postgres-backed dedupe table. Replace with your DB of choice.
async function processEventOnce(eventId: string, body: object) {
// INSERT ... ON CONFLICT DO NOTHING returns rowCount === 0 on duplicate.
const inserted = await db.query(
"INSERT INTO webhook_events(id) VALUES ($1) ON CONFLICT DO NOTHING",
[eventId],
);
if (inserted.rowCount === 0) return; // Already processed.
await handleEvent(body);
}async def process_event_once(event_id: str, body: dict):
# INSERT ... ON CONFLICT DO NOTHING returns 0 rows on duplicate.
inserted = await db.execute(
"INSERT INTO webhook_events (id) VALUES ($1) ON CONFLICT DO NOTHING",
event_id,
)
if inserted == "INSERT 0 0": # asyncpg-style status
return # Already processed.
await handle_event(body)快速入队并返回200。
Webhook端点应在一秒内响应。任何较慢的响应都会导致超时和重试。模式是:验证、排队、响应。队列在工作程序中运行实际交付,具有重试、指数回退和自身的幂等性。BullMQ和Celery都支持每个作业ID,这可以防止意外重新排队相同事件。
// Express handler: verify, enqueue, return 200 fast.
import { Queue } from "bullmq";
import { webhooks } from "@blockchain0x/node";
const paymentQueue = new Queue("payments");
app.post(
"/webhooks/payment",
express.raw({ type: "application/json" }),
async (req, res) => {
const result = webhooks.verify({
headers: req.headers,
rawBody: req.body,
secret: process.env.BLOCKCHAIN0X_WEBHOOK_SECRET!,
});
if (!result.ok) return res.status(400).json({ code: result.code });
await paymentQueue.add(result.eventType, { raw: req.body.toString() }, {
jobId: result.eventId, // Idempotency key.
removeOnComplete: true,
attempts: 5,
backoff: { type: "exponential", delay: 1000 },
});
res.status(200).send("ok");
},
);
// Worker file:
import { Worker } from "bullmq";
new Worker("payments", async (job) => {
await handleEvent(job.data);
});# Flask handler enqueues to Celery (or arq) and returns 200 quickly.
from celery import Celery
from flask import request
celery = Celery("payments", broker=os.environ["REDIS_URL"])
@celery.task(bind=True, max_retries=5)
def handle_payment_event(self, event_type, raw):
try:
process_event_once(event_type, raw)
except Exception as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@app.post("/webhooks/payment")
def webhook():
raw = request.get_data()
if not verify_signature(raw):
abort(401)
event_id = request.headers.get("X-Blockchain0x-Event-Id", "")
event_type = request.headers.get("X-Blockchain0x-Event-Type", "")
handle_payment_event.apply_async(args=[event_type, raw.decode()], task_id=event_id)
return "ok", 200arq在Python端遵循相同的形状 - 使用确定性作业ID注册任务,并让队列处理重试。关键约束是入队本身必须快速(与Redis的单次往返);绝不要在远程调用上阻塞Webhook。
处理从未到账的支付。
没有失败 webhook - 如果买方放弃,则不会收到事件,代理将卡在“等待付款”中。因此请自行检测:对等待时间过长的作业进行定期清理,对链进行对账,使用 transactions.get 以防它实际上已结算,然后释放持有的资源,将作业移动到终止未付款状态,并(如果适当)将结果呈现给用户。
async function sweepStaleAwaitingPayment() {
for (const job of await findJobsAwaitingPaymentOlderThan("1h")) {
// Reconcile against the chain before giving up.
const tx = job.txHash ? await client.transactions.get(job.txHash) : null;
if (tx) { markJobPaid(job.id); continue; } // It actually settled.
// 1. Release any held resources tied to the job.
releaseHeldResources(job.id);
// 2. Move it out of 'awaiting_payment' into a terminal 'unpaid' state.
markJobUnpaid(job.id);
// 3. (Optional) Notify the user, with a fresh payment link.
notifyUser(job.userId, { template: "agent_payment_unpaid", jobId: job.id });
}
}# Run on a schedule - there is no failure webhook to wait for.
def sweep_stale_awaiting_payment():
for job in find_jobs_awaiting_payment_older_than("1h"):
tx = client.transactions.get(job["tx_hash"]) if job.get("tx_hash") else None
if tx:
mark_job_paid(job["id"]) # It actually settled.
continue
release_held_resources(job["id"])
mark_job_unpaid(job["id"])
notify_user(job["user_id"], template="agent_payment_unpaid", job_id=job["id"])五个导致事件丢失或重复的错误。
在验证签名之前解析主体
HMAC必须计算在发送者签名的原始字节上。如果您的框架在处理程序运行之前自动解析JSON,您本地签名的字节将与发送者签名的字节不匹配(不同的空格、键顺序、编码),每个签名看起来都将无效。配置路由以接收原始主体(Express: express.raw,Flask: request.get_data),先验证,然后解析。
在Webhook处理程序内执行实际工作
Webhook具有激进的重试策略。如果您的处理程序需要30秒来交付工作,发送方的超时将触发,Webhook将被重新发送 - 现在您有两个交付正在进行同一笔支付。始终:验证、排队、返回2xx。实际工作在一个可以根据需要运行的后台工作程序中进行。
使用HTTP状态来传达业务逻辑
如果您的处理程序在用户不再存在于您的系统中时返回4xx,发送者将其视为'无效请求'并停止重试。如果它因相同条件返回5xx,发送者将永远重试,您的队列将填满。一旦您安全地持久化事件(或将其识别为重复),请返回200;使用队列逻辑,而不是HTTP状态,来表达业务决策。
基于有效负载哈希的幂等性,而不是事件ID
关于同一代理的两个不同事件(payment.received 和稍后的 payment.sent)具有不同的主体,并且确实需要单独处理。如果您的去重基于主体哈希,您可以丢弃其中一个。根据 X-Blockchain0x-Event-Id 去重(每次交付唯一),让事件类型驱动您的处理程序的操作。
期待一个单独的确认事件
已发布的事件是 payment.received、payment.sent、wallet.deployed 和 webhook.test - 没有单独的确认事件。当转账在区块中时,payment.received 被触发,这就是您进行大多数工作的信号。对于昂贵或不可逆的操作,轮询 transactions.get 并应用您自己的确认阈值;不要等待不存在的事件。
一旦webhook是防弹的。
Webhooks 是最难的部分。在上述四种模式到位后,剩下的工作主要是操作性的:一个测试环境来测试故障路径,支出控制以防止上游代理淹没您的处理程序,以及最终的安全审查。
完整参考见docs.blockchain0x.com。Webhook术语表:支付授权。产品表面:支付API。