Port Scan Detection using ElasticSearch and Kibana

NEK :  Netflow + ElasticSearch + Kibana
One of the most fundamentals of security monitoring is to be aware of port scans which can be part of reconnaissance activity. Netflow is very critical in network situational awareness (NetSA), and utilizing Elastic Search and Kibana we can create ourselves a nice looking dashboard that makes it very easy to spot scanning activities. ES+K not only a good solution for having a better dashboard, but also it overcomes limitations of typical IDS systems (snort,suricata,bro etc) associated with cost of having to track scanner activity for long periods. We can easily cook a realtime alerting system that can easily track 24-48 hour or even longer windows for abnormalities and scan activites.

In this blog post, you'll be able to create a dashboard that can be used for NetSA purposes in your environment.

Importing Netflow data into ES

I have a Netflow data file captured on June 10th, 2014 for a high volume web site. It contained data for all connections coming from all around the world to their public IP address. Time window in this Netflow file was 2014-06-10 00:00:00 - 2014-06-10 23:59:58. For the purpose of this blog post, first step in importing this file was to split flow records with only Syn packets into a separate dump file so that we can focus on typical matching pattern for common vertical/horizontal port scan (and also block scans). Using similar technique we can also catch SYN floods howeever this is a topic for another post.

In order to split flow records with only 'S', we can use following :

 # nfdump -r 2014-06-10.dump "flags S and not flags AFURP" -zw 2014-06-10-S.dump  

I should note that, due to nature of a 'flow', it may contain multiple TCP flags. So during a lifetime of a flow tuple (srcip,srcport,dstip,dstport,protocol) both S and A flags can be set and in this case above command will filter them out as we're not interested in flows that hit to an open (ACKed) ports. Out of 38,435,805 flows recorded in our 2014-06-10.dump file, there are only 275,485 flows that matches our criteria, so it makes it a lot easier to both import and analyze. Having said this, we'll still import full Netflow file along with SYN only flows into 2 separate types in ES as we want to observe if any traffic connection gets established between scanner IPs and alert on them.

When we're importing flows for S flows only dump file, we'll also get 'Flags' field so to get it converted to CSV :

 # nfdump -N -r 2014-06-10-S.dump -o "fmt:%ts, %sa, %sp, %da, %dp, %byt, %pkt, %out, %pr, %flg" -q > s.csv  

For our sample data set, this will create about 32MB file. File output for this file will contain lines like (masked to protect real IPs):

 2014-06-10 00:00:00.422,   AA.90.XX.2,  2299,   XX.20.YY.8,  445,   192,    4,   0, 6  , ....S.  
2014-06-10 00:00:03.456, ZZ.YY.103.20, 3241, XX.ZZ.112.25, 445, 20736, 432, 0, 6 , ....S.
2014-06-10 00:00:03.550, ZZ.XX.VV.132, 45486, XX.55.YY.12, 25, 352, 8, 0, 6 , ....S.
2014-06-10 00:00:03.695, XX.20.ZZ.114, 41313, XX.20.ZZ.AA, 15, 9040, 226, 0, 6 , ....S.
2014-06-10 00:00:05.560, AA.46.XX.64, 25003, BB.XX.19.AA, 80, 104, 2, 0, 6 , ....S.

For all flows, lets also get them converted to CSV for easy import into ES :

 # nfdump -N -r 2014-06-10.dump -o "fmt:%ts, %sa, %sp, %da, %dp, %byt, %pkt, %out, %pr" -q | gzip -1c > 2014-06-10.csv.gz  

I typically pipe export operation to a gzip utility as it will make it easier to move files around and faster to import using FIFO devices. Before getting them inserted into ES, lets create types for each. I'll first create a type for ALL flows we have using type definition used in previous blog post "NetFlow Analysis using ElasticSearch & Kibana"

1:  {  
2: "external": {
3: "_all" : {"enabled" : false},
4: "_source" : {"enabled" : false},
5: "properties": {
6: "ts": {"type": "date", "format" : "YYYY-MM-dd HH:mm:ss"},
7: "sa": {"type": "string", "index": "not_analyzed"},
8: "sp": {"type": "integer", "index": "not_analyzed"},
9: "da": {"type": "string", "index": "not_analyzed"},
10: "dp": {"type": "integer", "index": "not_analyzed"},
11: "byte": {"type": "long"},
12: "pkt": {"type": "long"},
13: "out": {"type": "integer", "index": "not_analyzed"},
14: "pr": {"type": "string", "index" : "not_analyzed"}
15: }
16: }
17: }

And for SYN flows we'll use following type definition. Please note, this type has 3 additional fields, one for TCP flags, 2 for GeoIP country codes we'll be storing with each IP.
1:  {  
2: "syn": {
3: "_all" : {"enabled" : false},
4: "_source" : {"enabled" : true, "compress" : true },
5: "properties": {
6: "ts": {"type": "date", "format" : "YYYY-MM-dd HH:mm:ss"},
7: "sa": {"type": "string", "index": "not_analyzed"},
8: "sp": {"type": "integer", "index": "not_analyzed"},
9: "sc": {"type": "string", "index": "not_analyzed"},
10: "da": {"type": "string", "index": "not_analyzed"},
11: "dp": {"type": "integer", "index": "not_analyzed"},
12: "dc": {"type": "string", "index": "not_analyzed"},
13: "byte": {"type": "long"},
14: "pkt": {"type": "long"},
15: "out": {"type": "integer", "index": "not_analyzed"},
16: "pr": {"type": "string", "index" : "not_analyzed"},
17: "flg": {"type": "string", "index" : "analyzed"}
18: }
19: }
20: }

As I'll be matching both source and destination IP addresses to ISO 2 letter country codes, I'll need latest GeoIP database from Maxmind via http://dev.maxmind.com/geoip/legacy/geolite/. Download 'GeoLite Country' binary database from the URL provided and extract into a directory where you'll also execute 'bulk-import-flgs.py' script available at https://github.com/bulutsal/networkanalysis. Also you'll need PYGeoIP API from https://pypi.python.org/pypi/pygeoip/ and you can simply install it using :

# pip install pygeoip  

After satisfying all dependencies you can go ahead with importing your S packets first. When I execute bulk import script with flags, it will produce JSON records like following :

1:  {  
2: "pkt":4,
3: "dc":"TR",
4: "da":"XX.20.ZZ.YY",
5: "byte":192,
6: "dp":445,
7: "out":0,
8: "pr":"TCP",
9: "sp":1250,
10: "ts":"2014-06-10 00:38:02",
11: "sc":"CH",
12: "sa":"",
13: "flg":[
14: "S"
15: ]
16: }

After importing S only flows into ES, go ahead with importing your primary netflow data using FIFO as below :

Open a screen shell first
 # mkfifo external.csv  
# gunzip -dc 2014-06-10.csv.gz > external.csv

Create another window using CTRL-A-C and run your bulk-import.py script, dont forget to modify it for your index and type name. This may take a while. After all is uploaded, go ahead and import DevOps-PortScan.kibanadashboard (downloadable from https://github.com/bulutsal/networkanalysis)

Playing and understanding DevOps-PortScan Dashboard

Once you load the dashboard into Kibana3, you'll see something like this :

This first screen will show everything we got in the index. We want to use ES's and Kibana's capabilities to drill down to all port scan activity so we'll filter out connections to port 80 and 443 as this flow data is from a high traffic web site and we're not interested in connections to those ports. Also lets focus on only TCP protocol as other scan methods are outside scope of this blog post. As we drill down, we start to see some scan activity in 'SYN ATTEMPTS' widget on the right hand side. This widget shows ratio of S only flows to all flows, so between 03:00 and 08:00 there was clearly increase in S flows. This is not typical and is a clear indication of increased port scan activity during night ours.

If we like we can focus on this window and drill down from there, but I would love to focus on IPs listed on the left in 'SCAN IP' window and demonstrate different types of scans performed by each IP. Lets click on and see what how IP appeared in our dashboard. This IP is registered in Switzerland and performed block scan, both touching a lot of IPs in our network and trying various ports which is clearly visible in the dashboard.

Once we click second IP,, in our list, we can see that it is from China (surprising eh?). This IP demonstrated different behaviour and performed horizontal scan mostly focusing on shorter list of ports like 53,25,22,3389 and 21. It also appears that it stopped scanning during between 09:00 and 14:00 and started scanning again.

3rd IP in our list is more interesting, this IP is from US and tried only 3 IP addresses in our network and scanned almost every single port and he did this in short bursts as opposed to continuous scans.

We can click each IP in the SCAN IP list and see how they've scanned our network, but also utilizing Kibana's map feature, we can click to a country and drill down by Country.

In my next blog post, I'll be showing how to detect SYN floods and observing DDOS via Netflow data.

18 Eylül 2014

Posted In: ElasticSearch, Gezegen, NetFlow

NetFlow Analysis using ElasticSearch & Kibana

Kibana dashboard showing various NetFlow metrics 
I've heard a lot about ElasticSearch lately, was trying to create some time to get a lab set up for the new trio on the block : ELK. For those who hasn't heard about the term ELK, it is an acronym for ElasticSearch + Logstash and Kibana. ELK stack is continuing tradition LAMP stack created a while ago by tightly integrating to each other, albeit on a completely different dimension, and becoming new invaluable tool for DevOps people.
Over the course of years, I've developed a lot of tools/guis, both small and big, to make metrics and data meaningful for my pleasure/business/troubleshooting purposes. But none was as much fun and as quick as I had with ElasticSearch and Kibana.

One of the most important advices I can give to anyone who is building, maintaining or operating an IT infrastructure is having situational awareness on every single angle possible. This means collecting a lot of metrics from all systems, including IPfix/NetFlows from network. Main focus of this tutorial is to show how ES and Kibana can be a valuable tool in assessing issues at network layer using Netflow on a real life scenario: On February 4th, 2014 a network issue caused 1 hour disruption to the services provied to a customer, an RCA requested by management. All logs gathered in one place, with very few reliable explanation as to what really happened. We've started looking at the issue deeper, this time utilizing NetFlows captured at various devices and using ES & Kibana to do analytics & drill down. ES & Kibana helped a lot to better understand and grasp what happened during the disruption and nailing down root cause.

I won't be going into details of setting up ElasticSearch and Kibana, as there are a lot of blog posts on the net on how to perform those steps. You'll see steps to get NFdump data ready, importing into ElasticSearch using ElasticSearch's Python & Bulk API, preparing Kibana for analytics, and discovering what NetFlow records show about the issue I'm after.
In addition to explaining steps to setup similar NFdump/ES/Kibana environment for yourself, I've also provided some analysis/benchmark on storage requirement of ES and explained different approaches on how to reduce foot print of ES while increasing performance.

First, some background information.

At our customer sites, we deploy various collectors and probes to collect and store network traffic metadata, namely NetFlow, for forensic/security and troubleshooting purposes. Probes are deployed at critical network edges to record and analyze activity passing through by using SPAN/RSPAN/ERSPAN, emmitting NetFlow metadata to collectors which save and store NetFlows on hard drive for later use. NetFlows can be costly to generate and store, especially if you're trying to capture traffic on outside/untrust/public interfaces, which face traffic/flows, both in and out, from all around the globe. Probing outside interfaces means recording traffic from spoofed IPs, ICMP pings, BGP announcements and every other bits that travels on Layer 3. For this blog post, I've used 3 flows collected at the same device, one from outside/public interface on the firewal, one for traffic filtered by firewall corresponding to the traffic passing through outside interface, and one for internal VLAN relevant to the issue I'm analysing for. All flows corresponding to the same 24 hour period, differed in size and characteristics widely:

  1. FNF1 : Corresponding to an internal VLAN traffic, have 4.6 million flows, 228MB in size
  2. FNF2 : Corresponding to the internal/trust interface on the firewall, having 8.6 million flows, 426MB in size.
  3. FNF3 : Corresponding to the public facing interface on the firewall, having 33.7 million flows, 1.671MB in size.

Netflow captures following fields that can be used to analyze various aspects of the network :
  • Timestamp
  • Duration
  • Protocol
  • Source IP
  • Source port
  • Destination IP
  • Destination port
  • Bytes
  • Packets
  • TCP Flags
  • Interface

There are a lot of other information that can be captured and shown, a sample 5 line output from FNF1 dump file is shown below :
1:  # nfdump -r fnf1.dump -oextended -c5  
2: Date flow start Duration Proto Src IP Addr:Port Dst IP Addr:Port Flags Tos Packets Bytes pps bps Bpp Flows
3: 2014-01-01 00:04:23.980 25.000 TCP -> .AP... 0 296 237484 11 75994 802 1
4: 2014-01-01 00:04:23.980 34.000 TCP -> .AP... 0 375 24206 11 5695 64 1
5: 2014-01-01 00:04:23.980 25.000 TCP -> .AP... 0 319 173587 12 55547 544 1
6: 2014-01-01 00:04:23.980 21.000 TCP -> .AP... 0 35 9504 1 3620 271 1
7: 2014-01-01 00:04:23.980 11.000 TCP -> .AP..F 0 39 6800 3 4945 174 1

Step 1) Prepare NFdump files

NFdump rotates dump files every 5 minutes by default. Instead of dealing with multiple NFdump files, I've converted all dump files in each collector's directory to a single file, while sorting by timestamp using following command :

# nfdump -mR ./nfdump -zw fnf1.dump

For the sake of performance and analytics I'm after, I'm only interested in timestamp, source IP, source port, destination IP, destination port, bytes, packets, interface and protocol. To do this, I've used following nfdump command to dump data into CSV file for later use :

# nfdump -Nqr fnf1.dump -o "fmt:%ts, %sa, %sp, %da, %dp, %byt, %pkt, %out, %pr" > fnf1.csv

Step 2) Prepare ElasticSearch Mapping

After having all 3 files dumped into respective CSV files, I've crafted a Python script utilizing ElasticSearch's official Python API to index each flow record in ElasticSearch. ElasticSearch provides schemaless storage and indexing, however just throwing Netflow data without providing a mapping (a schema or DDL some sort) is not smart for storage perspective. Before importing CSV into ElasticSearch, I've experimented with different schemas, one of which using IP mapping for source and destination IP addresses, however it didn't work well for some reason. I've switched storing IP information in string field using following schema definition :
1:  {  
2: "fnf1x": {
3: "_all" : {"enabled" : false},
4: "_source" : {"enabled" : false},
5: "properties": {
6: "ts": {"type": "date", "format" : "YYYY-MM-dd HH:mm:ss"},
7: "sa": {"type": "string", "index": "not_analyzed"},
8: "sp": {"type": "integer", "index": "not_analyzed"},
9: "da": {"type": "string", "index": "not_analyzed"},
10: "dp": {"type": "integer", "index": "not_analyzed"},
11: "byte": {"type": "long"},
12: "pkt": {"type": "long"},
13: "out": {"type": "integer", "index": "not_analyzed"},
14: "pr": {"type": "string", "index" : "not_analyzed"}
15: }
16: }
17: }

For those who are not familiar with ElasticSearch's mapping definition, here is a short definition of what this schema does. First, I didn't want to store NetFlow records in both index and in "_source" field as a JSON document, so it is disabled. For drilling down and analytics purposes, complete document, NetFlow record in this case, is rarely needed. Also, since NetFlow records are well defined and structured, I only search/filter using field names ie : protocol = TCP or destination port = 80. "_all" field is used for searching multiple fields at the same time, when no field name provided. Eliminating "_all" field also saves unnecessary I/O to disk and storage. I chose 'not_analyzed' for string fields, as there is no need to tokenize or stem any of the strings stored. IP information along with protocol fields can be stored and indexed as a whole. Please duplicate above mapping for each of the NetFlow collectors after changing "fnf1x" to the appropriate name you choose.

Step 3) Import CSV files into ElasticSearch

After this map is PUT on ElasticSearch, we can have our Python script to import CSV file created on step 1. Python script code is c/p here, as it is less than 30 lines :
1:  #!/usr/bin/python  
2: import csv, sys, time, json, elasticsearch
3: from elasticsearch import helpers
4: es = elasticsearch.Elasticsearch()
5: source = 'fnf1'
6: csvfile = source + '.csv'
7: jdata = dict()
8: actions = list()
9: i = 0
10: proto = {'0': '0', '1': 'ICMP', '2': 'IGMP', '6': 'TCP', '17': 'UDP', '112': 'VRRP', '3': 'GGP', '50': 'ESP'}
11: with open(csvfile, 'rb') as file :
12: line = csv.reader(file, delimiter = ',', skipinitialspace = True)
13: for row in line :
14: ptime = time.strptime(row[0][0:19], "%Y-%m-%d %H:%M:%S")
15: ctime = time.strftime('%Y-%m-%d %H:%M:%S', ptime)
16: jdata = { 'ts': ctime, 'byte': int(row[5]), 'sa': row[1], 'sp': int(float(row[2])), 'da': row[3], 'dp': int(float(row[4])), 'pkt': int(row[6]), 'out': int(row[7]), 'pr': proto[row[8].strip()] }
17: action = { '_index': 'netflowlab', '_type': 'fnf1x', '_source': json.dumps(jdata, separators=(',', ':'))}
18: actions.append(action)
19: i += 1
20: if i % 100000 == 0:
21: elasticsearch.helpers.bulk(es, actions)
22: print "Indexed %d, working on next 100000" %(i)
23: actions = list()
24: elasticsearch.helpers.bulk(es, actions)
25: print "Indexed %d, finishing." %(i)

Please change 'source' and '_type' fields above to reflect file names and "_type" in ElasticSearch index. Uploading total of 47 million rows, in 3 different CSF files took about 2 hours on my i7-3770 (QC 3.4GHz CPU). Most of the time spent was on Python parsing CSV file and converting into JSON format, I didn't have time to optimize code or profile it for performance tuning. I've used SSD drive to store ElasticSearch data files, which makes upload and analytics faster than traditional drives. Also after inserting 47 million rows, into 3 different types, there will be a lot of segments in ElasticSearch index directories. I suggest optimizing them by reducing number of segments to 1 using (netflowlab is the name of index I've used for this blog post) :

This resulted in 3.8GB of Index directory under /var/lib/elasticsearch/

Step 4) Using Kibana to analyze NetFlow data

After everything is ready, use enclosed Kibana dashboard schema file. Once you have uploaded dashboard schema, you'll have something similar to the image on the right. With my ElasticSearch Index, dashboard shows high level information about 46 million flows, accounting for 5.8TB of transferred data in 24 hours. In the next image, we see histograms showing both byte and PPS values :

Histogram shows some anomaly started happening around 14:30 and 20:00, especially around 14:30 and 15:30

Once we started zooming into the timeframe when the anomaly occured, we can see all other graphs updated according to the window we selected. In the mid section of the dashboard I have source IP, source address, destination IP and destination address pie charts, showing flow itself. In destination port pie chart, I immediately noticed that port 12201 is accounting for roughly 20% of the trafffic/flows happened at that time, which is way above normal characteristic of the traffic :

When I click 12201 on the destination port pie chart, Kibana re filters and re graphs data according to the selection I made. I immediately can see that, TCP traffic nearly diminished, and only UDP traffic is hitting port 12201, which happened to be the GrayLog server's default port listening for logs send by the various app servers.

When I changed histogram properties to show data with 1s resolution, I can see that PPS values went up to 250K alone for FNF1x collector, congesting network switch with both PPS and Throughput (MBit/sec). If I want, I can also drill down to the Interfaces and see how much traffic passed through each interface on the switch. This information showed us that the root cause of the issue we were investigating, was actually app servers pumping huge amounts of logs towards GrayLog server. Whole issue was triggered by another issue, but it was start of chain reaction, causing apps to go crazy with logs, making log pumping the root cause, and trigger itself a contributing factor.

Storage Perspective

I've also captured some information on how much data is required for NetFlow storage when different file formats are in use. JSON, obviously, being one of the least storage efficient file formats requires most storage when it comes to NetFlow data with 46 millon flows. After disabling "_all" and "_source" fields in ElasticSearch, its storage requirements also went down. 900MB of gzip compressed Nfdump data consumes about 3.8 GB of Index space on ElasticSearch. I should add that, I didn't store all Netflow fields in this test scenario, only included ones that are relevant to my use case. To make comparison a little more accurate, I've also added uncompressed Nfdump storage requirements below. Once I compress ElasticSearch's Index directory with .tar.gz (default gzip compression level), same 3.8GB becomes 2.7GB. This tells me that I can also store same Index files on ZFS with LZ compression turned on to save some space without sacrificing too much performance.

RowsNfdump (bz2)NfdumpNfdump (-j)CSVJsonES Index SizeTar.gz

I'll continue experimenting with ElasticSearch and post my notes about using ElasticSearch for Netflow analytic purposes. Please send in your questions and comments.
All files required to set up this proof of concept environment are located here : https://github.com/bulutsal
Please contribute back your changes to this location.

11 Mart 2014

Posted In: ElasticSearch, Gezegen, NetFlow

Twitter Auto Publish Powered By : XYZScripts.com