Faster API Calls

I have been building out APIs to automate digital forensics and incident response analysis, so artifacts are quickly ready for review during an investigation. I was using the ‘requests’ library from python but wanted to speed up the process. I found that using the ‘asyncio’ library provided me with improved performance to read a line delimited file, make a web request and write the results to disk. It was super useful, so I wanted to share a code snippet in case it could help others!

asyncio

I had to use ‘aiohttp.TCPConnector(ssl=False)’ on my MacBook but not my Ubuntu system for a certificate verification error, FYI.

import aiofiles
import aiohttp
import asyncio
import async_timeout

inputfile = 'input.txt'
outputfile = 'output.txt'

key = '<key>'
connection = aiohttp.TCPConnector(ssl=False)

async def fetch(session, url):
  with async_timeout.timeout(10):
    async with session.get(url,headers={'x-api-key':key}) as response:
      return await response.text()

async def main(loop):
  async with aiohttp.ClientSession(loop=loop,connector=connection) as session:
    async with aiofiles.open(inputfile,'r') as f:
      async with aiofiles.open(outputfile,'w') as w:
        async for line in f:
          url = 'https://sha256.tundralabs.net/'+str(line[:-1])
          jsonouput = await fetch(session,url)
          await w.write(jsonouput+'\n')

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

Happy Coding!
John Lukach

MatchMeta.Info v2

I have been using full paths from gold builds since the Fall of 2014 to match meta information. Files and directories that are hidden in plain sight by character substitutions, misspelling, etc. are more visible to the investigator by this analysis. The project started as a Python script with lookups against a stand-alone SQLite database containing Microsoft Windows installs from Technet. In the Spring of 2016, I switched to an API written in Twisted Python for filename comparisons from the NIST NSRL.

Recently, cloud computing has allowed me to reduce API infrastructure management and expand MatchMeta.Info to additional operating systems: Amazon Linux, CentOS, Debian, Red Hat Enterprise, Suse, Ubuntu, and Windows.

The full path of a file or directory first needs to be normalized before generating a SHA256 hash. First I test if the full path is from a Unix or Windows operating system. If it is from a Microsoft system, the drive letter is forced to ‘C’ as default. Next, I test for home directories so the username can be standardized. A SHA256 hash is generated from the final full path and compared with an API request.

import hashlib
import requests

### USERS ###
# admin = DEB8 DEB9
# Administrator = WIN2K3 WIN2K8 WIN2K12 WIN2K16
# centos = CENTOS6 CENTOS7
# ec2-user = AMZN1 AMZN2 RHEL7 SUSE12 SUSE15
# ubuntu = UBUNTU14 UBUNTU16 UBUNTU18

### PATHS ###
# DIR  = C:\Users\Administrator
# DIR  = /home/ubuntu
# FILE = C:\Users\Administrator\NTUSER.DAT
# FILE = /home/ubuntu/.ssh/authorized_keys

unix = 'ubuntu'
path = r'D:\Users\John\NTUSER.DAT'

if path[:1] == '/': ### UNIX
    out = path.split('/')
    if out[1] == 'home':
        out[2] = unix
        path = '/'.join(out)
elif path[1] == ':': ### WINDOWS
    new = list(path)
    new[0] = 'C'
    path = (''.join(new))
    out = path.split('\\')
    if out[1] == 'Users' or out[1] == 'Documents and Settings':
        out[2] = 'Administrator'
        path = '\\'.join(out)

hash_object = hashlib.sha256(path.encode())
hash_value = hash_object.hexdigest()

r = requests.get('https://api.matchmeta.info/'+hash_value.upper(), 
                  headers={'x-api-key': '<key>'})

print(r.text)

In the future, I hope to manage the API keys through the AWS Marketplace to allow others access to MatchMeta.Info.

Happy Coding!
John Lukach

AWS Pseudo Pipeline

I have been running my Forensic Artifact API on Ubuntu with a Nginx, Flask Python, and MariaDB stack. I wanted to get out of the infrastructure administration business by moving to the AWS Cloud. I decided to start with the migration of my SHA256 hash library. My goals were to improve availability, allow collaboration and keep the costs down. I wound up having an expensive learning experience while importing the data into DynamoDB!

Forensic Artifact API Diagram

I decided to use the Amazon Web Services (AWS) Boto3 SDK for Python so I could read from an S3 bucket with an EC2 instance that inserts into a DynamoDB table. I was able to read the line-delimited text file of SHA256 hashes as a stream minimizing the amount of memory required on the EC2 instance for Python. Batch writing of items into the DynamoDB table can use a maximum set of twenty-five. I set the batch volume with ‘range’ in the for loop that must match the minimum provisioned capacity for auto-scaling startup. Global tables being used to replicate DynamoDB across regions needs to match ‘range’ until the first auto-scale completes.

import boto3

def import_hash(hashlist,hashtype,hashsrc,hashdesc):

    client = boto3.client('s3')
    resource = boto3.resource('s3')
    matchmeta = resource.Bucket('bucketname')
    obj = client.get_object(Bucket='bucketname', Key=hashlist)

    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('sha256')

    while True:
        with table.batch_writer() as batch:
            for i in range(25):
                item = obj['Body']._raw_stream.readline()[:-2].decode('utf-8')
                if not item: break 
                batch.put_item(Item={'sha256':item.upper(),'type':hashtype,'source':hashsrc,'desc':hashdesc})
        if not item: break

import_hash('Folder/File.txt','Known','HashSets.com','Windows')

DynamoDB has an issue if read/writes go to ‘zero’ that auto-scaling will not reduce down to the minimum provisioned capacity. I needed to use a time-based CloudWatch event to execute a Lambda function to generate regular database activity.

import boto3

dynamodb = boto3.resource('dynamodb')

def lambda_handler(event, context):

    table = dynamodb.Table('sha256')
    table.get_item(Key={'sha256':'0000000000000000000000000000000000000000000000000000000000000000','type':'TEST'})
    table.put_item(Item={'sha256':'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF','type':'TEST','source':'JOHN','desc':'PING'})

    return

Happy Coding!

John Lukach
@jblukach