February 7, 2015
A cluster is a really powerful tool to handle all of your data. If you think along the lines of using Hadoop for network monitoring for instance, you would probably be looking at the existing tools out there and how it fits in a big data context. I’ll tell you right now that MapReduce will probably require a completely other way of thinking - but that doesn’t make it a bad approach.
If you follow this article you should have already set up your cluster (easy provisioning with Ambari) and installed flume-ng. You can do it through the HDP 2.0.6 (atm) stack. This article was written with Red Hat Server 6.4 in mind. It isn’t really a complete post about how you can use Hadoop, but rather an introduction to the format that you can use to ingest packet data from the wire, let us say a network tap (nTAP). I will try to exemplify where necessary.
TCPDUMP, which is built on libpcap (also check out Hakin9-article, PDF), basically provides you a stream of packets structured from the link layer and up to the application layer. This is better known as the TCP/IP stack.
Flume is the preferred tool for ingesting data into a Hadoop. It consists of a set of input (source) and output (sink) plugins, as well as a transport (channel) between them. In this case the PCAP format would probably be an almost ideal choice. For streaming the packets you should note that the global header is only written at the beginning of the stream and that it must be reconstructed when reading it back into tools expecting the PCAP format.
The Ingestion Format - Packets
Hadoop recognizes two file types: Sequence and text files. In our use case we’ll use sequence files since we are doing arbitrary files. Let us have a closer look at the input format (found most of it here, recommended blog).
Remember that everything in the PCAP format is little endian. This means that e.g.
c2 ba cd 4f turns into
4f cd ba c2. Little endianness is given by the magic header of the file, that being
a1 b2 c3 d4 is a upper case
0xd4c3b2a1 is a small
Global Header (24b)
01-04b: The first 4 bytes d4 c3 b2 a1 is the magic number 05-08b: The next 4 bytes 02 00 04 00 are the Major version (2 bytes) and Minor Version (2 bytes), in our case 2.4 09-16b: Next is TZ. These are set to 0 most of the time which gives us the 00 00 00 00 00 00 00 00 17-20b: Next is the Snapshot Length field (4 bytes) which indicates the maximum length of the captured packets (dataX) in bytes. In our file it is set to ff ff 00 00 which equals to 65535 (0xffff), the default value for tcpdump and wireshark. 21-24b: The last 4 bytes in the global header specify the Link-Layer Header Type. Our file has the value of 0x1 (01 00 00 00), which says that the link-layer protocol is Ethernet (literal cite).
c2 ba cd 4f b6 35 0f 00 36 00 00 00 36 00 00 00
01-04b: Timestamp, 0x4fcdbac2. $ calc 0x4fcdbac2 #-> 1338882754 $ date --date='1970-01-01 1338882754 sec GMT' 05-08b: Microseconds of timestamp 09-12b: Packet data size 13-16b: Length of packet as it was captured on the wire (54b). Can be the same as 9-12b but can be different if snapshot length (max packet length) is less than 65536.
After the packet header comes the data! Starting from the lower layer we see the Ethernet destination address
00:12:cf:e5:54:a0 followed by the source address
00:1f:3c:23:db:d3. The packet data corresponds to
d4 c3 b2 a1 02 00 04 00 00 00 00 00 00 00 00 00 ff ff 00 00 01 00 00 00 c2 ba cd 4f b6 35 0f 00 36 00 00 00 36 00 00 00 00 12 cf e5 54 a0 00 1f 3c 23 db d3 08 00 45 00 00 28 4a a6 40 00 40 06 58 eb c0 a8 0a e2 c0 a8 0b 0c 4c fb 00 17 e7 ca f8 58 26 13 45 de 50 11 40 c7 3e a6 00 00 c3 ba cd 4f 60 04 00 00 3c 00 00 00 3c 00 00 00 00 1f 3c 23 db d3 00 12 cf e5 54 a0 08 00 45 00 00 28 8a f7 00 00 40 06 58 9a c0 a8 0b 0c c0 a8 0a e2 00 17 4c fb 26 13 45 de e7 ca f8 59 50 10 01 df 7d 8e 00 00 00 00 00 00 00 00 c3 ba cd 4f 70 2f 00 00 3c 00 00 00 3c 00 00 00 00 1f 3c 23 db d3 00 12 cf e5 54 a0 08 00 45 00 00 28 26 f9 00 00 40 06 bc 98 c0 a8 0b 0c c0 a8 0a e2 00 17 4c fb 26 13 45 de e7 ca f8 59 50 11 01 df 7d 8d 00 00 00 00 00 00 00 00 c3 ba cd 4f db 2f 00 00 36 00 00 00 36 00 00 00 00 12 cf e5 54 a0 00 1f 3c 23 db d3 08 00 45 00 00 28 4a a7 40 00 40 06 58 ea c0 a8 0a e2 c0 a8 0b 0c 4c fb 00 17 e7 ca f8 59 26 13 45 df 50 10 40 c7 3e a5 00 00
Which is good news. The format is applicable, not directly - but applicable to being streamed into Flume. So the challenge would probably be to reconstruct the global header when reading it into the traditional tools.
The Ingestion Process through Flume
There are a couple of libraries out there that you should have already reviewed: RIPEs Packet Capture Library for Hadoop and Packetloops PacketPig. Problem in regard to these and the ingestion process is that they are really meant to read the PCAP files, they are in other words not meant for the ingestion. At this point you should try to let go of what you’ve read previously. There’s a really powerful Apache project taking care of the ingestion process for you. Open the Flume users manual in a new page to keep as a further reference.
# SOURCE (TCPDUMP) a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.shell = /bin/bash -c a1.sources.r1.command = /usr/sbin/tcpdump -e -s0 -nni eth0 -w - # ^snaplength=65535 # SINK (HDFS) a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.codeC = Snappy a1.sinks.k1.hdfs.filePrefix = pcap a1.sinks.k1.hdfs.fileSuffix = .snappy a1.sinks.k1.hdfs.fileType = SequenceFile a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollSize = 5000000000 a1.sinks.k1.hdfs.rollInterval = 0 # if running standalone setup enable line below #a1.sinks.k1.hdfs.minBlockReplicas = 1 a1.sinks.k1.hdfs.writeFormat = Writable a1.sinks.k1.hdfs.path = /user/ingest/pcap/%y/%m/%d/%H # INTERCEPTORS (TIMESTAMP FOR HDFS PATH) a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder # SINK (LOGGER) # for debugging purposes a1.sinks.k2.type = logger # CHANNEL (MEM) a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 ## bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
We’ve also added compression to the datasink going to HDFS in the configuration over. There are some performance tradeoffs involved in that, but you would actually save about 40% of the diskspace and increase disk I/O by 25% [source] this way (10GB of uncompressed data equals 6GB og compressed data). In this configuration we use the Google Snappy algorithm [Google Code Page]. Actual performance all depends on your data of course, so it will need to be tested. Additionally you would like a splittable compression algorithm, that is pretty important.
When you stream data to from a network interface you should be aware that TCPDUMP requires privileges in order to let you run it. What you are going to do is to create Flume user named “ingest” locally on the client, and a directory in HDFS for it to store the network data.
su - hdfs -c "hdfs dfs -mkdir /user/ingest" su - hdfs -c "hdfs dfs -chown ingest /user/ingest"
Drop the privileges required to use tcpdump in order to let the ingest-user run it on the client.
groupadd tcpdump chown root.tcpdump /usr/sbin/tcpdump chmod 750 /usr/sbin/tcpdump setcap "CAP_NET_RAW+eip" /usr/sbin/tcpdump
Now you might recall that tcpdump writes a global header to the data stream. That one is 24 bytes long and you’d want to strip it from the stream. This isn’t a really good optimized solution, but you can use cut to remove the first bytes (it will continuously cost you some CPU resources since it is not network dependent, but CPU-driven, the pipe):
tcpdump -e -nni eth0 -s 0 -w - |cut -b 25-
Remember that TCPDUMP has buffered output when piped, so use stdbuf if you’d like it to stream the data right away. You should verify that timestamp coming in the first package header (it’s little endian so use od). Also, create a shell-script and change your command on the first flume source to run that.
a1.sources.r1.command = /usr/local/sbin/tcpdump_streamer.sh
Finally, let us start it and see how it does. You should start seeing logger-events straight away. When you put it into production, remember to comment the k2-sink.
flume-ng agent -n a1 -c conf -f /home/ingest/configs/tcpdump_stream.conf
http://<sandbox-IP>:50070/dfshealth.jsp and point the file browser to the /user/ingest directory. That should give you something like this (download some large files):
So, in short there are a couple of things that you should take away from this:
- Traditional stream-based tools should be reviewed, but will probably be needed to be adapted for MapReduce
- There’s a couple of existing libraries for PCAP, but try to keep it simple using the ingestion tools for Hadoop (normally that would be Flume)
- Read up on the plugins of Flume
- Read up on the structure of your input format
- Use compression