diff options
Diffstat (limited to 'MonkeyTest/monkeytest.py')
| -rw-r--r-- | MonkeyTest/monkeytest.py | 194 |
1 files changed, 194 insertions, 0 deletions
diff --git a/MonkeyTest/monkeytest.py b/MonkeyTest/monkeytest.py new file mode 100644 index 0000000..830e43e --- /dev/null +++ b/MonkeyTest/monkeytest.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python +''' +MonkeyTest -- test your hard drive read-write speed in Python +A simplistic script to show that such system programming +tasks are possible and convenient to be solved in Python + +The file is being created, then written with random data, randomly read +and deleted, so the script doesn't waste your drive + +(!) Be sure, that the file you point to is not something + you need, cause it'll be overwritten during test + +Runs on both Python3 and 2, despite that I prefer 3 +Has been tested on 3.5 and 2.7 under ArchLinux +Has been tested on 3.5.2 under Ubuntu Xenial +''' +from __future__ import division, print_function # for compatability with py2 + +import os, sys +from random import shuffle +import argparse +import json + +ASCIIART = r'''Brought to you by coding monkeys. +Eat bananas, drink coffee & enjoy! + _ + ,//) + ) / + / / + _,^^,/ / + (G,66<_/ + _/\_,_) _ + / _ \ ,' ) + / /"\ \/ ,_\ + __(,/ > e ) / (_\.oO + \_ / ( -,_/ \_/ + U \_, _) + ( / + >/ + (.oO +''' +# ASCII-art: used part of text-image @ http://www.ascii-art.de/ascii/mno/monkey.txt +# it seems that its original author is Mic Barendsz (mic aka miK) +# text-image is a bit old (1999) so I couldn't find a way to communicate with author +# if You're reading this and You're an author -- feel free to write me + +try: # if Python >= 3.3 use new high-res counter + from time import perf_counter as time +except ImportError: # else select highest available resolution counter + if sys.platform[:3] == 'win': + from time import clock as time + else: + from time import time + + +def get_args(): + parser = argparse.ArgumentParser(description='Arguments', formatter_class = argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('-f', '--file', + required=False, + action='store', + default='/tmp/monkeytest', + help='The file to read/write to') + parser.add_argument('-s', '--size', + required=False, + action='store', + type=int, + default=128, + help='Total MB to write') + parser.add_argument('-w', '--write-block-size', + required=False, + action='store', + type=int, + default=1024, + help='The block size for writing in bytes') + parser.add_argument('-r', '--read-block-size', + required=False, + action='store', + type=int, + default=512, + help='The block size for reading in bytes') + parser.add_argument('-j', '--json', + required=False, + action='store', + help='Output to json file') + args = parser.parse_args() + return args + + +class Benchmark: + + def __init__(self, file,write_mb, write_block_kb, read_block_b): + self.file = file + self.write_mb = write_mb + self.write_block_kb = write_block_kb + self.read_block_b = read_block_b + wr_blocks = int(self.write_mb * 1024 / self.write_block_kb) + rd_blocks = int(self.write_mb * 1024 * 1024 / self.read_block_b) + self.write_results = self.write_test( 1024 * self.write_block_kb, wr_blocks) + self.read_results = self.read_test(self.read_block_b, rd_blocks) + + def write_test(self, block_size, blocks_count, show_progress=True): + ''' + Tests write speed by writing random blocks, at total quantity + of blocks_count, each at size of block_size bytes to disk. + Function returns a list of write times in sec of each block. + ''' + f = os.open(self.file, os.O_CREAT | os.O_WRONLY, 0o777) # low-level I/O + + took = [] + for i in range(blocks_count): + if show_progress: + # dirty trick to actually print progress on each iteration + sys.stdout.write('\rWriting: {:.2f} %'.format( + (i + 1) * 100 / blocks_count)) + sys.stdout.flush() + buff = os.urandom(block_size) + start = time() + os.write(f, buff) + os.fsync(f) # force write to disk + t = time() - start + took.append(t) + + os.close(f) + return took + + def read_test(self, block_size, blocks_count, show_progress=True): + ''' + Performs read speed test by reading random offset blocks from + file, at maximum of blocks_count, each at size of block_size + bytes until the End Of File reached. + Returns a list of read times in sec of each block. + ''' + f = os.open(self.file, os.O_RDONLY, 0o777) # low-level I/O + # generate random read positions + offsets = list(range(0, blocks_count * block_size, block_size)) + shuffle(offsets) + + took = [] + for i, offset in enumerate(offsets, 1): + if show_progress and i % int(self.write_block_kb * 1024 / self.read_block_b) == 0: + # read is faster than write, so try to equalize print period + sys.stdout.write('\rReading: {:.2f} %'.format( + (i + 1) * 100 / blocks_count)) + sys.stdout.flush() + start = time() + os.lseek(f, offset, os.SEEK_SET) # set position + buff = os.read(f, block_size) # read from position + t = time() - start + if not buff: break # if EOF reached + took.append(t) + + os.close(f) + return took + + def print_result(self): + result = ('\n\nWritten {} MB in {:.4f} s\nWrite speed is {:.2f} MB/s' + '\n max: {max:.2f}, min: {min:.2f}\n'.format( + self.write_mb, sum(self.write_results), self.write_mb / sum(self.write_results), + max=self.write_block_kb / (1024 * min(self.write_results)), + min=self.write_block_kb / (1024 * max(self.write_results)))) + result += ('\nRead {} x {} B blocks in {:.4f} s\nRead speed is {:.2f} MB/s' + '\n max: {max:.2f}, min: {min:.2f}\n'.format( + len(self.read_results), self.read_block_b, + sum(self.read_results), self.write_mb / sum(self.read_results), + max=self.read_block_b / (1024 * 1024 * min(self.read_results)), + min=self.read_block_b / (1024 * 1024 * max(self.read_results)))) + print(result) + print(ASCIIART) + + + def get_json_result(self,output_file): + results_json = {} + results_json["Written MB"] = self.write_mb + results_json["Write time (sec)"] = round(sum(self.write_results),2) + results_json["Write speed in MB/s"] = round(self.write_mb / sum(self.write_results),2) + results_json["Read blocks"] = len(self.read_results) + results_json["Read time (sec)"] = round(sum(self.read_results),2) + results_json["Read speed in MB/s"] = round(self.write_mb / sum(self.read_results),2) + with open(output_file,'w') as f: + json.dump(results_json,f) + + +def main(): + args = get_args() + benchmark = Benchmark(args.file, args.size, args.write_block_size, args.read_block_size) + if args.json is not None: + benchmark.get_json_result(args.json) + else: + benchmark.print_result() + os.remove(args.file) + + +if __name__ == "__main__": + main() |
