agent-flow / src /backend /tests /unit /test_webhook.py
Tai Truong
fix readme
d202ada
raw
history blame
2.1 kB
import tempfile
from pathlib import Path
import pytest
@pytest.fixture(autouse=True)
def _check_openai_api_key_in_environment_variables():
pass
async def test_webhook_endpoint(client, added_webhook_test):
# The test is as follows:
# 1. The flow when run will get a "path" from the payload and save a file with the path as the name.
# We will create a temporary file path and send it to the webhook endpoint, then check if the file exists.
# 2. we will delete the file, then send an invalid payload to the webhook endpoint and check if the file exists.
endpoint_name = added_webhook_test["endpoint_name"]
endpoint = f"api/v1/webhook/{endpoint_name}"
# Create a temporary file
with tempfile.TemporaryDirectory() as tmp:
file_path = Path(tmp) / "test_file.txt"
payload = {"path": str(file_path)}
response = await client.post(endpoint, json=payload)
assert response.status_code == 202
assert file_path.exists()
assert not file_path.exists()
# Send an invalid payload
payload = {"invalid_key": "invalid_value"}
response = await client.post(endpoint, json=payload)
assert response.status_code == 202
assert not file_path.exists()
async def test_webhook_flow_on_run_endpoint(client, added_webhook_test, created_api_key):
endpoint_name = added_webhook_test["endpoint_name"]
endpoint = f"api/v1/run/{endpoint_name}?stream=false"
# Just test that "Random Payload" returns 202
# returns 202
payload = {
"output_type": "any",
}
response = await client.post(endpoint, headers={"x-api-key": created_api_key.api_key}, json=payload)
assert response.status_code == 200, response.json()
async def test_webhook_with_random_payload(client, added_webhook_test):
endpoint_name = added_webhook_test["endpoint_name"]
endpoint = f"api/v1/webhook/{endpoint_name}"
# Just test that "Random Payload" returns 202
# returns 202
response = await client.post(
endpoint,
json="Random Payload",
)
assert response.status_code == 202