When to use webhooks
Webhooks are better than polling when:
- You have many concurrent submissions
- You want real-time notifications
- Your API server resources are limited
- You prefer an event-driven architecture
Polling is fine for:
- Occasional submissions
- Simple scripts and CLIs
- Quick prototypes and testing
Setting up webhooks
Step 1: Prepare your endpoint
Create an HTTPS endpoint that can receive POST requests. The delivered envelope uses Standard Webhooks, so verify the signature with the standardwebhooks library and your subscription secret. The verified event exposes type (the event type) and data (a snake_case object).
from fastapi import FastAPI, Request
from standardwebhooks.webhooks import Webhook
app = FastAPI()
wh = Webhook("whsec_AbCdEf...") # Your subscription secret
@app.post("/webhooks/integrations")
async def handle_webhook(request: Request):
try:
# Verify the webhook signature
payload = await request.body()
event = wh.verify(payload, dict(request.headers))
except Exception as e:
print(f"Invalid signature: {e}")
return {"error": "Unauthorized"}, 401
# Process the event
print(f"Type: {event['type']}")
print(f"Data: {event['data']}")
# Respond immediately
return {"status": "received"}
Step 2: Create a subscription
Use POST /api/integrations/webhooks/subscriptions. The request fields are endpointUrl (HTTPS required), eventTypes, and an optional description.
curl -X POST \
-H "X-API-Key: your-api-key-here" \
-H "Content-Type: application/json" \
-d '{
"endpointUrl": "https://your-domain.com/webhooks/integrations",
"eventTypes": ["asset.processing.completed", "asset.processing.failed"],
"description": "Production webhook endpoint"
}' \
https://mm-midmarket-integrations-api-preview.azurewebsites.net/api/integrations/webhooks/subscriptions
Response (201):
{
"id": "550e8400-...",
"workspaceId": "660e8400-...",
"endpointUrl": "https://your-domain.com/webhooks/integrations",
"eventTypes": ["asset.processing.completed", "asset.processing.failed"],
"description": "Production webhook endpoint",
"enabled": true,
"secret": "whsec_AbCdEf...",
"createdAt": "2026-06-03T12:00:00Z",
"updatedAt": "2026-06-03T12:00:00Z"
}
The secret is returned only on create. Save it immediately — it is never shown again. The field is secret (not signing_secret), the enabled flag is enabled (not active), and the event list is eventTypes (not events).
A non-HTTPS endpointUrl or hitting your workspace subscription limit returns 422.
Step 3: Test your endpoint
Create a submission and watch your endpoint receive events:
import requests
API_BASE = "https://mm-midmarket-integrations-api-preview.azurewebsites.net"
response = requests.post(
f"{API_BASE}/api/integrations/submissions",
headers={"X-API-Key": "your-api-key-here"},
json={
"assets": [{"blobPath": "<workspace-id>/test.mp4"}],
"sidekickIds": [],
},
)
print(f"Submission created: {response.json()['submissionId']}")
print("Watch your webhook endpoint for incoming events...")
Webhook events
There are exactly two event types. The delivered envelope has id, type, timestamp, and a snake_case data object. The payload does not embed results — fetch detail by ID after receiving an event.
asset.processing.completed
Fires when an asset finishes processing successfully.
{
"id": "evt_uuid",
"type": "asset.processing.completed",
"timestamp": "2026-06-03T12:05:00Z",
"data": {
"asset_id": "a1...",
"submission_id": "s1...",
"workspace_id": "w1...",
"status": "completed",
"filename": "campaign-video.mp4",
"issue_count": 3
}
}
asset.processing.failed
Fires when an asset fails to process. The data object adds error_message.
{
"id": "evt_uuid",
"type": "asset.processing.failed",
"timestamp": "2026-06-03T12:10:00Z",
"data": {
"asset_id": "a1...",
"submission_id": "s1...",
"workspace_id": "w1...",
"status": "failed",
"error_message": "...",
"filename": "bad.mp4"
}
}
Handling webhooks
Basic handler pattern
Read event["type"] and the snake_case data fields. Because the payload carries no results, fetch issues or topics by asset ID when you need detail.
import requests
API_BASE = "https://mm-midmarket-integrations-api-preview.azurewebsites.net"
API_KEY = "your-api-key-here"
def handle_webhook(event: dict):
"""Process a verified webhook event."""
event_type = event["type"]
data = event["data"]
submission_id = data["submission_id"]
asset_id = data["asset_id"]
if event_type == "asset.processing.completed":
# Fetch the enriched issue list by ID — the payload has no results
issues = requests.get(
f"{API_BASE}/api/integrations/submissions/{submission_id}"
f"/assets/{asset_id}/issues",
headers={"X-API-Key": API_KEY},
).json()
save_to_database(asset_id=asset_id, issues=issues["issues"])
notify_user(asset_id, "processing_complete")
elif event_type == "asset.processing.failed":
save_error(
asset_id=asset_id,
error_message=data.get("error_message"),
)
notify_user(asset_id, "processing_failed")
If you prefer clustered topics instead of a flat issue list, fetch .../assets/{asset_id}/topics the same way.
Queue-based processing
For high-volume scenarios, queue webhooks for async processing and respond immediately:
import json
from celery import shared_task
@app.post("/webhooks/integrations")
async def handle_webhook(request: Request):
# Verify signature
try:
event = verify_webhook(request)
except Exception:
return {"error": "Unauthorized"}, 401
# Queue for async processing
process_webhook.delay(json.dumps(event))
# Respond immediately
return {"status": "received"}
@shared_task
def process_webhook(event_json: str):
"""Process the webhook asynchronously."""
event = json.loads(event_json)
if event["type"] == "asset.processing.completed":
fetch_and_store_results(event["data"])
elif event["type"] == "asset.processing.failed":
handle_error(event["data"])
Webhook reliability
Signature verification
Always verify webhook signatures with the subscription secret:
import os
from standardwebhooks.webhooks import Webhook, WebhookVerificationError
async def verify_webhook(request) -> dict:
wh = Webhook(os.getenv("WEBHOOK_SECRET"))
try:
return wh.verify(
payload=await request.body(),
headers=dict(request.headers),
)
except WebhookVerificationError as e:
raise ValueError(f"Invalid webhook: {e}")
Idempotency
Deliveries may arrive more than once. The delivery provider retries failed deliveries, so make your handler idempotent by deduplicating on the event id:
def process_webhook_idempotent(event: dict):
"""Process each event at most once."""
event_id = event["id"]
if already_processed(event_id):
return {"status": "already_processed"}
process_event(event)
mark_as_processed(event_id)
return {"status": "processed"}
Store processed event IDs in your database:
CREATE TABLE webhook_events (
id VARCHAR(255) PRIMARY KEY,
event_type VARCHAR(255),
submission_id VARCHAR(255),
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Failed deliveries are retried automatically by the delivery provider. Check the deliveries endpoint (GET .../subscriptions/{id}/deliveries) or the control plane for attempt history rather than assuming a fixed retry schedule.
Testing webhooks locally
Using ngrok
Expose your local server publicly:
This gives you a URL like https://abc123.ngrok.io. Use it as the endpointUrl in your subscription.
Test endpoint
Use POST /health/webhook-test to validate that your signature verification is set up correctly, without affecting production data.
curl -X POST https://mm-midmarket-integrations-api-preview.azurewebsites.net/health/webhook-test \
-H "Content-Type: application/json" \
-d '{ ... }'
Managing subscriptions
| Method | Path | Purpose |
|---|
| GET | /api/integrations/webhooks/subscriptions | List subscriptions (paginated) |
| GET | /api/integrations/webhooks/subscriptions/{id} | Fetch one subscription |
| PATCH | /api/integrations/webhooks/subscriptions/{id} | Enable or disable ({ "enabled": false }) |
| DELETE | /api/integrations/webhooks/subscriptions/{id} | Delete (returns 204) |
| GET | /api/integrations/webhooks/subscriptions/{id}/deliveries | View delivery attempts |
The list endpoint returns a paginated envelope: { "items": [...], "page": 1, "pageSize": 10, "total": 3, "totalPages": 1 }. None of these responses include the secret — it is returned only on create.
Complete example
An end-to-end handler with verification, idempotency, and background processing:
import os
from fastapi import FastAPI, Request, BackgroundTasks
from standardwebhooks.webhooks import Webhook
app = FastAPI()
wh = Webhook(os.getenv("WEBHOOK_SECRET"))
# Track processed events for idempotency
processed_events = set()
@app.post("/webhooks/integrations")
async def handle_webhook(request: Request, background_tasks: BackgroundTasks):
"""Handle incoming webhook events."""
# 1. Verify signature
try:
payload = await request.body()
event = wh.verify(payload, dict(request.headers))
except Exception as e:
print(f"Invalid signature: {e}")
return {"error": "Unauthorized"}, 401
# 2. Deduplicate on event id
event_id = event["id"]
if event_id in processed_events:
print(f"Event {event_id} already processed")
return {"status": "already_processed"}
# 3. Queue for background processing
background_tasks.add_task(process_event, event)
# 4. Return immediately
return {"status": "received"}
async def process_event(event: dict):
"""Process the webhook event in the background."""
event_type = event["type"]
data = event["data"]
try:
if event_type == "asset.processing.completed":
fetch_and_store_results(data["submission_id"], data["asset_id"])
notify_user(data["asset_id"], "complete")
elif event_type == "asset.processing.failed":
save_error(data["asset_id"], data.get("error_message"))
notify_user(data["asset_id"], "failed")
processed_events.add(event["id"])
except Exception as e:
print(f"Error processing event: {e}")
save_failed_webhook(event, str(e))
Best practices
- Verify signatures on every webhook with the subscription
secret.
- Respond quickly and process asynchronously in the background.
- Deduplicate on the event
id to handle repeated deliveries.
- Fetch issues or topics by asset ID — the payload carries no inline results.
- Log all webhook events for debugging.
- Monitor delivery status via the deliveries endpoint or the control plane.