/ hadoop

The PCAP Format in a Hadoop Ingestion Perspective

A cluster is a really powerful tool to handle all of your data. If you think along the lines of using Hadoop for network monitoring for instance, you would probably be looking at the existing tools out there and how it fits in a big data context. I'll tell you right now that MapReduce will probably require a completely other way of thinking - but that doesn't make it a bad approach.

If you follow this article you should have already set up your cluster (easy provisioning with Ambari) and installed flume-ng. You can do it through the HDP 2.0.6 (atm) stack. This article was written with Red Hat Server 6.4 in mind. It isn't really a complete post about how you can use Hadoop, but rather an introduction to the format that you can use to ingest packet data from the wire, let us say a network tap (nTAP). I will try to exemplify where necessary.

TCPDUMP, which is built on libpcap (also check out Hakin9-article, PDF), basically provides you a stream of packets structured from the link layer and up to the application layer. This is better known as the TCP/IP stack.

Flume is the preferred tool for ingesting data into a Hadoop. It consists of a set of input (source) and output (sink) plugins, as well as a transport (channel) between them. In this case the PCAP format would probably be an almost ideal choice. For streaming the packets you should note that the global header is only written at the beginning of the stream and that it must be reconstructed when reading it back into tools expecting the PCAP format.

The Ingestion Format - Packets

Hadoop recognizes two file types: Sequence and text files. In our use case we'll use sequence files since we are doing arbitrary files. Let us have a closer look at the input format (found most of it here, recommended blog).

PCAP-format

Remember that everything in the PCAP format is little endian. This means that e.g. c2 ba cd 4f turns into 4f cd ba c2. Little endianness is given by the magic header of the file, that being a1 b2 c3 d4 is a upper case E and 0xd4c3b2a1 is a small e.

Global Header (24b)

01-04b:  The first 4 bytes d4 c3 b2 a1 is the magic number

05-08b:  The next 4 bytes 02 00 04 00 are the Major version (2 bytes) and Minor Version (2 bytes), in our case 2.4

09-16b:  Next is TZ. These are set to 0 most of the time which gives us the  00 00 00 00 00 00 00 00

17-20b:  Next is the Snapshot Length field (4 bytes) which indicates the maximum length of the captured packets (dataX) in bytes.  In our file it is set to ff ff 00 00 which equals to 65535 (0xffff), the default value for tcpdump and wireshark.

21-24b:  The last 4 bytes in the global header specify the Link-Layer Header Type. Our file has the value of 0x1 (01 00 00 00), which says that the link-layer protocol is Ethernet (literal cite).

Header1

c2 ba cd 4f b6 35 0f 00  36 00 00 00 36 00 00 00

Translates to:

01-04b:  Timestamp, 0x4fcdbac2.
$ calc 0x4fcdbac2 #-> 1338882754
$ date --date='1970-01-01 1338882754 sec GMT'

05-08b:  Microseconds of timestamp
09-12b:  Packet data size
13-16b:  Length of packet as it was captured on the wire (54b). Can be the same as 9-12b but can be different if snapshot length (max packet  length) is less than 65536.

After the packet header comes the data! Starting from the lower layer we see the Ethernet destination address 00:12:cf:e5:54:a0 followed by the source address 00:1f:3c:23:db:d3. The packet data corresponds to 54b.

d4 c3 b2 a1 02 00 04 00  00 00 00 00 00 00 00 00 
ff ff 00 00 01 00 00 00  c2 ba cd 4f b6 35 0f 00 
36 00 00 00 36 00 00 00  00 12 cf e5 54 a0 00 1f 
3c 23 db d3 08 00 45 00  00 28 4a a6 40 00 40 06 
58 eb c0 a8 0a e2 c0 a8  0b 0c 4c fb 00 17 e7 ca 
f8 58 26 13 45 de 50 11  40 c7 3e a6 00 00 c3 ba 
cd 4f 60 04 00 00 3c 00  00 00 3c 00 00 00 00 1f 
3c 23 db d3 00 12 cf e5  54 a0 08 00 45 00 00 28 
8a f7 00 00 40 06 58 9a  c0 a8 0b 0c c0 a8 0a e2 
00 17 4c fb 26 13 45 de  e7 ca f8 59 50 10 01 df 
7d 8e 00 00 00 00 00 00  00 00 c3 ba cd 4f 70 2f 
00 00 3c 00 00 00 3c 00  00 00 00 1f 3c 23 db d3 
00 12 cf e5 54 a0 08 00  45 00 00 28 26 f9 00 00 
40 06 bc 98 c0 a8 0b 0c  c0 a8 0a e2 00 17 4c fb 
26 13 45 de e7 ca f8 59  50 11 01 df 7d 8d 00 00 
00 00 00 00 00 00 c3 ba  cd 4f db 2f 00 00 36 00 
00 00 36 00 00 00 00 12  cf e5 54 a0 00 1f 3c 23 
db d3 08 00 45 00 00 28  4a a7 40 00 40 06 58 ea 
c0 a8 0a e2 c0 a8 0b 0c  4c fb 00 17 e7 ca f8 59 
26 13 45 df 50 10 40 c7  3e a5 00 00

Which is good news. The format is applicable, not directly - but applicable to being streamed into Flume. So the challenge would probably be to reconstruct the global header when reading it into the traditional tools.

The Ingestion Process through Flume

Flume basic concepts

There are a couple of libraries out there that you should have already reviewed: RIPEs Packet Capture Library for Hadoop and Packetloops PacketPig. Problem in regard to these and the ingestion process is that they are really meant to read the PCAP files, they are in other words not meant for the ingestion. At this point you should try to let go of what you've read previously. There's a really powerful Apache project taking care of the ingestion process for you. Open the Flume users manual in a new page to keep as a further reference.

There are a couple of approaches to this. Using the exec input source and Flume-plugin-pcap. In this case we'll simply use TCPDUMP, so the exec source is the correct selection.

# SOURCE (TCPDUMP)
a1.sources = r1
a1.sinks =  k1 k2
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.command = /usr/sbin/tcpdump -e -s0 -nni eth0 -w -
#                                             ^snaplength=65535
# SINK (HDFS)
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.codeC = Snappy
a1.sinks.k1.hdfs.filePrefix = pcap
a1.sinks.k1.hdfs.fileSuffix = .snappy
a1.sinks.k1.hdfs.fileType = SequenceFile 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.rollSize = 5000000000
a1.sinks.k1.hdfs.rollInterval = 0
# if running standalone setup enable line below
#a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.writeFormat = Writable
a1.sinks.k1.hdfs.path = /user/ingest/pcap/%y/%m/%d/%H

# INTERCEPTORS (TIMESTAMP FOR HDFS PATH)
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

# SINK (LOGGER)
# for debugging purposes
a1.sinks.k2.type = logger
# CHANNEL (MEM)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

## bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

We've also added compression to the datasink going to HDFS in the configuration over. There are some performance tradeoffs involved in that, but you would actually save about 40% of the diskspace and increase disk I/O by 25% [source] this way (10GB of uncompressed data equals 6GB og compressed data). In this configuration we use the Google Snappy algorithm [Google Code Page]. Actual performance all depends on your data of course, so it will need to be tested. Additionally you would like a splittable compression algorithm, that is pretty important.

When you stream data to from a network interface you should be aware that TCPDUMP requires privileges in order to let you run it. What you are going to do is to create Flume user named "ingest" locally on the client, and a directory in HDFS for it to store the network data.

su - hdfs -c "hdfs dfs -mkdir /user/ingest"
su - hdfs -c "hdfs dfs -chown ingest /user/ingest"

Drop the privileges required to use tcpdump in order to let the ingest-user run it on the client.

groupadd tcpdump
chown root.tcpdump /usr/sbin/tcpdump
chmod 750 /usr/sbin/tcpdump
setcap "CAP_NET_RAW+eip" /usr/sbin/tcpdump

Now you might recall that tcpdump writes a global header to the data stream. That one is 24 bytes long and you'd want to strip it from the stream. This isn't a really good optimized solution, but you can use cut to remove the first bytes (it will continuously cost you some CPU resources since it is not network dependent, but CPU-driven, the pipe):

tcpdump -e -nni eth0 -s 0 -w - |cut -b 25-

Remember that TCPDUMP has buffered output when piped, so use stdbuf if you'd like it to stream the data right away. You should verify that timestamp coming in the first package header (it's little endian so use od). Also, create a shell-script and change your command on the first flume source to run that.

a1.sources.r1.command = /usr/local/sbin/tcpdump_streamer.sh

Finally, let us start it and see how it does. You should start seeing logger-events straight away. When you put it into production, remember to comment the k2-sink.

flume-ng agent -n a1 -c conf -f /home/ingest/configs/tcpdump_stream.conf

Now, goto http://<sandbox-IP>:50070/dfshealth.jsp and point the file browser to the /user/ingest directory. That should give you something like this (download some large files):

Example of HDFS directory

Summarized

So, in short there are a couple of things that you should take away from this:

  • Traditional stream-based tools should be reviewed, but will probably be needed to be adapted for MapReduce
  • There's a couple of existing libraries for PCAP, but try to keep it simple using the ingestion tools for Hadoop (normally that would be Flume)
  • Read up on the plugins of Flume
  • Read up on the structure of your input format
  • Use compression

Enjoy.