python ping scan

Solutions on MaxInterview for python ping scan by the best coders in the world

showing results for - "python ping scan"
Nicolas
23 Jan 2017
1import time       # let's time our script
2 
3import ipaddress  # https://docs.python.org/3/library/ipaddress.html
4                  # convert ip/mask to list of hosts
5 
6 
7import subprocess # https://docs.python.org/3/library/subprocess.html
8                  # to make a popup window quiet
9 
10from colorama import init  # colors https://en.wikipedia.org/wiki/ANSI_escape_code
11init()                     # https://pypi.org/project/colorama/
12 
13 
14import threading           # for threading functions, lock, queue
15from queue import Queue    # https://docs.python.org/3/library/queue.html
16 
17# define a lock that we can use later to keep
18# prints from writing over itself
19print_lock = threading.Lock()
20 
21# Prompt the user to input a network address
22net_addr = input("Enter Network (192.168.1.0/24): ")
23 
24# actual code start time
25startTime = time.time()
26 
27# Create the network
28ip_net = ipaddress.ip_network(net_addr)
29 
30# Get all hosts on that network
31all_hosts = list(ip_net.hosts())
32 
33# Configure subprocess to hide the console window
34info = subprocess.STARTUPINFO()
35info.dwFlags |= subprocess.STARTF_USESHOWWINDOW
36info.wShowWindow = subprocess.SW_HIDE
37 
38# quick message/update
39print ('Sweeping Network with ICMP: ', net_addr)
40 
41# the actual ping definition and logic.
42# it's called from a pool, repeatedly threaded, not serial
43def pingsweep(ip):
44    
45    # for windows:   -n is ping count, -w is wait (ms)
46    # for linux: -c is ping count, -w is wait (ms)
47    # I didn't test subprocess in linux, but know the ping count must change if OS changes
48 
49    output = subprocess.Popen(['ping', '-n', '1', '-w', '150', str(all_hosts[ip])], stdout=subprocess.PIPE, startupinfo=info).communicate()[0]
50    
51    # lock this section, until we get a complete chunk
52    # then free it (so it doesn't write all over itself)
53    with print_lock:
54      
55      # normalize colors to grey
56      print('\033[93m', end='')
57 
58      # code logic if we have/don't have good response
59      if "Reply" in output.decode('utf-8'):
60         print(str(all_hosts[ip]), '\033[32m'+"is Online")
61      elif "Destination host unreachable" in output.decode('utf-8'):
62         #print(str(all_hosts[ip]), '\033[90m'+"is Offline (Unreachable)")
63         pass
64      elif "Request timed out" in output.decode('utf-8'):
65         #print(str(all_hosts[ip]), '\033[90m'+"is Offline (Timeout)")
66         pass
67      else:
68         # print colors in green if online
69         print("UNKNOWN", end='')
70 
71# defines a new ping using def pingsweep for each thread
72# holds task until thread completes
73def threader():
74   while True:
75      worker = q.get()
76      pingsweep(worker)
77      q.task_done()
78      
79q = Queue()
80 
81# up to 100 threads, daemon for cleaner shutdown   
82# just spawns the threads and makes them daemon mode
83for x in range(100):
84   t = threading.Thread(target = threader)
85   t.daemon = True
86   t.start()
87 
88# loops over the last octet in our network object
89# passing it to q.put (entering it into queue)
90for worker in range(len(all_hosts)):
91   q.put(worker)
92 
93# queue management   
94q.join()
95 
96# ok, give us a final time report
97runtime = float("%0.2f" % (time.time() - startTime))
98print("Run Time: ", runtime, "seconds")
99
Yannik
29 Apr 2016
1import socket
2import time
3import threading
4
5from queue import Queue
6socket.setdefaulttimeout(0.25)
7print_lock = threading.Lock()
8
9target = input('Enter the host to be scanned: ')
10t_IP = socket.gethostbyname(target)
11print ('Starting scan on host: ', t_IP)
12
13def portscan(port):
14   s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
15   try:
16      con = s.connect((t_IP, port))
17      with print_lock:
18         print(port, 'is open')
19      con.close()
20   except:
21      pass
22
23def threader():
24   while True:
25      worker = q.get()
26      portscan(worker)
27      q.task_done()
28      
29q = Queue()
30   startTime = time.time()
31   
32for x in range(100):
33   t = threading.Thread(target = threader)
34   t.daemon = True
35   t.start()
36   
37for worker in range(1, 500):
38   q.put(worker)
39   
40q.join()
41print('Time taken:', time.time() - startTime)