Why Real-Time Fraud Screening Matters
In the battle against fraud, speed is critical. Real-time fraud screening evaluates risk during the user interaction, before damage can occur. Batch processing and manual reviews catch fraud too late - after the account is created, the money is stolen, or the damage is done.
The Cost of Delayed Detection
- Account takeover - By the time batch analysis flags a compromised account, funds may be drained
- Synthetic identity fraud - Fake accounts used immediately after creation
- Chargeback losses - Fraudulent purchases completed before review
- Data exfiltration - Sensitive data accessed before access is revoked
Real-Time vs. Near-Real-Time vs. Batch
| Approach | Latency | Use Case |
|---|---|---|
| Real-time (synchronous) | < 200ms | Block/allow decisions during transaction |
| Near-real-time (async) | 1-30 seconds | Post-action analysis, session monitoring |
| Batch processing | Hours/days | Pattern analysis, model training, reporting |
Real-Time Screening Architecture
Building a sub-200ms fraud screening system requires careful architecture. The key is tiered decision making - fast, cheap checks first, expensive checks only when needed.
Tiered Decision Architecture
┌─────────────────────────────────────────────────────────────┐
│ User Action │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ TIER 1: Blocklists & Allow Lists (~5ms) │
│ - Known fraud identifiers │
│ - Trusted users │
│ - Compliance blocks (sanctions, etc.) │
└─────────────────────────────────────────────────────────────┘
│
(if not decisive)
▼
┌─────────────────────────────────────────────────────────────┐
│ TIER 2: Rules Engine (~10-20ms) │
│ - Velocity checks │
│ - Business rules │
│ - Geographic rules │
│ - Amount thresholds │
└─────────────────────────────────────────────────────────────┘
│
(if not decisive)
▼
┌─────────────────────────────────────────────────────────────┐
│ TIER 3: Phone Intelligence (~50-100ms) │
│ - Spam score │
│ - Line type │
│ - Carrier reputation │
│ - Number age │
└─────────────────────────────────────────────────────────────┘
│
(if not decisive)
▼
┌─────────────────────────────────────────────────────────────┐
│ TIER 4: ML Model Scoring (~20-50ms) │
│ - Feature assembly │
│ - Model inference │
│ - Score interpretation │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Final Decision │
│ ALLOW / CHALLENGE / REVIEW / BLOCK │
└─────────────────────────────────────────────────────────────┘
Implementation Example
class RealTimeFraudScreener:
"""Tiered real-time fraud screening system."""
def __init__(self, config):
self.blocklist = BlocklistService(config.blocklist_db)
self.rules_engine = RulesEngine(config.rules)
self.phone_api = PhoneIntelligenceAPI(config.phone_api_key)
self.ml_model = MLFraudModel(config.model_path)
self.logger = FraudLogger()
async def screen(self, event):
"""Screen an event for fraud in real-time."""
start_time = time.time()
decision = None
# Tier 1: Blocklists (fastest)
decision = await self._check_blocklists(event)
if decision:
return self._finalize(decision, 'blocklist', start_time)
# Tier 2: Rules Engine
decision = await self._check_rules(event)
if decision and decision.is_decisive:
return self._finalize(decision, 'rules', start_time)
# Tier 3: Phone Intelligence (parallel with other enrichment)
enrichment_tasks = [
self._enrich_phone(event.phone),
self._enrich_device(event.device_id),
self._enrich_ip(event.ip_address)
]
enrichments = await asyncio.gather(*enrichment_tasks)
phone_data, device_data, ip_data = enrichments
# Check phone intelligence for quick decisions
decision = self._evaluate_phone_risk(phone_data)
if decision and decision.is_decisive:
return self._finalize(decision, 'phone_intel', start_time)
# Tier 4: ML Model (most comprehensive)
features = self._assemble_features(event, phone_data, device_data, ip_data)
ml_score = self.ml_model.predict(features)
decision = self._interpret_ml_score(ml_score, event)
return self._finalize(decision, 'ml_model', start_time)
async def _check_blocklists(self, event):
"""Sub-5ms blocklist checks."""
checks = await asyncio.gather(
self.blocklist.check_phone(event.phone),
self.blocklist.check_email(event.email),
self.blocklist.check_device(event.device_id),
self.blocklist.check_ip(event.ip_address)
)
for result in checks:
if result.blocked:
return Decision(
action='block',
reason=result.reason,
confidence='high',
is_decisive=True
)
return None
async def _enrich_phone(self, phone):
"""Get phone intelligence with caching."""
cache_key = f"phone:{phone}"
cached = await self.cache.get(cache_key)
if cached:
return cached
data = await self.phone_api.lookup(phone, {
'lrn': True,
'spam': True,
'cnam': True
})
# Cache for 2 hours
await self.cache.set(cache_key, data, ttl=7200)
return data
def _evaluate_phone_risk(self, phone_data):
"""Quick phone-based risk assessment."""
if not phone_data:
return None
spam_score = phone_data.get('spam', {}).get('score', 0)
is_robocaller = phone_data.get('spam', {}).get('is_robocaller', False)
line_type = phone_data.get('lrn', {}).get('line_type')
# Immediate blocks
if spam_score >= 90 or is_robocaller:
return Decision(
action='block',
reason='high_spam_score',
confidence='high',
is_decisive=True
)
# Strong challenge signals
if spam_score >= 70:
return Decision(
action='challenge',
reason='elevated_spam_score',
confidence='medium',
is_decisive=True
)
# Continue to ML for borderline cases
return None
Phone Intelligence in Real-Time Screening
Phone numbers provide uniquely valuable signals for fraud screening because they're difficult to change rapidly and carry reputation history.
Key Phone Signals for Screening
| Signal | Fraud Indication | Typical Weight |
|---|---|---|
| Spam Score | Direct fraud/spam history | High |
| Line Type = VoIP | Easier to obtain, dispose | Medium |
| Recent Activation | New numbers often fraud | Medium-High |
| Recent Port | Possible SIM swap | Medium-High |
| No CNAM | Unestablished identity | Low-Medium |
| High-Risk Carrier | Carrier with fraud history | Low-Medium |
Phone-Based Screening Rules
def phone_screening_rules(phone_data, context):
"""Apply phone-based fraud screening rules."""
rules_triggered = []
# Rule 1: Known robocaller - immediate block
if phone_data.get('spam', {}).get('is_robocaller'):
return {
'action': 'block',
'rule': 'known_robocaller',
'confidence': 'high'
}
# Rule 2: Very high spam score
spam_score = phone_data.get('spam', {}).get('score', 0)
if spam_score >= 85:
return {
'action': 'block',
'rule': 'spam_score_critical',
'confidence': 'high'
}
# Rule 3: High spam + VoIP combination
line_type = phone_data.get('lrn', {}).get('line_type')
if spam_score >= 50 and line_type == 'voip':
rules_triggered.append({
'rule': 'spam_voip_combo',
'weight': 30
})
# Rule 4: New number (less than 7 days)
activation_date = phone_data.get('lrn', {}).get('activation_date')
if activation_date:
days_old = (datetime.now() - parse_date(activation_date)).days
if days_old < 7:
rules_triggered.append({
'rule': 'very_new_number',
'weight': 25
})
elif days_old < 30:
rules_triggered.append({
'rule': 'new_number',
'weight': 15
})
# Rule 5: Recently ported (possible SIM swap)
if phone_data.get('lrn', {}).get('ported'):
port_date = phone_data.get('lrn', {}).get('port_date')
if port_date:
days_since_port = (datetime.now() - parse_date(port_date)).days
if days_since_port < 3:
rules_triggered.append({
'rule': 'very_recent_port',
'weight': 35
})
elif days_since_port < 14:
rules_triggered.append({
'rule': 'recent_port',
'weight': 20
})
# Rule 6: High-value transaction with risky phone
if context.get('transaction_amount', 0) > 500:
if line_type == 'voip' or spam_score > 30:
rules_triggered.append({
'rule': 'high_value_risky_phone',
'weight': 20
})
# Calculate total risk from rules
total_weight = sum(r['weight'] for r in rules_triggered)
if total_weight >= 60:
return {'action': 'challenge', 'rules': rules_triggered}
elif total_weight >= 40:
return {'action': 'review', 'rules': rules_triggered}
return {'action': 'allow', 'rules': rules_triggered}
Add phone intelligence to your fraud screening. Sub-100ms API response times, comprehensive data.
Get Free API KeyReal-Time Velocity Checking
Velocity checks detect abnormal activity rates that indicate fraud or abuse.
Common Velocity Dimensions
- By phone number - Accounts/actions per phone per time window
- By IP address - Actions from single IP
- By device - Actions from single device fingerprint
- By user - Actions by single account
- Cross-dimensional - Same phone + different accounts
Implementation with Redis
class VelocityChecker:
"""Real-time velocity checking with Redis."""
def __init__(self, redis_client):
self.redis = redis_client
async def check_velocity(self, dimension, value, action, limits):
"""
Check if action exceeds velocity limits.
Args:
dimension: 'phone', 'ip', 'device', 'user'
value: The dimension value (phone number, IP, etc.)
action: The action type ('registration', 'login', 'transaction')
limits: Dict of {window_seconds: max_count}
Returns:
Dict with exceeded limits and recommendation
"""
key_prefix = f"velocity:{dimension}:{action}:{value}"
exceeded = []
for window_seconds, max_count in limits.items():
key = f"{key_prefix}:{window_seconds}"
# Increment counter
current = await self.redis.incr(key)
# Set expiry on first increment
if current == 1:
await self.redis.expire(key, window_seconds)
if current > max_count:
exceeded.append({
'window': window_seconds,
'limit': max_count,
'current': current
})
if exceeded:
return {
'exceeded': True,
'violations': exceeded,
'recommendation': self._get_recommendation(exceeded)
}
return {'exceeded': False}
def _get_recommendation(self, violations):
# More violations or higher overage = stronger response
max_overage = max(v['current'] / v['limit'] for v in violations)
if max_overage >= 3:
return 'block'
elif max_overage >= 2:
return 'challenge'
else:
return 'flag'
# Usage
velocity = VelocityChecker(redis)
# Define limits for phone-based registration
registration_limits = {
300: 1, # 1 registration per 5 minutes
3600: 3, # 3 per hour
86400: 5, # 5 per day
}
result = await velocity.check_velocity(
'phone',
'+15551234567',
'registration',
registration_limits
)
if result['exceeded']:
# Handle velocity violation
pass
Challenge-Response Mechanisms
When fraud signals are inconclusive, challenge the user rather than blocking outright.
Challenge Escalation Ladder
| Risk Level | Challenge Type | User Friction |
|---|---|---|
| Low-Medium | SMS OTP | Low |
| Medium | Email verification | Low |
| Medium-High | Voice call OTP | Medium |
| High | Document upload | High |
| Very High | Video verification | Very High |
class ChallengeManager:
"""Manage fraud challenge-response flow."""
def issue_challenge(self, risk_score, context):
"""Issue appropriate challenge based on risk."""
# Select challenge type
if risk_score < 40:
challenge = self._create_sms_challenge(context)
elif risk_score < 55:
challenge = self._create_email_challenge(context)
elif risk_score < 70:
challenge = self._create_voice_challenge(context)
elif risk_score < 85:
challenge = self._create_document_challenge(context)
else:
# Very high risk - manual review required
return self._queue_for_review(context)
# Store challenge state
self._store_challenge(challenge)
return challenge
def _create_sms_challenge(self, context):
"""Create SMS OTP challenge."""
otp = generate_otp(6)
return {
'type': 'sms_otp',
'challenge_id': generate_id(),
'phone': context.phone,
'otp_hash': hash_otp(otp),
'expires_at': datetime.now() + timedelta(minutes=10),
'max_attempts': 3,
'message': f"Your verification code is: {otp}"
}
def verify_challenge(self, challenge_id, response):
"""Verify challenge response."""
challenge = self._get_challenge(challenge_id)
if not challenge:
return {'success': False, 'reason': 'invalid_challenge'}
if datetime.now() > challenge['expires_at']:
return {'success': False, 'reason': 'expired'}
if challenge['attempts'] >= challenge['max_attempts']:
return {'success': False, 'reason': 'max_attempts'}
# Verify based on challenge type
if challenge['type'] == 'sms_otp':
if hash_otp(response) == challenge['otp_hash']:
self._mark_challenge_complete(challenge_id)
return {'success': True}
else:
self._increment_attempts(challenge_id)
return {'success': False, 'reason': 'incorrect'}
return {'success': False, 'reason': 'unknown_type'}
Performance Optimization
Meeting sub-200ms latency requirements requires aggressive optimization.
Optimization Strategies
1. Parallel Data Fetching
async def parallel_enrichment(event):
"""Fetch all enrichment data in parallel."""
tasks = [
fetch_phone_data(event.phone),
fetch_device_data(event.device_id),
fetch_ip_data(event.ip_address),
fetch_user_history(event.user_id),
fetch_transaction_history(event.user_id)
]
# All complete in ~max(individual times), not sum
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
'phone': results[0] if not isinstance(results[0], Exception) else None,
'device': results[1] if not isinstance(results[1], Exception) else None,
'ip': results[2] if not isinstance(results[2], Exception) else None,
'user_history': results[3] if not isinstance(results[3], Exception) else None,
'tx_history': results[4] if not isinstance(results[4], Exception) else None
}
2. Aggressive Caching
class CachedEnrichment:
"""Multi-tier caching for enrichment data."""
def __init__(self, local_cache, redis, api):
self.local = local_cache # In-memory, ~0.1ms
self.redis = redis # Redis, ~1-2ms
self.api = api # External API, ~50-100ms
async def get_phone_data(self, phone):
# L1: Local cache
local_key = f"phone:{phone}"
if local_key in self.local:
return self.local[local_key]
# L2: Redis cache
redis_data = await self.redis.get(local_key)
if redis_data:
data = json.loads(redis_data)
self.local[local_key] = data # Promote to L1
return data
# L3: API call
data = await self.api.lookup(phone)
# Store in both caches
self.local[local_key] = data
await self.redis.set(local_key, json.dumps(data), ex=7200)
return data
3. Precomputed Features
# Precompute features in near-real-time, use in real-time
class FeatureStore:
"""Pre-computed feature storage."""
async def get_user_features(self, user_id):
"""Get pre-computed user features."""
key = f"features:user:{user_id}"
return await self.redis.hgetall(key)
async def update_user_features(self, user_id, event):
"""Update features after event (near-real-time)."""
key = f"features:user:{user_id}"
# Increment counters
pipe = self.redis.pipeline()
pipe.hincrby(key, 'total_transactions', 1)
pipe.hincrbyfloat(key, 'total_amount', event.amount)
# Update rolling averages
pipe.hset(key, 'last_activity', datetime.now().isoformat())
await pipe.execute()
Monitoring and Alerting
Real-time systems require real-time monitoring.
Key Metrics to Track
- Latency percentiles - P50, P95, P99 screening latency
- Decision distribution - Allow/challenge/block rates
- False positive rate - Legitimate users blocked/challenged
- False negative rate - Fraud that passed screening
- Cache hit rates - Per-tier cache effectiveness
- Error rates - API failures, timeouts
class ScreeningMetrics:
"""Track real-time screening metrics."""
def __init__(self, statsd):
self.stats = statsd
def record_screening(self, result, start_time):
latency_ms = (time.time() - start_time) * 1000
# Latency histogram
self.stats.timing('fraud.screening.latency', latency_ms)
# Decision counter
self.stats.incr(f"fraud.screening.decision.{result.action}")
# Tier that made decision
self.stats.incr(f"fraud.screening.tier.{result.decision_tier}")
# Alert on slow screenings
if latency_ms > 200:
self.stats.incr('fraud.screening.slow')
def record_outcome(self, screening_id, was_fraud):
"""Record actual outcome for model feedback."""
# This powers false positive/negative tracking
self.stats.incr(f"fraud.outcomes.{'fraud' if was_fraud else 'legit'}")
Best Practices
- Design for latency - Every millisecond matters; optimize ruthlessly
- Fail open intelligently - If systems fail, have a safe degradation path
- Use tiered decisions - Fast checks first, expensive checks only when needed
- Cache aggressively - Phone data changes slowly; cache liberally
- Parallel everything - Fetch enrichment data concurrently
- Challenge, don't block - Let users prove legitimacy when possible
- Monitor continuously - Track latency, decisions, and outcomes
- Test under load - Ensure performance holds during traffic spikes
Frequently Asked Questions
What latency is acceptable for real-time fraud screening?
For synchronous screening during user actions, aim for P95 latency under 200ms. Users notice delays above 300ms. For checkout flows, up to 500ms may be acceptable due to user expectation of security checks. Phone intelligence APIs typically respond in 50-100ms, leaving headroom for rules and ML scoring. If you can't meet latency targets, consider async post-action screening with ability to cancel/reverse.
Should I fail open or fail closed when fraud APIs timeout?
This depends on your risk tolerance and action type. For high-value transactions, failing closed (blocking) may be appropriate. For registrations and logins, failing open prevents legitimate user frustration. Best practice: fail open with enhanced monitoring - allow the action but flag for near-real-time review, increased session monitoring, or delayed challenge. Never let API failures completely disable fraud protection.
How do I balance fraud prevention with user experience?
The key is graduated friction - apply challenges proportional to risk. Low-risk users should sail through with no friction. Medium-risk users get quick SMS verification. Only high-risk users face significant challenges. Measure your false positive rate and tune thresholds to minimize friction for legitimate users. Also, gather feedback when users successfully complete challenges - those users were legitimate despite high scores, which helps tune your model.
How often should I retrain fraud models?
Fraud patterns evolve continuously, so models degrade over time. Most organizations retrain monthly, with weekly updates in high-fraud environments. However, real-time rules can be updated instantly when new attack patterns emerge. The best approach is continuous monitoring of model performance metrics; when precision/recall degrades beyond thresholds, trigger retraining. Some teams maintain champion/challenger models, continuously training new versions.