Python解析Pcap包类源码学习

摘要:
可用于PCAP包分析的软件,如Kolai和Wireshark,是非常有用的分析软件。在寻找Pcap分析的编程代码时,发现许多领导者都编写了Python脚本来帮助分析Pcap,而且还存在以接口形式提取和显示Pcap信息的框架。本文对使用Python中的Scapy库提取协议五元组信息进行了学习总结,这在实战中没有使用,因为发现PCAP数据包读取、解包和数据包检查的速度太慢。0x2.参考库Python解析pcap包的常用库包括Scapy、dpkt、Pyshark等。参考源代码和工具如下https://github.com/thepacketgeek/cloud-pcaphttps://github.com/madpowah/ForensicPCAPhttps://github.com/le4f/pcap-肛门yzerhttps://github.com/HatBoy/Pcap-Analyzerhttps://github.com/caesar0301/awesome-pcaptoolshttps://asecuritysite.com/forensics/pcaphttps://github.com/DanMcInerney/net-Creds提供了一个用于解析Pcap包的网站https://packettotal.com/ 、 https://www.capanalysis.net/ca/ 、 https://asecuritysite.com/forensics/pcap?

0x1、前言

​ 在现场取证遇到分析流量包的情况会比较少,虽然流量类设备原理是把数据都抓出来进行解析,很大一定程度上已经把人可以做的事情交给了机器自动完成。

​ 可用于PCAP包分析的软件比如科来,Wireshark都是很好用的分析软件,找Pcap解析的编程类代码时发现已经有很多大佬写过Python脚本辅助解析Pcap,也有提取将Pcap信息以界面形式展示出来框架。

​ 本文对利用Python里的Scapy库提取协议五元组信息进行学习性总结,没有用于实战,因为实践过程中发现PCAP读包解包查包速度太慢了。

0x2、参考库

Python解析pcap包的常见库有Scapy、dpkt、Pyshark等。可以参考的源码,工具如下

https://github.com/thepacketgeek/cloud-pcap
https://github.com/madpowah/ForensicPCAP
https://github.com/le4f/pcap-analyzer
https://github.com/HatBoy/Pcap-Analyzer
https://github.com/caesar0301/awesome-pcaptools
https://asecuritysite.com/forensics/pcap
https://github.com/DanMcInerney/net-creds

提供解析Pcap包服务的网站https://packettotal.com/、https://www.capanalysis.net/ca/、https://asecuritysite.com/forensics/pcap?infile=smtp.pcap&infile=smtp.pcap、https://app.any.run

0x3、邮件协议

提取发件人的邮箱,就要熟悉SMTP的几个端口进行识别。

25  端口为SMTP(Simple Mail Transfer Protocol,简单邮件传输协议)服务所开放的,是用于发送邮件。
110端口是为POP3(邮件协议3)服务开放的,POP2、POP3都是主要用于接收邮件的,目前POP3使用的比较多,许多服务器都同时支持POP2和POP3。
143端口主要是用于“Internet Message AccessProtocol”v2(Internet消息访问协议,简称IMAP),和POP3一样,是用于电子邮件的接收的协议。
465 端口是SSL/TLS通讯协议的 内容一开始就被保护起来了 是看不到原文的。
465 端口(SMTPS):465端口是为SMTPS(SMTP-over-SSL)协议服务开放的,这是SMTP协议基于SSL安全协议之上的一种变种协议,它继承了SSL安全协议的非对称加密的高度安全可靠性,可防止邮件泄露。
587 端口是STARTTLS协议的 属于TLS通讯协议 只是他是在STARTTLS命令执行后才对之后的原文进行保护的。

SMTP常用命令语法

在Wireshark中SMTP协议数据是在建立TCP三次握手后会出现的,常用的命令如下。

SMTP命令不区分大小写,但参数区分大小写,有关这方面的详细说明请参考RFC821。
HELO <domain> <CRLF>。向服务器标识用户身份发送者能欺骗,说谎,但一般情况下服务器都能检测到。
MAIL FROM: <reverse-path> <CRLF>。<reverse-path>为发送者地址,此命令用来初始化邮件传输,即用来对所有的状态和缓冲区进行初始化。
RCPT TO:<forward-path> <CRLF>。 <forward-path>用来标志邮件接收者的地址,常用在MAIL FROM后,可以有多个RCPT TO。
DATA <CRLF>。将之后的数据作为数据发送,以<CRLF>.<CRLF>标志数据的结尾。
REST <CRLF>。重置会话,当前传输被取消。
NOOP <CRLF>。要求服务器返回OK应答,一般用作测试。
QUIT <CRLF>。结束会话。
VRFY <string> <CRLF>。验证指定的邮箱是否存在,由于安全方面的原因,服务器大多禁止此命令。
EXPN <string> <CRLF>。验证给定的邮箱列表是否存在,由于安全方面的原因,服务器大多禁止此命令。
HELP <CRLF>。查询服务器支持什么命令。

代码

forensicPCAP 可以根据解析PCAP包,然后利用CMD模块循环交互界面,输入命令调出对应函数。核心原理为

基于Scapy找到源端口或目的端口为110、143端口的数据记录保存作为邮件数据。

关键代码代码如下:

self.pcap是构造函数self.pcap = rdpcap(namefile)提前读取,然后enumerate()遍历。指定目标端口和源端口是110、143的数据筛选出来。

	def do_mail(self, arg, opts=None):
		"""Print the number of mail's requests and store its
Usage :
- mail"""
		sys.stdout.write(bcolors.TXT + "## Searching mail's request ... ")
		sys.stdout.flush()
		con = []
		mailpkts = []
		for i,packet in enumerate(self.pcap):
		    # TCP的包
			if TCP in packet:
			    # 获取源端口或目的端口为110、143端口的数据记录
				if packet.getlayer('TCP').dport == 110 or packet.getlayer('TCP').sport == 110 or packet.getlayer('TCP').dport == 143 or packet.getlayer('TCP').sport == 143 :
					if packet.getlayer('TCP').flags == 2:
						con.append(i)
					mailpkts.append(packet)
		sys.stdout.write("OK.
")
		print "## Result : Mail's request : " + str(len(con))  
		sys.stdout.write(bcolors.TXT + "## Saving mails ... ")
		sys.stdout.flush()
		res = ""
		for packet in mailpkts:
				if packet.getlayer('TCP').flags == 24:
					res = res + packet.getlayer('Raw').load
					sys.stdout.write(".")
					sys.stdout.flush()
		sys.stdout.write("OK
")
		sys.stdout.flush()			
		self.cmd = "mail"			
		self.last = res

0x4、DNS解析

forensicPCAP 解析DNS部分代码。bcolors.TXT是设定了一个字体显示的颜色,res = packet.getlayer('DNS').qd.qname是获取DNS域名解析记录。

############# do_dns() ###########
    def do_dns(self, arg, opts=None):
        """Print all DNS requests in the PCAP file
Usage :
- dns"""
        sys.stdout.write(bcolors.TXT + "## Listing all DNS requests ...")
        sys.stdout.flush()
        dns = []
        dns.append([])
        
        # 枚举PCAP包的数据
        for i,packet in enumerate(self.pcap):
            if DNS in packet:
                # 获取DNS域名解析记录
                res = packet.getlayer('DNS').qd.qname
                if res[len(res) - 1] == '.':
                    res = res[:-1]
                # 保存域名
                dns.append([i, res])

        sys.stdout.write("OK.
")
        # 统计DNS数目
        print bcolors.TXT + "## Result : " + str(len(dns) - 1) + " DNS request(s)" + bcolors.ENDC
        self.last = dns
        self.cmd = "dns"

pcap-analyzer核心的代码是借鉴了forensicPCAP,获取解析为DNS的请求记录显示的时候会统计出次数最多的IP的前十名。

代码

def get_dns(file):
    dns = []
    # 打开PCAP文件
    pcap = rdpcap(UPLOAD_FOLDER+file)
    for packet in pcap:
        if DNS in packet:
                # 核心代码
                res = packet.getlayer('DNS').qd.qname
                if res[len(res) - 1] == '.':
                    res = res[:-1]
                dns.append(res)
    # 统计DNS协议,出现次数最多的IP前十名            
    dns = Counter(dns).most_common(10)

0x5、密码信息提取

net-creds 以Scapy为基础,解析PCAP中含有密码信息的一个脚本。

代码

主要代码由other_parser()函数实现,分割每个包中的HTTP等内容,然后搜索身份验证相关的关键字筛选出账户、密码。

def other_parser(src_ip_port, dst_ip_port, full_load, ack, seq, pkt, verbose):
    '''
    Pull out pertinent info from the parsed HTTP packet data
    '''
    user_passwd = None
    http_url_req = None
    method = None
    http_methods = ['GET ', 'POST ', 'CONNECT ', 'TRACE ', 'TRACK ', 'PUT ', 'DELETE ', 'HEAD ']
    http_line, header_lines, body = parse_http_load(full_load, http_methods)
    headers = headers_to_dict(header_lines)
    if 'host' in headers:
        host = headers['host']
    else:
        host = ''

    if http_line != None:
        method, path = parse_http_line(http_line, http_methods)
        http_url_req = get_http_url(method, host, path, headers)
        if http_url_req != None:
            if verbose == False:
                if len(http_url_req) > 98:
                    http_url_req = http_url_req[:99] + '...'
            printer(src_ip_port, None, http_url_req)

    # Print search terms
    searched = get_http_searches(http_url_req, body, host)
    if searched:
        printer(src_ip_port, dst_ip_port, searched)

    # Print user/pwds
    if body != '':
        user_passwd = get_login_pass(body)
        if user_passwd != None:
            try:
                http_user = user_passwd[0].decode('utf8')
                http_pass = user_passwd[1].decode('utf8')
                # Set a limit on how long they can be prevent false+
                if len(http_user) > 75 or len(http_pass) > 75:
                    return
                user_msg = 'HTTP username: %s' % http_user
                printer(src_ip_port, dst_ip_port, user_msg)
                pass_msg = 'HTTP password: %s' % http_pass
                printer(src_ip_port, dst_ip_port, pass_msg)
            except UnicodeDecodeError:
                pass

    # Print POST loads
    # ocsp is a common SSL post load that's never interesting
    if method == 'POST' and 'ocsp.' not in host:
        try:
            if verbose == False and len(body) > 99:
                # If it can't decode to utf8 we're probably not interested in it
                msg = 'POST load: %s...' % body[:99].encode('utf8')
            else:
                msg = 'POST load: %s' % body.encode('utf8')
            printer(src_ip_port, None, msg)
        except UnicodeDecodeError:
            pass

    # Kerberos over TCP
    decoded = Decode_Ip_Packet(str(pkt)[14:])
    kerb_hash = ParseMSKerbv5TCP(decoded['data'][20:])
    if kerb_hash:
        printer(src_ip_port, dst_ip_port, kerb_hash)

    # Non-NETNTLM NTLM hashes (MSSQL, DCE-RPC,SMBv1/2,LDAP, MSSQL)
    NTLMSSP2 = re.search(NTLMSSP2_re, full_load, re.DOTALL)
    NTLMSSP3 = re.search(NTLMSSP3_re, full_load, re.DOTALL)
    if NTLMSSP2:
        parse_ntlm_chal(NTLMSSP2.group(), ack)
    if NTLMSSP3:
        ntlm_resp_found = parse_ntlm_resp(NTLMSSP3.group(), seq)
        if ntlm_resp_found != None:
            printer(src_ip_port, dst_ip_port, ntlm_resp_found)

    # Look for authentication headers
    if len(headers) == 0:
        authenticate_header = None
        authorization_header = None
    for header in headers:
        authenticate_header = re.match(authenticate_re, header)
        authorization_header = re.match(authorization_re, header)
        if authenticate_header or authorization_header:
            break

    if authorization_header or authenticate_header:
        # NETNTLM
        netntlm_found = parse_netntlm(authenticate_header, authorization_header, headers, ack, seq)
        if netntlm_found != None:
            printer(src_ip_port, dst_ip_port, netntlm_found)

        # Basic Auth
        parse_basic_auth(src_ip_port, dst_ip_port, headers, authorization_header)

关键字列表:

# Regexs
authenticate_re = '(www-|proxy-)?authenticate'
authorization_re = '(www-|proxy-)?authorization'
ftp_user_re = r'USER (.+)
'
ftp_pw_re = r'PASS (.+)
'
irc_user_re = r'NICK (.+?)((
)?
|s)'
irc_pw_re = r'NS IDENTIFY (.+)'
irc_pw_re2 = 'nickserv :identify (.+)'
mail_auth_re = '(d+ )?(auth|authenticate) (login|plain)'
mail_auth_re1 =  '(d+ )?login '
NTLMSSP2_re = 'NTLMSSPx00x02x00x00x00.+'
NTLMSSP3_re = 'NTLMSSPx00x03x00x00x00.+'
# Prone to false+ but prefer that to false-
http_search_re = '((search|query|&q|?q|search?p|searchterm|keywords|keyword|command|terms|keys|question|kwd|searchPhrase)=([^&][^&]*))'

0x6、提取数据需要关注的元素

  • 源IP、目的IP、源端口、目的端口、协议、数据包大小
关联出受害者,攻击者控制的跳板
  • 协议:HTTP、FTP、邮件协议
- 先查看HTTP、HTTPS类,然后查看数据包长度。
- 提取相关IP信息。
- 判断是否是木马远控、密码账户、可疑IP
  • 账户密码字段
- 渗透测试,嗅探回来的数据包分析含有账户密码信息
- 暴力破解IP

0x7、测试dpkt解析pcap

本想借助dpkt解析mail、dns、http来辅助分析pcap包进行分析,查阅资料学习却发现并不如使用scapy那么方便。

dpkt是一个python模块,可以对简单的数据包创建/解析,以及基本TCP / IP协议的解析,速度很快。

dpkt 手册

https://dpkt.readthedocs.io/en/latest/
dpkt 下载

https://pypi.org/project/dpkt/

看官方手册发现DPKT是读取每个pcap包里的内容,用isinstance判断是不是有IP的包,再判断是属于哪个协议,对应的协议已经封装好API如果发现可以匹配某个协议API就输出来相关值。

想要扩展这个源码还需要去学习一下协议相关的字段含义。

API调用:

https://dpkt.readthedocs.io/en/latest/api/api_auto.html#module-dpkt.qq

在手册中找到了在Github中部分API的示例代码,具备参考价值。

https://github.com/jeffsilverm/dpkt_doc

手册例子

以下代码是手册中的例子,通过查询发现inet_pton无法直接使用,按照网络上的解决方法修改了一下。

打印数据包

使用DPKT读取pcap文件并打印出数据包的内容。打印出以太网帧和IP数据包中的字段。

python2测试代码:

#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os


def mac_addr(address):
    """Convert a MAC address to a readable/printable string

       Args:
           address (str): a MAC address in hex form (e.g. 'x01x02x03x04x05x06')
       Returns:
           str: Printable/readable MAC address
    """
    return ':'.join('%02x' % compat_ord(b) for b in address)


class sockaddr(ctypes.Structure):
    _fields_ = [("sa_family", ctypes.c_short),
                ("__pad1", ctypes.c_ushort),
                ("ipv4_addr", ctypes.c_byte * 4),
                ("ipv6_addr", ctypes.c_byte * 16),
                ("__pad2", ctypes.c_ulong)]

if hasattr(ctypes, 'windll'):
    WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
    WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
    def not_windows():
        raise SystemError(
            "Invalid platform. ctypes.windll must be available."
        )
    WSAStringToAddressA = not_windows
    WSAAddressToStringA = not_windows


def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')


def inet_ntop(address_family, packed_ip):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))
    ip_string = ctypes.create_string_buffer(128)
    ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))

    if address_family == socket.AF_INET:
        if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
    elif address_family == socket.AF_INET6:
        if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
    else:
        raise socket.error('unknown address family')

    if WSAAddressToStringA(
            ctypes.byref(addr),
            addr_size,
            None,
            ip_string,
            ctypes.byref(ip_string_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    return ip_string[:ip_string_size.value - 1]

# Adding our two functions to the socket library
if os.name == 'nt':
    socket.inet_pton = inet_pton
    socket.inet_ntop = inet_ntop

def inet_to_str(inet):
    return socket.inet_ntop(socket.AF_INET, inet)

def print_packets(pcap):
    """Print out information about each packet in a pcap

       Args:
           pcap: dpkt pcap reader object (dpkt.pcap.Reader)
    """
    # packet num count
    r_num = 0
    # For each packet in the pcap process the contents
    for timestamp, buf in pcap:
        r_num=r_num+1
        print ('packet num count :' , r_num )
        # Print out the timestamp in UTC
        print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))

        # Unpack the Ethernet frame (mac src/dst, ethertype)
        eth = dpkt.ethernet.Ethernet(buf)
        print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)

        # Make sure the Ethernet data contains an IP packet
        if not isinstance(eth.data, dpkt.ip.IP):
            print('Non IP Packet type not supported %s
' % eth.data.__class__.__name__)
            continue

        # Now unpack the data within the Ethernet frame (the IP packet)
        # Pulling out src, dst, length, fragment info, TTL, and Protocol
        ip = eth.data

        # Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
        do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
        more_fragments = bool(ip.off & dpkt.ip.IP_MF)
        fragment_offset = ip.off & dpkt.ip.IP_OFFMASK

        # Print out the info
        print('IP: %s -> %s   (len=%d ttl=%d DF=%d MF=%d offset=%d)
' % 
              (inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))



def test():
    """Open up a test pcap file and print out the packets"""
    with open('pcap222.pcap', 'rb') as f:
        pcap = dpkt.pcap.Reader(f)
        print_packets(pcap)

if __name__ == '__main__':
    test()

输出:

('packet num count :', 4474)
('Timestamp: ', '2017-08-01 03:55:03.314832')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 211.90.25.31   (len=52 ttl=64 DF=1 MF=0 offset=0)

('packet num count :', 4475)
('Timestamp: ', '2017-08-01 03:55:03.485679')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 180.97.33.12   (len=114 ttl=64 DF=0 MF=0 offset=0)

('packet num count :', 4476)
('Timestamp: ', '2017-08-01 03:55:03.486141')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 119.75.222.122   (len=52 ttl=64 DF=1 MF=0 offset=0)

打印ICMP

检查ICMP数据包并显示ICMP内容。

#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os


def mac_addr(address):
    """Convert a MAC address to a readable/printable string

       Args:
           address (str): a MAC address in hex form (e.g. 'x01x02x03x04x05x06')
       Returns:
           str: Printable/readable MAC address
    """
    return ':'.join('%02x' % compat_ord(b) for b in address)


class sockaddr(ctypes.Structure):
    _fields_ = [("sa_family", ctypes.c_short),
                ("__pad1", ctypes.c_ushort),
                ("ipv4_addr", ctypes.c_byte * 4),
                ("ipv6_addr", ctypes.c_byte * 16),
                ("__pad2", ctypes.c_ulong)]

if hasattr(ctypes, 'windll'):
    WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
    WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
    def not_windows():
        raise SystemError(
            "Invalid platform. ctypes.windll must be available."
        )
    WSAStringToAddressA = not_windows
    WSAAddressToStringA = not_windows


def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')


def inet_ntop(address_family, packed_ip):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))
    ip_string = ctypes.create_string_buffer(128)
    ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))

    if address_family == socket.AF_INET:
        if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
    elif address_family == socket.AF_INET6:
        if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
    else:
        raise socket.error('unknown address family')

    if WSAAddressToStringA(
            ctypes.byref(addr),
            addr_size,
            None,
            ip_string,
            ctypes.byref(ip_string_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    return ip_string[:ip_string_size.value - 1]

# Adding our two functions to the socket library
if os.name == 'nt':
    socket.inet_pton = inet_pton
    socket.inet_ntop = inet_ntop

def inet_to_str(inet):
    return socket.inet_ntop(socket.AF_INET, inet)

def print_icmp(pcap):
    """Print out information about each packet in a pcap

       Args:
           pcap: dpkt pcap reader object (dpkt.pcap.Reader)
    """
    # packet num count
    r_num = 0
    # For each packet in the pcap process the contents
    for timestamp, buf in pcap:
        r_num=r_num+1
        print ('packet num count :' , r_num )
        # Unpack the Ethernet frame (mac src/dst, ethertype)
        eth = dpkt.ethernet.Ethernet(buf)

        # Make sure the Ethernet data contains an IP packet
        if not isinstance(eth.data, dpkt.ip.IP):
            print('Non IP Packet type not supported %s
' % eth.data.__class__.__name__)
            continue

        # Now grab the data within the Ethernet frame (the IP packet)
        ip = eth.data

        # Now check if this is an ICMP packet
        if isinstance(ip.data, dpkt.icmp.ICMP):
            icmp = ip.data

            # Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
            do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
            more_fragments = bool(ip.off & dpkt.ip.IP_MF)
            fragment_offset = ip.off & dpkt.ip.IP_OFFMASK

            # Print out the info
            print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))
            print( 'Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
            print( 'IP: %s -> %s   (len=%d ttl=%d DF=%d MF=%d offset=%d)' % 
                  (inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))
            print('ICMP: type:%d code:%d checksum:%d data: %s
' % (icmp.type, icmp.code, icmp.sum, repr(icmp.data)))



def test():
    """Open up a test pcap file and print out the packets"""
    with open('pcap222.pcap', 'rb') as f:
        pcap = dpkt.pcap.Reader(f)
        print_icmp(pcap)



if __name__ == '__main__':
    test()

输出:

('packet num count :', 377)
('Timestamp: ', '2017-08-01 03:45:56.403640')
('Ethernet Frame: ', 'ec:88:8f:86:14:5c', '9c:5c:8e:76:bf:24', 2048)
IP: 202.118.168.73 -> 192.168.1.103   (len=56 ttl=253 DF=0 MF=0 offset=0)
ICMP: type:3 code:13 checksum:52074 data: Unreach(data=IP(len=28, id=2556, off=16384, ttl=61, p=6, sum=36831, src='http://t.zoukankan.com/xc0xa8x01g', dst='xcalx17q', opts='', data='nxb1x00Px85)=]'))

打印HTTP请求

#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os


def mac_addr(address):
    """Convert a MAC address to a readable/printable string

       Args:
           address (str): a MAC address in hex form (e.g. 'x01x02x03x04x05x06')
       Returns:
           str: Printable/readable MAC address
    """
    return ':'.join('%02x' % compat_ord(b) for b in address)


class sockaddr(ctypes.Structure):
    _fields_ = [("sa_family", ctypes.c_short),
                ("__pad1", ctypes.c_ushort),
                ("ipv4_addr", ctypes.c_byte * 4),
                ("ipv6_addr", ctypes.c_byte * 16),
                ("__pad2", ctypes.c_ulong)]

if hasattr(ctypes, 'windll'):
    WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
    WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
    def not_windows():
        raise SystemError(
            "Invalid platform. ctypes.windll must be available."
        )
    WSAStringToAddressA = not_windows
    WSAAddressToStringA = not_windows


def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')


def inet_ntop(address_family, packed_ip):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))
    ip_string = ctypes.create_string_buffer(128)
    ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))

    if address_family == socket.AF_INET:
        if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
    elif address_family == socket.AF_INET6:
        if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
    else:
        raise socket.error('unknown address family')

    if WSAAddressToStringA(
            ctypes.byref(addr),
            addr_size,
            None,
            ip_string,
            ctypes.byref(ip_string_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    return ip_string[:ip_string_size.value - 1]

# Adding our two functions to the socket library
if os.name == 'nt':
    socket.inet_pton = inet_pton
    socket.inet_ntop = inet_ntop

def inet_to_str(inet):
    return socket.inet_ntop(socket.AF_INET, inet)

def print_http_requests(pcap):
    """Print out information about each packet in a pcap

       Args:
           pcap: dpkt pcap reader object (dpkt.pcap.Reader)
    """
    # packet num count
    r_num = 0
    # For each packet in the pcap process the contents
    for timestamp, buf in pcap:
        r_num=r_num+1
        print ('packet num count :' , r_num )
        # Unpack the Ethernet frame (mac src/dst, ethertype)
        eth = dpkt.ethernet.Ethernet(buf)

        # Make sure the Ethernet data contains an IP packet
        if not isinstance(eth.data, dpkt.ip.IP):
            print('Non IP Packet type not supported %s
' % eth.data.__class__.__name__)
            continue

        # Now grab the data within the Ethernet frame (the IP packet)
        ip = eth.data

        # Check for TCP in the transport layer
        if isinstance(ip.data, dpkt.tcp.TCP):

            # Set the TCP data
            tcp = ip.data

            # Now see if we can parse the contents as a HTTP request
            try:
                request = dpkt.http.Request(tcp.data)
            except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
                continue

            # Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
            do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
            more_fragments = bool(ip.off & dpkt.ip.IP_MF)
            fragment_offset = ip.off & dpkt.ip.IP_OFFMASK

            # Print out the info
            print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))
            print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
            print('IP: %s -> %s   (len=%d ttl=%d DF=%d MF=%d offset=%d)' %
                  (inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))
            print('HTTP request: %s
' % repr(request))

            # Check for Header spanning acrossed TCP segments
            if not tcp.data.endswith(b'
'):
                print('
HEADER TRUNCATED! Reassemble TCP segments!
')




def test():
    """Open up a test pcap file and print out the packets"""
    with open('pcap222.pcap', 'rb') as f:
        pcap = dpkt.pcap.Reader(f)
        print_http_requests(pcap)

if __name__ == '__main__':
    test()

输出:

Timestamp:  2004-05-13 10:17:08.222534
Ethernet Frame:  00:00:01:00:00:00 fe:ff:20:00:01:00 2048
IP: 145.254.160.237 -> 65.208.228.223   (len=519 ttl=128 DF=1 MF=0 offset=0)
HTTP request: Request(body='', uri='/download.html', headers={'accept-language': 'en-us,en;q=0.5', 'accept-encoding': 'gzip,deflate', 'connection': 'keep-alive', 'keep-alive': '300', 'accept': 'text/xml,application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,image/jpeg,image/gif;q=0.2,*/*;q=0.1', 'user-agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'host': 'www.ethereal.com', 'referer': 'http://www.ethereal.com/development.html'}, version='1.1', data='', method='GET')

Timestamp:  2004-05-13 10:17:10.295515
Ethernet Frame:  00:00:01:00:00:00 fe:ff:20:00:01:00 2048
IP: 145.254.160.237 -> 216.239.59.99   (len=761 ttl=128 DF=1 MF=0 offset=0)
HTTP request: Request(body='', uri='/pagead/ads?client=ca-pub-2309191948673629&random=1084443430285&lmt=1082467020&format=468x60_as&output=html&url=http%3A%2F%2Fwww.ethereal.com%2Fdownload.html&color_bg=FFFFFF&color_text=333333&color_link=000000&color_url=666633&color_border=666633', headers={'accept-language': 'en-us,en;q=0.5', 'accept-encoding': 'gzip,deflate', 'connection': 'keep-alive', 'keep-alive': '300', 'accept': 'text/xml,application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,image/jpeg,image/gif;q=0.2,*/*;q=0.1', 'user-agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'host': 'pagead2.googlesyndication.com', 'referer': 'http://www.ethereal.com/download.html'}, version='1.1', data='', method='GET')

...

打印出以太网IP

594 MB的pcap解析速度是127秒。

# coding=utf-8
import dpkt
import socket
import time
import ctypes
import os
import datetime

# 测试dpkt获取IP运行时间
# 使用dpkt获取时间戳、源IP、目的IP

class sockaddr(ctypes.Structure):
    _fields_ = [("sa_family", ctypes.c_short),
                ("__pad1", ctypes.c_ushort),
                ("ipv4_addr", ctypes.c_byte * 4),
                ("ipv6_addr", ctypes.c_byte * 16),
                ("__pad2", ctypes.c_ulong)]

if hasattr(ctypes, 'windll'):
    WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
    WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
    def not_windows():
        raise SystemError(
            "Invalid platform. ctypes.windll must be available."
        )
    WSAStringToAddressA = not_windows
    WSAAddressToStringA = not_windows


def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')


def inet_ntop(address_family, packed_ip):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))
    ip_string = ctypes.create_string_buffer(128)
    ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))

    if address_family == socket.AF_INET:
        if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
    elif address_family == socket.AF_INET6:
        if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
    else:
        raise socket.error('unknown address family')

    if WSAAddressToStringA(
            ctypes.byref(addr),
            addr_size,
            None,
            ip_string,
            ctypes.byref(ip_string_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    return ip_string[:ip_string_size.value - 1]

# Adding our two functions to the socket library
if os.name == 'nt':
    socket.inet_pton = inet_pton
    socket.inet_ntop = inet_ntop

def inet_to_str(inet):
    return socket.inet_ntop(socket.AF_INET, inet)


def getip(pcap):

    Num = 0
    for timestamp, buf in pcap:
        eth = dpkt.ethernet.Ethernet(buf)

        # 对没有IP段的包过滤掉
        if eth.type != dpkt.ethernet.ETH_TYPE_IP:
            continue

        ip = eth.data
        ip_src = inet_to_str(ip.src)
        ip_dst = inet_to_str(ip.dst)
        # 打印时间戳,源->目标

        #print(ts + " " + ip_src + "-->" + ip_dst)
        Num= Num+1
        print ('{0}	time:{1}	src:{2}-->dst:{3} '.format(Num,timestamp,ip_src ,ip_dst))
        if eth.data.__class__.__name__ == 'IP':

            ip = '%d.%d.%d.%d' % tuple(map(ord, list(eth.data.dst)))

            if eth.data.data.__class__.__name__ == 'TCP':

                if eth.data.data.dport == 80:
                    print eth.data.data.data  # http 请求的数据

if __name__ == '__main__':
    starttime = datetime.datetime.now()
    f = open('pcap222.pcap', 'rb')  # 要以rb方式打开,用r方式打开会报错
    pcap = dpkt.pcap.Reader(f)
    getip(pcap)
    endtime = datetime.datetime.now()
    print ('time : {0} seconds '.format((endtime - starttime).seconds))

输出:

1290064	time:1501562988.75	src:113.142.85.151-->dst:192.168.1.103 
1290065	time:1501562988.75	src:192.168.1.103-->dst:113.142.85.151 

1290066	time:1501562988.75	src:192.168.1.103-->dst:113.142.85.151 

1290067	time:1501562988.75	src:113.142.85.151-->dst:192.168.1.103 
1290068	time:1501562988.75	src:192.168.1.103-->dst:113.142.85.151 

1290069	time:1501562988.76	src:192.168.1.103-->dst:113.142.85.151 

1290070	time:1501562988.76	src:122.228.91.14-->dst:192.168.1.103 
1290071	time:1501562988.76	src:192.168.1.103-->dst:113.142.85.151 

1290072	time:1501562988.76	src:113.142.85.151-->dst:192.168.1.103 
1290073	time:1501562988.76	src:192.168.1.103-->dst:113.142.85.151 

1290074	time:1501562988.76	src:192.168.1.103-->dst:113.142.85.151 
GET / HTTP/1.1
Accept: application/x-ms-application, image/jpeg, application/xaml+xml, image/gif, image/pjpeg, application/x-ms-xbap, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, */*
Accept-Language: zh-cn
User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Mac_PowerPC; en) Opera 9.24
Referer: -
Connection: Keep-Alive
Host: win7.shangshai-qibao.cn

0x7、参考

SMTP协议分析 https://yq.aliyun.com/wenji/262429

scapy 解析pcap文件总结 https://www.cnblogs.com/14061216chen/p/8093441.html

python之字符串格式化(format) https://www.cnblogs.com/benric/p/4965224.html

免责声明:文章转载自《Python解析Pcap包类源码学习》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇AutoMapper小结Asp.net Core 系列之--4.事务、日志及错误处理下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

redis产生随机数据

  由于需要研究redis cluster集群监控,需要产生随机数据,顾写此set和list随机数据生成代码。 直接贴代码 # coding: utf-8 # author Elephanyu from abc import abstractmethod from random import randint, choice from redisclust...

【转】mysql字符串函数

对于针对字符串位置的操作,第一个位置被标记为1(即:第一个字母索引为1)。 ASCII(str) 返回字符串str的 最左面字符的ASCII代码值。如果str是空字符串, 返回0。如果str是NULL,返回NULL。 mysql> select ASCII('2'); -> 50mysql> select ASCII(2...

Spring Boot 对日期转换时间戳或字符串

一、在Spring Boot 项目application.properties文件中添加 # 日期类型转换成时间戳返回 spring.jackson.serialization.write-dates-as-timestamps=true # 日期类型转换成字符串返回 #spring.jackson.date-format=yyyy-mm-dd HH:m...

MySQL的日期格式

MySQL的五种时间和日期类型 YEAR表示年份:字节数为1,取值范围为“1901——2155”DATE表示日期:字节数为4,取值范围为“1000-01-01——9999-12-31”TIME表示时间:字节数为3,取值范围为“-838:59:59——838:59:59”DATETIME和TIMESTAMP表示日期和时间DATETIME:字节数为8,取值范围...

屌炸天,Oracle 发布了一个全栈虚拟机 GraalVM,支持 Python!

前阵子,Oracle 发布了一个黑科技 “GraalVM”,号称是一个全新的通用全栈虚拟机,并具有高性能、跨语言交互等逆天特性,真有这么神奇? GraalVM 简介 GraalVM 是一个跨语言的通用虚拟机,不仅支持了 Java、Scala、Groovy、Kotlin 等基于 JVM 的语言,以及 C、C++ 等基于 LLVM 的语言,还支持其他像 Jav...

python向多个邮箱发邮件--注意接收是垃圾邮件

群发邮件注意:三处标红的地方 # -*- coding: UTF-8 -*- import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header from e...