1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 import scalaris
16 from datetime import datetime
17 from threading import Thread
18 import time, threading
19 import random, string
20 import os, sys, traceback
21
22 _BENCH_DATA_SIZE = 1000
23 """The size of a single data item that is send to scalaris."""
24 _benchTime = 0
25 """This is used to create different erlang keys for each run."""
26 _PERCENT_TO_REMOVE = 5
27 """Cut 5% off of both ends of the result list."""
28 _TESTRUNS = 1;
29 """Number of test runs (accumulates results over all test runs)."""
30
31 if 'SCALARIS_JSON_URLS' in os.environ and os.environ['SCALARIS_JSON_URLS'] != '':
32 DEFAULT_URLS = os.environ['SCALARIS_JSON_URLS'].split()
33 else:
34 DEFAULT_URLS = [scalaris.DEFAULT_URL]
35
36 -def minibench(operations, threads_per_node, benchmarks):
37 """
38 Default minimal benchmark.
39
40 Tests some strategies for writing key/value pairs to scalaris:
41 1) writing binary objects (random data, size = _BENCH_DATA_SIZE)
42 2) writing string objects (random data, size = _BENCH_DATA_SIZE)
43 each with the given number of consecutive operations and parallel
44 threads per Scalaris node,
45 * first using a new Transaction or TransactionSingleOp for each test,
46 * then using a new Transaction or TransactionSingleOp but re-using a single connection,
47 * and finally re-using a single Transaction or TransactionSingleOp object.
48 """
49
50
51 _benchTime = _getCurrentMillis()
52
53 parallel_runs = len(DEFAULT_URLS) * threads_per_node;
54 print 'Number of available nodes: ' + str(len(DEFAULT_URLS))
55 print '-> Using ' + str(parallel_runs) + ' parallel instances per test run...'
56 sys.stdout.flush()
57
58 print 'Benchmark of scalaris.TransactionSingleOp:'
59 sys.stdout.flush()
60 test_types = ['binary', 'string']
61 test_types_str = ['B', 'S']
62 columns = ['TransactionSingleOp.write(string, bytearray)',
63 'TransactionSingleOp.write(string, string)']
64 test_bench = [TransSingleOpBench1, TransSingleOpBench2, TransSingleOpBench3]
65 rows = ['separate connection', 're-use connection', 're-use object']
66 test_group = 'transsinglebench';
67 results = _getResultArray(rows, columns)
68 _runBenchAndPrintResults(benchmarks, results, columns, rows, test_types,
69 test_types_str, test_bench, test_group, 1, operations, parallel_runs)
70
71 print '-----'
72 print 'Benchmark of scalaris.Transaction:'
73 sys.stdout.flush()
74 test_types = ['binary', 'string']
75 test_types_str = ['B', 'S']
76 columns = ['Transaction.write(string, bytearray)',
77 'Transaction.write(string, string)']
78 test_bench = [TransBench1, TransBench2, TransBench3]
79 rows = ['separate connection', 're-use connection', 're-use object']
80 test_group = 'transbench';
81 results = _getResultArray(rows, columns)
82 _runBenchAndPrintResults(benchmarks, results, columns, rows, test_types,
83 test_types_str, test_bench, test_group, 1, operations, parallel_runs)
84
85 print '-----'
86 print 'Benchmark incrementing an integer key (read+write):'
87 sys.stdout.flush()
88 test_types = ['int']
89 test_types_str = ['I']
90 columns = ['Transaction.add_add_on_nr(string, int)']
91 test_bench = [TransIncrementBench1, TransIncrementBench2, TransIncrementBench3]
92 rows = ['separate connection', 're-use connection', 're-use object']
93 test_group = 'transbench_inc';
94 results = _getResultArray(rows, columns)
95 _runBenchAndPrintResults(benchmarks, results, columns, rows, test_types,
96 test_types_str, test_bench, test_group, 7, operations, parallel_runs)
97
98 print '-----'
99 print 'Benchmark read 5 + write 5:'
100 sys.stdout.flush()
101 test_types = ['binary', 'string']
102 test_types_str = ['B', 'S']
103 columns = ['Transaction.read(string) + Transaction.write(string, binary)',
104 'Transaction.read(string) + Transaction.write(string, string)']
105 test_bench = [TransRead5Write5Bench1, TransRead5Write5Bench2, TransRead5Write5Bench3]
106 rows = ['separate connection', 're-use connection', 're-use object']
107 test_group = 'transbench_r5w5';
108 results = _getResultArray(rows, columns)
109 _runBenchAndPrintResults(benchmarks, results, columns, rows, test_types,
110 test_types_str, test_bench, test_group, 10, operations, parallel_runs)
111
112 print '-----'
113 print 'Benchmark appending to a String list (read+write):'
114 sys.stdout.flush()
115 test_types = ['string']
116 test_types_str = ['S']
117 columns = ['Transaction.add_add_del_on_list(string, [string], [])']
118 test_bench = [TransAppendToListBench1, TransAppendToListBench2, TransAppendToListBench3]
119 rows = ['separate connection', 're-use connection', 're-use object']
120 test_group = 'transbench_append';
121 results = _getResultArray(rows, columns)
122 _runBenchAndPrintResults(benchmarks, results, columns, rows, test_types,
123 test_types_str, test_bench, test_group, 16, operations, parallel_runs)
124
126 """
127 Abstract base class of a test run that is to be run in a thread.
128 """
129
130 - def __init__(self, key, value, operations):
131 """
132 Create a new runnable.
133 """
134 Thread.__init__(self)
135 self._key = key
136 self._value = value
137 self._operations = operations
138
139 self._shouldStop = False
140 self._timeAtStart = 0
141 self._speed = -1
142
144 """
145 Call this method when a benchmark is started.
146 Sets the time the benchmark was started.
147 """
148 self._timeAtStart = _getCurrentMillis()
149
151 """
152 Call this method when a benchmark is finished.
153 Calculates the time the benchmark took and the number of transactions
154 performed during this time.
155 """
156 timeTaken = _getCurrentMillis() - self._timeAtStart
157 speed = (testRuns * 1000) / timeTaken
158 return speed
159
161 """
162 Will be called before the benchmark starts with all possible
163 variations of "j" in the operation() call.
164 "j" with None is the overall initialisation run at first.
165 """
166 pass
167
169 """
170 Will be called at the start of the benchmark.
171 """
172 pass
173
175 """
176 Will be called before the end of the benchmark.
177 """
178 pass
179
181 """
182 The operation to execute during the benchmark.
183 """
184 pass
185
187 threading.currentThread().name = "BenchRunnable-" + self._key
188 retry = 0
189 while (retry < 3) and (not self._shouldStop):
190 try:
191 self.pre_init()
192 for j in xrange(self._operations):
193 self.pre_init(j)
194 self._testBegin()
195 self.init()
196 for j in xrange(self._operations):
197 self.operation(j)
198 self.cleanup()
199 self._speed = self._testEnd(self._operations)
200 break
201 except:
202
203 pass
204 retry += 1
205
208
210 self._shouldStop = True
211
213 - def __init__(self, key, value, operations):
218
220 """
221 Will be called at the start of the benchmark.
222 """
223 self._connection = _getConnection()
224
226 """
227 Will be called before the end of the benchmark.
228 """
229 self._connection.close()
230
232 """
233 Performs a benchmark writing objects using a new TransactionSingleOp object
234 for each test.
235 """
236 - def __init__(self, key, value, operations):
238
243
245 """
246 Performs a benchmark writing objects using a new TransactionSingleOp but
247 re-using a single connection for each test.
248 """
249 - def __init__(self, key, value, operations):
251
255
257 """
258 Performs a benchmark writing objects using a single TransactionSingleOp
259 object for all tests.
260 """
261 - def __init__(self, key, value, operations):
263
266
269
271 self._tx.write(self._key + '_' + str(j), self._value)
272
274 """
275 Performs a benchmark writing objects using a new Transaction for each test.
276 """
277 - def __init__(self, key, value, operations):
279
285
287 """
288 Performs a benchmark writing objects using a new Transaction but re-using a
289 single connection for each test.
290 """
291 - def __init__(self, key, value, operations):
293
298
300 """
301 Performs a benchmark writing objects using a single Transaction object
302 for all tests.
303 """
304 - def __init__(self, key, value, operations):
306
309
312
314 self._tx.write(self._key + '_' + str(j), self._value)
315 self._tx.commit()
316
318 """
319 Performs a benchmark writing integer numbers on a single key and
320 increasing them.
321 Provides convenience methods for the full increment benchmark
322 implementations.
323 """
324 - def __init__(self, key, value, operations):
326
332
341
343 """
344 Performs a benchmark writing integer numbers on a single key and
345 increasing them using a new Transaction for each test.
346 """
347 - def __init__(self, key, value, operations):
349
354
356 """
357 Performs a benchmark writing integer numbers on a single key and
358 increasing them using a new Transaction but re-using a single
359 connection for each test.
360 """
361 - def __init__(self, key, value, operations):
363
366
368 self._connection.close()
369
373
375 """
376 Performs a benchmark writing objects using a single Transaction
377 object for all tests.
378 """
379 - def __init__(self, key, value, operations):
381
384
387
390
392 """
393 Performs a benchmark reading X values and overwriting them afterwards
394 inside a transaction.
395 Provides convenience methods for the full read-x, write-x benchmark
396 implementations.
397 """
398 - def __init__(self, key, value, nr_keys, operations):
399 BenchRunnable.__init__(self, key, value, operations)
400 self._keys = []
401 self._value_write = []
402 for i in xrange(nr_keys):
403 self._keys.append(key + "_" + str(i))
404 self._value_write.append(_getRandom(_BENCH_DATA_SIZE, type(value).__name__))
405
420
440
442 """
443 Performs a benchmark reading 5 values and overwriting them afterwards
444 inside a transaction using a new Transaction for each test.
445 """
446 - def __init__(self, key, value, operations):
448
453
455 """
456 Performs a benchmark reading 5 values and overwriting them afterwards
457 inside a transaction using a new Transaction but re-using a single
458 connection for each test.
459 """
460 - def __init__(self, key, value, operations):
462
465
467 self._connection.close()
468
472
474 """
475 Performs a benchmark reading 5 values and overwriting them afterwards
476 inside a transaction using a single Transaction object for all tests.
477 """
478 - def __init__(self, key, value, operations):
480
483
486
489
491 """
492 Performs a benchmark adding values to a list inside a transaction.
493 Provides convenience methods for the full append-to-list benchmark
494 implementations.
495 """
496 - def __init__(self, key, value, nr_keys, operations):
501
511
524
526 """
527 Performs a benchmark adding values to a list inside a transaction
528 using a new Transaction for each test.
529 """
530 - def __init__(self, key, value, operations):
532
537
539 """
540 Performs a benchmark adding values to a list inside a transaction using a
541 new Transaction but re-using a single connection for each test.
542 """
543 - def __init__(self, key, value, operations):
545
548
550 self._connection.close()
551
555
557 """
558 Performs a benchmark adding values to a list inside a transaction using a
559 single Transaction object for all tests.
560 """
561 - def __init__(self, key, value, operations):
563
566
569
572
574 """
575 Gets the number of milliseconds since epoch.
576 """
577 now = datetime.now()
578 return int(time.mktime(now.timetuple())) * 1000 + (now.microsecond // 1000)
579
581 """
582 Call this method when a benchmark is started.
583 Sets the time the benchmark was started.
584 """
585 global _timeAtStart
586 _timeAtStart = _getCurrentMillis()
587
589 """
590 Call this method when a benchmark is finished.
591 Calculates the time the benchmark took and the number of transactions
592 performed during this time.
593 Returns the number of achieved transactions per second.
594 """
595 global _timeAtStart
596 timeTaken = _getCurrentMillis() - _timeAtStart
597 speed = (testruns * 1000) // timeTaken
598 return speed
599
603
605 """
606 Returns a pre-initialized results array with values <tt>-1</tt>.
607 """
608 results = {}
609 for row in rows:
610 results[row] = {}
611 for column in columns:
612 results[row][column] = -1
613 return results
614
616 """
617 Creates an random string or binary object from <size> random characters/bytes.
618 """
619 if mytype == 'int':
620 return random.randint(0, 2147483647)
621 elif mytype == 'string' or mytype == 'str':
622 return ''.join(random.choice(string.printable) for _x in xrange(size))
623 elif mytype == 'binary':
624 return bytearray(random.randrange(0, 256) for _x in xrange(size))
625
627 """
628 Integrates the workers' results into the result array.
629 """
630 try:
631 for bench_thread in worker:
632 if failed >= 3:
633 bench_thread.shouldStop()
634 try:
635 while(bench_thread.isAlive()):
636 bench_thread.join(1)
637 except RuntimeError:
638 pass
639 else:
640 try:
641 while(bench_thread.isAlive()):
642 bench_thread.join(1)
643 speed = bench_thread.getSpeed()
644 except RuntimeError:
645 speed = -1
646
647 if speed < 0:
648 failed += 1
649 else:
650 results[i] += speed
651 return failed
652 except KeyboardInterrupt:
653 print 'CTRL-C received, aborting...'
654 for bench_thread in worker:
655 bench_thread.shouldStop()
656 sys.exit(1)
657
659 """
660 Calculates the average number of transactions per second from the results
661 of executing 10 transactions per test run. Will remove the top and bottom
662 _PERCENT_TO_REMOVE percent of the sorted results array.
663 Returns the average number of transactions per second.
664 """
665 results.sort()
666 toRemove = int((len(results) * _PERCENT_TO_REMOVE) // 100);
667 avgSpeed = 0;
668 for i in xrange(toRemove, (len(results) - toRemove)):
669 avgSpeed += results[i]
670
671 avgSpeed //= len(results) - 2 * toRemove
672 return avgSpeed
673
674 -def _runBenchAndPrintResults(benchmarks, results, columns, rows, test_types,
675 test_types_str, test_bench, test_group,
676 first_bench_id, operations, parallel_runs):
677 """
678 Runs the given benchmarks and prints a results table.
679 """
680
681 for test in xrange(len(results) * len(results[list(results.keys())[0]])):
682 try:
683 i = test % len(results);
684 j = test // len(results);
685 if (test + first_bench_id) in benchmarks:
686 results[rows[i]][columns[j]] = _runBench(operations,
687 _getRandom(_BENCH_DATA_SIZE, test_types[j]),
688 test_group + "_" + test_types_str[j] + "_" + str(i + 1),
689 test_bench[i], parallel_runs)
690 time.sleep(1)
691 else:
692 results[rows[i]][columns[j]] = -2
693 except Exception:
694 _printException()
695
696 _printResults(columns, rows, results, operations, parallel_runs)
697
698 -def _runBench(operations, value, name, clazz, parallel_runs):
699 """
700 Runs the given benchmark.
701 """
702 key = str(_benchTime) + name
703 results = [-1]*_TESTRUNS
704
705 for i in xrange(_TESTRUNS):
706 worker = []
707 for thread in xrange(parallel_runs):
708 new_worker = clazz(key + '_' + str(i) + '_' + str(thread), value, operations)
709 worker.append(new_worker)
710 new_worker.start()
711 failed = 0
712 failed = _integrateResults(results, i, worker, failed);
713 if failed >= 3:
714 return -1
715
716 return _getAvgSpeed(results)
717
718 -def _printResults(columns, rows, results, operations, parallel_runs):
719 """
720 Prints a result table.
721 """
722 print 'Concurrent threads: ' + str(parallel_runs) + ', each using ' + str(operations) + ' transactions'
723 colLen = 25
724 emptyFirstColumn = ''.join([' ']*colLen)
725 print emptyFirstColumn + '\tspeed (transactions / second)'
726 print emptyFirstColumn,
727 i = 1
728 for column in columns:
729 print '\t(' + str(i) + ')',
730 i += 1
731 print ''
732 for row in rows:
733 print row + ''.join([' ']*(colLen - len(row))),
734 for column in columns:
735 value = results[row][column]
736 if (value == -2):
737 print '\tn/a',
738 elif (value == -1):
739 print '\tfailed',
740 else:
741 print '\t' + str(int(value)),
742 print ''
743
744 i = 1
745 for column in columns:
746 print '(' + str(i) + ') ' + column
747 i += 1
748 sys.stdout.flush()
749
751 mytype, message, trace = sys.exc_info()
752 print str(mytype) + str(message)
753 traceback.print_tb(trace)
754
756 nr_operations = 500
757 threads_per_node = 10
758 allBenchs = False
759 if (len(argv) == 1):
760 allBenchs = True
761 elif (len(argv) == 2):
762 allBenchs = True
763 nr_operations = int(argv[1])
764 elif (len(argv) == 3):
765 allBenchs = True
766 nr_operations = int(argv[1])
767 threads_per_node = int(argv[2])
768 elif (len(argv) >= 4):
769 nr_operations = int(argv[1])
770 threads_per_node = int(argv[2])
771 benchmarks = []
772 for i in xrange(3, len(argv)):
773 if argv[i] == 'all':
774 allBenchs = True
775 else:
776 benchmarks.append(int(argv[i]))
777 if allBenchs:
778 benchmarks = xrange(1, 19, 1)
779 minibench(nr_operations, threads_per_node, benchmarks)
780
781 if __name__ == "__main__":
782 run_from_cmd(sys.argv)
783