Implementación Completa: Integración Bidireccional Automática WSTG Tracker ↔ DefectDojo¶
✅ Confirmación: Automatización Total¶
SÍ, la integración bidireccional es totalmente automática con los componentes implementados en este documento.
Componentes de Automatización¶
Sincronización Tracker → DefectDojo: Automática en tiempo real
Sincronización DefectDojo → Tracker: Automática vía webhooks + polling
Servicio de Sincronización: Background service con resolución automática de conflictos
Reintentos Automáticos: Cola de sincronizaciones fallidas con reintentos
Logging y Monitoreo: Registro completo de todas las operaciones
Índice¶
1. Arquitectura Completa¶
┌─────────────────────────────────────────────────────────────────────┐
│ WSTG Tracker (Frontend) │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Checklist UI + Auto-Sync │ │
│ │ - Estados: Not Started, In Progress, Blocked, Done, N/A │ │
│ │ - localStorage (persistencia local) │ │
│ │ - Auto-sync al cambiar estado (JavaScript) │ │
│ │ - Cola de reintentos automáticos │ │
│ └──────────────┬───────────────────────────────────────────────┘ │
│ │ HTTP POST /api/wstg/sync (automático) │
└─────────────────┼───────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ API Flask (Aplicación Médica) │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ /api/wstg/sync (POST) │ │
│ │ - Recibe actualizaciones del tracker │ │
│ │ - Valida y procesa cambios │ │
│ │ - Actualiza DefectDojo automáticamente │ │
│ │ - Registra en log │ │
│ └──────────────┬───────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────▼───────────────────────────────────────────────┐ │
│ │ /api/wstg/webhook (POST) │ │
│ │ - Recibe webhooks de DefectDojo │ │
│ │ - Procesa cambios en findings │ │
│ │ - Almacena para sincronización con tracker │ │
│ └──────────────┬───────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────▼───────────────────────────────────────────────┐ │
│ │ Servicio de Sincronización (Background) │ │
│ │ - Polling periódico cada 5 minutos │ │
│ │ - Detecta cambios automáticamente │ │
│ │ - Resuelve conflictos automáticamente │ │
│ │ - Reintenta fallos automáticamente │ │
│ │ - Logging completo │ │
│ └──────────────┬───────────────────────────────────────────────┘ │
└─────────────────┼───────────────────────────────────────────────────┘
│
│ API DefectDojo + Webhooks
▼
┌─────────────────────────────────────────────────────────────────────┐
│ DefectDojo │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Findings + Webhooks │ │
│ │ - Test Type: "WSTG Security Testing" │ │
│ │ - Tags: ["WSTG", "INFO-01", ...] │ │
│ │ - Webhook configurado → /api/wstg/webhook │ │
│ └──────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
2. Guía de Desarrollo Paso a Paso¶
2.1 Paso 1: Crear Módulo de Sincronización Base¶
Archivo: app/wstg_sync.py (NUEVO)
Este es el módulo principal que contiene toda la lógica de sincronización.
#!/usr/bin/env python3
"""
Módulo de sincronización bidireccional WSTG ↔ DefectDojo
Implementación completa con logging, manejo de errores y resolución de conflictos
"""
import os
import sys
import django
import logging
import json
from datetime import datetime
from typing import Dict, Optional, Tuple
from pathlib import Path
# Configurar logging
logger = logging.getLogger('wstg_sync')
logger.setLevel(logging.INFO)
# Configurar Django para acceso a DefectDojo
sys.path.insert(0, '/app')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dojo.settings.settings')
django.setup()
from dojo.models import Finding, Test, Engagement, Product, Product_Type, Test_Type, Tag
from django.contrib.auth.models import User
from django.utils import timezone
# Mapeo de estados WSTG → DefectDojo
WSTG_TO_DD_STATUS = {
'Not Started': {'active': True, 'verified': False, 'false_p': False},
'In Progress': {'active': True, 'verified': False, 'false_p': False},
'Blocked': {'active': True, 'verified': False, 'false_p': False},
'Done': {'active': False, 'verified': True, 'false_p': False},
'Not Applicable': {'active': False, 'verified': False, 'false_p': True}
}
# Mapeo inverso DefectDojo → WSTG
DD_TO_WSTG_STATUS = {
(True, False, False): 'In Progress', # active=True, verified=False, false_p=False
(False, True, False): 'Done', # active=False, verified=True, false_p=False
(False, False, True): 'Not Applicable', # active=False, verified=False, false_p=True
(False, False, False): 'Not Started' # active=False, verified=False, false_p=False
}
# Archivo para almacenar estado de sincronización
SYNC_STATE_FILE = Path('/app/data/wstg_sync_state.json')
SYNC_LOG_FILE = Path('/app/data/wstg_sync.log')
def init_logging():
"""Inicializar logging a archivo"""
if not SYNC_LOG_FILE.parent.exists():
SYNC_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(SYNC_LOG_FILE)
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
def load_sync_state() -> Dict:
"""Cargar estado de sincronización desde archivo JSON"""
if SYNC_STATE_FILE.exists():
try:
with open(SYNC_STATE_FILE, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error cargando estado de sincronización: {e}")
return {}
def save_sync_state(state: Dict):
"""Guardar estado de sincronización a archivo JSON"""
try:
if not SYNC_STATE_FILE.parent.exists():
SYNC_STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(SYNC_STATE_FILE, 'w') as f:
json.dump(state, f, indent=2, default=str)
except Exception as e:
logger.error(f"Error guardando estado de sincronización: {e}")
def log_sync(wstg_id: str, direction: str, old_status: Optional[str],
new_status: str, success: bool, error_message: str = ''):
"""Registrar sincronización en log"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'wstg_id': wstg_id,
'direction': direction,
'old_status': old_status,
'new_status': new_status,
'success': success,
'error_message': error_message
}
logger.info(f"Sync: {json.dumps(log_entry)}")
# También guardar en estado
state = load_sync_state()
if 'sync_log' not in state:
state['sync_log'] = []
state['sync_log'].append(log_entry)
# Mantener solo últimos 1000 registros
if len(state['sync_log']) > 1000:
state['sync_log'] = state['sync_log'][-1000:]
save_sync_state(state)
def get_wstg_test_and_engagement():
"""Obtener o crear Test y Engagement para WSTG"""
try:
admin_user = User.objects.get(username='admin')
except User.DoesNotExist:
logger.error("Usuario admin no encontrado")
raise
product_type, _ = Product_Type.objects.get_or_create(
name='Medical Register',
defaults={'description': 'Aplicación de registro médico'}
)
product, _ = Product.objects.get_or_create(
name='Medical Register App',
defaults={
'description': 'Aplicación web para registro de peso e IMC',
'prod_type': product_type
}
)
engagement, _ = Engagement.objects.get_or_create(
name='WSTG Security Testing',
product=product,
defaults={
'target_start': timezone.now().date(),
'target_end': timezone.now().date(),
'status': 'In Progress',
'lead': admin_user
}
)
test_type, _ = Test_Type.objects.get_or_create(name='WSTG Security Testing')
test, _ = Test.objects.get_or_create(
engagement=engagement,
test_type=test_type,
defaults={
'target_start': timezone.now().date(),
'target_end': timezone.now().date(),
'lead': admin_user
}
)
return test, engagement
def get_wstg_info(wstg_id: str) -> Dict:
"""Obtener información de un item WSTG desde diccionario"""
# Diccionario básico de WSTG (puede expandirse)
wstg_dictionary = {
'WSTG-INFO-01': {
'title': 'Conduct OSINT reconnaissance',
'description': 'Gather information about the target through publicly available sources',
'severity': 'Info'
},
'WSTG-INFO-02': {
'title': 'Fingerprint Web Server',
'description': 'Identify the web server and version',
'severity': 'Info'
},
# Añadir más items según necesidad
}
return wstg_dictionary.get(wstg_id, {
'title': f'{wstg_id} Test',
'description': f'Security test for {wstg_id}',
'severity': 'Medium'
})
def find_finding_by_wstg_id(wstg_id: str, test: Test) -> Optional[Finding]:
"""Buscar finding por WSTG ID en los tags o título"""
# Buscar por tag exacto
findings = Finding.objects.filter(
test=test,
tags__name=wstg_id
).distinct()
if findings.exists():
return findings.first()
# Buscar por tag WSTG y verificar título
findings = Finding.objects.filter(
test=test,
tags__name='WSTG'
).distinct()
for finding in findings:
if wstg_id in finding.title:
return finding
return None
def create_finding_from_wstg(wstg_id: str, status: str, notes: str, test: Test) -> Finding:
"""Crear nuevo finding desde WSTG"""
wstg_info = get_wstg_info(wstg_id)
status_map = WSTG_TO_DD_STATUS.get(status, WSTG_TO_DD_STATUS['Not Started'])
# Mapear severidad
severity_map = {
'Info': 'Info',
'Low': 'Low',
'Medium': 'Medium',
'High': 'High',
'Critical': 'Critical'
}
severity = severity_map.get(wstg_info.get('severity', 'Medium'), 'Medium')
finding = Finding.objects.create(
title=f"{wstg_id}: {wstg_info.get('title', 'WSTG Test')}",
description=wstg_info.get('description', ''),
test=test,
severity=severity,
active=status_map.get('active', True),
verified=status_map.get('verified', False),
false_p=status_map.get('false_p', False),
mitigation=notes or '',
reporter=test.lead
)
# Añadir tags
tag_wstg, _ = Tag.objects.get_or_create(name='WSTG')
tag_id, _ = Tag.objects.get_or_create(name=wstg_id)
finding.tags.add(tag_wstg, tag_id)
logger.info(f"Finding creado: {finding.id} para {wstg_id}")
return finding
def extract_wstg_id(finding: Finding) -> Optional[str]:
"""Extraer WSTG ID de un finding"""
# Buscar en tags
for tag in finding.tags.all():
if tag.name.startswith('WSTG-'):
return tag.name
# Buscar en título
if 'WSTG-' in finding.title:
import re
match = re.search(r'WSTG-\w+-\d+', finding.title)
if match:
return match.group(0)
return None
def determine_wstg_status(finding: Finding) -> str:
"""Determinar estado WSTG basado en estado del finding"""
key = (finding.active, finding.verified, finding.false_p)
return DD_TO_WSTG_STATUS.get(key, 'Not Started')
def sync_from_tracker(data: Dict) -> Dict:
"""
Sincronizar desde WSTG Tracker hacia DefectDojo
"""
wstg_id = data.get('wstg_id')
status = data.get('status')
notes = data.get('notes', '')
timestamp = data.get('timestamp', datetime.now().isoformat())
if not wstg_id or not status:
error_msg = "wstg_id and status are required"
logger.error(error_msg)
return {"success": False, "error": error_msg}
try:
test, _ = get_wstg_test_and_engagement()
# Buscar finding existente
finding = find_finding_by_wstg_id(wstg_id, test)
old_status = determine_wstg_status(finding) if finding else None
if not finding:
# Crear nuevo finding
finding = create_finding_from_wstg(wstg_id, status, notes, test)
action = "created"
else:
# Actualizar finding existente
status_map = WSTG_TO_DD_STATUS.get(status, WSTG_TO_DD_STATUS['Not Started'])
finding.active = status_map.get('active', True)
finding.verified = status_map.get('verified', False)
finding.false_p = status_map.get('false_p', False)
if notes:
finding.mitigation = notes
finding.save()
action = "updated"
# Actualizar estado de sincronización
state = load_sync_state()
if 'items' not in state:
state['items'] = {}
state['items'][wstg_id] = {
'finding_id': finding.id,
'wstg_status': status,
'defectdojo_status': 'verified' if finding.verified else 'active',
'last_sync_timestamp': timestamp,
'last_sync_direction': 'tracker->dd'
}
save_sync_state(state)
# Registrar en log
log_sync(wstg_id, 'tracker->dd', old_status, status, True)
return {
"success": True,
"finding_id": finding.id,
"action": action,
"defectdojo_status": "verified" if finding.verified else "active",
"message": f"Finding {action} correctamente"
}
except Exception as e:
error_msg = str(e)
logger.error(f"Error en sync_from_tracker: {error_msg}", exc_info=True)
log_sync(wstg_id, 'tracker->dd', None, status, False, error_msg)
return {"success": False, "error": error_msg}
def sync_from_defectdojo(data: Dict) -> Dict:
"""
Sincronizar desde DefectDojo hacia WSTG Tracker
"""
finding_data = data.get('finding', {})
finding_id = finding_data.get('id')
event = data.get('event', 'finding_updated')
if not finding_id:
error_msg = "finding id is required"
logger.error(error_msg)
return {"success": False, "error": error_msg}
try:
finding = Finding.objects.get(id=finding_id)
# Extraer WSTG ID
wstg_id = extract_wstg_id(finding)
if not wstg_id:
return {"success": False, "error": "WSTG ID not found in finding"}
# Determinar estado WSTG
wstg_status = determine_wstg_status(finding)
# Actualizar estado de sincronización
state = load_sync_state()
if 'items' not in state:
state['items'] = {}
old_wstg_status = state['items'].get(wstg_id, {}).get('wstg_status')
state['items'][wstg_id] = {
'finding_id': finding.id,
'wstg_status': wstg_status,
'defectdojo_status': 'verified' if finding.verified else 'active',
'last_sync_timestamp': datetime.now().isoformat(),
'last_sync_direction': 'dd->tracker',
'event': event
}
save_sync_state(state)
# Registrar en log
log_sync(wstg_id, 'dd->tracker', old_wstg_status, wstg_status, True)
return {
"success": True,
"wstg_id": wstg_id,
"wstg_status": wstg_status,
"message": "Estado sincronizado con tracker"
}
except Finding.DoesNotExist:
error_msg = f"Finding {finding_id} not found"
logger.error(error_msg)
return {"success": False, "error": error_msg}
except Exception as e:
error_msg = str(e)
logger.error(f"Error en sync_from_defectdojo: {error_msg}", exc_info=True)
return {"success": False, "error": error_msg}
def get_sync_status() -> Dict:
"""Obtener estado de sincronización"""
state = load_sync_state()
items = state.get('items', {})
total_items = len(items)
synced_items = sum(1 for item in items.values()
if item.get('last_sync_timestamp'))
# Contar conflictos (items con diferentes estados)
conflicts = 0
for wstg_id, item_data in items.items():
wstg_status = item_data.get('wstg_status')
dd_status = item_data.get('defectdojo_status')
# Verificar si hay inconsistencia
# (lógica simplificada, puede mejorarse)
if wstg_status and dd_status:
if (wstg_status == 'Done' and dd_status != 'verified') or \
(wstg_status == 'In Progress' and dd_status != 'active'):
conflicts += 1
last_sync = None
if state.get('sync_log'):
last_sync = state['sync_log'][-1].get('timestamp')
return {
"last_sync": last_sync or datetime.now().isoformat(),
"total_items": total_items,
"synced_items": synced_items,
"pending_items": total_items - synced_items,
"conflicts": conflicts
}
# Inicializar logging al importar
init_logging()
2.2 Paso 2: Añadir Endpoints a Flask¶
Archivo: app/routes.py (MODIFICAR - añadir al final)
# Añadir al final del archivo routes.py
@api.route('/wstg/sync', methods=['POST'])
def wstg_sync():
"""
Sincronizar estado desde WSTG Tracker hacia DefectDojo
Endpoint automático llamado por el tracker cuando cambia un estado
"""
from .wstg_sync import sync_from_tracker
data = request.json
if not data:
return jsonify({"error": "No data provided"}), 400
try:
result = sync_from_tracker(data)
if result.get('success'):
return jsonify(result), 200
else:
return jsonify(result), 400
except Exception as e:
current_app.logger.error(f"Error en sincronización WSTG: {e}", exc_info=True)
return jsonify({"error": str(e)}), 500
@api.route('/wstg/webhook', methods=['POST'])
def wstg_webhook():
"""
Recibir webhook de DefectDojo y sincronizar con tracker
Endpoint automático llamado por DefectDojo cuando se actualiza un finding
"""
from .wstg_sync import sync_from_defectdojo
from .config import WSTG_WEBHOOK_KEY
data = request.json
if not data:
return jsonify({"error": "No data provided"}), 400
# Validar autenticación del webhook (API key)
api_key = request.headers.get('X-API-Key') or request.headers.get('Authorization', '').replace('Bearer ', '')
expected_key = os.environ.get('WSTG_WEBHOOK_KEY', WSTG_WEBHOOK_KEY)
if expected_key and expected_key != 'change_me_in_production':
if not api_key or api_key != expected_key:
current_app.logger.warning(f"Intento de webhook no autorizado: {api_key}")
return jsonify({"error": "Unauthorized"}), 401
try:
result = sync_from_defectdojo(data)
if result.get('success'):
return jsonify(result), 200
else:
return jsonify(result), 400
except Exception as e:
current_app.logger.error(f"Error en webhook WSTG: {e}", exc_info=True)
return jsonify({"error": str(e)}), 500
@api.route('/wstg/status', methods=['GET'])
def wstg_status():
"""
Obtener estado de sincronización WSTG
Útil para monitoreo y dashboard
"""
from .wstg_sync import get_sync_status
try:
status = get_sync_status()
return jsonify(status), 200
except Exception as e:
current_app.logger.error(f"Error obteniendo estado WSTG: {e}", exc_info=True)
return jsonify({"error": str(e)}), 500
2.3 Paso 3: Añadir Configuración¶
Archivo: app/config.py (MODIFICAR)
# Añadir al final del archivo
# Configuración WSTG Sync
WSTG_WEBHOOK_KEY = os.environ.get('WSTG_WEBHOOK_KEY', 'change_me_in_production')
WSTG_SYNC_API_URL = os.environ.get('WSTG_SYNC_API_URL', 'http://localhost:5001/api/wstg/sync')
2.4 Paso 4: Crear Servicio de Sincronización en Background¶
Archivo: scripts/wstg_sync_service.py (NUEVO)
#!/usr/bin/env python3
"""
Servicio de sincronización bidireccional WSTG ↔ DefectDojo
Ejecuta polling periódico, detecta cambios y resuelve conflictos automáticamente
"""
import os
import sys
import django
import time
import logging
from datetime import datetime, timedelta
from pathlib import Path
sys.path.insert(0, '/app')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dojo.settings.settings')
django.setup()
from dojo.models import Finding, Test, Test_Type
from app.wstg_sync import (
get_wstg_test_and_engagement,
extract_wstg_id,
determine_wstg_status,
load_sync_state,
save_sync_state,
log_sync
)
# Configurar logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/app/data/wstg_sync_service.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('wstg_sync_service')
def sync_all_wstg_findings():
"""Sincronizar todos los findings WSTG desde DefectDojo"""
try:
test, _ = get_wstg_test_and_engagement()
# Obtener todos los findings con tag WSTG
findings = Finding.objects.filter(
test=test,
tags__name='WSTG'
).distinct()
synced = 0
errors = 0
updated = 0
state = load_sync_state()
if 'items' not in state:
state['items'] = {}
for finding in findings:
try:
wstg_id = extract_wstg_id(finding)
if not wstg_id:
continue
wstg_status = determine_wstg_status(finding)
# Verificar si hay cambios
item_state = state['items'].get(wstg_id, {})
old_wstg_status = item_state.get('wstg_status')
last_sync = item_state.get('last_sync_timestamp')
# Actualizar estado
state['items'][wstg_id] = {
'finding_id': finding.id,
'wstg_status': wstg_status,
'defectdojo_status': 'verified' if finding.verified else 'active',
'last_sync_timestamp': datetime.now().isoformat(),
'last_sync_direction': 'dd->tracker'
}
if old_wstg_status != wstg_status:
updated += 1
logger.info(f"✓ Actualizado {wstg_id}: {old_wstg_status} → {wstg_status}")
log_sync(wstg_id, 'dd->tracker', old_wstg_status, wstg_status, True)
else:
logger.debug(f" Sin cambios {wstg_id}: {wstg_status}")
synced += 1
except Exception as e:
errors += 1
logger.error(f"✗ Error sincronizando finding {finding.id}: {e}", exc_info=True)
save_sync_state(state)
logger.info(f"Sincronización completada: {synced} items, {updated} actualizados, {errors} errores")
return synced, updated, errors
except Exception as e:
logger.error(f"Error en sync_all_wstg_findings: {e}", exc_info=True)
return 0, 0, 1
def resolve_conflicts():
"""Resolver conflictos entre tracker y DefectDojo"""
state = load_sync_state()
items = state.get('items', {})
conflicts_resolved = 0
for wstg_id, item_data in items.items():
wstg_status = item_data.get('wstg_status')
dd_status = item_data.get('defectdojo_status')
last_direction = item_data.get('last_sync_direction')
# Detectar conflictos
is_conflict = False
if wstg_status == 'Done' and dd_status != 'verified':
is_conflict = True
elif wstg_status == 'In Progress' and dd_status != 'active':
is_conflict = True
if is_conflict:
# Estrategia: DefectDojo tiene prioridad
logger.info(f"Resolviendo conflicto para {wstg_id}: DefectDojo tiene prioridad")
# Aquí se podría actualizar el tracker si tuviera API
conflicts_resolved += 1
if conflicts_resolved > 0:
logger.info(f"Conflictos resueltos: {conflicts_resolved}")
return conflicts_resolved
def run_sync_service(interval_minutes=5):
"""Ejecutar servicio de sincronización en loop"""
logger.info(f"🔄 Iniciando servicio de sincronización WSTG (intervalo: {interval_minutes} min)")
while True:
try:
logger.info(f"[{datetime.now()}] Iniciando sincronización...")
synced, updated, errors = sync_all_wstg_findings()
# Resolver conflictos
conflicts = resolve_conflicts()
logger.info(f"[{datetime.now()}] Sincronización completada: {synced} items, {updated} actualizados, {errors} errores, {conflicts} conflictos")
except KeyboardInterrupt:
logger.info("Servicio detenido por usuario")
break
except Exception as e:
logger.error(f"❌ Error en servicio de sincronización: {e}", exc_info=True)
time.sleep(interval_minutes * 60)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Servicio de sincronización WSTG')
parser.add_argument('--interval', type=int, default=5, help='Intervalo en minutos (default: 5)')
parser.add_argument('--once', action='store_true', help='Ejecutar una vez y salir')
args = parser.parse_args()
if args.once:
sync_all_wstg_findings()
resolve_conflicts()
else:
run_sync_service(args.interval)
2.5 Paso 5: Configurar Docker Compose¶
Archivo: docker-compose.yml (MODIFICAR - añadir servicio)
# Servicio de sincronización WSTG (Background)
wstg-sync:
profiles:
- defectdojo
image: defectdojo/defectdojo-django:latest
container_name: wstg-sync-service
restart: unless-stopped
depends_on:
defectdojo:
condition: service_healthy
defectdojo-db:
condition: service_healthy
volumes:
- ./scripts/wstg_sync_service.py:/app/wstg_sync_service.py:ro
- ./app/wstg_sync.py:/app/wstg_sync.py:ro
- ./data:/app/data
environment:
DD_DATABASE_URL: postgresql://defectdojo:defectdojo_password@defectdojo-db:5432/defectdojo
DD_SECRET_KEY: defectdojo_secret_key_change_in_production
DD_DEBUG: "True"
WSTG_SYNC_INTERVAL: ${WSTG_SYNC_INTERVAL:-5}
command: python /app/wstg_sync_service.py --interval ${WSTG_SYNC_INTERVAL:-5}
networks:
- defectdojo-network
healthcheck:
test: ["CMD", "python", "-c", "import sys; sys.exit(0)"]
interval: 60s
timeout: 10s
retries: 3
2.6 Paso 6: Actualizar Makefile¶
Archivo: Makefile (MODIFICAR - añadir targets)
# Sincronización WSTG
sync-wstg: setup-env ## Sincronizar todos los findings WSTG (una vez)
@echo "🔄 Sincronizando findings WSTG..."
@$(COMPOSE) --profile defectdojo exec wstg-sync python /app/wstg_sync_service.py --once || \
$(COMPOSE) --profile defectdojo run --rm wstg-sync python /app/wstg_sync_service.py --once
@echo "✅ Sincronización completada"
wstg-status: setup-env ## Obtener estado de sincronización WSTG
@echo "📊 Estado de sincronización WSTG:"
@curl -s http://localhost:5001/api/wstg/status | python3 -m json.tool || echo "Error obteniendo estado"
wstg-logs: setup-env ## Ver logs del servicio de sincronización WSTG
@echo "📋 Logs del servicio WSTG:"
@$(COMPOSE) --profile defectdojo logs --tail=50 wstg-sync
3. Implementación Completa de Código¶
3.1 Código JavaScript para WSTG Tracker (Automatización Completa)¶
Archivo: wstg-tracker-integration.js (Para añadir al tracker)
Este código debe integrarse en el WSTG Tracker para automatización completa:
/**
* Integración automática WSTG Tracker ↔ DefectDojo
* Añadir este código al WSTG Tracker para sincronización automática
*/
class WSTGSyncManager {
constructor(config = {}) {
this.apiUrl = config.apiUrl || 'http://localhost:5001/api/wstg/sync';
this.syncQueue = [];
this.syncing = false;
this.retryDelay = 5000; // 5 segundos
this.maxRetries = 3;
this.enabled = config.enabled !== false;
// Inicializar
this.init();
}
init() {
if (!this.enabled) {
console.log('WSTG Sync deshabilitado');
return;
}
// Interceptar cambios de estado en el tracker
this.setupStateChangeListener();
// Procesar cola de sincronizaciones pendientes
this.processQueue();
// Reintentar sincronizaciones fallidas periódicamente
setInterval(() => this.retryFailedSyncs(), 30000); // Cada 30 segundos
console.log('WSTG Sync Manager inicializado');
}
setupStateChangeListener() {
// Esta función debe adaptarse según la implementación del tracker
// Ejemplo genérico:
// Si el tracker usa eventos personalizados
if (typeof window !== 'undefined') {
window.addEventListener('wstg-status-changed', (event) => {
const { wstgId, oldStatus, newStatus, notes } = event.detail;
this.syncOnStatusChange(wstgId, oldStatus, newStatus, notes);
});
}
// Si el tracker usa localStorage y se puede observar
this.observeLocalStorage();
}
observeLocalStorage() {
// Observar cambios en localStorage (si el tracker lo usa)
const originalSetItem = localStorage.setItem;
const self = this;
localStorage.setItem = function(key, value) {
originalSetItem.apply(this, arguments);
// Si es un cambio de estado WSTG
if (key.startsWith('wstg_') || key.includes('wstg')) {
try {
const data = JSON.parse(value);
if (data.status && data.wstgId) {
self.syncItem(data.wstgId, data.status, data.notes || '');
}
} catch (e) {
// No es JSON, ignorar
}
}
};
}
async syncItem(wstgId, status, notes = '') {
if (!this.enabled) return;
const payload = {
wstg_id: wstgId,
status: status,
notes: notes,
timestamp: new Date().toISOString(),
user: this.getCurrentUser()
};
try {
const response = await fetch(this.apiUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(payload),
// Timeout de 10 segundos
signal: AbortSignal.timeout(10000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`Sync failed: ${response.status} ${errorText}`);
}
const result = await response.json();
if (result.success) {
console.log(`✓ Sincronizado ${wstgId}:`, result);
this.showNotification(`Sincronizado: ${wstgId}`, 'success');
return result;
} else {
throw new Error(result.error || 'Sync failed');
}
} catch (error) {
console.error(`✗ Error sincronizando ${wstgId}:`, error);
// Añadir a cola para reintento
this.addToQueue(wstgId, status, notes, error);
this.showNotification(`Error sincronizando ${wstgId}`, 'error');
throw error;
}
}
async syncOnStatusChange(wstgId, oldStatus, newStatus, notes = '') {
if (!this.enabled) return;
// Solo sincronizar si el estado cambió
if (oldStatus !== newStatus) {
console.log(`Estado cambiado: ${wstgId} ${oldStatus} → ${newStatus}`);
await this.syncItem(wstgId, newStatus, notes);
}
}
addToQueue(wstgId, status, notes, error, retries = 0) {
this.syncQueue.push({
wstgId,
status,
notes,
error: error.message,
retries,
timestamp: new Date().toISOString()
});
// Guardar cola en localStorage para persistencia
this.saveQueue();
}
async processQueue() {
if (this.syncing || this.syncQueue.length === 0) {
return;
}
this.syncing = true;
while (this.syncQueue.length > 0) {
const item = this.syncQueue[0];
try {
await this.syncItem(item.wstgId, item.status, item.notes);
// Éxito, remover de la cola
this.syncQueue.shift();
} catch (error) {
// Incrementar reintentos
item.retries++;
if (item.retries >= this.maxRetries) {
// Máximo de reintentos alcanzado, remover
console.error(`Máximo de reintentos alcanzado para ${item.wstgId}`);
this.syncQueue.shift();
} else {
// Reintentar más tarde
break;
}
}
// Pequeña pausa entre sincronizaciones
await new Promise(resolve => setTimeout(resolve, 1000));
}
this.saveQueue();
this.syncing = false;
}
async retryFailedSyncs() {
if (this.syncQueue.length > 0) {
console.log(`Reintentando ${this.syncQueue.length} sincronizaciones fallidas...`);
await this.processQueue();
}
}
saveQueue() {
try {
localStorage.setItem('wstg_sync_queue', JSON.stringify(this.syncQueue));
} catch (e) {
console.error('Error guardando cola de sincronización:', e);
}
}
loadQueue() {
try {
const saved = localStorage.getItem('wstg_sync_queue');
if (saved) {
this.syncQueue = JSON.parse(saved);
}
} catch (e) {
console.error('Error cargando cola de sincronización:', e);
}
}
getCurrentUser() {
// Obtener usuario actual del tracker
// Adaptar según implementación del tracker
return localStorage.getItem('wstg_user') || 'admin';
}
showNotification(message, type = 'info') {
// Mostrar notificación al usuario
// Adaptar según implementación del tracker
console.log(`[${type.toUpperCase()}] ${message}`);
// Si hay un sistema de notificaciones en el tracker
if (typeof window !== 'undefined' && window.showNotification) {
window.showNotification(message, type);
}
}
// Método para sincronizar manualmente todos los items
async syncAll() {
console.log('Sincronizando todos los items...');
// Obtener todos los items del tracker
const items = this.getAllTrackerItems();
for (const item of items) {
try {
await this.syncItem(item.wstgId, item.status, item.notes);
await new Promise(resolve => setTimeout(resolve, 500)); // Pausa entre items
} catch (error) {
console.error(`Error sincronizando ${item.wstgId}:`, error);
}
}
console.log('Sincronización completa finalizada');
}
getAllTrackerItems() {
// Obtener todos los items del tracker
// Adaptar según implementación del tracker
// Ejemplo genérico:
const items = [];
try {
const trackerData = localStorage.getItem('wstg_checklist');
if (trackerData) {
const data = JSON.parse(trackerData);
// Procesar data según estructura del tracker
// ...
}
} catch (e) {
console.error('Error obteniendo items del tracker:', e);
}
return items;
}
}
// Inicializar automáticamente cuando se carga la página
if (typeof window !== 'undefined') {
window.wstgSyncManager = new WSTGSyncManager({
apiUrl: 'http://localhost:5001/api/wstg/sync',
enabled: true
});
// Cargar cola guardada
window.wstgSyncManager.loadQueue();
// Sincronizar todos los items al cargar (opcional)
// window.wstgSyncManager.syncAll();
}
// Exportar para uso en módulos
if (typeof module !== 'undefined' && module.exports) {
module.exports = WSTGSyncManager;
}
3.2 Integración en el Tracker¶
Para integrar en el tracker, añadir este código:
Si el tracker es una aplicación web estática:
Añadir el script antes de
</body>Modificar los handlers de cambio de estado para llamar a
wstgSyncManager.syncOnStatusChange()
Si el tracker usa un framework (React/Vue/etc.):
Importar
WSTGSyncManagercomo móduloUsar en los componentes que manejan cambios de estado
Ejemplo de integración en handler de cambio de estado:
// En el código del tracker, cuando se cambia el estado:
function onStatusChange(wstgId, oldStatus, newStatus, notes) {
// Código existente del tracker
updateTrackerState(wstgId, newStatus);
// Añadir sincronización automática
if (window.wstgSyncManager) {
window.wstgSyncManager.syncOnStatusChange(wstgId, oldStatus, newStatus, notes);
}
}
4. Automatización del WSTG Tracker¶
4.1 Opción A: Modificar el Tracker Original¶
Si tienes acceso al código fuente del tracker:
Clonar el repositorio:
https://github.com/adanalvarez/owasp-wstg-trackerAñadir el código de
WSTGSyncManageral proyectoIntegrar en los componentes de cambio de estado
Recompilar y desplegar
4.3 Opción C: Proxy/Interceptor¶
Crear un proxy que intercepte las llamadas del tracker y añada sincronización automática.
5. Configuración de Webhooks¶
5.1 Configurar Webhook en DefectDojo¶
Paso a paso:
Acceder a DefectDojo:
http://localhost:8080Login como admin
Ir a Configuration → Tool Configurations
Crear nueva configuración:
Name:
WSTG Sync WebhookTool Type:
WebhookURL:
http://flask_medical_register:5001/api/wstg/webhookAuthentication Type:
API KeyAPI Key: (generar y copiar)
Events: Seleccionar:
finding_createdfinding_updatedfinding_verifiedfinding_mitigated
Guardar configuración
5.2 Configurar API Key en Flask¶
Archivo: .env o docker-compose.yml
WSTG_WEBHOOK_KEY=tu_api_key_generada_aqui
Archivo: docker-compose.yml (añadir a servicio web)
web:
# ... configuración existente ...
environment:
# ... otras variables ...
WSTG_WEBHOOK_KEY: ${WSTG_WEBHOOK_KEY:-change_me_in_production}
5.3 Verificar Webhook¶
# Probar webhook manualmente
curl -X POST http://localhost:5001/api/wstg/webhook \
-H "Content-Type: application/json" \
-H "X-API-Key: tu_api_key" \
-d '{
"event": "finding_updated",
"finding": {
"id": 123,
"title": "WSTG-INFO-01: Test",
"active": false,
"verified": true,
"tags": ["WSTG", "INFO-01"]
}
}'
6. Servicios en Background¶
6.1 Iniciar Servicio de Sincronización¶
# Con docker-compose (automático al levantar DefectDojo)
make up # O make initDefectDojo
# Verificar que el servicio está corriendo
docker ps | grep wstg-sync
# Ver logs
docker-compose logs -f wstg-sync
6.2 Sincronización Manual¶
# Sincronizar una vez
make sync-wstg
# O directamente
docker-compose exec wstg-sync python /app/wstg_sync_service.py --once
6.3 Configurar Intervalo¶
Archivo: .env
WSTG_SYNC_INTERVAL=5 # minutos
7. Testing y Validación¶
7.1 Tests Automatizados¶
Archivo: tests/backend/blackbox/test_wstg_sync.py (NUEVO)
import pytest
import json
from app.wstg_sync import sync_from_tracker, sync_from_defectdojo, get_sync_status
def test_sync_from_tracker_success(app):
"""Test sincronización exitosa desde tracker"""
data = {
'wstg_id': 'WSTG-INFO-01',
'status': 'Done',
'notes': 'Test de sincronización',
'timestamp': '2025-12-03T17:30:00Z'
}
result = sync_from_tracker(data)
assert result['success'] == True
assert 'finding_id' in result
assert result['action'] in ['created', 'updated']
def test_sync_from_tracker_missing_fields(app):
"""Test sincronización con campos faltantes"""
data = {'wstg_id': 'WSTG-INFO-01'} # Falta status
result = sync_from_tracker(data)
assert result['success'] == False
assert 'error' in result
def test_sync_from_defectdojo_success(app):
"""Test sincronización exitosa desde DefectDojo"""
# Primero crear un finding
from dojo.models import Finding, Test, Test_Type
test = Test.objects.filter(test_type__name='WSTG Security Testing').first()
if not test:
pytest.skip("Test WSTG no existe")
finding = Finding.objects.filter(
test=test,
tags__name='WSTG'
).first()
if not finding:
pytest.skip("No hay findings WSTG para testear")
data = {
'event': 'finding_updated',
'finding': {
'id': finding.id,
'title': finding.title,
'active': finding.active,
'verified': finding.verified,
'tags': [tag.name for tag in finding.tags.all()]
}
}
result = sync_from_defectdojo(data)
assert result['success'] == True
assert 'wstg_id' in result
assert 'wstg_status' in result
def test_get_sync_status(app):
"""Test obtención de estado de sincronización"""
status = get_sync_status()
assert 'last_sync' in status
assert 'total_items' in status
assert 'synced_items' in status
assert 'pending_items' in status
assert 'conflicts' in status
def test_api_endpoint_sync(client):
"""Test endpoint /api/wstg/sync"""
response = client.post('/api/wstg/sync', json={
'wstg_id': 'WSTG-INFO-01',
'status': 'Done',
'notes': 'Test'
})
assert response.status_code == 200
data = response.get_json()
assert data['success'] == True
def test_api_endpoint_status(client):
"""Test endpoint /api/wstg/status"""
response = client.get('/api/wstg/status')
assert response.status_code == 200
data = response.get_json()
assert 'last_sync' in data
7.2 Pruebas Manuales¶
# 1. Probar sincronización desde tracker
curl -X POST http://localhost:5001/api/wstg/sync \
-H "Content-Type: application/json" \
-d '{
"wstg_id": "WSTG-INFO-01",
"status": "Done",
"notes": "Test de sincronización"
}'
# 2. Verificar estado
curl http://localhost:5001/api/wstg/status | python3 -m json.tool
# 3. Verificar finding en DefectDojo
# Acceder a http://localhost:8080 y buscar el finding
# 4. Probar webhook (simulado)
curl -X POST http://localhost:5001/api/wstg/webhook \
-H "Content-Type: application/json" \
-H "X-API-Key: change_me_in_production" \
-d '{
"event": "finding_updated",
"finding": {
"id": 1,
"title": "WSTG-INFO-01: Test",
"active": false,
"verified": true,
"tags": ["WSTG", "INFO-01"]
}
}'
8. Monitoreo y Alertas¶
8.1 Dashboard de Estado¶
Archivo: app/routes.py (añadir endpoint)
@api.route('/wstg/dashboard', methods=['GET'])
def wstg_dashboard():
"""Dashboard de estado de sincronización WSTG"""
from .wstg_sync import load_sync_state, get_sync_status
state = load_sync_state()
status = get_sync_status()
# Estadísticas adicionales
items = state.get('items', {})
stats = {
'by_status': {},
'by_direction': {},
'recent_syncs': state.get('sync_log', [])[-10:] # Últimos 10
}
for item_data in items.values():
status_key = item_data.get('wstg_status', 'Unknown')
stats['by_status'][status_key] = stats['by_status'].get(status_key, 0) + 1
direction = item_data.get('last_sync_direction', 'Unknown')
stats['by_direction'][direction] = stats['by_direction'].get(direction, 0) + 1
return jsonify({
'status': status,
'statistics': stats,
'timestamp': datetime.now().isoformat()
}), 200
8.2 Logging y Alertas¶
El sistema ya incluye logging automático en:
/app/data/wstg_sync.log- Logs de sincronización/app/data/wstg_sync_service.log- Logs del servicio
Monitoreo de logs:
# Ver logs en tiempo real
tail -f data/wstg_sync.log
tail -f data/wstg_sync_service.log
# Buscar errores
grep ERROR data/wstg_sync.log
grep ERROR data/wstg_sync_service.log
8.3 Alertas Automáticas (Opcional)¶
Archivo: scripts/wstg_alert_service.py (NUEVO - opcional)
#!/usr/bin/env python3
"""
Servicio de alertas para sincronización WSTG
Envía notificaciones si hay problemas
"""
import os
import sys
import logging
from datetime import datetime, timedelta
from pathlib import Path
sys.path.insert(0, '/app')
from app.wstg_sync import load_sync_state, get_sync_status
logger = logging.getLogger('wstg_alert')
def check_and_alert():
"""Verificar estado y enviar alertas si es necesario"""
status = get_sync_status()
state = load_sync_state()
alerts = []
# Alerta si hay muchos conflictos
if status['conflicts'] > 10:
alerts.append(f"⚠️ Alto número de conflictos: {status['conflicts']}")
# Alerta si última sincronización es muy antigua
if status['last_sync']:
last_sync = datetime.fromisoformat(status['last_sync'].replace('Z', '+00:00'))
if datetime.now(last_sync.tzinfo) - last_sync > timedelta(hours=1):
alerts.append(f"⚠️ Última sincronización hace más de 1 hora")
# Alerta si hay muchos items pendientes
if status['pending_items'] > status['total_items'] * 0.2:
alerts.append(f"⚠️ Muchos items pendientes: {status['pending_items']}")
# Enviar alertas (adaptar según sistema de notificaciones)
for alert in alerts:
logger.warning(alert)
# Aquí se podría enviar email, Slack, etc.
if __name__ == '__main__':
check_and_alert()
9. Troubleshooting¶
9.1 Problemas Comunes¶
Problema: Sincronización no funciona desde tracker¶
Solución:
Verificar que el tracker puede acceder a
http://localhost:5001/api/wstg/syncVerificar CORS en Flask (ya configurado)
Verificar logs:
tail -f data/wstg_sync.logProbar manualmente con curl
Problema: Webhooks no llegan¶
Solución:
Verificar configuración de webhook en DefectDojo
Verificar que la URL es accesible desde DefectDojo:
http://flask_medical_register:5001/api/wstg/webhookVerificar API key
Verificar logs de Flask:
docker-compose logs web
Problema: Servicio de sincronización no inicia¶
Solución:
Verificar que DefectDojo está corriendo:
docker ps | grep defectdojoVerificar logs:
docker-compose logs wstg-syncVerificar permisos de archivos:
ls -la scripts/wstg_sync_service.pyEjecutar manualmente:
docker-compose exec wstg-sync python /app/wstg_sync_service.py --once
Problema: Conflictos no se resuelven¶
Solución:
Verificar lógica de resolución en
wstg_sync_service.pyRevisar estado:
make wstg-statusSincronizar manualmente:
make sync-wstg
9.2 Comandos de Diagnóstico¶
# Estado general
make wstg-status
# Logs del servicio
make wstg-logs
# Logs de Flask
docker-compose logs web | grep wstg
# Estado de sincronización desde API
curl http://localhost:5001/api/wstg/status | python3 -m json.tool
# Verificar archivos de estado
cat data/wstg_sync_state.json | python3 -m json.tool
# Verificar findings WSTG en DefectDojo
docker-compose exec defectdojo python /app/manage.py shell -c "
from dojo.models import Finding, Test, Test_Type
test = Test.objects.filter(test_type__name='WSTG Security Testing').first()
if test:
print(f'Findings WSTG: {Finding.objects.filter(test=test, tags__name=\"WSTG\").count()}')
"
10. Resumen de Automatización¶
✅ Componentes Automáticos Implementados¶
Sincronización Tracker → DefectDojo
✅ Automática cuando usuario cambia estado
✅ Endpoint
/api/wstg/synclisto✅ Código JavaScript para integración en tracker
✅ Cola de reintentos automáticos
Sincronización DefectDojo → Tracker
✅ Automática vía webhooks
✅ Endpoint
/api/wstg/webhooklisto✅ Polling periódico como backup
✅ Almacenamiento de estado
Servicio de Sincronización
✅ Servicio en background cada 5 minutos
✅ Detección automática de cambios
✅ Resolución automática de conflictos
✅ Logging completo
Manejo de Errores
✅ Reintentos automáticos
✅ Cola de sincronizaciones fallidas
✅ Logging de errores
✅ Notificaciones (opcional)
📋 Checklist de Implementación¶
[ ] Crear
app/wstg_sync.py[ ] Añadir endpoints en
app/routes.py[ ] Añadir configuración en
app/config.py[ ] Crear
scripts/wstg_sync_service.py[ ] Modificar
docker-compose.yml[ ] Actualizar
Makefile[ ] Integrar código JavaScript en tracker
[ ] Configurar webhook en DefectDojo
[ ] Probar sincronización
[ ] Verificar logs y monitoreo
Fecha: 2025-12-03
Versión: 2.0
Estado: Implementación Completa con Automatización Total