吉沃运营专员 发表于 2022-5-27 15:40:16

Passive DNS 捕获和监视工具包 —— Dnsmonster

项目地址:https://github.com/mosajjal/dnsmonster
基于 Golang 构建的 Passive DNS 监控框架。dnsmonster 为 DNS 流量实现数据包嗅探器。它可以接受来自 pcap 文件、实时接口或 dnstap 套接字的流量,并且可以用于索引和存储每秒数十万个 DNS 查询,因为它已被证明能够在商品上每秒索引 20 万多个 DNS 查询电脑。它的目标是可扩展、简单且易于使用,并且能帮助安全团队了解有关企业 DNS 流量的详细信息。dnsmonster 不希望跟踪 DNS 对话,而是旨在在 DNS 数据包进入后立即对其进行索引。它也不旨在破坏最终用户的隐私,具有屏蔽第 3 层 IP (IPv4 和 IPv6),使团队能够对聚合数据执行趋势分析,而无需将查询追溯到个人。

版本 1.x 之前的代码被认为是 beta,可能会发生重大更改。请访问每个标签的发行说明,以查看每个发行版之间的中断方案列表,以及如何减少潜在的数据丢失。

特性

[*]能够使用 Linux 的 afpacket 和零拷贝数据包捕获。
[*]支持 BPF
[*]能够屏蔽 IP 地址以增强隐私
[*]具有预处理采样率的能力
[*]能够拥有 "跳过" fqdns 列表以避免将某些域/后缀/前缀写入存储
[*]能够拥有 "允许" 域列表,用于记录对某些域的访问
[*]跳过并允许域文件/url 的热重载
[*]每个输出流具有可配置逻辑的模块化输出。
[*]使用 ClickHouse 的 TTL 属性的自动数据保留策略
[*]用于 ClickHouse 输出的内置 Grafana 仪表板
[*]能够作为单个静态链接的二进制文件交付
[*]能够使用环境变量、命令行选项或配置文件进行配置
[*]能够使用 ClickHouse 的 SAMPLE 功能对输出进行采样
[*]能够使用 prometheus 和 statstd 发送指标
[*]得益于 ClickHouse 的内置 LZ4 存储,实现了高压缩比
[*]支持基于 TCP 的 DNS、分段 DNS (udp/tcp) 和 IPv6
[*]支持 dnstrap over Unix socket 或 TCP
[*]与 Splunk 和 Microsoft Sentinel 的内置 SIEM 集成

安装
Linux

开始使用 dnsmonster 的最佳方法是从 release 部分下载二进制文件。该二进制文件是针对 musl 静态构建的,因此对于许多发行版来说它是开箱即用的。对于 afpacket 支持,必须使用内核 3.x+。任何现代 Linux 发行版 (CentOS/RHEL 7+、Ubuntu 14.0.4.2+、Debian 7+) 都附带 3.x+ 版本,因此它应该可以开箱即用。如果你的发行版无法使用预编译版本,请提交包含详细信息的问题,并使用手动构建部分手动构建 dnsmonster。

容器

由于 dnsmonster 使用原始数据包捕获功能,Docker/Podman 守护进程必须将能力授予容器

sudo docker run --rm -it --net=host --cap-add NET_RAW --cap-add NET_ADMIN --name dnsmonster ghcr.io/mosajjal/dnsmonster:latest --devName lo --stdoutOutputType=1
手动构建

使用 libpcap:确保已安装 libpcap-devel 和 linux-headers 软件包。软件包的名称可能因发行版而异。在此之后,只需克隆存储库并运行 go build 。

git clone https://github.com/mosajjal/dnsmonster --depth 1 /tmp/dnsmonster
cd /tmp/dnsmonster
go get
go build -o dnsmonster .
没有 libpcap:dnsmonster 只使用 libpcap 中的一个函数,将 tcpdump 样式的过滤器转换为 BPF 字节码。如果可以忍受没有 BPF 支持,可以在没有 libpcap 的情况下构建 dnsmonster。请注意,对于任何其他平台,数据包捕获都会退回到 libpcap,因此它会成为硬依赖 (*BSD、Windows、Darwin)

git clone https://github.com/mosajjal/dnsmonster --depth 1 /tmp/dnsmonster
cd /tmp/dnsmonster
go get
go build -o dnsmonster -tags nolibpcap .
上述构建也适用于 ARMv7 (RPi4) 和 AArch64 中

静态构建

如果你有 libpcap.a 的副本,则可以将其静态链接到 dnsmonster 并完全静态地构建它。在下面的代码中,请将 /root/libpcap-1.9.1/libpcap.a 更改为你的副本的位置。

git clone https://github.com/mosajjal/dnsmonster --depth 1 /tmp/dnsmonster
cd /tmp/dnsmonster/
go get
go build --ldflags "-L /root/libpcap-1.9.1/libpcap.a -linkmode external -extldflags \"-I/usr/include/libnl3 -lnl-genl-3 -lnl-3 -static\"" -a -o dnsmonster
有关如何创建静态链接二进制文件的更多信息,请查看此 Dockerfile。

Windows

在 Windows 上构建与 Linux 非常相似。 只要确保有 npcap。 克隆存储库 (--history 1 有效),然后运行 go get 和 go build。

如前所述,二进制文件的 Windows 版本取决于要安装的 npcap。安装后,二进制文件应该可以开箱即用。它已经在 Windows 10 环境中进行了测试,并且没有问题。要查找接口名称以提供 --devName 参数并开始嗅探,需要执行以下操作:


[*]以管理员身份打开 cmd.exe 并运行以下命令:getmac.exe,将看到一个包含接口 MAC 地址的表和一个传输名称列,其中包含如下内容:\Device\Tcpip_{16000000-0000-0000-0000- 145C4638064C}
[*]在 cmd.exe 中运行 dnsmonster.exe -> dnsmonster.exe --devName \Device\NPF_{16000000-0000-0000-0000-145C4638064C}

请注意,必须将 \Tcpip 从 getmac.exe 更改为 \NPF,然后将其传递给 dnsmonster.exe。

FreeBSD and MacOS

与 Linux 和 Windows 非常相似,请确保已安装 git、libpcap 和 go,然后按照相同的说明进行操作:


[*]git clone https://github.com/mosajjal/dnsmonster --depth 1 /tmp/dnsmonster
[*]cd /tmp/dnsmonster
[*]go get
[*]go build -o dnsmonster

体系架构
使用 Docker 进行多合一安装



在示例图中,捕获了 DNS 服务器流量的出口/入口,然后在到达 DNSMonster 服务器之前添加了一个可选的数据包聚合层。从 DNS 服务器传出的出站数据对于在 DNS 队列上执行缓存和性能分析非常有用。如果无法使用聚合器,可以让两个 TAP 直接连接到 DNSMonster,并让两个 DNSMonster 代理查看流量。

运行 ./autobuild.sh 创建多个容器:


[*]dnsmonster 的多个实例来查看任何接口上的流量。接口列表将作为 autobuild.sh 的一部分提示
[*]clickhouse 的一个实例,用于收集 dnsmonster 的输出并将所有日志/数据保存到数据和日志目录。两者都将作为 autobuild.sh 的一部分提示
[*]grafana 的一个实例,它使用预先构建的仪表板查看 clickhouse 数据

企业部署



配置
DNSMonster 可以使用 3 种不同的方法进行配置。命令行选项、环境变量和配置文件。优先顺序:


[*]命令行选项 (区分大小写,camelCase)
[*]环境变量 (总是大写)
[*]配置文件 (区分大小写,PascalCase)
[*]默认值 (无配置)

命令行选项

请注意,命令行参数目前是区分大小写和驼峰式的。这是 dnsmonster 使用的底层标志解析器库的已知限制。

#
# Device used to capture
--devName=

# Pcap filename to run
--pcapFile=

# dnstap socket path. Example: unix:///tmp/dnstap.sock, tcp://127.0.0.1:8080
--dnstapSocket=

# Port selected to filter packets
--port=53

# Capture Sampling by a:b. eg sampleRatio of 1:100 will process 1 percent of the incoming packets
--sampleRatio=1:1

# Cleans up packet hash table used for deduplication
--dedupCleanupInterval=1m0s

# Set the dnstap socket permission, only applicable when unix:// is used
--dnstapPermission=755

# Number of routines used to handle received packets
--packetHandlerCount=2

# Size of the tcp assembler
--tcpAssemblyChannelSize=10000

# Size of the tcp result channel
--tcpResultChannelSize=10000

# Number of routines used to handle tcp packets
--tcpHandlerCount=1

# Size of the channel to send packets to be defragged
--defraggerChannelSize=10000

# Size of the channel where the defragged packets are returned
--defraggerChannelReturnSize=10000

# Size of the packet handler channel
--packetChannelSize=1000

# Afpacket Buffersize in MB
--afpacketBuffersizeMb=64

# BPF filter applied to the packet stream. If port is selected, the packets will not be defragged.
--filter=((ip and (ip == 6 or ip == 17)) or (ip6 and (ip6 == 17 or ip6 == 6 or ip6 == 44)))

# Use AFPacket for live captures. Supported on Linux 3.0+ only
--useAfpacket

# The PCAP capture does not contain ethernet frames
--noEthernetframe

# Deduplicate incoming packets, Only supported with --devName and --pcapFile. Experimental
--dedup

# Do not put the interface in promiscuous mode
--noPromiscuous

#
# Address of the clickhouse database to save the results
--clickhouseAddress=localhost:9000

# Username to connect to the clickhouse database
--clickhouseUsername=

# Password to connect to the clickhouse database
--clickhousePassword=

# Database to connect to the clickhouse database
--clickhouseDatabase=default

# Interval between sending results to ClickHouse
--clickhouseDelay=1s

# Debug Clickhouse connection
--clickhouseDebug

# Compress Clickhouse connection
--clickhouseCompress

# Use TLS for Clickhouse connection
--clickhouseSecure

# Save full packet query and response in JSON format.
--clickhouseSaveFullQuery

# What should be written to clickhouse. options:
#        0: Disable Output
#        1: Enable Output without any filters
#        2: Enable Output and apply skipdomains logic
#        3: Enable Output and apply allowdomains logic
#        4: Enable Output and apply both skip and allow domains logic
--clickhouseOutputType=0

# Minimum capacity of the cache array used to send data to clickhouse. Set close to the queries per second received to prevent allocations
--clickhouseBatchSize=100000

# Number of Clickhouse output Workers
--clickhouseWorkers=1

# Channel Size for each Clickhouse Worker
--clickhouseWorkerChannelSize=100000

#
# What should be written to elastic. options:
#        0: Disable Output
#        1: Enable Output without any filters
#        2: Enable Output and apply skipdomains logic
#        3: Enable Output and apply allowdomains logic
#        4: Enable Output and apply both skip and allow domains logic
--elasticOutputType=0

# elastic endpoint address, example: http://127.0.0.1:9200. Used if elasticOutputType is not none
--elasticOutputEndpoint=

# elastic index
--elasticOutputIndex=default

# Send data to Elastic in batch sizes
--elasticBatchSize=1000

# Interval between sending results to Elastic if Batch size is not filled
--elasticBatchDelay=1s

#
# What should be written to file. options:
#        0: Disable Output
#        1: Enable Output without any filters
#        2: Enable Output and apply skipdomains logic
#        3: Enable Output and apply allowdomains logic
#        4: Enable Output and apply both skip and allow domains logic
--fileOutputType=0

# Path to output file. Used if fileOutputType is not none
--fileOutputPath=

# Output format for file. options:json,csv, csv_no_header, gotemplate. note that the csv splits the datetime format into multiple fields
--fileOutputFormat=json

# Go Template to format the output as needed
--fileOutputGoTemplate={{.}}

#
# What should be written to influx. options:
#        0: Disable Output
#        1: Enable Output without any filters
#        2: Enable Output and apply skipdomains logic
#        3: Enable Output and apply allowdomains logic
#        4: Enable Output and apply both skip and allow domains logic
--influxOutputType=0

# influx Server address, example: http://localhost:8086. Used if influxOutputType is not none
--influxOutputServer=

# Influx Server Auth Token
--influxOutputToken=dnsmonster

# Influx Server Bucket
--influxOutputBucket=dnsmonster

# Influx Server Org
--influxOutputOrg=dnsmonster

# Minimum capacity of the cache array used to send data to Influx
--influxOutputWorkers=8

# Minimum capacity of the cache array used to send data to Influx
--influxBatchSize=1000

#
# What should be written to kafka. options:
#        0: Disable Output
#        1: Enable Output without any filters
#        2: Enable Output and apply skipdomains logic
#        3: Enable Output and apply allowdomains logic
#        4: Enable Output and apply both skip and allow domains logic
--kafkaOutputType=0

# kafka broker address(es), example: 127.0.0.1:9092. Used if kafkaOutputType is not none
--kafkaOutputBroker=

# Kafka topic for logging
--kafkaOutputTopic=dnsmonster

# Minimum capacity of the cache array used to send data to Kafka
--kafkaBatchSize=1000

# Kafka connection timeout in seconds
--kafkaTimeout=3

# Interval between sending results to Kafka if Batch size is not filled
--kafkaBatchDelay=1s

# Compress Kafka connection
--kafkaCompress

# Use TLS for kafka connection
--kafkaSecure

# Path of CA certificate that signs Kafka broker certificate
--kafkaCACertificatePath=

# Path of TLS certificate to present to broker
--kafkaTLSCertificatePath=

# Path of TLS certificate key
--kafkaTLSKeyPath=

#
# What should be written to Microsoft Sentinel. options:
#        0: Disable Output
#        1: Enable Output without any filters
#        2: Enable Output and apply skipdomains logic
#        3: Enable Output and apply allowdomains logic
#        4: Enable Output and apply both skip and allow domains logic
--sentinelOutputType=0

# Sentinel Shared Key, either the primary or secondary, can be found in Agents Management page under Log Analytics workspace
--sentinelOutputSharedKey=

# Sentinel Customer Id. can be found in Agents Management page under Log Analytics workspace
--sentinelOutputCustomerId=

# Sentinel Output LogType
--sentinelOutputLogType=dnsmonster

# Sentinel Output Proxy in URI format
--sentinelOutputProxy=

# Sentinel Batch Size
--sentinelBatchSize=100

# Interval between sending results to Sentinel if Batch size is not filled
--sentinelBatchDelay=1s

#
# What should be written to HEC. options:
#        0: Disable Output
#        1: Enable Output without any filters
#        2: Enable Output and apply skipdomains logic
#        3: Enable Output and apply allowdomains logic
#        4: Enable Output and apply both skip and allow domains logic
--splunkOutputType=0

# splunk endpoint address, example: http://127.0.0.1:8088. Used if splunkOutputType is not none, can be specified multiple times for load balanace and HA
--splunkOutputEndpoint=

# Splunk HEC Token
--splunkOutputToken=00000000-0000-0000-0000-000000000000

# Splunk Output Index
--splunkOutputIndex=temp

# Splunk Output Proxy in URI format
--splunkOutputProxy=

# Splunk Output Source
--splunkOutputSource=dnsmonster

# Splunk Output Sourcetype
--splunkOutputSourceType=json

# Send data to HEC in batch sizes
--splunkBatchSize=1000

# Interval between sending results to HEC if Batch size is not filled
--splunkBatchDelay=1s

#
# What should be written to stdout. options:
#        0: Disable Output
#        1: Enable Output without any filters
#        2: Enable Output and apply skipdomains logic
#        3: Enable Output and apply allowdomains logic
#        4: Enable Output and apply both skip and allow domains logic
--stdoutOutputType=0

# Output format for stdout. options:json,csv, csv_no_header, gotemplate. note that the csv splits the datetime format into multiple fields
--stdoutOutputFormat=json

# Go Template to format the output as needed
--stdoutOutputGoTemplate={{.}}

# Number of workers
--stdoutOutputWorkerCount=8

#
# What should be written to Syslog server. options:
#        0: Disable Output
#        1: Enable Output without any filters
#        2: Enable Output and apply skipdomains logic
#        3: Enable Output and apply allowdomains logic
#        4: Enable Output and apply both skip and allow domains logic
--syslogOutputType=0

# Syslog endpoint address, example: udp://127.0.0.1:514, tcp://127.0.0.1:514. Used if syslogOutputType is not none
--syslogOutputEndpoint=udp://127.0.0.1:514

#
# Garbage Collection interval for tcp assembly and ip defragmentation
--gcTime=10s

# Duration to calculate interface stats
--captureStatsDelay=1s

# Mask IPv4s by bits. 32 means all the bits of IP is saved in DB
--maskSize4=32

# Mask IPv6s by bits. 32 means all the bits of IP is saved in DB
--maskSize6=128

# Name of the server used to index the metrics.
--serverName=default

# Set debug Log format
--logFormat=text

# Set debug Log level, 0:PANIC, 1:ERROR, 2:WARN, 3:INFO, 4:DEBUG
--logLevel=3

# Size of the result processor channel size
--resultChannelSize=100000

# write cpu profile to file
--cpuprofile=

# write memory profile to file
--memprofile=

# GOMAXPROCS variable
--gomaxprocs=-1

# Limit of packets logged to clickhouse every iteration. Default 0 (disabled)
--packetLimit=0

# Skip outputing domains matching items in the CSV file path. Can accept a URL (http:// or https://) or path
--skipDomainsFile=

# Hot-Reload skipDomainsFile interval
--skipDomainsRefreshInterval=1m0s

# Allow Domains logic input file. Can accept a URL (http:// or https://) or path
--allowDomainsFile=

# Hot-Reload allowDomainsFile file interval
--allowDomainsRefreshInterval=1m0s

# Skip TLS verification when making HTTPS connections
--skipTLSVerification

#
# Metric Endpoint Service
--metricEndpointType=

# Statsd endpoint. Example: 127.0.0.1:8125
--metricStatsdAgent=

# Prometheus Registry endpoint. Example: http://0.0.0.0:2112/metric
--metricPrometheusEndpoint=

# Interval between sending results to Metric Endpoint
--metricFlushInterval=10s
环境变量

所有标志也可以通过环境变量设置。请记住,每个参数的名称都是大写的,所有变量的前缀都是 "DNSMONSTER"。

例子:

$ export DNSMONSTER_PORT=53
$ export DNSMONSTER_DEVNAME=lo
$ sudo -E dnsmonster
什么是保留策略

ClickHouse 表的默认保留策略设置为 30 天。你可以通过使用 ./autobuild.sh 构建容器来更改。由于 ClickHouse 没有内部时间戳,因此 TTL 将查看 pcap 文件中传入数据包的日期。因此,在导入旧的 pcap 文件时,ClickHouse 可能会在写入数据时自动开始删除数据,你不会在 Grafana 中看到任何实际数据。要解决此问题,可以将 TTL 更改为比 PCAP 文件中最早的数据包早一天。

注意:要随时更改 TTL,需要使用 clickhouse 客户端直接连接到 Clickhouse 服务器并运行以下 SQL 语句 (本示例将其从 30 天更改为 90 天):

ALTER TABLE DNS_LOG MODIFY TTL DnsDate + INTERVAL 90 DAY;`
注意:上述命令仅更改原始 DNS 日志数据的 TTL,这是你的大部分容量消耗。要确保为每个聚合表调整 TTL,可以运行以下命令:

ALTER TABLE DNS_LOG MODIFY TTL DnsDate + INTERVAL 90 DAY;
ALTER TABLE `.inner.DNS_DOMAIN_COUNT` MODIFY TTL DnsDate + INTERVAL 90 DAY;
ALTER TABLE `.inner.DNS_DOMAIN_UNIQUE` MODIFY TTL DnsDate + INTERVAL 90 DAY;
ALTER TABLE `.inner.DNS_PROTOCOL` MODIFY TTL DnsDate + INTERVAL 90 DAY;
ALTER TABLE `.inner.DNS_GENERAL_AGGREGATIONS` MODIFY TTL DnsDate + INTERVAL 90 DAY;
ALTER TABLE `.inner.DNS_EDNS` MODIFY TTL DnsDate + INTERVAL 90 DAY;
ALTER TABLE `.inner.DNS_OPCODE` MODIFY TTL DnsDate + INTERVAL 90 DAY;
ALTER TABLE `.inner.DNS_TYPE` MODIFY TTL DnsDate + INTERVAL 90 DAY;
ALTER TABLE `.inner.DNS_CLASS` MODIFY TTL DnsDate + INTERVAL 90 DAY;
ALTER TABLE `.inner.DNS_RESPONSECODE` MODIFY TTL DnsDate + INTERVAL 90 DAY;
ALTER TABLE `.inner.DNS_SRCIP_MASK` MODIFY TTL DnsDate + INTERVAL 90 DAY;
更新:在最新版本的 clickhouse 中,.inner 表的名称与相应的聚合视图不同。要修改 TTL,必须使用 SHOW TABLES 找到 UUID 格式的表名,并使用这些 UUID 重复 ALTER 命令。

采样和跳过
预处理抽样

dnsmonster 支持使用一个简单的参数对数据包进行预处理采样:sampleRatio。此参数接受 "比率" 值,例如 1:2,1:2 表示每到达 2 个数据包,只处理其中一个 (50% 采样)。请注意,此采样发生在 bpf 过滤器之后而不是之前。如果在跟上 DNS 流量时遇到问题,可以将其设置为 2:10,这意味着 20% 通过 bpf 过滤器的数据包将由 dnsmonster 处理。

跳过域

dnsmonster 支持后处理域跳过列表,以避免将重复数据写入数据库。域跳过列表是一个 csv 格式的文件,只有两列:一个字符串和该特定字符串的逻辑。dnsmonster 支持三种逻辑:前缀、后缀和 fqdn。前缀和后缀意味着只有以提到的字符串开头/结尾的域将被跳过以写入数据库。请注意,由于该过程是针对 DNS 问题完成的,因此字符串很可能有一个尾随。这也需要包含在跳过列表行中 (请查看 skipdomains.csv.sample 以获得更好的视图)。还可以进行完整的 FQDN 匹配,以避免将高噪声 FQDN 写入数据库。

允许域

dnsmonster 具有 allowdomains 的概念,如果 DNS 流量中存在某些 FQDN、前缀或后缀,它有助于构建检测。鉴于 dnsmonster 支持多个输出流,每个输出流具有不同的逻辑,因此可以在 ClickHouse 中收集所有 DNS 流量,但仅在标准输出或同一 dnsmonster 实例中的文件中收集允许列表域。

SAMPLE in clickhouse SELECT queries

默认情况下,tables.sql (DNS_LOG) 文件创建的主表能够根据需要对结果进行抽样,因为每个 DNS 问题都有一个与之关联的唯一 UUID。有关 Clickhouse 中 SAMPLE 查询的更多信息,请查看此 文档。

支持的输入

[*]通过 libpcap/ncap 实时捕获 (支持以太网和原始 IP)
[*]通过 afpacket 实时捕获 (支持以太网和原始 IP)
[*]Dnstap 套接字 (监听模式)
[*]Pcap 文件 (以太网帧)

注意:如果你的 pcap 文件被 Linux 的元接口之一 (例如 tcpdump -i any) 捕获,则 dnsmonster 将无法从中读取以太网帧,因为它不存在。可以使用 tcprewrite 之类的工具将 pcap 文件转换为以太网。

支持的输出

[*]Clickhouse
[*]Kafka
[*]Elasticsearch
[*]Splunk HEC
[*]Stdout
[*]File
[*]Syslog (Linux Only)
[*]Microsoft Sentinel
[*]InfluxDB

路线图

[*]SELECT 查询的下采样功能
[*]添加对 afpacket 的支持
[*]配置文件选项
[*]从索引中排除 FQDN
[*]FQDN 白名单仅记录某些域
[*]dnstap 支持
[*]Kafka 输出支持
[*]能够从 HTTP(S) 端点加载 allowDomains 和 skipDomains
[*]Elasticsearch 输出支持
[*]Splunk HEC 输出支持
[*]系统日志输出支持
[*]Grafana 仪表板性能改进
[*]移除 libpcap 依赖并移至 pcapgo 进行数据包处理
[*]准备好用于机器学习和异常检测的数据
[*]重复数据删除支持 (WIP)
[*]Clickhouse 的可选 SSL
[*]statsd 和 Prometheus 支持
[*]Splunk 仪表板
[*]Kibana 仪表板
[*]Clickhouse 版本控制和迁移工具
[*]测试和基准

页: [1]
查看完整版本: Passive DNS 捕获和监视工具包 —— Dnsmonster