AlistoIR Blog Topic

FortiGate Sentbyte Aggregation in Wazuh: A Practical Workaround for Data Exfiltration Detection

June 24, 2026 · By Oliver Roca
This blog explains a practical workaround for a key Wazuh limitation: rolling byte aggregation. Since Wazuh rules are not designed to calculate SUM(sentbyte) over a time window, the workflow uses an external Python correlation service to aggregate FortiGate outbound traffic bytes, generate a synthetic JSON event, and let Wazuh create the final official data exfiltration alert. This keeps Wazuh as the trusted detection source while adding aggregation capability for large outbound transfer monitoring.

A step-by-step tutorial for ingesting raw FortiGate traffic logs, generating synthetic exfiltration alerts, and keeping Wazuh as the official detection source.

Why this tutorial matters

Wazuh can parse FortiGate traffic logs very well, but it is not designed to do rolling byte aggregation like:

  • SUM(sentbyte) by srcip + dstip + dst_port within 15 minutes

That means if you want to detect potential data exfiltration based on large outbound transfer volume, the clean design is:

FortiGate raw logs
  -> Wazuh collects normal traffic events
  -> local correlation service sums sentbyte
  -> correlation service writes a synthetic JSON log
  -> Wazuh reads the synthetic log
  -> Wazuh custom rule creates the final official alert
  -> AlistoIR or another SOAR receives the correlated alert

The most important rule is this:

Never write directly to /var/ossec/logs/alerts/alerts.json.

Your correlation service should write to its own log file, then let Wazuh generate the official alert.

What you will build

This tutorial creates two useful alert layers:

  1. A raw FortiGate investigation alert in Wazuh for the original traffic event.
  2. A final synthetic exfiltration alert in Wazuh when outbound bytes cross a threshold within a rolling 15-minute window.

Final architecture

FortiGate
  -> /var/log/fortigate.log
  -> Wazuh localfile
  -> built-in FortiGate decoder
  -> custom raw rule 110520

/var/log/fortigate.log
  -> byte-correlation.py
  -> /var/log/wazuh-correlation/data_exfil.json
  -> Wazuh localfile
  -> custom correlation rule 110510
  -> /var/ossec/logs/alerts/alerts.json
  -> AlistoIR or another downstream integration

Why is this design better

  • Wazuh stays the official detection source.
  • Analysts still get the raw FortiGate alert for investigation.
  • The correlation logic stays separate and is easier to maintain.
  • The final synthetic alert is still searchable, rule-based, and auditable inside Wazuh.

Tested lab context

This tutorial is based on a working lab flow with:

  • Wazuh 4.14.5 rc1
  • built-in FortiGate decoder and ruleset
  • custom raw FortiGate rule 110520
  • custom final correlation rule 110510

If your environment already uses those IDs, choose another unused range.

Prerequisites

You need:

  • a running Wazuh manager
  • FortiGate traffic logs sent through syslog
  • access to the Linux server that receives FortiGate logs
  • Python 3 on the server that will run the correlator
  • permission to edit ossec.conf and Wazuh custom rule files

Step 1: Decide where the FortiGate logs will land

You have two valid deployment patterns.

Option A: Manager-local pattern

FortiGate sends syslog directly to the Wazuh manager.

FortiGate -> Wazuh manager -> /var/log/fortigate.log

This is the simplest version.

Option B: Separate Linux collector pattern

FortiGate sends syslog to another Linux server, which has a Wazuh agent.

FortiGate
  -> Linux collector
  -> /var/log/fortigate.log
  -> byte-correlation.py
  -> /var/log/wazuh-correlation/data_exfil.json
  -> Wazuh agent
  -> Wazuh manager

If you use a separate collector, the correlator should usually run on that collector rather than on the manager.

Step 2: Configure FortiGate syslog landing

If you are using rsyslog, create a dedicated FortiGate landing file.

File:

nano /etc/rsyslog.d/40-fortigate.conf

Example:

# FortiGate syslog landing file for the Wazuh manager or collector
#
# Adjust the port and transport if your FortiGate sends TCP or a different port.

module(load="imudp")
input(type="imudp" port="514")

if ($fromhost-ip == "FORTIGATE_IP_HERE") then {
    action(type="omfile" file="/var/log/fortigate.log")
    stop
}

Replace FORTIGATEIPHERE with the real FortiGate source IP.

Then create the file and restart rsyslog:

sudo touch /var/log/fortigate.log
sudo systemctl restart rsyslog
sudo tail -f /var/log/fortigate.log
rsyslog

If logs are arriving correctly, you should now see raw FortiGate traffic events in /var/log/fortigate.log.

fortigate log

Step 3: Validate the built-in FortiGate decoder

Before adding correlation, confirm Wazuh can parse a FortiGate traffic event correctly.

Run:

sudo /var/ossec/bin/wazuh-logtest

Paste this sample FortiGate traffic line:

date=2026-06-14 time=10:00:05 devname="FGT-HQ" devid="FGT60F1234567890" logid="0000000013" type="traffic" subtype="forward" level="notice" vd="root" eventtime=1781421605 srcip=10.10.5.25 srcport=53111 srcintf="lan" srcintfrole="lan" dstip=203.0.113.50 dstport=443 dstintf="wan1" dstintfrole="wan" poluuid="11111111-1111-1111-1111-111111111111" sessionid=900001 proto=6 action="close" policyid=12 policytype="policy" service="HTTPS" dstcountry="Reserved" srccountry="Reserved" trandisp="snat" transip=198.51.100.20 transport=53111 appid=40568 app="HTTPS.BROWSER" appcat="Web.Client" apprisk="medium" applist="default" duration=120 sentbyte=4294967296 rcvdbyte=10485760 sentpkt=4200 rcvdpkt=3100 utmaction="allow" countapp=1 devtype="Workstation" osname="Windows" mastersrcmac="00:11:22:33:44:55" srcmac="00:11:22:33:44:55" srcserver=0 utmref=0-900001

You should see FortiGate fields like:

  • srcip
  • dstip
  • dstport
  • sessionid
  • sentbyte

And Wazuh should match the built-in FortiGate traffic rule chain.

wazuh logtest

Step 4: Create the raw FortiGate investigation rule

This rule is not your final exfiltration detection. Its purpose is to make the original FortiGate traffic event visible in alerts.json for investigation.

Create:

sudo nano /var/ossec/etc/rules/fortigate_raw_traffic_rules.xml

Add:

<group name="fortigate,custom,raw_traffic,">
  <rule id="110520" level="3">
    <if_sid>81618</if_sid>
    <description>FortiGate raw traffic event with outbound byte count recorded</description>
    <group>fortigate,network,raw_traffic,custom_rule,investigation_ready,</group>
  </rule>
</group>
fortigate raw traffic rule

Why rule 81618?

81618 is Wazuh’s built-in FortiGate traffic rule. Your custom rule uses it as the parent, so the raw FortiGate event becomes easier to investigate in alerts.json.

Step 5: Register the raw FortiGate log with Wazuh

If the raw log is on the Wazuh manager

Add this to the manager’s /var/ossec/etc/ossec.conf:

<localfile>
  <log_format>syslog</log_format>
  <location>/var/log/fortigate.log</location>
</localfile>
register raw fortigate log

If the raw log is on a separate Linux collector with a Wazuh agent

Add the same block to the agent’s /var/ossec/etc/ossec.conf instead.

Step 6: Create the synthetic correlation output path

Create the directories and file:

sudo mkdir -p /opt/wazuh-correlation
sudo mkdir -p /var/log/wazuh-correlation
sudo mkdir -p /var/lib/wazuh-correlation
sudo touch /var/log/wazuh-correlation/data_exfil.json
sudo chown root:wazuh /var/log/wazuh-correlation/data_exfil.json
wazuh correlation

This file will receive the synthetic JSON alerts produced by your correlation script.

Step 7: Create the final correlation rule

Now create the Wazuh rule that will detect the synthetic JSON alert.

Create:

sudo nano /var/ossec/etc/rules/data_exfil_correlation_rules.xml

Add:

<group name="firewall,correlation,data_exfiltration,">
  <rule id="110510" level="12">
    <decoded_as>json</decoded_as>
    <field name="correlation_type">^data_exfiltration_bytes_threshold$</field>
    <description>Possible Data Exfiltration: Suspiciously large outbound data transfer detected within 15 minutes</description>
    <group>data_exfiltration,network,correlation,</group>
    <mitre>
      <id>T1041</id>
      <id>T1048</id>
      <id>T1020</id>
    </mitre>
  </rule>
</group>
Picture1

This rule looks for:

correlation_type = data_exfiltration_bytes_threshold

and creates the final high-severity exfiltration alert.

Step 8: Register the synthetic JSON log with Wazuh

If the synthetic file is on the Wazuh manager

Add this to the manager’s /var/ossec/etc/ossec.conf:

<localfile>
  <log_format>json</log_format>
  <location>/var/log/wazuh-correlation/data_exfil.json</location>
</localfile>
synthetic json log

If the synthetic file is on a separate Linux collector with a Wazuh agent

Add the same block to the agent’s ossec.conf.

Step 9: Create byte-correlation.py

Save this as:

/opt/wazuh-correlation/byte-correlation.py

This script:

  • reads FortiGate raw logs
  • keeps only type=traffic and subtype=forward
  • parses sentbyte
  • groups by srcip + dstip + dst_port + protocol
  • sums outbound bytes inside a rolling 15-minute window
  • writes a synthetic JSON alert once the threshold is reached
#!/usr/bin/env python3
"""Aggregate FortiGate sentbyte values into synthetic Wazuh JSON alerts."""

from __future__ import annotations

import argparse
import json
import os
import shlex
import sys
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Deque, Iterable


@dataclass
class Event:
    timestamp: datetime
    src_ip: str
    dst_ip: str
    dst_port: str
    protocol: str
    session_id: str
    sentbyte: int
    raw_fields: dict[str, str]


@dataclass
class ReadState:
    inode: int = 0
    offset: int = 0
    initialized: bool = False


class Correlator:
    def __init__(self, threshold_bytes: int, window_minutes: int) -> None:
        self.threshold_bytes = threshold_bytes
        self.window_minutes = window_minutes
        self.window = timedelta(minutes=window_minutes)
        self.grouped: dict[tuple[str, str, str, str], Deque[Event]] = defaultdict(deque)
        self.last_emitted_at: dict[tuple[str, str, str, str], datetime] = {}

    def process_event(self, event: Event) -> dict[str, object] | None:
        key = (event.src_ip, event.dst_ip, event.dst_port, event.protocol)
        bucket = self.grouped[key]
        bucket.append(event)

        cutoff = event.timestamp - self.window
        while bucket and bucket[0].timestamp < cutoff:
            bucket.popleft()

        total_bytes = sum(item.sentbyte for item in bucket)
        last_emitted = self.last_emitted_at.get(key)
        if total_bytes < self.threshold_bytes:
            return None
        if last_emitted and (event.timestamp - last_emitted) < self.window:
            return None

        alert = build_alert(bucket, self.window_minutes)
        self.last_emitted_at[key] = event.timestamp
        return alert


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Read FortiGate raw logs and emit synthetic JSON correlation alerts."
    )
    parser.add_argument("--input", required=True, help="Path to raw FortiGate log file")
    parser.add_argument("--output", required=True, help="Path to write synthetic JSONL alerts")
    parser.add_argument(
        "--mode",
        choices=("once", "follow"),
        default="once",
        help="Run once over the whole file or follow the input like a service",
    )
    parser.add_argument(
        "--threshold-bytes",
        type=int,
        default=12 * 1024 * 1024 * 1024,
        help="Minimum total sent bytes within the time window before emitting an alert",
    )
    parser.add_argument(
        "--window-minutes",
        type=int,
        default=15,
        help="Rolling aggregation window in minutes",
    )
    parser.add_argument(
        "--poll-interval",
        type=float,
        default=2.0,
        help="Seconds to wait between file polls in follow mode",
    )
    parser.add_argument(
        "--state-file",
        default="",
        help="JSON file that stores read offset for follow mode",
    )
    parser.add_argument(
        "--start-at-end-on-first-run",
        action="store_true",
        help="When no prior state exists, start from EOF instead of replaying the whole file",
    )
    parser.add_argument(
        "--max-loops",
        type=int,
        default=0,
        help="Test-only guard for follow mode. Zero means run forever",
    )
    return parser.parse_args()


def parse_kv_log(line: str) -> dict[str, str]:
    fields: dict[str, str] = {}
    for token in shlex.split(line.strip()):
        if "=" not in token:
            continue
        key, value = token.split("=", 1)
        fields[key] = value.strip('"')
    return fields


def parse_timestamp(fields: dict[str, str]) -> datetime:
    date_value = fields.get("date")
    time_value = fields.get("time")
    if not date_value or not time_value:
        raise ValueError("Missing date/time fields")

    parsed = datetime.strptime(f"{date_value} {time_value}", "%Y-%m-%d %H:%M:%S")
    return parsed.replace(tzinfo=UTC)


def normalize_protocol(value: str) -> str:
    mapping = {
        "6": "TCP",
        "17": "UDP",
        "1": "ICMP",
    }
    return mapping.get(value, value.upper() if value else "UNKNOWN")


def parse_event(line: str) -> Event | None:
    fields = parse_kv_log(line)
    if fields.get("type") != "traffic" or fields.get("subtype") != "forward":
        return None

    if "sentbyte" not in fields or "srcip" not in fields or "dstip" not in fields:
        return None

    timestamp = parse_timestamp(fields)
    return Event(
        timestamp=timestamp,
        src_ip=fields["srcip"],
        dst_ip=fields["dstip"],
        dst_port=fields.get("dstport", ""),
        protocol=normalize_protocol(fields.get("proto", "")),
        session_id=fields.get("sessionid", ""),
        sentbyte=int(fields["sentbyte"]),
        raw_fields=fields,
    )


def load_events(path: Path) -> list[Event]:
    events: list[Event] = []
    for raw_line in path.read_text(encoding="utf-8").splitlines():
        line = raw_line.strip()
        if not line or line.startswith("#"):
            continue
        try:
            event = parse_event(line)
        except Exception as exc:  # pragma: no cover
            print(f"Skipping unparsable line: {exc}", file=sys.stderr)
            continue
        if event is not None:
            events.append(event)
    return sorted(events, key=lambda item: item.timestamp)


def build_alert(window_events: Iterable[Event], window_minutes: int) -> dict[str, object]:
    batch = list(window_events)
    first = batch[0]
    last = batch[-1]
    total_bytes = sum(item.sentbyte for item in batch)

    return {
        "correlation_type": "data_exfiltration_bytes_threshold",
        "signature": "Possible Data Exfiltration - High Outbound Bytes",
        "severity": "high",
        "src_ip": first.src_ip,
        "dst_ip": first.dst_ip,
        "dst_port": first.dst_port,
        "protocol": first.protocol,
        "total_bytes_out": total_bytes,
        "total_gb_out": round(total_bytes / (1024 ** 3), 2),
        "event_count": len(batch),
        "time_window": f"{window_minutes}m",
        "first_seen": first.timestamp.isoformat().replace("+00:00", "Z"),
        "last_seen": last.timestamp.isoformat().replace("+00:00", "Z"),
        "mitre": "T1041,T1048,T1020",
        "recommendation": (
            "Validate destination IP, review endpoint activity, and block or isolate if unauthorized."
        ),
        "source_product": "FortiGate",
        "source_log_type": "traffic-forward",
        "aggregation_key": f"{first.src_ip}|{first.dst_ip}|{first.dst_port}|{first.protocol}",
        "session_ids": [item.session_id for item in batch if item.session_id],
    }


def correlate(events: list[Event], threshold_bytes: int, window_minutes: int) -> list[dict[str, object]]:
    correlator = Correlator(threshold_bytes=threshold_bytes, window_minutes=window_minutes)
    alerts: list[dict[str, object]] = []
    for event in events:
        alert = correlator.process_event(event)
        if alert is not None:
            alerts.append(alert)
    return alerts


def load_state(path: Path) -> ReadState:
    if not path.is_file():
        return ReadState()
    try:
        payload = json.loads(path.read_text(encoding="utf-8"))
    except json.JSONDecodeError:
        return ReadState()
    return ReadState(
        inode=int(payload.get("inode", 0)),
        offset=int(payload.get("offset", 0)),
        initialized=bool(payload.get("initialized", False)),
    )


def save_state(path: Path, state: ReadState) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    payload = {
        "inode": state.inode,
        "offset": state.offset,
        "initialized": state.initialized,
    }
    path.write_text(json.dumps(payload, separators=(",", ":")), encoding="utf-8")


def open_from_state(
    input_path: Path,
    state: ReadState,
    start_at_end_on_first_run: bool,
) -> tuple[object, ReadState]:
    handle = input_path.open("r", encoding="utf-8", newline="")
    stat_result = os.fstat(handle.fileno())
    inode = stat_result.st_ino

    if not state.initialized:
        offset = stat_result.st_size if start_at_end_on_first_run else 0
        handle.seek(offset)
        new_state = ReadState(inode=inode, offset=offset, initialized=True)
        return handle, new_state

    if state.inode != inode or state.offset > stat_result.st_size:
        handle.seek(0)
        new_state = ReadState(inode=inode, offset=0, initialized=True)
        return handle, new_state

    handle.seek(state.offset)
    new_state = ReadState(inode=inode, offset=state.offset, initialized=True)
    return handle, new_state


def emit_alert(handle: object, alert: dict[str, object]) -> None:
    handle.write(json.dumps(alert, separators=(",", ":")) + "\n")
    handle.flush()


def run_once(args: argparse.Namespace) -> int:
    input_path = Path(args.input)
    output_path = Path(args.output)

    events = load_events(input_path)
    alerts = correlate(events, args.threshold_bytes, args.window_minutes)

    output_path.parent.mkdir(parents=True, exist_ok=True)
    with output_path.open("w", encoding="utf-8", newline="\n") as handle:
        for alert in alerts:
            emit_alert(handle, alert)

    print(f"Parsed events: {len(events)}")
    print(f"Generated synthetic alerts: {len(alerts)}")
    print(f"Output: {output_path}")
    return 0


def run_follow(args: argparse.Namespace) -> int:
    input_path = Path(args.input)
    output_path = Path(args.output)
    state_path = Path(args.state_file) if args.state_file else output_path.with_suffix(".state.json")

    correlator = Correlator(threshold_bytes=args.threshold_bytes, window_minutes=args.window_minutes)
    state = load_state(state_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    loop_count = 0
    with output_path.open("a", encoding="utf-8", newline="\n") as out_handle:
        file_handle, state = open_from_state(input_path, state, args.start_at_end_on_first_run)
        save_state(state_path, state)

        try:
            while True:
                line = file_handle.readline()
                if line:
                    state.offset = file_handle.tell()
                    save_state(state_path, state)

                    raw = line.strip()
                    if not raw or raw.startswith("#"):
                        continue

                    try:
                        event = parse_event(raw)
                    except Exception as exc:  # pragma: no cover
                        print(f"Skipping unparsable line: {exc}", file=sys.stderr)
                        continue
                    if event is None:
                        continue

                    alert = correlator.process_event(event)
                    if alert is not None:
                        emit_alert(out_handle, alert)
                        print(
                            "Generated synthetic alert for "
                            f"{alert['aggregation_key']} with {alert['total_bytes_out']} bytes"
                        )
                    continue

                time.sleep(args.poll_interval)
                loop_count += 1
                if args.max_loops and loop_count >= args.max_loops:
                    break

                current_stat = input_path.stat()
                if current_stat.st_ino != state.inode or current_stat.st_size < state.offset:
                    file_handle.close()
                    file_handle, state = open_from_state(input_path, state, False)
                    save_state(state_path, state)
        finally:
            file_handle.close()

    print(f"State: {state_path}")
    print(f"Output: {output_path}")
    return 0


def main() -> int:
    args = parse_args()
    if args.mode == "follow":
        return run_follow(args)
    return run_once(args)


if __name__ == "__main__":
    raise SystemExit(main())
python script

Make it executable:

sudo chmod +x /opt/wazuh-correlation/byte-correlation.py

Step 10: Create the service environment file

Create:

sudo nano /etc/default/fortigate-byte-correlation

Add:

INPUT_LOG=/var/log/fortigate.log
OUTPUT_LOG=/var/log/wazuh-correlation/data_exfil.json
STATE_FILE=/var/lib/wazuh-correlation/fortigate-byte-correlation.state.json
THRESHOLD_BYTES=10737418240
WINDOW_MINUTES=15
POLL_INTERVAL=2
START_AT_END_FLAG=--start-at-end-on-first-run
service environment

What does the threshold mean?

10737418240 bytes = 10 GiB

You can tune this based on your environment.

Step 11: Create the systemd service

Create:

sudo nano /etc/systemd/system/fortigate-byte-correlation.service

Add:

[Unit]
Description=FortiGate byte correlation for Wazuh synthetic exfil alerts
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
User=root
Group=wazuh
EnvironmentFile=/etc/default/fortigate-byte-correlation
ExecStart=/usr/bin/python3 /opt/wazuh-correlation/byte-correlation.py \
  --mode follow \
  --input ${INPUT_LOG} \
  --output ${OUTPUT_LOG} \
  --state-file ${STATE_FILE} \
  --threshold-bytes ${THRESHOLD_BYTES} \
  --window-minutes ${WINDOW_MINUTES} \
  --poll-interval ${POLL_INTERVAL} \
  ${START_AT_END_FLAG}
Restart=always
RestartSec=5
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=full
ReadWritePaths=/var/log/fortigate.log /var/log/wazuh-correlation /var/lib/wazuh-correlation

[Install]
WantedBy=multi-user.target
systemd service

Enable and start it:

sudo systemctl daemon-reload
sudo systemctl enable --now fortigate-byte-correlation
sudo systemctl status fortigate-byte-correlation

Step 12: Restart Wazuh

After updating localfiles and rules, restart Wazuh:

sudo systemctl restart wazuh-manager
sudo systemctl status wazuh-manager

If you are using a separate Linux collector with a Wazuh agent:

  • restart the agent after changing the agent ossec.conf
  • restart the manager after changing manager-side rules

Step 13: Test the raw FortiGate alert

Use wazuh-logtest to validate the raw FortiGate event path.

Run:

sudo /var/ossec/bin/wazuh-logtest

Paste the raw FortiGate sample log from Step 3.

Expected raw result

Rule id: 110520
Level: 3
Description: FortiGate raw traffic event with outbound byte count recorded
wazuh logtest with rule 110520

Then monitor live alerts:

sudo tail -f /var/ossec/logs/alerts/alerts.json

You should now see the raw FortiGate alert in alerts.json.

Step 14: Test the final synthetic correlation alert

You can also validate the synthetic JSON alert with wazuh-logtest.

Paste this JSON:

{"correlation_type":"data_exfiltration_bytes_threshold","signature":"Possible Data Exfiltration - High Outbound Bytes","severity":"high","src_ip":"10.10.5.25","dst_ip":"203.0.113.50","dst_port":"443","protocol":"TCP","total_bytes_out":12884901888,"total_gb_out":12,"event_count":3,"time_window":"15m","first_seen":"2026-06-15T00:05:05Z","last_seen":"2026-06-14T10:00:05Z","mitre":"T1041,T1048,T1020","recommendation":"Validate destination IP, review endpoint activity, and block or isolate if unauthorized.","source_product":"FortiGate","source_log_type":"traffic-forward","aggregation_key":"10.10.5.25|203.0.113.50|443|TCP","session_ids":["900201","900001","900001"]}

Expected final correlation result

Rule id: 110510
Level: 12
Description: Possible Data Exfiltration: Suspiciously large outbound data transfer detected within 15 minutes
expected final correlation

If the pipeline is working end to end, that synthetic event should also appear in:

/var/ossec/logs/alerts/alerts.json

Step 15: What your final state should look like

At this point, your environment should produce two useful alert layers:

  • Rule 110520: raw FortiGate traffic alert for investigation
  • Rule 110510: final high-confidence correlation alert for exfiltration triage

This is the main benefit of the design:

Raw FortiGate event stays available for analyst investigation
Correlation script handles sentbyte math
Wazuh still creates the final official alert
SOAR only needs the correlated alert if you want to keep triage noise low

Optional: Separate Linux collector with Wazuh agent

If FortiGate logs land on another Linux server, this is still a valid and often cleaner architecture.

Flow

FortiGate
  -> Linux collector
  -> /var/log/fortigate.log
  -> byte-correlation.py
  -> /var/log/wazuh-correlation/data_exfil.json
  -> Wazuh agent localfile entries
  -> Wazuh manager
  -> rules and alerts.json

Agent-side localfile config

Add both of these to the collector agent’s /var/ossec/etc/ossec.conf:

<localfile>
  <log_format>syslog</log_format>
  <location>/var/log/fortigate.log</location>
</localfile>

<localfile>
  <log_format>json</log_format>
  <location>/var/log/wazuh-correlation/data_exfil.json</location>
</localfile>

Important difference in this pattern

In this design:

  • the collector receives FortiGate syslog
  • the collector runs byte-correlation.py
  • the collector Wazuh agent forwards raw and synthetic logs
  • the manager keeps the custom rules and generates the final alerts

Troubleshooting

Raw log arrives, but no alert is created

  • test the raw FortiGate line in wazuh-logtest
  • confirm Wazuh matches the built-in FortiGate traffic rule first
  • check that your custom rule file is loaded under etc/rules

Synthetic JSON is written, but no final alert appears

  • confirm log_format is json
  • test one JSON line with wazuh-logtest
  • confirm the rule matches correlationtype=dataexfiltrationbytesthreshold

Custom rule file exists but Wazuh ignores it

  • restart wazuh-manager
  • confirm your custom rule directory is loaded
  • confirm the XML syntax is valid

Too many repeated alerts

  • raise the threshold
  • increase the time window
  • extend duplicate suppression logic in byte-correlation.py

Nothing is written to the synthetic file

  • check:
sudo systemctl status fortigate-byte-correlation
  • confirm the environment file paths are correct
  • confirm the script can read /var/log/fortigate.log

Separate collector deployment does not work

Remember:

  • manager-local localfile entries are not needed for logs already forwarded by the Wazuh agent
  • raw and synthetic log localfiles should be on the collector agent
  • custom rules stay on the manager

Conclusion

This correlation pattern gives you a practical way to detect large outbound FortiGate transfers without forcing Wazuh rules to do arithmetic they were not designed to do.

The final model is clean:

FortiGate provides the raw evidence
Python handles the byte aggregation
Wazuh creates the final official alert

References