populate buisness understanding from beta application form

This commit is contained in:
Swifty
2026-02-16 12:31:19 +01:00
parent 223df9d3da
commit 92d2da442d
5 changed files with 700 additions and 0 deletions

View File

@@ -190,5 +190,8 @@ ZEROBOUNCE_API_KEY=
POSTHOG_API_KEY=
POSTHOG_HOST=https://eu.i.posthog.com
# Tally Form Integration (pre-populate business understanding on signup)
TALLY_API_KEY=
# Other Services
AUTOMOD_API_KEY=

View File

@@ -126,6 +126,9 @@ v1_router = APIRouter()
########################################################
_tally_background_tasks: set[asyncio.Task] = set()
@v1_router.post(
"/auth/user",
summary="Get or create user",
@@ -134,6 +137,19 @@ v1_router = APIRouter()
)
async def get_or_create_user_route(user_data: dict = Security(get_jwt_payload)):
user = await get_or_create_user(user_data)
# Fire-and-forget: populate business understanding from Tally form
try:
from backend.data.tally import populate_understanding_from_tally
task = asyncio.create_task(
populate_understanding_from_tally(user.id, user.email)
)
_tally_background_tasks.add(task)
task.add_done_callback(_tally_background_tasks.discard)
except Exception:
pass # Never block user creation
return user.model_dump()

View File

@@ -0,0 +1,371 @@
"""Tally form integration: cache submissions, match by email, extract business understanding."""
import json
import logging
from datetime import datetime, timezone
from typing import Optional
from openai import AsyncOpenAI
from backend.data.redis_client import get_redis_async
from backend.data.understanding import (
BusinessUnderstandingInput,
get_business_understanding,
upsert_business_understanding,
)
from backend.util.request import Requests
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
TALLY_API_BASE = "https://api.tally.so"
TALLY_FORM_ID = "npGe0q"
# Redis key templates
_EMAIL_INDEX_KEY = "tally:form:{form_id}:email_index"
_QUESTIONS_KEY = "tally:form:{form_id}:questions"
_LAST_FETCH_KEY = "tally:form:{form_id}:last_fetch"
# TTLs
_INDEX_TTL = 3600 # 1 hour
_LAST_FETCH_TTL = 7200 # 2 hours
# Pagination
_PAGE_LIMIT = 500
async def _fetch_tally_page(
form_id: str,
page: int,
limit: int = _PAGE_LIMIT,
start_date: Optional[str] = None,
) -> dict:
"""Fetch a single page of submissions from the Tally API."""
settings = Settings()
api_key = settings.secrets.tally_api_key
url = f"{TALLY_API_BASE}/forms/{form_id}/submissions?page={page}&limit={limit}"
if start_date:
url += f"&startDate={start_date}"
client = Requests(
trusted_origins=[TALLY_API_BASE],
raise_for_status=True,
extra_headers={
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
},
)
response = await client.get(url)
return response.json()
async def _fetch_all_submissions(
form_id: str,
start_date: Optional[str] = None,
) -> tuple[list[dict], list[dict]]:
"""Paginate through all Tally submissions. Returns (questions, submissions)."""
questions: list[dict] = []
all_submissions: list[dict] = []
page = 1
while True:
data = await _fetch_tally_page(form_id, page, start_date=start_date)
if page == 1:
questions = data.get("questions", [])
submissions = data.get("submissions", [])
all_submissions.extend(submissions)
total_pages = data.get("totalNumberOfPages", 1)
if page >= total_pages:
break
page += 1
return questions, all_submissions
def _build_email_index(
submissions: list[dict], questions: list[dict]
) -> dict[str, dict]:
"""Build an {email -> submission_data} index from submissions.
Scans question titles for email/contact fields to find the email answer.
"""
# Find question IDs that are likely email fields
email_question_ids: list[str] = []
for q in questions:
label = (q.get("label") or q.get("title") or q.get("name") or "").lower()
q_type = (q.get("type") or "").lower()
if q_type in ("input_email", "email"):
email_question_ids.append(q["id"])
elif any(kw in label for kw in ("email", "e-mail", "contact")):
email_question_ids.append(q["id"])
index: dict[str, dict] = {}
for sub in submissions:
email = _extract_email_from_submission(sub, email_question_ids)
if email:
index[email.lower()] = {
"responses": sub.get("responses", sub.get("fields", [])),
"submitted_at": sub.get("submittedAt", sub.get("createdAt", "")),
"questions": sub.get("questions", []),
}
return index
def _extract_email_from_submission(
submission: dict, email_question_ids: list[str]
) -> Optional[str]:
"""Extract email address from a submission's responses."""
# Try respondent email first (Tally often includes this)
respondent_email = submission.get("respondentEmail")
if respondent_email:
return respondent_email
# Search through responses/fields for matching question IDs
responses = submission.get("responses", submission.get("fields", []))
if isinstance(responses, list):
for resp in responses:
q_id = resp.get("questionId") or resp.get("key") or resp.get("id")
if q_id in email_question_ids:
value = resp.get("value") or resp.get("answer")
if isinstance(value, str) and "@" in value:
return value
elif isinstance(responses, dict):
for q_id in email_question_ids:
value = responses.get(q_id)
if isinstance(value, str) and "@" in value:
return value
return None
async def _get_cached_index(
form_id: str,
) -> tuple[Optional[dict], Optional[list]]:
"""Check Redis for cached email index and questions. Returns (index, questions) or (None, None)."""
redis = await get_redis_async()
index_key = _EMAIL_INDEX_KEY.format(form_id=form_id)
questions_key = _QUESTIONS_KEY.format(form_id=form_id)
raw_index = await redis.get(index_key)
raw_questions = await redis.get(questions_key)
if raw_index and raw_questions:
return json.loads(raw_index), json.loads(raw_questions)
return None, None
async def _refresh_cache(form_id: str) -> tuple[dict, list]:
"""Refresh the Tally submission cache. Uses incremental fetch when possible.
Returns (email_index, questions).
"""
redis = await get_redis_async()
last_fetch_key = _LAST_FETCH_KEY.format(form_id=form_id)
index_key = _EMAIL_INDEX_KEY.format(form_id=form_id)
questions_key = _QUESTIONS_KEY.format(form_id=form_id)
last_fetch = await redis.get(last_fetch_key)
if last_fetch:
# Incremental fetch: only get new submissions since last fetch
logger.info(f"Tally incremental fetch since {last_fetch}")
questions, new_submissions = await _fetch_all_submissions(
form_id, start_date=last_fetch
)
# Try to load existing index to merge
raw_existing = await redis.get(index_key)
existing_index: dict[str, dict] = {}
if raw_existing:
existing_index = json.loads(raw_existing)
if not questions:
raw_q = await redis.get(questions_key)
if raw_q:
questions = json.loads(raw_q)
new_index = _build_email_index(new_submissions, questions)
existing_index.update(new_index)
email_index = existing_index
else:
# Full initial fetch
logger.info("Tally full initial fetch")
questions, submissions = await _fetch_all_submissions(form_id)
email_index = _build_email_index(submissions, questions)
# Store in Redis
now = datetime.now(timezone.utc).isoformat()
await redis.setex(index_key, _INDEX_TTL, json.dumps(email_index))
await redis.setex(questions_key, _INDEX_TTL, json.dumps(questions))
await redis.setex(last_fetch_key, _LAST_FETCH_TTL, now)
logger.info(f"Tally cache refreshed: {len(email_index)} emails indexed")
return email_index, questions
async def find_submission_by_email(
form_id: str, email: str
) -> Optional[tuple[dict, list]]:
"""Look up a Tally submission by email. Uses cache when available.
Returns (submission_data, questions) or None.
"""
email_lower = email.lower()
# Try cache first
email_index, questions = await _get_cached_index(form_id)
if email_index is not None and questions is not None:
sub = email_index.get(email_lower)
if sub is not None:
return sub, questions
return None
# Cache miss - refresh
email_index, questions = await _refresh_cache(form_id)
sub = email_index.get(email_lower)
if sub is not None:
return sub, questions
return None
def format_submission_for_llm(submission: dict, questions: list[dict]) -> str:
"""Format a submission as readable Q&A text for LLM consumption."""
# Build question ID -> title lookup
q_titles: dict[str, str] = {}
for q in questions:
q_id = q.get("id", "")
title = q.get("label") or q.get("title") or q.get("name") or f"Question {q_id}"
q_titles[q_id] = title
lines: list[str] = []
responses = submission.get("responses", [])
if isinstance(responses, list):
for resp in responses:
q_id = resp.get("questionId") or resp.get("key") or resp.get("id") or ""
title = q_titles.get(q_id, f"Question {q_id}")
value = resp.get("value") or resp.get("answer") or ""
lines.append(f"Q: {title}\nA: {_format_answer(value)}")
elif isinstance(responses, dict):
for q_id, value in responses.items():
title = q_titles.get(q_id, f"Question {q_id}")
lines.append(f"Q: {title}\nA: {_format_answer(value)}")
return "\n\n".join(lines)
def _format_answer(value: object) -> str:
"""Format an answer value for display."""
if value is None:
return "(no answer)"
if isinstance(value, list):
return ", ".join(str(v) for v in value)
if isinstance(value, dict):
parts = [f"{k}: {v}" for k, v in value.items() if v]
return "; ".join(parts) if parts else "(no answer)"
return str(value)
_EXTRACTION_PROMPT = """\
You are a business analyst. Given the following form submission data, extract structured business understanding information.
Return a JSON object with ONLY the fields that can be confidently extracted. Use null for fields that cannot be determined.
Fields:
- user_name (string): the person's name
- job_title (string): their job title
- business_name (string): company/business name
- industry (string): industry or sector
- business_size (string): company size e.g. "1-10", "11-50", "51-200"
- user_role (string): their role context e.g. "decision maker", "implementer"
- key_workflows (list of strings): key business workflows
- daily_activities (list of strings): daily activities performed
- pain_points (list of strings): current pain points
- bottlenecks (list of strings): process bottlenecks
- manual_tasks (list of strings): manual/repetitive tasks
- automation_goals (list of strings): desired automation goals
- current_software (list of strings): software/tools currently used
- existing_automation (list of strings): existing automations
- additional_notes (string): any additional context
Form data:
{submission_text}
Return ONLY valid JSON."""
async def extract_business_understanding(
formatted_text: str,
) -> BusinessUnderstandingInput:
"""Use an LLM to extract structured business understanding from form text."""
settings = Settings()
api_key = settings.secrets.open_router_api_key
client = AsyncOpenAI(api_key=api_key, base_url="https://openrouter.ai/api/v1")
response = await client.chat.completions.create(
model="openai/gpt-4o-mini",
messages=[
{
"role": "user",
"content": _EXTRACTION_PROMPT.format(submission_text=formatted_text),
}
],
response_format={"type": "json_object"},
temperature=0.0,
)
raw = response.choices[0].message.content or "{}"
data = json.loads(raw)
# Filter out null values before constructing
cleaned = {k: v for k, v in data.items() if v is not None}
return BusinessUnderstandingInput(**cleaned)
async def populate_understanding_from_tally(user_id: str, email: str) -> None:
"""Main orchestrator: check Tally for a matching submission and populate understanding.
Fire-and-forget safe — all exceptions are caught and logged.
"""
try:
# Check if understanding already exists (idempotency)
existing = await get_business_understanding(user_id)
if existing is not None:
logger.debug(
f"Tally: user {user_id} already has business understanding, skipping"
)
return
# Check API key is configured
settings = Settings()
if not settings.secrets.tally_api_key:
logger.debug("Tally: no API key configured, skipping")
return
# Look up submission by email
result = await find_submission_by_email(TALLY_FORM_ID, email)
if result is None:
logger.debug(f"Tally: no submission found for {email}")
return
submission, questions = result
logger.info(f"Tally: found submission for {email}, extracting understanding")
# Format and extract
formatted = format_submission_for_llm(submission, questions)
if not formatted.strip():
logger.warning("Tally: formatted submission was empty, skipping")
return
understanding_input = await extract_business_understanding(formatted)
# Upsert into database
await upsert_business_understanding(user_id, understanding_input)
logger.info(f"Tally: successfully populated understanding for user {user_id}")
except Exception:
logger.exception(f"Tally: error populating understanding for user {user_id}")

View File

@@ -0,0 +1,305 @@
"""Tests for backend.data.tally module."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from backend.data.tally import (
_build_email_index,
_format_answer,
find_submission_by_email,
format_submission_for_llm,
populate_understanding_from_tally,
)
# ── Fixtures ──────────────────────────────────────────────────────────────────
SAMPLE_QUESTIONS = [
{"id": "q1", "label": "What is your name?", "type": "INPUT_TEXT"},
{"id": "q2", "label": "Email address", "type": "INPUT_EMAIL"},
{"id": "q3", "label": "Company name", "type": "INPUT_TEXT"},
{"id": "q4", "label": "Industry", "type": "INPUT_TEXT"},
]
SAMPLE_SUBMISSIONS = [
{
"respondentEmail": None,
"responses": [
{"questionId": "q1", "value": "Alice Smith"},
{"questionId": "q2", "value": "alice@example.com"},
{"questionId": "q3", "value": "Acme Corp"},
{"questionId": "q4", "value": "Technology"},
],
"submittedAt": "2025-01-15T10:00:00Z",
},
{
"respondentEmail": "bob@example.com",
"responses": [
{"questionId": "q1", "value": "Bob Jones"},
{"questionId": "q2", "value": "bob@example.com"},
{"questionId": "q3", "value": "Bob's Burgers"},
{"questionId": "q4", "value": "Food"},
],
"submittedAt": "2025-01-16T10:00:00Z",
},
]
# ── _build_email_index ────────────────────────────────────────────────────────
def test_build_email_index():
index = _build_email_index(SAMPLE_SUBMISSIONS, SAMPLE_QUESTIONS)
assert "alice@example.com" in index
assert "bob@example.com" in index
assert len(index) == 2
def test_build_email_index_case_insensitive():
submissions = [
{
"respondentEmail": None,
"responses": [
{"questionId": "q2", "value": "Alice@Example.COM"},
],
"submittedAt": "2025-01-15T10:00:00Z",
},
]
index = _build_email_index(submissions, SAMPLE_QUESTIONS)
assert "alice@example.com" in index
assert "Alice@Example.COM" not in index
def test_build_email_index_empty():
index = _build_email_index([], SAMPLE_QUESTIONS)
assert index == {}
def test_build_email_index_no_email_field():
questions = [{"id": "q1", "label": "Name", "type": "INPUT_TEXT"}]
submissions = [
{
"responses": [{"questionId": "q1", "value": "Alice"}],
"submittedAt": "2025-01-15T10:00:00Z",
}
]
index = _build_email_index(submissions, questions)
assert index == {}
def test_build_email_index_respondent_email():
"""respondentEmail takes precedence over field scanning."""
submissions = [
{
"respondentEmail": "direct@example.com",
"responses": [
{"questionId": "q2", "value": "field@example.com"},
],
"submittedAt": "2025-01-15T10:00:00Z",
}
]
index = _build_email_index(submissions, SAMPLE_QUESTIONS)
assert "direct@example.com" in index
assert "field@example.com" not in index
# ── format_submission_for_llm ─────────────────────────────────────────────────
def test_format_submission_for_llm():
submission = {
"responses": [
{"questionId": "q1", "value": "Alice Smith"},
{"questionId": "q3", "value": "Acme Corp"},
],
}
result = format_submission_for_llm(submission, SAMPLE_QUESTIONS)
assert "Q: What is your name?" in result
assert "A: Alice Smith" in result
assert "Q: Company name" in result
assert "A: Acme Corp" in result
def test_format_submission_for_llm_dict_responses():
submission = {
"responses": {
"q1": "Alice Smith",
"q3": "Acme Corp",
},
}
result = format_submission_for_llm(submission, SAMPLE_QUESTIONS)
assert "A: Alice Smith" in result
assert "A: Acme Corp" in result
def test_format_answer_types():
assert _format_answer(None) == "(no answer)"
assert _format_answer("hello") == "hello"
assert _format_answer(["a", "b"]) == "a, b"
assert _format_answer({"key": "val"}) == "key: val"
assert _format_answer(42) == "42"
# ── find_submission_by_email ──────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_find_submission_by_email_cache_hit():
cached_index = {
"alice@example.com": {"responses": [], "submitted_at": "2025-01-15"},
}
cached_questions = SAMPLE_QUESTIONS
with patch(
"backend.data.tally._get_cached_index",
new_callable=AsyncMock,
return_value=(cached_index, cached_questions),
) as mock_cache:
result = await find_submission_by_email("form123", "alice@example.com")
mock_cache.assert_awaited_once_with("form123")
assert result is not None
sub, questions = result
assert sub["submitted_at"] == "2025-01-15"
@pytest.mark.asyncio
async def test_find_submission_by_email_cache_miss():
refreshed_index = {
"alice@example.com": {"responses": [], "submitted_at": "2025-01-15"},
}
with (
patch(
"backend.data.tally._get_cached_index",
new_callable=AsyncMock,
return_value=(None, None),
),
patch(
"backend.data.tally._refresh_cache",
new_callable=AsyncMock,
return_value=(refreshed_index, SAMPLE_QUESTIONS),
) as mock_refresh,
):
result = await find_submission_by_email("form123", "alice@example.com")
mock_refresh.assert_awaited_once_with("form123")
assert result is not None
@pytest.mark.asyncio
async def test_find_submission_by_email_no_match():
cached_index = {
"alice@example.com": {"responses": [], "submitted_at": "2025-01-15"},
}
with patch(
"backend.data.tally._get_cached_index",
new_callable=AsyncMock,
return_value=(cached_index, SAMPLE_QUESTIONS),
):
result = await find_submission_by_email("form123", "unknown@example.com")
assert result is None
# ── populate_understanding_from_tally ─────────────────────────────────────────
@pytest.mark.asyncio
async def test_populate_understanding_skips_existing():
"""If user already has understanding, skip entirely."""
mock_understanding = MagicMock()
with (
patch(
"backend.data.tally.get_business_understanding",
new_callable=AsyncMock,
return_value=mock_understanding,
) as mock_get,
patch(
"backend.data.tally.find_submission_by_email",
new_callable=AsyncMock,
) as mock_find,
):
await populate_understanding_from_tally("user-1", "test@example.com")
mock_get.assert_awaited_once_with("user-1")
mock_find.assert_not_awaited()
@pytest.mark.asyncio
async def test_populate_understanding_skips_no_api_key():
"""If no Tally API key, skip gracefully."""
mock_settings = MagicMock()
mock_settings.secrets.tally_api_key = ""
with (
patch(
"backend.data.tally.get_business_understanding",
new_callable=AsyncMock,
return_value=None,
),
patch("backend.data.tally.Settings", return_value=mock_settings),
patch(
"backend.data.tally.find_submission_by_email",
new_callable=AsyncMock,
) as mock_find,
):
await populate_understanding_from_tally("user-1", "test@example.com")
mock_find.assert_not_awaited()
@pytest.mark.asyncio
async def test_populate_understanding_handles_errors():
"""Must never raise, even on unexpected errors."""
with patch(
"backend.data.tally.get_business_understanding",
new_callable=AsyncMock,
side_effect=RuntimeError("DB down"),
):
# Should not raise
await populate_understanding_from_tally("user-1", "test@example.com")
@pytest.mark.asyncio
async def test_populate_understanding_full_flow():
"""Happy path: no existing understanding, finds submission, extracts, upserts."""
mock_settings = MagicMock()
mock_settings.secrets.tally_api_key = "test-key"
submission = {
"responses": [
{"questionId": "q1", "value": "Alice"},
{"questionId": "q3", "value": "Acme"},
],
}
mock_input = MagicMock()
with (
patch(
"backend.data.tally.get_business_understanding",
new_callable=AsyncMock,
return_value=None,
),
patch("backend.data.tally.Settings", return_value=mock_settings),
patch(
"backend.data.tally.find_submission_by_email",
new_callable=AsyncMock,
return_value=(submission, SAMPLE_QUESTIONS),
),
patch(
"backend.data.tally.extract_business_understanding",
new_callable=AsyncMock,
return_value=mock_input,
) as mock_extract,
patch(
"backend.data.tally.upsert_business_understanding",
new_callable=AsyncMock,
) as mock_upsert,
):
await populate_understanding_from_tally("user-1", "alice@example.com")
mock_extract.assert_awaited_once()
mock_upsert.assert_awaited_once_with("user-1", mock_input)

View File

@@ -684,6 +684,11 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
screenshotone_api_key: str = Field(default="", description="ScreenshotOne API Key")
tally_api_key: str = Field(
default="",
description="Tally API key for form submission lookup on signup",
)
apollo_api_key: str = Field(default="", description="Apollo API Key")
smartlead_api_key: str = Field(default="", description="SmartLead API Key")
zerobounce_api_key: str = Field(default="", description="ZeroBounce API Key")