root/CodeSnippets/ping.py

Revision 1520, 6.2 KB (checked in by JensDiemer, 2 years ago)

update some SVN-keywords

  • Property svn:eol-style set to LF
  • Property svn:keywords set to Author Rev LastChangedDate
Line 
1#!/usr/bin/env python
2
3"""
4    A pure python ping implementation using raw socket.
5
6
7    Note that ICMP messages can only be sent from processes running as root.
8
9
10    Derived from ping.c distributed in Linux's netkit. That code is
11    copyright (c) 1989 by The Regents of the University of California.
12    That code is in turn derived from code written by Mike Muuss of the
13    US Army Ballistic Research Laboratory in December, 1983 and
14    placed in the public domain. They have my thanks.
15
16    Bugs are naturally mine. I'd be glad to hear about them. There are
17    certainly word - size dependenceies here.
18
19    Copyright (c) Matthew Dixon Cowles, <http://www.visi.com/~mdc/>.
20    Distributable under the terms of the GNU General Public License
21    version 2. Provided with no warranties of any sort.
22
23    Original Version from Matthew Dixon Cowles:
24      -> ftp://ftp.visi.com/users/mdc/ping.py
25
26    Rewrite by Jens Diemer:
27      -> http://www.python-forum.de/post-69122.html#69122
28
29
30    Revision history
31    ~~~~~~~~~~~~~~~~
32
33    May 30, 2007
34    little rewrite by Jens Diemer:
35     -  change socket asterisk import to a normal import
36     -  replace time.time() with time.clock()
37     -  delete "return None" (or change to "return" only)
38     -  in checksum() rename "str" to "source_string"
39
40    November 22, 1997
41    Initial hack. Doesn't do much, but rather than try to guess
42    what features I (or others) will want in the future, I've only
43    put in what I need now.
44
45    December 16, 1997
46    For some reason, the checksum bytes are in the wrong order when
47    this is run under Solaris 2.X for SPARC but it works right under
48    Linux x86. Since I don't know just what's wrong, I'll swap the
49    bytes always and then do an htons().
50
51    December 4, 2000
52    Changed the struct.pack() calls to pack the checksum and ID as
53    unsigned. My thanks to Jerome Poincheval for the fix.
54
55
56    Last commit info:
57    ~~~~~~~~~~~~~~~~~
58    $LastChangedDate: $
59    $Rev: $
60    $Author: $
61"""
62
63
64import os, sys, socket, struct, select, time
65
66# From /usr/include/linux/icmp.h; your milage may vary.
67ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.
68
69
70def checksum(source_string):
71    """
72    I'm not too confident that this is right but testing seems
73    to suggest that it gives the same answers as in_cksum in ping.c
74    """
75    sum = 0
76    countTo = (len(source_string)/2)*2
77    count = 0
78    while count<countTo:
79        thisVal = ord(source_string[count + 1])*256 + ord(source_string[count])
80        sum = sum + thisVal
81        sum = sum & 0xffffffff # Necessary?
82        count = count + 2
83
84    if countTo<len(source_string):
85        sum = sum + ord(source_string[len(source_string) - 1])
86        sum = sum & 0xffffffff # Necessary?
87
88    sum = (sum >> 16)  +  (sum & 0xffff)
89    sum = sum + (sum >> 16)
90    answer = ~sum
91    answer = answer & 0xffff
92
93    # Swap bytes. Bugger me if I know why.
94    answer = answer >> 8 | (answer << 8 & 0xff00)
95
96    return answer
97
98
99def receive_one_ping(my_socket, ID, timeout):
100    """
101    receive the ping from the socket.
102    """
103    timeLeft = timeout
104    while True:
105        startedSelect = time.clock()
106        whatReady = select.select([my_socket], [], [], timeLeft)
107        howLongInSelect = (time.clock() - startedSelect)
108        if whatReady[0] == []: # Timeout
109            return
110
111        timeReceived = time.clock()
112        recPacket, addr = my_socket.recvfrom(1024)
113        icmpHeader = recPacket[20:28]
114        type, code, checksum, packetID, sequence = struct.unpack(
115            "bbHHh", icmpHeader
116        )
117        if packetID == ID:
118            bytesInDouble = struct.calcsize("d")
119            timeSent = struct.unpack("d", recPacket[28:28 + bytesInDouble])[0]
120            return timeReceived - timeSent
121
122        timeLeft = timeLeft - howLongInSelect
123        if timeLeft <= 0:
124            return
125
126
127def send_one_ping(my_socket, dest_addr, ID):
128    """
129    Send one ping to the given >dest_addr<.
130    """
131    dest_addr  =  socket.gethostbyname(dest_addr)
132
133    # Header is type (8), code (8), checksum (16), id (16), sequence (16)
134    my_checksum = 0
135
136    # Make a dummy heder with a 0 checksum.
137    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
138    bytesInDouble = struct.calcsize("d")
139    data = (192 - bytesInDouble) * "Q"
140    data = struct.pack("d", time.clock()) + data
141
142    # Calculate the checksum on the data and the dummy header.
143    my_checksum = checksum(header + data)
144
145    # Now that we have the right checksum, we put that in. It's just easier
146    # to make up a new header than to stuff it into the dummy.
147    header = struct.pack(
148        "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
149    )
150    packet = header + data
151    my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1
152
153
154def do_one(dest_addr, timeout):
155    """
156    Returns either the delay (in seconds) or none on timeout.
157    """
158    icmp = socket.getprotobyname("icmp")
159    try:
160        my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
161    except socket.error, (errno, msg):
162        if errno == 1:
163            # Operation not permitted
164            msg = msg + (
165                " - Note that ICMP messages can only be sent from processes"
166                " running as root."
167            )
168            raise socket.error(msg)
169        raise # raise the original error
170
171    my_ID = os.getpid() & 0xFFFF
172
173    send_one_ping(my_socket, dest_addr, my_ID)
174    delay = receive_one_ping(my_socket, my_ID, timeout)
175
176    my_socket.close()
177    return delay
178
179
180def verbose_ping(dest_addr, timeout = 2, count = 4):
181    """
182    Send >count< ping to >dest_addr< with the given >timeout< and display
183    the result.
184    """
185    for i in xrange(count):
186        print "ping %s..." % dest_addr,
187        try:
188            delay  =  do_one(dest_addr, timeout)
189        except socket.gaierror, e:
190            print "failed. (socket error: '%s')" % e[1]
191            break
192
193        if delay  ==  None:
194            print "failed. (timeout within %ssec.)" % timeout
195        else:
196            delay  =  delay * 1000
197            print "get ping in %0.4fms" % delay
198    print
199
200
201if __name__ == '__main__':
202    verbose_ping("heise.de")
203    verbose_ping("google.com")
204    verbose_ping("a-test-url-taht-is-not-available.com")
205    verbose_ping("192.168.1.1")
Note: See TracBrowser for help on using the browser.