summaryrefslogtreecommitdiff
path: root/MonkeyTest/monkeytest.py
diff options
context:
space:
mode:
Diffstat (limited to 'MonkeyTest/monkeytest.py')
-rw-r--r--MonkeyTest/monkeytest.py194
1 files changed, 194 insertions, 0 deletions
diff --git a/MonkeyTest/monkeytest.py b/MonkeyTest/monkeytest.py
new file mode 100644
index 0000000..830e43e
--- /dev/null
+++ b/MonkeyTest/monkeytest.py
@@ -0,0 +1,194 @@
+#!/usr/bin/env python
+'''
+MonkeyTest -- test your hard drive read-write speed in Python
+A simplistic script to show that such system programming
+tasks are possible and convenient to be solved in Python
+
+The file is being created, then written with random data, randomly read
+and deleted, so the script doesn't waste your drive
+
+(!) Be sure, that the file you point to is not something
+ you need, cause it'll be overwritten during test
+
+Runs on both Python3 and 2, despite that I prefer 3
+Has been tested on 3.5 and 2.7 under ArchLinux
+Has been tested on 3.5.2 under Ubuntu Xenial
+'''
+from __future__ import division, print_function # for compatability with py2
+
+import os, sys
+from random import shuffle
+import argparse
+import json
+
+ASCIIART = r'''Brought to you by coding monkeys.
+Eat bananas, drink coffee & enjoy!
+ _
+ ,//)
+ ) /
+ / /
+ _,^^,/ /
+ (G,66<_/
+ _/\_,_) _
+ / _ \ ,' )
+ / /"\ \/ ,_\
+ __(,/ > e ) / (_\.oO
+ \_ / ( -,_/ \_/
+ U \_, _)
+ ( /
+ >/
+ (.oO
+'''
+# ASCII-art: used part of text-image @ http://www.ascii-art.de/ascii/mno/monkey.txt
+# it seems that its original author is Mic Barendsz (mic aka miK)
+# text-image is a bit old (1999) so I couldn't find a way to communicate with author
+# if You're reading this and You're an author -- feel free to write me
+
+try: # if Python >= 3.3 use new high-res counter
+ from time import perf_counter as time
+except ImportError: # else select highest available resolution counter
+ if sys.platform[:3] == 'win':
+ from time import clock as time
+ else:
+ from time import time
+
+
+def get_args():
+ parser = argparse.ArgumentParser(description='Arguments', formatter_class = argparse.ArgumentDefaultsHelpFormatter)
+ parser.add_argument('-f', '--file',
+ required=False,
+ action='store',
+ default='/tmp/monkeytest',
+ help='The file to read/write to')
+ parser.add_argument('-s', '--size',
+ required=False,
+ action='store',
+ type=int,
+ default=128,
+ help='Total MB to write')
+ parser.add_argument('-w', '--write-block-size',
+ required=False,
+ action='store',
+ type=int,
+ default=1024,
+ help='The block size for writing in bytes')
+ parser.add_argument('-r', '--read-block-size',
+ required=False,
+ action='store',
+ type=int,
+ default=512,
+ help='The block size for reading in bytes')
+ parser.add_argument('-j', '--json',
+ required=False,
+ action='store',
+ help='Output to json file')
+ args = parser.parse_args()
+ return args
+
+
+class Benchmark:
+
+ def __init__(self, file,write_mb, write_block_kb, read_block_b):
+ self.file = file
+ self.write_mb = write_mb
+ self.write_block_kb = write_block_kb
+ self.read_block_b = read_block_b
+ wr_blocks = int(self.write_mb * 1024 / self.write_block_kb)
+ rd_blocks = int(self.write_mb * 1024 * 1024 / self.read_block_b)
+ self.write_results = self.write_test( 1024 * self.write_block_kb, wr_blocks)
+ self.read_results = self.read_test(self.read_block_b, rd_blocks)
+
+ def write_test(self, block_size, blocks_count, show_progress=True):
+ '''
+ Tests write speed by writing random blocks, at total quantity
+ of blocks_count, each at size of block_size bytes to disk.
+ Function returns a list of write times in sec of each block.
+ '''
+ f = os.open(self.file, os.O_CREAT | os.O_WRONLY, 0o777) # low-level I/O
+
+ took = []
+ for i in range(blocks_count):
+ if show_progress:
+ # dirty trick to actually print progress on each iteration
+ sys.stdout.write('\rWriting: {:.2f} %'.format(
+ (i + 1) * 100 / blocks_count))
+ sys.stdout.flush()
+ buff = os.urandom(block_size)
+ start = time()
+ os.write(f, buff)
+ os.fsync(f) # force write to disk
+ t = time() - start
+ took.append(t)
+
+ os.close(f)
+ return took
+
+ def read_test(self, block_size, blocks_count, show_progress=True):
+ '''
+ Performs read speed test by reading random offset blocks from
+ file, at maximum of blocks_count, each at size of block_size
+ bytes until the End Of File reached.
+ Returns a list of read times in sec of each block.
+ '''
+ f = os.open(self.file, os.O_RDONLY, 0o777) # low-level I/O
+ # generate random read positions
+ offsets = list(range(0, blocks_count * block_size, block_size))
+ shuffle(offsets)
+
+ took = []
+ for i, offset in enumerate(offsets, 1):
+ if show_progress and i % int(self.write_block_kb * 1024 / self.read_block_b) == 0:
+ # read is faster than write, so try to equalize print period
+ sys.stdout.write('\rReading: {:.2f} %'.format(
+ (i + 1) * 100 / blocks_count))
+ sys.stdout.flush()
+ start = time()
+ os.lseek(f, offset, os.SEEK_SET) # set position
+ buff = os.read(f, block_size) # read from position
+ t = time() - start
+ if not buff: break # if EOF reached
+ took.append(t)
+
+ os.close(f)
+ return took
+
+ def print_result(self):
+ result = ('\n\nWritten {} MB in {:.4f} s\nWrite speed is {:.2f} MB/s'
+ '\n max: {max:.2f}, min: {min:.2f}\n'.format(
+ self.write_mb, sum(self.write_results), self.write_mb / sum(self.write_results),
+ max=self.write_block_kb / (1024 * min(self.write_results)),
+ min=self.write_block_kb / (1024 * max(self.write_results))))
+ result += ('\nRead {} x {} B blocks in {:.4f} s\nRead speed is {:.2f} MB/s'
+ '\n max: {max:.2f}, min: {min:.2f}\n'.format(
+ len(self.read_results), self.read_block_b,
+ sum(self.read_results), self.write_mb / sum(self.read_results),
+ max=self.read_block_b / (1024 * 1024 * min(self.read_results)),
+ min=self.read_block_b / (1024 * 1024 * max(self.read_results))))
+ print(result)
+ print(ASCIIART)
+
+
+ def get_json_result(self,output_file):
+ results_json = {}
+ results_json["Written MB"] = self.write_mb
+ results_json["Write time (sec)"] = round(sum(self.write_results),2)
+ results_json["Write speed in MB/s"] = round(self.write_mb / sum(self.write_results),2)
+ results_json["Read blocks"] = len(self.read_results)
+ results_json["Read time (sec)"] = round(sum(self.read_results),2)
+ results_json["Read speed in MB/s"] = round(self.write_mb / sum(self.read_results),2)
+ with open(output_file,'w') as f:
+ json.dump(results_json,f)
+
+
+def main():
+ args = get_args()
+ benchmark = Benchmark(args.file, args.size, args.write_block_size, args.read_block_size)
+ if args.json is not None:
+ benchmark.get_json_result(args.json)
+ else:
+ benchmark.print_result()
+ os.remove(args.file)
+
+
+if __name__ == "__main__":
+ main()