An OpenIOC Graph: A Different Kind of Rule Scheme

Despite that I think that implementing a full-fledged XML-editor is too complex for an operational scenario, I believe the OpenIOC-format, which has been in the works at Mandiant for a couple of years now, is quite good. They also have the IOC Writer which was launched at last summers Black Hat. OpenIOC can export to other expression languages, such as Yara, as well.

I have been thinking of a way to combine graph knowledge with exactly that for a while, an expressive detection language based on a graph. If combining two things you love, I have learned that it simply can't end badly, it must end with something amazing. Let's give it a try!

So I went about it, starting off by importing a sample Maltego-graph to Titan on HBase. I basically set out with five connected nodes in Maltego Tungsten. Nothing malicious, just a national newspaper.

Running that through my Rexster migration script results in something like this:

It's nice considering if you'd like to put it in a larger context with millions or billions of vertices you would like to trigger on. That is out of bounds for Maltego, or your desktop system in general.

The OpenIOC Part

If looking at the graphs above, you will probably agree that it isn't especially describing of certain incidents or other contextual data. But what if we could combine the graph with something like OpenIOC? Turns out that it's conceptually similar. The weakness of OpenIOC is that it doesn't scale when firing up an OpenIOC editor - like the one Mandiant have created. On the other hand, if you could traverse a graph with OpenIOC designed around the OpenIOC format..

Let's create a basic writer as a demonstration, which operates on the root level (no nesting of rules in this example).

from ioc_writer import ioc_api
from lxml import etree as et

class IOC:
    def __init__(self):
        self.IOC = ioc_api.IOC(name='Test', description='An IOC generated from a Python script', author='Someone')

        self.IOC.set_created_date()
        self.IOC.set_published_date()
        self.IOC.set_lastmodified_date()
        self.IOC.update_name('test_rexster')
        self.IOC.update_description('A Test')
        self.id = self.IOC.iocid

    def addNode(self,label,text,type,indicator,condition='is'):
    IndicatorItem_node = ioc_api.make_IndicatorItem_node(condition, label, text, type, indicator)
        current_guid = IndicatorItem_node.attrib['id']
        print current_guid
        self.IOC.top_level_indicator.append(IndicatorItem_node)

    def __str__(self):
        self.xml = et.tostring(self.IOC.root, encoding='utf-8', xml_declaration=True, pretty_print=True)
        return self.xml

This enables us to do something like this:

ioc = IOC()
ioc.addNode('test','Just a test','domain','vg.no')
print ioc

Which will again return the XML of the IOC.

<?xml version='1.0' encoding='utf-8'?>
<OpenIOC xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://openioc.org/schemas/OpenIOC_1.1" id="06fd70db-992c-4678-83e6-8f1b150e8bcf" last-modified="2014-01-28T07:15:09" published-date="2014-01-28T07:15:09">
  <metadata>
    <short_description>test</short_description>
    <description>A Test</description>
    <keywords/>
    <authored_by>Someone</authored_by>
    <authored_date>2014-01-28T07:15:09</authored_date>
    <links/>
  </metadata>
  <criteria>
    <Indicator id="fbbb2883-473a-4a1c-92c4-692e199adb61" operator="OR">
      <IndicatorItem id="14a42d26-b056-4b2e-a327-7d6edb25457e" condition="is" preserve-case="false" negate="false">
        <Context document="test" search="Just a test" type="mir"/>
        <Content type="domain">vg.no</Content>
        <IndicatorItem id="dff6e0c5-613b-4bea-8bad-bb7a36b3ccdf" condition="is" preserve-case="false" negate="false">
          <Context document="test" search="Just a test" type="mir"/>
          <Content type="ip">195.88.55.16</Content>
        </IndicatorItem>
      </IndicatorItem>
    </Indicator>
  </criteria>
  <parameters/>
</OpenIOC>

Reviewing the XML above you might notice that the scheme is pretty transferrable to a graph, perhaps even simplifying of the IOC XML. Be especially aware on the following tags and attributes:

  • Content
  • The IndicatorItem condition
  • The content type

A nested IOC might look like this (relevant excerpt):

    <Indicator id="b12f8c27-d168-49b5-bc75-cec86bf21d3f" operator="OR">
      <IndicatorItem id="af4323dc-a967-4fe3-b62f-b461b90a3550" condition="is" preserve-case="false" negate="false">
        <Context document="test" search="Just a test" type="mir"/>
        <Content type="domain">vg.no</Content>
        <IndicatorItem id="2ff639ca-dcec-4967-ac06-f54989bf3dc4" condition="is" preserve-case="false" negate="false">
          <Context document="test" search="Just a test" type="mir"/>
          <Content type="ip">195.88.55.16</Content>
        </IndicatorItem>
      </IndicatorItem>
    </Indicator>

The above implies that the domain vg.no needs to be accompanied with the IP-address 195.88.55.16.

Merging the Best of Two Worlds

So now that we have had a look at the power in the structure of a graph and the power of expression in the OpenIOC XML-indicators, you might see why this is the best of two worlds.

In the challenge of combining them both I perhaps oversimplified the nesting and used the two previously mentioned attributes in the graph, adding the content as the value of the node and the condition. We will also have to add the type attribute since that tells us what type of OpenIOC entry we have when reversing the process later on. We will have a small collision between Maltego and OpenIOC, since for instance an IP-address type will differ. So for now you will need two type attributes, one for Maltego and one for OpenIOC (if you plan to go both ways). This is left as an exersise for the reader.

Creating an OpenIOC-compatible graph is a breeze:

from rexpro import RexProConnection

class Graph:
    def __init__(self):
        self.graph = RexProConnection('localhost',8184,'titan')

    def addVertice(self,content,content_type,condition):
        vertice_id = self.graph.execute("""
            def v1 = g.addVertex([content:content,content_type:content_type,condition:condition])
            return v1""", 
            {'content':content, 'content_type':content_type, 'condition':condition})
        return vertice_id

    def addEdge(self,vid1,vid2,label):
        edge = self.graph.execute("""
            def v1 = g.v(vid1)
            def v2 = g.v(vid2)
            g.addEdge(v1, v2, label)
            g.commit()""",{'vid1':vid1['_id'], 'vid2':vid2['_id'], 'label':label})

graph=Graph()
v1=graph.addVertice('vg.no','domain','is')
v2=graph.addVertice('195.88.55.16','ip','is')
graph.addEdge(v1,v2,'and')

If you'd like to go the other way again in order to talk to other organisations perhaps, you will want to run the process in reverse:

from rexpro import RexProConnection

class RexsterIOC:
    def __init__(self):
        self.graph = RexProConnection('localhost',8184,'titan')

        self.IOC = ioc_api.IOC(name='Test', description='A test IOC generated from Rexster', author='Someone')

        self.IOC.set_created_date()
        self.IOC.set_published_date()
        self.IOC.set_lastmodified_date()
        #IOC.add_link('help', self.baseurl + url)
        self.IOC.update_name('test')
        self.IOC.update_description('A Test')
        self.id = self.IOC.iocid
        self.lastId=None

    def     addNode(self,label,text,type,indicator,condition='is',addToLast=False):
        IndicatorItem_node = ioc_api.make_IndicatorItem_node(condition, label, text, type, indicator)

        if addToLast and self.last:
            self.last.append(IndicatorItem_node)
        else:
            self.IOC.top_level_indicator.append(IndicatorItem_node)

        current_guid = IndicatorItem_node.attrib['id']
        self.last  = IndicatorItem_node

    def traverse(self,rootNodeId):
        root=self.graph.execute("""return g.v(80284)""",{'vid':str(rootNodeId)})
        self.addNode('test','Just a test',
            root['_properties']['content_type'],
            root['_properties']['content'],
            root['_properties']['condition'])

        one_level_out=self.graph.execute("""return g.v(vid).out""",{'vid':str(rootNodeId)})
        for vertex in one_level_out:
            self.addNode('test','Just a test',
                vertex['_properties']['content_type'],
                vertex['_properties']['content'],
                vertex['_properties']['condition'],addToLast=True)      

    def __str__(self):
        self.xml = et.tostring(self.IOC.root, encoding='utf-8', xml_declaration=True, pretty_print=True)
        return self.xml

ioc = RexsterIOC()
ioc.traverse(80284) # the root node
print ioc

One thing that you can now do is to store the indicators with the rest of your network data. This again will imply that the edges are created automatically without any need to actually run jobs to combine data for detecting stuff.

That's my small concept demonstration. I think it's pretty cool!

I've put the scripts out there for you if you'd like to give it a try.

Tommy

Tommy is an analyst and incident handler with more than seven years of experience from the government and private industry. He holds an M.Sc. in Digital Forensics and a B.Tech. in information security