The case for a crawler
I have the need, for research purposes of course, to analyze information regarding the topology of internet. I want to get information such as racio between ser 
ver and clients in the internet. Why I would want to know such a thing? To answer questions like is being the world a more or less descentralized place?. How many percentage of the servers are serving http content. How does security evolve on this or that tcp/ip protocol?. All these kind of questions have something in common. They need data to analyze. But how do we get data in the first place? We could trust statistics provided by some giant of the web, or download some data containing a sample of this kind and apply statistic methods over it. So why bother to build a crawler? Well, Falseability is the building block of the scientific method. It basically states that we should be able to reproduce an experiment and therefore validate or refuse a theory. Here our theories are merely questions like those stated previously but the process of validation should enable us to reproduce all steps, and that includes data aquisition. Sure I could always assume the data given as true but by doing that I would ended validating  nothing at all since I did not verify the data I used. So since the validation of the datasets we get in the wild are pretty impossible to validate an alternative and viable approach consists in choosing the acquisition of it.
The body of a crawler
But how do we do it? A crawler is a pretty simple program. All it does is to jump around over the Internet space and verify is some service is alive and responding. So in a nutshell is just a program that try to connect a port and if so reports it. This process albeit simple has some tricky parts. First of all we need to devise a way to find the machines and that means to create a way to sort out machines. This raises some other secondary questions. Even if we got a valid ip address how do we know if there is a machine is addressed to it? Well that's the tricky part, we don't! So we need to define a sensible threshold to abort the connection and jump try another ip/port pair. This threshold should be the balance between avoiding too much time waiting for a non existent response and not too quick that makes the connection not possible to establish. Typically a connection is done under 100 milliseconds so a compromise of 150 to 200 ms is a good one. Another problem we need to solve is the mechanism to derive random ips. First of all we need to acknowledge the fact that two different address schema exist already in place. IP version 4 the oldest and IP version 6 the most recent one. For simplicity reasons we will tackle here the generation of the first one which is, by far, the most common.
  class IpUtils:
  @staticmethod
  def randomIPV4():
      blocks = []
      for i in range(0,4):
          blocks.append(str(random.randint(1,255)))
      return ".".join(blocks)
The block of code above is the answer to our ip generation question. So IP are merely strings that combine 4 blocks of 8 bit unsigned values, which therefore means, between 0 and 255. The algorithm is just a naive implementation of this idea.
If you happen to know ip protocol you sure remember that the port size in the frame has 16 bits. This means that we need to try 2¹⁶ to cover all the ports. If we go with this approach it will need an approximate 65000 connection tryout. No since we need something like 200 miliseconds of timeout per port we would end up with something like 65000*0.2 = 13000 seconds which is something like 216 minutes. This is too much. It would take ages until we got some useful information. But there is a good compromise we can make here. We know that the ports are documented and are usually respected by default implementation an deployments. So instead of brute force approach we filter the total amount of ports to a subset previously defined. We can download a list of ports from wikipedia or some rfc, in this case the ports analyzed were.
[1,5,7,9,11,13,17,18,19,20,21,22,23,25,37,39,42,43,49,50,53,63,67,68,69,70,71,72,73,73,79,80,
88,95,101,102,105,107,109,110,111,113,115,117,119,123,137,138,139,143,161,162,163,164,174,177,178,179,191,
194,1,201,202,204,206,209,210,213,220,245,347,363,369,370,372,389,427,434,435,443,444,445,464,468,487,488,
496,500,535,538,546,547,554,563,565,587,610,611,612,631,636,674,694,749,750,765,767,873,992,993,994,995]
If you take the trouble to count it you'll verify that we reduced the search space from an astonishing 2¹⁶ to a mere 112 ports. By doing the same basic arithmetic we would spend up a maximum of 112*0.2 = 22 seconds per ip. This is a lot of improvement, seems like a good compromise, the trade off is that we will lack information regarding other ports and protocols. You must note that this is the worst case because if the machine is up the connection will fail much quickly and therefore it will spend lot less per connection.
The core scan process is described bellow
def scan(self):
      ports = []
      try:
          for port in self.knownports:
              time.sleep(self.delay)
              sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
              sock.settimeout(self.timeout)
              result = sock.connect_ex((self.serverip, port))
              if result == 0:
                  print "Ip: {}, Port {}:      Open".format(self.serverip,port)
                  ports.append(port)
              else:
                  print "Ip: {}, Port {}:      Closed".format(self.serverip,port)
              sock.close()
      except socket.error:
          print "Couldn't connect to server"
          return []
      return ports
The distributed approach
We notice that previously the use of basic arithmetic helped us a lot to devise a good strategy to optimize the search mechanism. There is another optimization we can do. We can parallelize the process. Here we have a choice to do. We can do it by creating a multi-threaded program or we do it by adding more instances of the process. The first is a more optimized version of the second when we consider one machine setup. However if we want to scale out we would end up with several processes all of them multi-threaded. It is true that we would end up exploit the maximum of parallelism if we make this double layer of concurrency. There is, however, some caveats. First of all a concurrent process is more complex and another point is when you run several processes all with a multi-threaded approach you end up with a complex case of m*n parallelism degree. This sounds like a good choice initially the problem is that it is more hard to reason about concurrency starvation and problems when we got two concurrency models. So in this case we choose the not so optimum process/fork approach. The good news about the fork is that is scalable across machines. We can create a bunch of processes in one machine and another bunch in another and the scaling is much more simple because we just need to spawn more processes. Another advantage is that if one thread goes down the others stay alive because they got completely different address spaces. No need here to complex try catch to prevent the process and all the existing threads going down. Another downside from this approach is that it is much more memory intensive. Since different processes have different memory spaces we need to allocate the respective stack memory buffer and therefore we cannot create so much process as threads per machine. Here we choose simplicity.
Zookeeper and Kafka
But some questions remains, for instance, how do all the process combine and publish the results. How do we know what work was been done by a specific process? To answer this questions we need a way to dress all this separate entities the notion of cluster. Zookeeper was created just for this. The main function of the crawler
def main():
  cl = CrawlerCluster("/balhau/crawler")
  def exit_handler(signa,frame):
    print "Exiting. Disconnecting from zookeeper"
    cl.printPeers()
    cl.closeZookeeper()
    sys.exit(0)
  signal.signal(signal.SIGINT,exit_handler)
  cl.connectZookeeper()
  cl.scanPorts()
  cl.closeZookeeper()
show us that the first step he does is to register itself in zookeeper
def connectZookeeper(self):
        print "Registering to Cluster"
        self.zk.start()
        self.zk.ensure_path(self.zk_root)
        self.children = self.zk.get_children(self.zk_root, watch=self.watch_peers)
        self.uid = str(uuid.uuid1())
        self.zk.create(self.zk_root+"/"+self.uid,self.uid,ephemeral = True)
For this we generate a random unique identifier as an ephemeral node. The Zookeeper ephemeral znode is a handy concept. Zookeeper tracks the connection from the client and when the client disconnects it will automatically delete the znode. So all the boilerplate code and state machine needed to manage the life-cycle of registering peers is abstracted by this simple, yet powerful concept.

In the previous image we create a bunch of 35 process instances scanning independently the Internet. A final question remains. Where does all that information goes. Lets look at the code
def scanPorts(self):
        while(True):
            time.sleep(1)
            #print "Peers : %s" % len(self.children)
            ip = IpUtils.randomIPV4()
            print "Scanning: %s" % ip
            portscan = PortScanner(ip,ports=known_ports,timeout=0.2)
            ports = portscan.scan()
            message={}
            message['ip']=ip
            message['ports']=ports
            message['crawlerid']=self.uid
            msgstr=json.dumps(message)
            kp.send(props['kafka']['topics']['portscan'],msgstr,ip)
            print ports
Where portscan.scan() was the scan method described previously.
The answer is simple, we channel it through a kafka topic. There are two nice things we get by using a log. One is that it decouples the process of acquisition from the phase of transformation. Another is scaling capabilities. We can create and deploy the process in hundreds of machines and end up with an aggregate of thousands of peers searching the web without much impact. The nice thing about decoupling is not that is just a nice and fancy word. The decoupling enables me to isolate responsibilities and to start to think and design the transformation phase without scale dependencies or additional codebase complexity that would end up to arise if they were the same.
In just half a day and a rasperry pi running 35 python processes we end up with a total of more tha 85000 nodes analyzed.

Source code here.
Somewhere in the future will be provided the kubernetes code to run this in a container way. But until there you can run it manually. 
See you soon, Cheers!