python构造icmp echo请求和实现网络探测器功能代码分享
admin
2023-07-31 02:01:00
0

python发送icmp echo requesy请求

复制代码 代码如下:
import socket
import struct

def checksum(source_string):
    sum = 0
    countTo = (len(source_string)/2)*2
    count = 0
    while count        thisVal = ord(source_string[count + 1])*256 + ord(source_string[count])
        sum = sum + thisVal
        sum = sum & 0xffffffff
        count = count + 2
    if countTo        sum = sum + ord(source_string[len(source_string) – 1])
        sum = sum & 0xffffffff
    sum = (sum >> 16)  +  (sum & 0xffff)
    sum = sum + (sum >> 16)
    answer = ~sum
    answer = answer & 0xffff
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer

def ping(ip):
    s = socket.socket(socket.AF_INET, socket.SOCK_RAW, 1)
    packet = struct.pack(
            \”!BBHHH\”, 8, 0, 0, 0, 0
    )
    chksum=checksum(packet)
    packet = struct.pack(
            \”!BBHHH\”, 8, 0, chksum, 0, 0
    )
    s.sendto(packet, (ip, 1))

if __name__==\’__main__\’:
    ping(\’192.168.41.56\’)

扫描探测网络功能(网络探测器)

复制代码 代码如下:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

\’\’\’
探测网络主机存活。
\’\’\’

import os
import struct
import array
import time
import socket
import IPy
import threading

class SendPingThr(threading.Thread):
    \’\’\’
    发送ICMP请求报文的线程。

    参数:
        ipPool      — 可迭代的IP地址池
        icmpPacket  — 构造的icmp报文
        icmpSocket  — icmp套字接
        timeout     — 设置发送超时
    \’\’\’
    def __init__(self, ipPool, icmpPacket, icmpSocket, timeout=3):
        threading.Thread.__init__(self)
        self.Sock = icmpSocket
        self.ipPool = ipPool
        self.packet = icmpPacket
        self.timeout = timeout
        self.Sock.settimeout( timeout + 3 )

    def run(self):
        time.sleep(0.01)  #等待接收线程启动
        for ip in self.ipPool:
            try:
                self.Sock.sendto(self.packet, (ip, 0))
            except socket.timeout:
                break
        time.sleep(self.timeout)

class Nscan:
    \’\’\’
    参数:
        timeout    — Socket超时,默认3秒
        IPv6       — 是否是IPv6,默认为False
    \’\’\’
    def __init__(self, timeout=3, IPv6=False):
        self.timeout = timeout
        self.IPv6 = IPv6

        self.__data = struct.pack(\’d\’, time.time())   #用于ICMP报文的负荷字节(8bit)
        self.__id = os.getpid()   #构造ICMP报文的ID字段,无实际意义

    @property   #属性装饰器
    def __icmpSocket(self):
        \’\’\’创建ICMP Socket\’\’\’
        if not self.IPv6:
            Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname(\”icmp\”))
        else:
            Sock = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.getprotobyname(\”ipv6-icmp\”))
        return Sock

    def __inCksum(self, packet):
        \’\’\’ICMP 报文效验和计算方法\’\’\’
        if len(packet) & 1:
            packet = packet + \’\\0\’
        words = array.array(\’h\’, packet)
        sum = 0
        for word in words:
            sum += (word & 0xffff)
        sum = (sum >> 16) + (sum & 0xffff)
        sum = sum + (sum >> 16)

        return (~sum) & 0xffff

    @property
    def __icmpPacket(self):
        \’\’\’构造 ICMP 报文\’\’\’
        if not self.IPv6:
            header = struct.pack(\’bbHHh\’, 8, 0, 0, self.__id, 0) # TYPE、CODE、CHKSUM、ID、SEQ
        else:
            header = struct.pack(\’BbHHh\’, 128, 0, 0, self.__id, 0)

        packet = header + self.__data     # packet without checksum
        chkSum = self.__inCksum(packet) # make checksum

        if not self.IPv6:
            header = struct.pack(\’bbHHh\’, 8, 0, chkSum, self.__id, 0)
        else:
            header = struct.pack(\’BbHHh\’, 128, 0, chkSum, self.__id, 0)

        return header + self.__data   # packet *with* checksum

    def isUnIP(self, IP):
        \’\’\’判断IP是否是一个合法的单播地址\’\’\’
        IP = [int(x) for x in IP.split(\’.\’) if x.isdigit()]
        if len(IP) == 4:
            if (0 < IP[0] < 223 and IP[0] != 127 and IP[1] < 256 and IP[2] < 256 and 0 < IP[3] < 255):
                return True
        return False

    def makeIpPool(self, startIP, lastIP):
        \’\’\’生产 IP 地址池\’\’\’
        IPver = 6 if self.IPv6 else 4
        intIP = lambda ip: IPy.IP(ip).int()
        ipPool = {IPy.intToIp(ip, IPver) for ip in range(intIP(startIP), intIP(lastIP)+1)}

        return {ip for ip in ipPool if self.isUnIP(ip)}

    def mPing(self, ipPool):
        \’\’\’利用ICMP报文探测网络主机存活

        参数:
            ipPool  — 可迭代的IP地址池
        \’\’\’
        Sock = self.__icmpSocket
        Sock.settimeout(self.timeout)
        packet = self.__icmpPacket
        recvFroms = set()   #接收线程的来源IP地址容器

        sendThr = SendPingThr(ipPool, packet, Sock, self.timeout)
        sendThr.start()

        while True:
            try:
                recvFroms.add(Sock.recvfrom(1024)[1][0])
            except Exception:
                pass
            finally:
                if not sendThr.isAlive():
                    break
        return recvFroms & ipPool

if __name__==\’__main__\’:
    s = Nscan()
    ipPool = s.makeIpPool(\’192.168.0.1\’, \’192.168.0.254\’)
    print( s.mPing(ipPool) )

相关内容

热门资讯

Mobi、epub格式电子书如... 在wps里全局设置里有一个文件关联,打开,勾选电子书文件选项就可以了。
定时清理删除C:\Progra... C:\Program Files (x86)下面很多scoped_dir开头的文件夹 写个批处理 定...
scoped_dir32_70... 一台虚拟机C盘总是莫名奇妙的空间用完,导致很多软件没法再运行。经过仔细检查发现是C:\Program...
500 行 Python 代码... 语法分析器描述了一个句子的语法结构,用来帮助其他的应用进行推理。自然语言引入了很多意外的歧义,以我们...
小程序支付时提示:appid和... [Q]小程序支付时提示:appid和mch_id不匹配 [A]小程序和微信支付没有进行关联,访问“小...
pycparser 是一个用... `pycparser` 是一个用 Python 编写的 C 语言解析器。它可以用来解析 C 代码并构...
微信小程序使用slider实现... 众所周知哈,微信小程序里面的音频播放是没有进度条的,但最近有个项目呢,客户要求音频要有进度条控制,所...
65536是2的几次方 计算2... 65536是2的16次方:65536=2⁶ 65536是256的2次方:65536=256 6553...
Apache Doris 2.... 亲爱的社区小伙伴们,我们很高兴地向大家宣布,Apache Doris 2.0.0 版本已于...
项目管理和工程管理的区别 项目管理 项目管理,顾名思义就是专注于开发和完成项目的管理,以实现目标并满足成功标准和项目要求。 工...