Aller au contenu

B3.6 - Conformité et Veille Sécurité

RGPD, Normes et Identité Numérique - BTS SIO

🎯 Objectifs du cours

À l'issue de ce cours, vous serez capable de : - Appliquer les exigences du RGPD dans un contexte informatique - Comprendre les principales normes de sécurité (ISO 27001, ANSSI) - Mettre en place une stratégie de veille sécurité - Préserver et gérer l'identité numérique d'une organisation


🛡️ RGPD - Règlement Général sur la Protection des Données

RGPD (GDPR)
Règlement européen entré en vigueur le 25 mai 2018, qui encadre le traitement des données personnelles sur le territoire de l'Union européenne. Il vise à redonner aux citoyens le contrôle de leurs données personnelles.

Principes fondamentaux du RGPD

🎯
Finalité
Les données doivent être collectées pour des finalités déterminées, explicites et légitimes. Pas de traitement ultérieur incompatible.
📏
Proportionnalité
Les données collectées doivent être adéquates, pertinentes et limitées à ce qui est nécessaire au regard des finalités.
Conservation limitée
Les données ne peuvent être conservées que pendant la durée nécessaire aux finalités pour lesquelles elles sont traitées.
🔒
Sécurité
Mise en place de mesures techniques et organisationnelles appropriées pour assurer la sécurité des données.
Exactitude
Les données doivent être exactes et, si nécessaire, tenues à jour. Toute mesure raisonnable doit être prise pour effacer ou rectifier.
👁️
Transparence
Information claire et compréhensible des personnes concernées sur le traitement de leurs données personnelles.

Droits des personnes concernées

Les 8 droits fondamentaux
  1. Droit à l'information : Être informé de la collecte et du traitement
  2. Droit d'accès : Obtenir une copie des données personnelles
  3. Droit de rectification : Corriger les données inexactes
  4. Droit à l'effacement : "Droit à l'oubli" sous certaines conditions
  5. Droit à la limitation : Restreindre le traitement
  6. Droit à la portabilité : Récupérer ses données dans un format structuré
  7. Droit d'opposition : S'opposer au traitement pour des raisons légitimes
  8. Droits relatifs à la prise de décision automatisée : Ne pas subir de décision basée uniquement sur un traitement automatisé

Obligations des responsables de traitement

Obligation Description Mise en œuvre technique
Registre des traitements Documenter tous les traitements de données Base de données des traitements, cartographie des flux
Privacy by Design Intégrer la protection dès la conception Chiffrement, pseudonymisation, minimisation
Analyse d'impact (PIA) Évaluer les risques pour les droits et libertés Méthodologie CNIL, outils d'évaluation
Notification de violation Signaler les incidents dans les 72h Procédures d'incident, outils de notification
Désignation d'un DPO Délégué à la protection des données Formation, outils de gestion de la conformité

Sanctions et amendes

Sanctions administratives
  • Niveau 1 : Jusqu'à 10 millions d'euros ou 2% du CA annuel mondial
  • Niveau 2 : Jusqu'à 20 millions d'euros ou 4% du CA annuel mondial
  • Autres sanctions : Avertissement, mise en demeure, limitation temporaire ou définitive du traitement, suspension des flux de données

Mise en conformité technique

Exemple d'implémentation RGPD en Python
#!/usr/bin/env python3
"""
Système de gestion de la conformité RGPD
Exemple d'implémentation des droits des utilisateurs
"""

import hashlib
import json
import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum

class ConsentType(Enum):
    EXPLICIT = "explicit"
    IMPLICIT = "implicit"
    WITHDRAWN = "withdrawn"

class LegalBasis(Enum):
    CONSENT = "consent"
    CONTRACT = "contract"
    LEGAL_OBLIGATION = "legal_obligation"
    VITAL_INTERESTS = "vital_interests"
    PUBLIC_TASK = "public_task"
    LEGITIMATE_INTERESTS = "legitimate_interests"

@dataclass
class PersonalData:
    """Représentation d'une donnée personnelle"""
    field_name: str
    value: str
    category: str  # identité, contact, professionnel, etc.
    sensitivity: str  # normal, sensible, très_sensible
    source: str
    collection_date: datetime.datetime
    retention_period: int  # en jours
    legal_basis: LegalBasis

    def is_expired(self) -> bool:
        """Vérifier si la donnée a dépassé sa durée de conservation"""
        expiry_date = self.collection_date + datetime.timedelta(days=self.retention_period)
        return datetime.datetime.now() > expiry_date

    def anonymize(self) -> str:
        """Anonymiser la donnée"""
        return hashlib.sha256(self.value.encode()).hexdigest()[:8]

@dataclass
class DataSubject:
    """Personne concernée par le traitement"""
    subject_id: str
    email: str
    consents: Dict[str, ConsentType]
    personal_data: List[PersonalData]
    created_at: datetime.datetime
    last_updated: datetime.datetime

    def grant_consent(self, purpose: str, consent_type: ConsentType = ConsentType.EXPLICIT):
        """Accorder un consentement"""
        self.consents[purpose] = consent_type
        self.last_updated = datetime.datetime.now()

    def withdraw_consent(self, purpose: str):
        """Retirer un consentement"""
        self.consents[purpose] = ConsentType.WITHDRAWN
        self.last_updated = datetime.datetime.now()

    def has_valid_consent(self, purpose: str) -> bool:
        """Vérifier si le consentement est valide"""
        return self.consents.get(purpose) == ConsentType.EXPLICIT

class GDPRCompliance:
    """Système de gestion de la conformité RGPD"""

    def __init__(self):
        self.data_subjects: Dict[str, DataSubject] = {}
        self.processing_activities: List[Dict] = []
        self.data_breaches: List[Dict] = []

    def register_data_subject(self, subject_id: str, email: str) -> DataSubject:
        """Enregistrer une nouvelle personne concernée"""
        subject = DataSubject(
            subject_id=subject_id,
            email=email,
            consents={},
            personal_data=[],
            created_at=datetime.datetime.now(),
            last_updated=datetime.datetime.now()
        )
        self.data_subjects[subject_id] = subject
        return subject

    def collect_personal_data(self, subject_id: str, data: PersonalData) -> bool:
        """Collecter une donnée personnelle"""
        if subject_id not in self.data_subjects:
            return False

        subject = self.data_subjects[subject_id]

        # Vérifier la base légale
        if data.legal_basis == LegalBasis.CONSENT:
            if not subject.has_valid_consent("data_collection"):
                return False

        subject.personal_data.append(data)
        subject.last_updated = datetime.datetime.now()
        return True

    def exercise_right_of_access(self, subject_id: str) -> Optional[Dict]:
        """Droit d'accès - Fournir toutes les données de la personne"""
        if subject_id not in self.data_subjects:
            return None

        subject = self.data_subjects[subject_id]

        return {
            "subject_id": subject.subject_id,
            "email": subject.email,
            "consents": subject.consents,
            "personal_data": [asdict(data) for data in subject.personal_data],
            "created_at": subject.created_at.isoformat(),
            "last_updated": subject.last_updated.isoformat()
        }

    def exercise_right_of_rectification(self, subject_id: str, field_name: str, new_value: str) -> bool:
        """Droit de rectification - Corriger une donnée"""
        if subject_id not in self.data_subjects:
            return False

        subject = self.data_subjects[subject_id]

        for data in subject.personal_data:
            if data.field_name == field_name:
                data.value = new_value
                subject.last_updated = datetime.datetime.now()
                return True

        return False

    def exercise_right_of_erasure(self, subject_id: str, field_name: Optional[str] = None) -> bool:
        """Droit à l'effacement - Supprimer des données"""
        if subject_id not in self.data_subjects:
            return False

        subject = self.data_subjects[subject_id]

        if field_name:
            # Supprimer une donnée spécifique
            subject.personal_data = [
                data for data in subject.personal_data 
                if data.field_name != field_name
            ]
        else:
            # Supprimer toutes les données
            del self.data_subjects[subject_id]

        return True

    def exercise_right_of_portability(self, subject_id: str) -> Optional[str]:
        """Droit à la portabilité - Export des données en JSON"""
        data = self.exercise_right_of_access(subject_id)
        if data:
            return json.dumps(data, indent=2, ensure_ascii=False)
        return None

    def data_retention_cleanup(self):
        """Nettoyage automatique des données expirées"""
        cleaned_count = 0

        for subject in self.data_subjects.values():
            original_count = len(subject.personal_data)
            subject.personal_data = [
                data for data in subject.personal_data 
                if not data.is_expired()
            ]
            cleaned_count += original_count - len(subject.personal_data)

        return cleaned_count

    def report_data_breach(self, description: str, affected_subjects: List[str], 
                          severity: str, measures_taken: str):
        """Signaler une violation de données"""
        breach = {
            "id": hashlib.md5(f"{datetime.datetime.now()}{description}".encode()).hexdigest(),
            "description": description,
            "affected_subjects": affected_subjects,
            "severity": severity,
            "measures_taken": measures_taken,
            "reported_at": datetime.datetime.now().isoformat(),
            "notified_to_authority": False,
            "subjects_notified": False
        }

        self.data_breaches.append(breach)

        # Notification automatique si plus de 72h
        if severity in ["high", "critical"]:
            breach["requires_immediate_notification"] = True

        return breach["id"]

    def generate_compliance_report(self) -> Dict:
        """Générer un rapport de conformité"""
        total_subjects = len(self.data_subjects)
        total_data_points = sum(len(s.personal_data) for s in self.data_subjects.values())

        consent_stats = {}
        for subject in self.data_subjects.values():
            for purpose, consent in subject.consents.items():
                if purpose not in consent_stats:
                    consent_stats[purpose] = {"explicit": 0, "withdrawn": 0}
                if consent == ConsentType.EXPLICIT:
                    consent_stats[purpose]["explicit"] += 1
                elif consent == ConsentType.WITHDRAWN:
                    consent_stats[purpose]["withdrawn"] += 1

        return {
            "report_date": datetime.datetime.now().isoformat(),
            "total_data_subjects": total_subjects,
            "total_data_points": total_data_points,
            "consent_statistics": consent_stats,
            "data_breaches_count": len(self.data_breaches),
            "compliance_score": self._calculate_compliance_score()
        }

    def _calculate_compliance_score(self) -> float:
        """Calculer un score de conformité basique"""
        if not self.data_subjects:
            return 100.0

        total_score = 0
        total_subjects = len(self.data_subjects)

        for subject in self.data_subjects.values():
            subject_score = 0

            # Points pour les consentements explicites
            explicit_consents = sum(1 for c in subject.consents.values() 
                                  if c == ConsentType.EXPLICIT)
            if explicit_consents > 0:
                subject_score += 30

            # Points pour la fraîcheur des données
            days_since_update = (datetime.datetime.now() - subject.last_updated).days
            if days_since_update < 30:
                subject_score += 20

            # Points pour la limitation des données
            if len(subject.personal_data) < 10:  # Principe de minimisation
                subject_score += 25

            # Points pour l'absence de données expirées
            expired_data = sum(1 for data in subject.personal_data if data.is_expired())
            if expired_data == 0:
                subject_score += 25

            total_score += subject_score

        return min(100.0, total_score / total_subjects)

# Exemple d'utilisation
if __name__ == "__main__":
    # Initialisation du système RGPD
    gdpr_system = GDPRCompliance()

    # Enregistrement d'un utilisateur
    user = gdpr_system.register_data_subject("user123", "user@example.com")
    user.grant_consent("data_collection", ConsentType.EXPLICIT)
    user.grant_consent("marketing", ConsentType.EXPLICIT)

    # Collecte de données personnelles
    personal_data = PersonalData(
        field_name="nom",
        value="Dupont",
        category="identité",
        sensitivity="normal",
        source="formulaire_inscription",
        collection_date=datetime.datetime.now(),
        retention_period=365,  # 1 an
        legal_basis=LegalBasis.CONSENT
    )

    gdpr_system.collect_personal_data("user123", personal_data)

    # Exercice des droits
    print("=== Droit d'accès ===")
    access_data = gdpr_system.exercise_right_of_access("user123")
    print(json.dumps(access_data, indent=2, ensure_ascii=False))

    print("\n=== Droit de portabilité ===")
    portable_data = gdpr_system.exercise_right_of_portability("user123")
    print(portable_data)

    print("\n=== Rapport de conformité ===")
    compliance_report = gdpr_system.generate_compliance_report()
    print(json.dumps(compliance_report, indent=2, ensure_ascii=False))

📋 Normes et Standards de Sécurité

Normes de sécurité
Référentiels internationaux qui définissent les bonnes pratiques et exigences pour la mise en place d'un système de management de la sécurité de l'information efficace.

Principales normes de sécurité

🏆 ISO 27001

Système de Management de la Sécurité de l'Information (SMSI)

  • Approche par les risques
  • Amélioration continue (PDCA)
  • 114 mesures de sécurité
  • Certification possible

🇫🇷 ANSSI

Agence Nationale de la Sécurité des Systèmes d'Information

  • Guide d'hygiène informatique
  • Référentiel général de sécurité (RGS)
  • Critères communs (CC)
  • Certification CSPN

🏭 IEC 62443

Sécurité des systèmes industriels

  • Systèmes de contrôle industriel
  • Zones et conduits de sécurité
  • Niveaux de sécurité (SL)
  • Cybersécurité OT

💳 PCI DSS

Payment Card Industry Data Security Standard

  • Protection des données de cartes
  • 12 exigences principales
  • Tests de pénétration obligatoires
  • Audit annuel

🏥 HDS

Hébergement de Données de Santé

  • Données de santé à caractère personnel
  • Certification obligatoire
  • Traçabilité et chiffrement
  • Procédures d'urgence

🌐 NIST

National Institute of Standards and Technology

  • Cybersecurity Framework
  • 5 fonctions principales
  • Identify, Protect, Detect, Respond, Recover
  • Approche par maturité

ISO 27001 - Détail des domaines

Domaine Objectifs Mesures clés
A.5 - Politiques de sécurité Orientation et soutien de la direction Politique de sécurité, revue régulière
A.6 - Organisation Cadre de gouvernance de la sécurité Rôles et responsabilités, accords de confidentialité
A.7 - Sécurité RH Personnel de confiance et sensibilisé Vérification des antécédents, formation
A.8 - Gestion des actifs Protection appropriée des actifs Inventaire, classification, manipulation
A.9 - Contrôle d'accès Accès autorisé uniquement Politique d'accès, gestion des privilèges
A.10 - Cryptographie Usage approprié de la cryptographie Politique cryptographique, gestion des clés
A.11 - Sécurité physique Protection des zones et équipements Périmètres sécurisés, protection contre les menaces
A.12 - Sécurité d'exploitation Fonctionnement sécurisé des systèmes Procédures d'exploitation, gestion des vulnérabilités
A.13 - Sécurité des communications Protection des informations en transit Gestion du réseau, transferts sécurisés
A.14 - Développement sécurisé Sécurité intégrée dans le cycle de vie Politique de développement, tests de sécurité
A.15 - Relations fournisseurs Protection dans les relations tierces Politique fournisseurs, accords de service
A.16 - Gestion des incidents Réponse cohérente et efficace Procédures d'incident, amélioration continue
A.17 - Continuité d'activité Maintien des opérations critiques Plan de continuité, tests réguliers
A.18 - Conformité Respect des exigences légales Identification des exigences, audits

Checklist de mise en conformité ISO 27001

✅ Phase 1 : Préparation
  • Définir le périmètre du SMSI
  • Identifier les parties intéressées
  • Établir la politique de sécurité
  • Nommer un responsable SMSI
✅ Phase 2 : Analyse des risques
  • Inventaire des actifs informationnels
  • Identification des menaces et vulnérabilités
  • Évaluation des risques (probabilité × impact)
  • Définition du niveau de risque acceptable
✅ Phase 3 : Traitement des risques
  • Sélection des mesures de sécurité
  • Déclaration d'applicabilité (SoA)
  • Plan de traitement des risques
  • Validation par la direction
✅ Phase 4 : Mise en œuvre
  • Déploiement des mesures techniques
  • Formation et sensibilisation
  • Documentation des procédures
  • Tests et validation
✅ Phase 5 : Surveillance
  • Indicateurs de performance (KPI)
  • Audits internes réguliers
  • Revue de direction
  • Amélioration continue

🔍 Veille Sécurité

Veille sécurité
Processus continu de collecte, d'analyse et de diffusion d'informations sur les menaces, vulnérabilités et évolutions technologiques en matière de cybersécurité.

Sources de veille sécurité

🏛️ Sources institutionnelles
  • ANSSI : Bulletins d'alerte, guides de bonnes pratiques
  • CERT-FR : Alertes de sécurité, avis de vulnérabilités
  • CISA : Cybersecurity advisories (États-Unis)
  • ENISA : Rapports sur les menaces (Europe)
🔍 Bases de vulnérabilités
  • CVE (Common Vulnerabilities and Exposures) : Base internationale
  • NVD (National Vulnerability Database) : NIST
  • VulnDB : Base commerciale détaillée
  • Exploit-DB : Exploits publics
🌐 Communautés et forums
  • Reddit : r/netsec, r/cybersecurity
  • Twitter : Comptes d'experts en sécurité
  • Discord/Slack : Communautés spécialisées
  • Conférences : BlackHat, DefCon, SSTIC
🏢 Sources commerciales
  • Threat Intelligence : FireEye, CrowdStrike
  • Rapports sectoriels : Verizon DBIR, IBM X-Force
  • Éditeurs de sécurité : Bulletins produits
  • Cabinets de conseil : Analyses prospectives

Automatisation de la veille

Système automatisé de veille sécurité
#!/usr/bin/env python3
"""
Système automatisé de veille sécurité
Collecte et analyse des informations de sécurité
"""

import requests
import feedparser
import json
import re
import time
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import sqlite3
from dataclasses import dataclass
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart

@dataclass
class SecurityAlert:
    """Alerte de sécurité"""
    id: str
    title: str
    description: str
    severity: str
    source: str
    published_date: datetime
    cve_ids: List[str]
    affected_products: List[str]
    url: str
    tags: List[str]

class SecurityWatchdog:
    """Système de veille sécurité automatisé"""

    def __init__(self, db_path: str = "security_watch.db"):
        self.db_path = db_path
        self.init_database()

        # Sources RSS/Atom
        self.rss_feeds = {
            "CERT-FR": "https://www.cert.ssi.gouv.fr/feed/",
            "US-CERT": "https://www.cisa.gov/cybersecurity-advisories/all.xml",
            "NVD": "https://nvd.nist.gov/feeds/xml/cve/misc/nvd-rss.xml",
            "SANS": "https://isc.sans.edu/rssfeed.xml",
            "Krebs": "https://krebsonsecurity.com/feed/",
            "Schneier": "https://www.schneier.com/feed/"
        }

        # APIs de threat intelligence
        self.apis = {
            "cve_details": "https://cve.circl.lu/api/",
            "vulndb": "https://vulndb.cyberriskanalytics.com/api/v1/",
            "mitre": "https://cve.mitre.org/cgi-bin/cvekey.cgi"
        }

        # Mots-clés de surveillance
        self.keywords = [
            "zero-day", "ransomware", "apt", "malware",
            "data breach", "vulnerability", "exploit",
            "phishing", "ddos", "botnet"
        ]

        # Produits surveillés
        self.monitored_products = [
            "windows", "linux", "apache", "nginx",
            "mysql", "postgresql", "php", "python",
            "java", "javascript", "wordpress", "drupal"
        ]

    def init_database(self):
        """Initialiser la base de données SQLite"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS alerts (
                id TEXT PRIMARY KEY,
                title TEXT NOT NULL,
                description TEXT,
                severity TEXT,
                source TEXT,
                published_date TEXT,
                cve_ids TEXT,
                affected_products TEXT,
                url TEXT,
                tags TEXT,
                processed BOOLEAN DEFAULT FALSE,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS keywords_tracking (
                keyword TEXT,
                alert_id TEXT,
                matches INTEGER,
                FOREIGN KEY (alert_id) REFERENCES alerts (id)
            )
        ''')

        conn.commit()
        conn.close()

    def fetch_rss_feeds(self) -> List[SecurityAlert]:
        """Récupérer les flux RSS/Atom"""
        alerts = []

        for source_name, feed_url in self.rss_feeds.items():
            try:
                print(f"[+] Récupération du flux {source_name}...")
                feed = feedparser.parse(feed_url)

                for entry in feed.entries:
                    # Extraire les CVE IDs
                    cve_pattern = r'CVE-\d{4}-\d{4,7}'
                    cve_ids = re.findall(cve_pattern, entry.title + " " + entry.get('summary', ''))

                    # Détecter les produits affectés
                    affected_products = []
                    content = (entry.title + " " + entry.get('summary', '')).lower()
                    for product in self.monitored_products:
                        if product in content:
                            affected_products.append(product)

                    # Calculer la sévérité basée sur les mots-clés
                    severity = self._calculate_severity(content, cve_ids)

                    # Extraire les tags
                    tags = self._extract_tags(content)

                    alert = SecurityAlert(
                        id=f"{source_name}_{hash(entry.link)}",
                        title=entry.title,
                        description=entry.get('summary', ''),
                        severity=severity,
                        source=source_name,
                        published_date=datetime(*entry.published_parsed[:6]) if hasattr(entry, 'published_parsed') else datetime.now(),
                        cve_ids=cve_ids,
                        affected_products=affected_products,
                        url=entry.link,
                        tags=tags
                    )

                    alerts.append(alert)

            except Exception as e:
                print(f"[-] Erreur lors de la récupération de {source_name}: {e}")

        return alerts

    def fetch_cve_details(self, cve_id: str) -> Optional[Dict]:
        """Récupérer les détails d'une CVE"""
        try:
            url = f"https://cve.circl.lu/api/cve/{cve_id}"
            response = requests.get(url, timeout=10)

            if response.status_code == 200:
                return response.json()

        except Exception as e:
            print(f"[-] Erreur lors de la récupération de {cve_id}: {e}")

        return None

    def _calculate_severity(self, content: str, cve_ids: List[str]) -> str:
        """Calculer la sévérité d'une alerte"""
        high_severity_keywords = [
            "critical", "zero-day", "remote code execution",
            "privilege escalation", "ransomware", "apt"
        ]

        medium_severity_keywords = [
            "vulnerability", "exploit", "malware", "phishing"
        ]

        # Vérifier les mots-clés haute sévérité
        for keyword in high_severity_keywords:
            if keyword in content:
                return "HIGH"

        # Vérifier les CVE avec score CVSS élevé
        for cve_id in cve_ids:
            cve_details = self.fetch_cve_details(cve_id)
            if cve_details and cve_details.get('cvss', 0) >= 7.0:
                return "HIGH"

        # Vérifier les mots-clés moyenne sévérité
        for keyword in medium_severity_keywords:
            if keyword in content:
                return "MEDIUM"

        return "LOW"

    def _extract_tags(self, content: str) -> List[str]:
        """Extraire les tags d'une alerte"""
        tags = []

        tag_patterns = {
            "malware": r"\b(malware|trojan|virus|worm|rootkit)\b",
            "ransomware": r"\b(ransomware|crypto|locker)\b",
            "apt": r"\b(apt|advanced persistent threat)\b",
            "ddos": r"\b(ddos|denial of service)\b",
            "phishing": r"\b(phishing|spear phishing|whaling)\b",
            "vulnerability": r"\b(vulnerability|exploit|cve)\b",
            "data_breach": r"\b(data breach|leak|exposure)\b"
        }

        for tag, pattern in tag_patterns.items():
            if re.search(pattern, content, re.IGNORECASE):
                tags.append(tag)

        return tags

    def store_alerts(self, alerts: List[SecurityAlert]):
        """Stocker les alertes en base de données"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        for alert in alerts:
            try:
                cursor.execute('''
                    INSERT OR REPLACE INTO alerts 
                    (id, title, description, severity, source, published_date, 
                     cve_ids, affected_products, url, tags)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    alert.id,
                    alert.title,
                    alert.description,
                    alert.severity,
                    alert.source,
                    alert.published_date.isoformat(),
                    json.dumps(alert.cve_ids),
                    json.dumps(alert.affected_products),
                    alert.url,
                    json.dumps(alert.tags)
                ))

                # Tracking des mots-clés
                for keyword in self.keywords:
                    content = (alert.title + " " + alert.description).lower()
                    matches = len(re.findall(keyword, content, re.IGNORECASE))
                    if matches > 0:
                        cursor.execute('''
                            INSERT OR REPLACE INTO keywords_tracking 
                            (keyword, alert_id, matches)
                            VALUES (?, ?, ?)
                        ''', (keyword, alert.id, matches))

            except Exception as e:
                print(f"[-] Erreur lors du stockage de l'alerte {alert.id}: {e}")

        conn.commit()
        conn.close()

    def get_recent_alerts(self, hours: int = 24, severity: str = None) -> List[Dict]:
        """Récupérer les alertes récentes"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        since_date = (datetime.now() - timedelta(hours=hours)).isoformat()

        query = '''
            SELECT * FROM alerts 
            WHERE published_date >= ?
        '''
        params = [since_date]

        if severity:
            query += ' AND severity = ?'
            params.append(severity)

        query += ' ORDER BY published_date DESC'

        cursor.execute(query, params)
        results = cursor.fetchall()

        conn.close()

        # Convertir en dictionnaires
        columns = [desc[0] for desc in cursor.description]
        alerts = []
        for row in results:
            alert_dict = dict(zip(columns, row))
            # Désérialiser les champs JSON
            alert_dict['cve_ids'] = json.loads(alert_dict['cve_ids'])
            alert_dict['affected_products'] = json.loads(alert_dict['affected_products'])
            alert_dict['tags'] = json.loads(alert_dict['tags'])
            alerts.append(alert_dict)

        return alerts

    def generate_daily_report(self) -> str:
        """Générer un rapport quotidien"""
        high_alerts = self.get_recent_alerts(24, "HIGH")
        medium_alerts = self.get_recent_alerts(24, "MEDIUM")

        report = f"""
RAPPORT QUOTIDIEN DE VEILLE SÉCURITÉ
=====================================
Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

ALERTES HAUTE SÉVÉRITÉ ({len(high_alerts)})
{'-' * 40}
"""

        for alert in high_alerts[:10]:  # Top 10
            report += f"""
{alert['title']}
  Source: {alert['source']}
  CVE: {', '.join(alert['cve_ids']) if alert['cve_ids'] else 'N/A'}
  Produits: {', '.join(alert['affected_products']) if alert['affected_products'] else 'N/A'}
  URL: {alert['url']}
"""

        report += f"""

ALERTES MOYENNE SÉVÉRITÉ ({len(medium_alerts)})
{'-' * 40}
"""

        for alert in medium_alerts[:5]:  # Top 5
            report += f"""
{alert['title']}
  Source: {alert['source']}
  URL: {alert['url']}
"""

        # Statistiques
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            SELECT keyword, SUM(matches) as total_matches
            FROM keywords_tracking kt
            JOIN alerts a ON kt.alert_id = a.id
            WHERE a.published_date >= ?
            GROUP BY keyword
            ORDER BY total_matches DESC
            LIMIT 10
        ''', [(datetime.now() - timedelta(hours=24)).isoformat()])

        keyword_stats = cursor.fetchall()
        conn.close()

        report += f"""

MOTS-CLÉS LES PLUS FRÉQUENTS
{'-' * 30}
"""

        for keyword, count in keyword_stats:
            report += f"• {keyword}: {count} mentions\n"

        return report

    def send_email_alert(self, subject: str, content: str, recipients: List[str]):
        """Envoyer une alerte par email"""
        # Configuration SMTP (à adapter)
        smtp_server = "smtp.example.com"
        smtp_port = 587
        smtp_user = "security-watch@example.com"
        smtp_password = "password"

        try:
            msg = MimeMultipart()
            msg['From'] = smtp_user
            msg['To'] = ', '.join(recipients)
            msg['Subject'] = subject

            msg.attach(MimeText(content, 'plain', 'utf-8'))

            server = smtplib.SMTP(smtp_server, smtp_port)
            server.starttls()
            server.login(smtp_user, smtp_password)
            server.send_message(msg)
            server.quit()

            print(f"[+] Email envoyé à {', '.join(recipients)}")

        except Exception as e:
            print(f"[-] Erreur lors de l'envoi de l'email: {e}")

    def run_watch_cycle(self):
        """Exécuter un cycle de veille complet"""
        print(f"[+] Démarrage du cycle de veille - {datetime.now()}")

        # Récupérer les nouvelles alertes
        alerts = self.fetch_rss_feeds()
        print(f"[+] {len(alerts)} alertes récupérées")

        # Stocker en base
        self.store_alerts(alerts)

        # Vérifier les alertes critiques
        critical_alerts = [a for a in alerts if a.severity == "HIGH"]

        if critical_alerts:
            print(f"[!] {len(critical_alerts)} alertes critiques détectées")

            # Envoyer une notification immédiate
            alert_summary = "\n".join([f"• {a.title} ({a.source})" for a in critical_alerts[:5]])
            self.send_email_alert(
                f"ALERTE SÉCURITÉ CRITIQUE - {len(critical_alerts)} nouvelles menaces",
                f"Nouvelles alertes critiques détectées:\n\n{alert_summary}",
                ["admin@example.com", "security@example.com"]
            )

        print(f"[+] Cycle de veille terminé")

# Exemple d'utilisation
if __name__ == "__main__":
    # Initialisation du système de veille
    watchdog = SecurityWatchdog()

    # Exécution d'un cycle de veille
    watchdog.run_watch_cycle()

    # Génération du rapport quotidien
    daily_report = watchdog.generate_daily_report()
    print(daily_report)

    # Programmation automatique (exemple avec cron)
    # 0 */6 * * * /usr/bin/python3 /path/to/security_watch.py

🆔 Identité Numérique et Réputation

Identité numérique
Ensemble des informations et traces numériques qu'une organisation laisse sur Internet, volontairement ou involontairement, qui constituent sa représentation virtuelle et influencent sa réputation.

Composantes de l'identité numérique

🌐 Présence officielle
  • Sites web : Site corporate, portails clients, intranets
  • Réseaux sociaux : Comptes officiels, pages entreprise
  • Domaines : Noms de domaine principaux et variantes
  • Certificats : SSL/TLS, signatures numériques
👥 Présence des collaborateurs
  • Profils professionnels : LinkedIn, réseaux métier
  • Publications : Articles, conférences, brevets
  • Réseaux personnels : Traces involontaires
  • Fuites d'information : Données exposées
🔍 Traces techniques
  • Infrastructure : Adresses IP, serveurs, services
  • Métadonnées : Documents, images, emails
  • Logs publics : Certificats CT, DNS, WHOIS
  • Vulnérabilités : Expositions de sécurité
📰 Réputation externe
  • Médias : Articles de presse, communiqués
  • Avis clients : Plateformes d'évaluation
  • Forums : Discussions, retours d'expérience
  • Incidents : Breaches, pannes, controverses

Menaces sur l'identité numérique

Menace Description Impact Mesures de protection
Typosquatting Enregistrement de domaines similaires Phishing, détournement de trafic Surveillance des domaines, enregistrement préventif
Usurpation d'identité Création de faux comptes/sites Atteinte à la réputation, fraude Monitoring des réseaux sociaux, signalement
Défacement Modification malveillante du site Image dégradée, perte de confiance Sécurisation web, sauvegdes, monitoring
Fuite de données Exposition d'informations sensibles Sanctions RGPD, perte de clients Chiffrement, contrôles d'accès, DLP
Campagne de désinformation Diffusion de fausses informations Réputation ternie, boycott Veille réputation, communication de crise
Social engineering Manipulation des collaborateurs Accès non autorisé, vol d'informations Formation, sensibilisation, procédures

Stratégie de protection de l'identité numérique

🔍 Audit de l'identité numérique
  • Cartographie : Inventaire de tous les actifs numériques
  • Recherche OSINT : Informations publiquement disponibles
  • Analyse des risques : Vulnérabilités et expositions
  • Benchmark concurrentiel : Comparaison sectorielle
🛡️ Mesures de protection
  • Enregistrement défensif : Domaines variantes et typos
  • Monitoring continu : Surveillance automatisée
  • Authentification forte : 2FA sur tous les comptes
  • Charte d'usage : Guidelines pour les collaborateurs
📱 Gestion des réseaux sociaux
  • Politique social media : Règles de publication
  • Comptes officiels : Certification et sécurisation
  • Modération : Gestion des commentaires et interactions
  • Crise communication : Procédures de réponse rapide
⚡ Gestion de crise
  • Plan de communication : Messages pré-rédigés
  • Cellule de crise : Équipe dédiée et formée
  • Canaux de communication : Médias, réseaux, site web
  • Suivi post-crise : Analyse et amélioration

Outils de monitoring de l'identité numérique

Script de surveillance de l'identité numérique
#!/usr/bin/env python3
"""
Système de surveillance de l'identité numérique
Monitoring automatisé de la présence en ligne
"""

import requests
import dns.resolver
import whois
import ssl
import socket
from urllib.parse import urlparse
import re
from datetime import datetime, timedelta
import json
from typing import List, Dict, Optional
import hashlib

class DigitalIdentityMonitor:
    """Surveillance de l'identité numérique"""

    def __init__(self, organization_name: str, primary_domain: str):
        self.organization_name = organization_name
        self.primary_domain = primary_domain
        self.monitored_domains = []
        self.social_accounts = {}
        self.monitoring_results = {}

    def add_domain_variants(self):
        """Générer les variantes de domaine à surveiller"""
        base_domain = self.primary_domain.split('.')[0]
        tlds = ['.com', '.fr', '.org', '.net', '.eu', '.info']

        # Variantes communes
        variants = [
            base_domain,
            base_domain + 's',
            base_domain.replace('-', ''),
            base_domain.replace('_', ''),
            'www' + base_domain,
            base_domain + 'official'
        ]

        # Typosquatting commun
        typos = []
        for i, char in enumerate(base_domain):
            # Caractères adjacents sur clavier QWERTY
            adjacent_chars = {
                'a': 'qwsz', 'b': 'vghn', 'c': 'xdfv', 'd': 'erfcxs',
                'e': 'wrdsf', 'f': 'rtgvcd', 'g': 'tyhbvf', 'h': 'yujnbg',
                'i': 'ujklo', 'j': 'ikmnhu', 'k': 'olmji', 'l': 'pko',
                'm': 'njk', 'n': 'bhjm', 'o': 'iklp', 'p': 'ol',
                'q': 'wa', 'r': 'etdf', 's': 'awedxz', 't': 'ryfg',
                'u': 'yihj', 'v': 'cfgb', 'w': 'qase', 'x': 'zsdc',
                'y': 'tugh', 'z': 'asx'
            }

            if char in adjacent_chars:
                for adj_char in adjacent_chars[char]:
                    typo = base_domain[:i] + adj_char + base_domain[i+1:]
                    typos.append(typo)

        # Combiner variantes et TLDs
        for variant in variants + typos:
            for tld in tlds:
                self.monitored_domains.append(variant + tld)

    def check_domain_registration(self, domain: str) -> Dict:
        """Vérifier l'enregistrement d'un domaine"""
        result = {
            'domain': domain,
            'registered': False,
            'registrar': None,
            'creation_date': None,
            'expiration_date': None,
            'nameservers': [],
            'suspicious': False
        }

        try:
            # Vérification WHOIS
            domain_info = whois.whois(domain)

            if domain_info.domain_name:
                result['registered'] = True
                result['registrar'] = domain_info.registrar
                result['creation_date'] = domain_info.creation_date
                result['expiration_date'] = domain_info.expiration_date
                result['nameservers'] = domain_info.name_servers or []

                # Détection de domaines suspects
                if domain != self.primary_domain:
                    # Domaine récemment enregistré
                    if isinstance(domain_info.creation_date, datetime):
                        days_old = (datetime.now() - domain_info.creation_date).days
                        if days_old < 30:
                            result['suspicious'] = True

                    # Registrar différent
                    primary_whois = whois.whois(self.primary_domain)
                    if (primary_whois.registrar and 
                        domain_info.registrar != primary_whois.registrar):
                        result['suspicious'] = True

        except Exception as e:
            print(f"Erreur WHOIS pour {domain}: {e}")

        return result

    def check_ssl_certificate(self, domain: str) -> Dict:
        """Vérifier le certificat SSL d'un domaine"""
        result = {
            'domain': domain,
            'has_ssl': False,
            'issuer': None,
            'subject': None,
            'expiration_date': None,
            'san_domains': [],
            'suspicious': False
        }

        try:
            context = ssl.create_default_context()
            with socket.create_connection((domain, 443), timeout=10) as sock:
                with context.wrap_socket(sock, server_hostname=domain) as ssock:
                    cert = ssock.getpeercert()

                    result['has_ssl'] = True
                    result['issuer'] = dict(x[0] for x in cert['issuer'])
                    result['subject'] = dict(x[0] for x in cert['subject'])

                    # Date d'expiration
                    exp_date = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
                    result['expiration_date'] = exp_date

                    # SAN (Subject Alternative Names)
                    if 'subjectAltName' in cert:
                        result['san_domains'] = [name[1] for name in cert['subjectAltName']]

                    # Détection de certificats suspects
                    if domain != self.primary_domain:
                        # Certificat auto-signé ou émetteur suspect
                        issuer_cn = result['issuer'].get('commonName', '').lower()
                        if 'let\'s encrypt' in issuer_cn or 'self-signed' in issuer_cn:
                            result['suspicious'] = True

        except Exception as e:
            print(f"Erreur SSL pour {domain}: {e}")

        return result

    def check_website_content(self, domain: str) -> Dict:
        """Analyser le contenu d'un site web"""
        result = {
            'domain': domain,
            'accessible': False,
            'title': None,
            'description': None,
            'keywords': [],
            'organization_mentioned': False,
            'suspicious_content': False,
            'phishing_indicators': []
        }

        try:
            response = requests.get(f"https://{domain}", timeout=10, 
                                  allow_redirects=True, verify=False)

            if response.status_code == 200:
                result['accessible'] = True
                content = response.text.lower()

                # Extraction des métadonnées
                title_match = re.search(r'<title>(.*?)</title>', content, re.IGNORECASE)
                if title_match:
                    result['title'] = title_match.group(1).strip()

                desc_match = re.search(r'<meta[^>]*name=["\']description["\'][^>]*content=["\']([^"\']*)["\']', 
                                     content, re.IGNORECASE)
                if desc_match:
                    result['description'] = desc_match.group(1).strip()

                # Vérification de la mention de l'organisation
                org_variations = [
                    self.organization_name.lower(),
                    self.organization_name.lower().replace(' ', ''),
                    self.organization_name.lower().replace('-', ''),
                ]

                for variation in org_variations:
                    if variation in content:
                        result['organization_mentioned'] = True
                        break

                # Détection d'indicateurs de phishing
                phishing_patterns = [
                    r'urgent.*action.*required',
                    r'verify.*account.*immediately',
                    r'suspended.*account',
                    r'click.*here.*now',
                    r'limited.*time.*offer',
                    r'congratulations.*winner'
                ]

                for pattern in phishing_patterns:
                    if re.search(pattern, content, re.IGNORECASE):
                        result['phishing_indicators'].append(pattern)
                        result['suspicious_content'] = True

        except Exception as e:
            print(f"Erreur d'accès web pour {domain}: {e}")

        return result

    def search_social_media_mentions(self, platform: str, query: str) -> List[Dict]:
        """Rechercher des mentions sur les réseaux sociaux"""
        # Note: Nécessite des APIs spécifiques pour chaque plateforme
        # Ceci est un exemple conceptuel

        mentions = []

        # Simulation de recherche (à remplacer par de vraies APIs)
        if platform == "twitter":
            # API Twitter v2
            pass
        elif platform == "facebook":
            # Graph API Facebook
            pass
        elif platform == "linkedin":
            # LinkedIn API
            pass

        return mentions

    def generate_monitoring_report(self) -> Dict:
        """Générer un rapport de surveillance"""
        report = {
            'organization': self.organization_name,
            'primary_domain': self.primary_domain,
            'scan_date': datetime.now().isoformat(),
            'summary': {
                'total_domains_checked': len(self.monitored_domains),
                'suspicious_domains': 0,
                'ssl_issues': 0,
                'phishing_attempts': 0
            },
            'findings': {
                'domain_registrations': [],
                'ssl_certificates': [],
                'website_content': [],
                'social_mentions': []
            },
            'recommendations': []
        }

        # Analyser tous les domaines surveillés
        for domain in self.monitored_domains:
            # Vérification d'enregistrement
            domain_info = self.check_domain_registration(domain)
            if domain_info['registered']:
                report['findings']['domain_registrations'].append(domain_info)

                if domain_info['suspicious']:
                    report['summary']['suspicious_domains'] += 1

                # Vérification SSL
                ssl_info = self.check_ssl_certificate(domain)
                report['findings']['ssl_certificates'].append(ssl_info)

                if ssl_info['suspicious']:
                    report['summary']['ssl_issues'] += 1

                # Analyse du contenu
                content_info = self.check_website_content(domain)
                report['findings']['website_content'].append(content_info)

                if content_info['suspicious_content']:
                    report['summary']['phishing_attempts'] += 1

        # Générer des recommandations
        if report['summary']['suspicious_domains'] > 0:
            report['recommendations'].append(
                "Surveiller de près les domaines suspects identifiés"
            )

        if report['summary']['ssl_issues'] > 0:
            report['recommendations'].append(
                "Vérifier les certificats SSL suspects"
            )

        if report['summary']['phishing_attempts'] > 0:
            report['recommendations'].append(
                "Signaler les tentatives de phishing aux autorités"
            )

        return report

    def run_full_scan(self) -> Dict:
        """Exécuter une analyse complète"""
        print(f"[+] Démarrage de l'analyse pour {self.organization_name}")

        # Générer les variantes de domaine
        self.add_domain_variants()
        print(f"[+] {len(self.monitored_domains)} domaines à surveiller")

        # Générer le rapport
        report = self.generate_monitoring_report()

        print(f"[+] Analyse terminée:")
        print(f"    - Domaines suspects: {report['summary']['suspicious_domains']}")
        print(f"    - Problèmes SSL: {report['summary']['ssl_issues']}")
        print(f"    - Tentatives de phishing: {report['summary']['phishing_attempts']}")

        return report

# Exemple d'utilisation
if __name__ == "__main__":
    # Configuration pour une organisation
    monitor = DigitalIdentityMonitor("MonEntreprise", "monentreprise.com")

    # Ajout de comptes sociaux à surveiller
    monitor.social_accounts = {
        "twitter": "@monentreprise",
        "linkedin": "company/monentreprise",
        "facebook": "monentreprise"
    }

    # Exécution de l'analyse
    report = monitor.run_full_scan()

    # Sauvegarde du rapport
    with open(f"identity_report_{datetime.now().strftime('%Y%m%d')}.json", 'w') as f:
        json.dump(report, f, indent=2, default=str, ensure_ascii=False)

    print("\n[+] Rapport sauvegardé")

🎯 Compétences BTS SIO - Bloc B3

Correspondance avec le référentiel U7 :
  • B3.1-B3.2 : Protéger les données à caractère personnel (RGPD)
  • B3.3-B3.4 : Préserver l'identité numérique de l'organisation
  • B3.5 : Sécuriser les équipements et les usages des utilisateurs
  • B3.6 : Garantir la disponibilité, l'intégrité et la confidentialité face aux cyberattaques

📚 Ressources et références

Documentation officielle

  • CNIL : Guide RGPD du développeur
  • ANSSI : Guide d'hygiène informatique
  • ISO : Norme ISO 27001:2022
  • NIST : Cybersecurity Framework

Outils pratiques

  • RGPD : PIA CNIL, registre des traitements
  • Veille : CERT-FR, CVE Details, NVD
  • Monitoring : Google Alerts, Mention, Brand24
  • Conformité : GRC tools, audit frameworks

💡 Points clés à retenir
  • La conformité RGPD est une obligation légale avec des sanctions importantes
  • Les normes de sécurité fournissent un cadre structuré pour la protection
  • La veille sécurité doit être automatisée et continue
  • L'identité numérique nécessite une surveillance proactive
  • La gestion de crise doit être préparée et testée régulièrement