Real-Time Fraud Screening

Real-time fraud screening evaluates transactions, account actions, and communications as they happen, making split-second decisions to allow, challenge, or block suspicious activity. This guide covers the architecture, data sources, decision engines, and optimization techniques for building low-latency fraud screening systems that incorporate phone intelligence.

Key Takeaways

  • Real-time screening must complete in under 200ms for seamless user experience
  • Use tiered architecture: fast rules first, ML models for complex cases
  • Phone intelligence provides immediate risk signals without user friction
  • Combine synchronous blocking with async enrichment for best performance

Why Real-Time Fraud Screening Matters

In the battle against fraud, speed is critical. Real-time fraud screening evaluates risk during the user interaction, before damage can occur. Batch processing and manual reviews catch fraud too late - after the account is created, the money is stolen, or the damage is done.

The Cost of Delayed Detection

  • Account takeover - By the time batch analysis flags a compromised account, funds may be drained
  • Synthetic identity fraud - Fake accounts used immediately after creation
  • Chargeback losses - Fraudulent purchases completed before review
  • Data exfiltration - Sensitive data accessed before access is revoked

Real-Time vs. Near-Real-Time vs. Batch

Approach Latency Use Case
Real-time (synchronous) < 200ms Block/allow decisions during transaction
Near-real-time (async) 1-30 seconds Post-action analysis, session monitoring
Batch processing Hours/days Pattern analysis, model training, reporting

Real-Time Screening Architecture

Building a sub-200ms fraud screening system requires careful architecture. The key is tiered decision making - fast, cheap checks first, expensive checks only when needed.

Tiered Decision Architecture

┌─────────────────────────────────────────────────────────────┐
│                      User Action                             │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│  TIER 1: Blocklists & Allow Lists (~5ms)                    │
│  - Known fraud identifiers                                   │
│  - Trusted users                                             │
│  - Compliance blocks (sanctions, etc.)                       │
└─────────────────────────────────────────────────────────────┘
                              │
                    (if not decisive)
                              ▼
┌─────────────────────────────────────────────────────────────┐
│  TIER 2: Rules Engine (~10-20ms)                            │
│  - Velocity checks                                           │
│  - Business rules                                            │
│  - Geographic rules                                          │
│  - Amount thresholds                                         │
└─────────────────────────────────────────────────────────────┘
                              │
                    (if not decisive)
                              ▼
┌─────────────────────────────────────────────────────────────┐
│  TIER 3: Phone Intelligence (~50-100ms)                     │
│  - Spam score                                                │
│  - Line type                                                 │
│  - Carrier reputation                                        │
│  - Number age                                                │
└─────────────────────────────────────────────────────────────┘
                              │
                    (if not decisive)
                              ▼
┌─────────────────────────────────────────────────────────────┐
│  TIER 4: ML Model Scoring (~20-50ms)                        │
│  - Feature assembly                                          │
│  - Model inference                                           │
│  - Score interpretation                                      │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                    Final Decision                            │
│  ALLOW / CHALLENGE / REVIEW / BLOCK                         │
└─────────────────────────────────────────────────────────────┘

Implementation Example

class RealTimeFraudScreener:
    """Tiered real-time fraud screening system."""

    def __init__(self, config):
        self.blocklist = BlocklistService(config.blocklist_db)
        self.rules_engine = RulesEngine(config.rules)
        self.phone_api = PhoneIntelligenceAPI(config.phone_api_key)
        self.ml_model = MLFraudModel(config.model_path)
        self.logger = FraudLogger()

    async def screen(self, event):
        """Screen an event for fraud in real-time."""
        start_time = time.time()
        decision = None

        # Tier 1: Blocklists (fastest)
        decision = await self._check_blocklists(event)
        if decision:
            return self._finalize(decision, 'blocklist', start_time)

        # Tier 2: Rules Engine
        decision = await self._check_rules(event)
        if decision and decision.is_decisive:
            return self._finalize(decision, 'rules', start_time)

        # Tier 3: Phone Intelligence (parallel with other enrichment)
        enrichment_tasks = [
            self._enrich_phone(event.phone),
            self._enrich_device(event.device_id),
            self._enrich_ip(event.ip_address)
        ]
        enrichments = await asyncio.gather(*enrichment_tasks)
        phone_data, device_data, ip_data = enrichments

        # Check phone intelligence for quick decisions
        decision = self._evaluate_phone_risk(phone_data)
        if decision and decision.is_decisive:
            return self._finalize(decision, 'phone_intel', start_time)

        # Tier 4: ML Model (most comprehensive)
        features = self._assemble_features(event, phone_data, device_data, ip_data)
        ml_score = self.ml_model.predict(features)
        decision = self._interpret_ml_score(ml_score, event)

        return self._finalize(decision, 'ml_model', start_time)

    async def _check_blocklists(self, event):
        """Sub-5ms blocklist checks."""
        checks = await asyncio.gather(
            self.blocklist.check_phone(event.phone),
            self.blocklist.check_email(event.email),
            self.blocklist.check_device(event.device_id),
            self.blocklist.check_ip(event.ip_address)
        )

        for result in checks:
            if result.blocked:
                return Decision(
                    action='block',
                    reason=result.reason,
                    confidence='high',
                    is_decisive=True
                )
        return None

    async def _enrich_phone(self, phone):
        """Get phone intelligence with caching."""
        cache_key = f"phone:{phone}"
        cached = await self.cache.get(cache_key)
        if cached:
            return cached

        data = await self.phone_api.lookup(phone, {
            'lrn': True,
            'spam': True,
            'cnam': True
        })

        # Cache for 2 hours
        await self.cache.set(cache_key, data, ttl=7200)
        return data

    def _evaluate_phone_risk(self, phone_data):
        """Quick phone-based risk assessment."""
        if not phone_data:
            return None

        spam_score = phone_data.get('spam', {}).get('score', 0)
        is_robocaller = phone_data.get('spam', {}).get('is_robocaller', False)
        line_type = phone_data.get('lrn', {}).get('line_type')

        # Immediate blocks
        if spam_score >= 90 or is_robocaller:
            return Decision(
                action='block',
                reason='high_spam_score',
                confidence='high',
                is_decisive=True
            )

        # Strong challenge signals
        if spam_score >= 70:
            return Decision(
                action='challenge',
                reason='elevated_spam_score',
                confidence='medium',
                is_decisive=True
            )

        # Continue to ML for borderline cases
        return None

Phone Intelligence in Real-Time Screening

Phone numbers provide uniquely valuable signals for fraud screening because they're difficult to change rapidly and carry reputation history.

Key Phone Signals for Screening

Signal Fraud Indication Typical Weight
Spam Score Direct fraud/spam history High
Line Type = VoIP Easier to obtain, dispose Medium
Recent Activation New numbers often fraud Medium-High
Recent Port Possible SIM swap Medium-High
No CNAM Unestablished identity Low-Medium
High-Risk Carrier Carrier with fraud history Low-Medium

Phone-Based Screening Rules

def phone_screening_rules(phone_data, context):
    """Apply phone-based fraud screening rules."""

    rules_triggered = []

    # Rule 1: Known robocaller - immediate block
    if phone_data.get('spam', {}).get('is_robocaller'):
        return {
            'action': 'block',
            'rule': 'known_robocaller',
            'confidence': 'high'
        }

    # Rule 2: Very high spam score
    spam_score = phone_data.get('spam', {}).get('score', 0)
    if spam_score >= 85:
        return {
            'action': 'block',
            'rule': 'spam_score_critical',
            'confidence': 'high'
        }

    # Rule 3: High spam + VoIP combination
    line_type = phone_data.get('lrn', {}).get('line_type')
    if spam_score >= 50 and line_type == 'voip':
        rules_triggered.append({
            'rule': 'spam_voip_combo',
            'weight': 30
        })

    # Rule 4: New number (less than 7 days)
    activation_date = phone_data.get('lrn', {}).get('activation_date')
    if activation_date:
        days_old = (datetime.now() - parse_date(activation_date)).days
        if days_old < 7:
            rules_triggered.append({
                'rule': 'very_new_number',
                'weight': 25
            })
        elif days_old < 30:
            rules_triggered.append({
                'rule': 'new_number',
                'weight': 15
            })

    # Rule 5: Recently ported (possible SIM swap)
    if phone_data.get('lrn', {}).get('ported'):
        port_date = phone_data.get('lrn', {}).get('port_date')
        if port_date:
            days_since_port = (datetime.now() - parse_date(port_date)).days
            if days_since_port < 3:
                rules_triggered.append({
                    'rule': 'very_recent_port',
                    'weight': 35
                })
            elif days_since_port < 14:
                rules_triggered.append({
                    'rule': 'recent_port',
                    'weight': 20
                })

    # Rule 6: High-value transaction with risky phone
    if context.get('transaction_amount', 0) > 500:
        if line_type == 'voip' or spam_score > 30:
            rules_triggered.append({
                'rule': 'high_value_risky_phone',
                'weight': 20
            })

    # Calculate total risk from rules
    total_weight = sum(r['weight'] for r in rules_triggered)

    if total_weight >= 60:
        return {'action': 'challenge', 'rules': rules_triggered}
    elif total_weight >= 40:
        return {'action': 'review', 'rules': rules_triggered}

    return {'action': 'allow', 'rules': rules_triggered}

Add phone intelligence to your fraud screening. Sub-100ms API response times, comprehensive data.

Get Free API Key

Real-Time Velocity Checking

Velocity checks detect abnormal activity rates that indicate fraud or abuse.

Common Velocity Dimensions

  • By phone number - Accounts/actions per phone per time window
  • By IP address - Actions from single IP
  • By device - Actions from single device fingerprint
  • By user - Actions by single account
  • Cross-dimensional - Same phone + different accounts

Implementation with Redis

class VelocityChecker:
    """Real-time velocity checking with Redis."""

    def __init__(self, redis_client):
        self.redis = redis_client

    async def check_velocity(self, dimension, value, action, limits):
        """
        Check if action exceeds velocity limits.

        Args:
            dimension: 'phone', 'ip', 'device', 'user'
            value: The dimension value (phone number, IP, etc.)
            action: The action type ('registration', 'login', 'transaction')
            limits: Dict of {window_seconds: max_count}

        Returns:
            Dict with exceeded limits and recommendation
        """
        key_prefix = f"velocity:{dimension}:{action}:{value}"

        exceeded = []
        for window_seconds, max_count in limits.items():
            key = f"{key_prefix}:{window_seconds}"

            # Increment counter
            current = await self.redis.incr(key)

            # Set expiry on first increment
            if current == 1:
                await self.redis.expire(key, window_seconds)

            if current > max_count:
                exceeded.append({
                    'window': window_seconds,
                    'limit': max_count,
                    'current': current
                })

        if exceeded:
            return {
                'exceeded': True,
                'violations': exceeded,
                'recommendation': self._get_recommendation(exceeded)
            }

        return {'exceeded': False}

    def _get_recommendation(self, violations):
        # More violations or higher overage = stronger response
        max_overage = max(v['current'] / v['limit'] for v in violations)

        if max_overage >= 3:
            return 'block'
        elif max_overage >= 2:
            return 'challenge'
        else:
            return 'flag'


# Usage
velocity = VelocityChecker(redis)

# Define limits for phone-based registration
registration_limits = {
    300: 1,    # 1 registration per 5 minutes
    3600: 3,   # 3 per hour
    86400: 5,  # 5 per day
}

result = await velocity.check_velocity(
    'phone',
    '+15551234567',
    'registration',
    registration_limits
)

if result['exceeded']:
    # Handle velocity violation
    pass

Challenge-Response Mechanisms

When fraud signals are inconclusive, challenge the user rather than blocking outright.

Challenge Escalation Ladder

Risk Level Challenge Type User Friction
Low-Medium SMS OTP Low
Medium Email verification Low
Medium-High Voice call OTP Medium
High Document upload High
Very High Video verification Very High
class ChallengeManager:
    """Manage fraud challenge-response flow."""

    def issue_challenge(self, risk_score, context):
        """Issue appropriate challenge based on risk."""

        # Select challenge type
        if risk_score < 40:
            challenge = self._create_sms_challenge(context)
        elif risk_score < 55:
            challenge = self._create_email_challenge(context)
        elif risk_score < 70:
            challenge = self._create_voice_challenge(context)
        elif risk_score < 85:
            challenge = self._create_document_challenge(context)
        else:
            # Very high risk - manual review required
            return self._queue_for_review(context)

        # Store challenge state
        self._store_challenge(challenge)

        return challenge

    def _create_sms_challenge(self, context):
        """Create SMS OTP challenge."""
        otp = generate_otp(6)

        return {
            'type': 'sms_otp',
            'challenge_id': generate_id(),
            'phone': context.phone,
            'otp_hash': hash_otp(otp),
            'expires_at': datetime.now() + timedelta(minutes=10),
            'max_attempts': 3,
            'message': f"Your verification code is: {otp}"
        }

    def verify_challenge(self, challenge_id, response):
        """Verify challenge response."""
        challenge = self._get_challenge(challenge_id)

        if not challenge:
            return {'success': False, 'reason': 'invalid_challenge'}

        if datetime.now() > challenge['expires_at']:
            return {'success': False, 'reason': 'expired'}

        if challenge['attempts'] >= challenge['max_attempts']:
            return {'success': False, 'reason': 'max_attempts'}

        # Verify based on challenge type
        if challenge['type'] == 'sms_otp':
            if hash_otp(response) == challenge['otp_hash']:
                self._mark_challenge_complete(challenge_id)
                return {'success': True}
            else:
                self._increment_attempts(challenge_id)
                return {'success': False, 'reason': 'incorrect'}

        return {'success': False, 'reason': 'unknown_type'}

Performance Optimization

Meeting sub-200ms latency requirements requires aggressive optimization.

Optimization Strategies

1. Parallel Data Fetching

async def parallel_enrichment(event):
    """Fetch all enrichment data in parallel."""
    tasks = [
        fetch_phone_data(event.phone),
        fetch_device_data(event.device_id),
        fetch_ip_data(event.ip_address),
        fetch_user_history(event.user_id),
        fetch_transaction_history(event.user_id)
    ]

    # All complete in ~max(individual times), not sum
    results = await asyncio.gather(*tasks, return_exceptions=True)

    return {
        'phone': results[0] if not isinstance(results[0], Exception) else None,
        'device': results[1] if not isinstance(results[1], Exception) else None,
        'ip': results[2] if not isinstance(results[2], Exception) else None,
        'user_history': results[3] if not isinstance(results[3], Exception) else None,
        'tx_history': results[4] if not isinstance(results[4], Exception) else None
    }

2. Aggressive Caching

class CachedEnrichment:
    """Multi-tier caching for enrichment data."""

    def __init__(self, local_cache, redis, api):
        self.local = local_cache  # In-memory, ~0.1ms
        self.redis = redis         # Redis, ~1-2ms
        self.api = api             # External API, ~50-100ms

    async def get_phone_data(self, phone):
        # L1: Local cache
        local_key = f"phone:{phone}"
        if local_key in self.local:
            return self.local[local_key]

        # L2: Redis cache
        redis_data = await self.redis.get(local_key)
        if redis_data:
            data = json.loads(redis_data)
            self.local[local_key] = data  # Promote to L1
            return data

        # L3: API call
        data = await self.api.lookup(phone)

        # Store in both caches
        self.local[local_key] = data
        await self.redis.set(local_key, json.dumps(data), ex=7200)

        return data

3. Precomputed Features

# Precompute features in near-real-time, use in real-time
class FeatureStore:
    """Pre-computed feature storage."""

    async def get_user_features(self, user_id):
        """Get pre-computed user features."""
        key = f"features:user:{user_id}"
        return await self.redis.hgetall(key)

    async def update_user_features(self, user_id, event):
        """Update features after event (near-real-time)."""
        key = f"features:user:{user_id}"

        # Increment counters
        pipe = self.redis.pipeline()
        pipe.hincrby(key, 'total_transactions', 1)
        pipe.hincrbyfloat(key, 'total_amount', event.amount)

        # Update rolling averages
        pipe.hset(key, 'last_activity', datetime.now().isoformat())

        await pipe.execute()

Monitoring and Alerting

Real-time systems require real-time monitoring.

Key Metrics to Track

  • Latency percentiles - P50, P95, P99 screening latency
  • Decision distribution - Allow/challenge/block rates
  • False positive rate - Legitimate users blocked/challenged
  • False negative rate - Fraud that passed screening
  • Cache hit rates - Per-tier cache effectiveness
  • Error rates - API failures, timeouts
class ScreeningMetrics:
    """Track real-time screening metrics."""

    def __init__(self, statsd):
        self.stats = statsd

    def record_screening(self, result, start_time):
        latency_ms = (time.time() - start_time) * 1000

        # Latency histogram
        self.stats.timing('fraud.screening.latency', latency_ms)

        # Decision counter
        self.stats.incr(f"fraud.screening.decision.{result.action}")

        # Tier that made decision
        self.stats.incr(f"fraud.screening.tier.{result.decision_tier}")

        # Alert on slow screenings
        if latency_ms > 200:
            self.stats.incr('fraud.screening.slow')

    def record_outcome(self, screening_id, was_fraud):
        """Record actual outcome for model feedback."""
        # This powers false positive/negative tracking
        self.stats.incr(f"fraud.outcomes.{'fraud' if was_fraud else 'legit'}")

Best Practices

  1. Design for latency - Every millisecond matters; optimize ruthlessly
  2. Fail open intelligently - If systems fail, have a safe degradation path
  3. Use tiered decisions - Fast checks first, expensive checks only when needed
  4. Cache aggressively - Phone data changes slowly; cache liberally
  5. Parallel everything - Fetch enrichment data concurrently
  6. Challenge, don't block - Let users prove legitimacy when possible
  7. Monitor continuously - Track latency, decisions, and outcomes
  8. Test under load - Ensure performance holds during traffic spikes

Frequently Asked Questions

What latency is acceptable for real-time fraud screening?

For synchronous screening during user actions, aim for P95 latency under 200ms. Users notice delays above 300ms. For checkout flows, up to 500ms may be acceptable due to user expectation of security checks. Phone intelligence APIs typically respond in 50-100ms, leaving headroom for rules and ML scoring. If you can't meet latency targets, consider async post-action screening with ability to cancel/reverse.

Should I fail open or fail closed when fraud APIs timeout?

This depends on your risk tolerance and action type. For high-value transactions, failing closed (blocking) may be appropriate. For registrations and logins, failing open prevents legitimate user frustration. Best practice: fail open with enhanced monitoring - allow the action but flag for near-real-time review, increased session monitoring, or delayed challenge. Never let API failures completely disable fraud protection.

How do I balance fraud prevention with user experience?

The key is graduated friction - apply challenges proportional to risk. Low-risk users should sail through with no friction. Medium-risk users get quick SMS verification. Only high-risk users face significant challenges. Measure your false positive rate and tune thresholds to minimize friction for legitimate users. Also, gather feedback when users successfully complete challenges - those users were legitimate despite high scores, which helps tune your model.

How often should I retrain fraud models?

Fraud patterns evolve continuously, so models degrade over time. Most organizations retrain monthly, with weekly updates in high-fraud environments. However, real-time rules can be updated instantly when new attack patterns emerge. The best approach is continuous monitoring of model performance metrics; when precision/recall degrades beyond thresholds, trigger retraining. Some teams maintain champion/challenger models, continuously training new versions.

Related Articles

← Back to Phone Fraud Detection

Power Your Real-Time Fraud Screening

Get phone intelligence with sub-100ms response times. Spam scores, line type, carrier data, and more.