Module gossip_load

Gossip based aggregation of load information.

Copyright © 2008-2015 Zuse Institute Berlin

Version: $Id$

Behaviours: gossip_beh.

Authors: Jens V. Fischer (jensvfischer@gmail.com).

Description

Gossip based aggregation of load information. This module implements the symmetric push-sum protocol. The algorithm is used to compute aggregates of the load information, which is measured as the count of items currently in a node's key range.
The aggregation of load information is used in Scalaris for two purposes: First, for passive load balancing. When a node joins, the gossiped load information is used to decide where to place the new node. The node will be placed so that the standard deviation of the load is reduced the most. Second, the gossiping is used for system monitoring. The local estimates of the gossiping can be viewed for example in the Web Interface of every Scalaris node.
Different metrics are computed on the load information: The module is initialised during the startup of the gossiping framework, continuously aggregating load information in the background. Additionally, is it possible to start instances of the module for the purpose of computing different sizes histograms, request_histogram/1.

Data Types

avg()

avg() = {Value :: float(), Weight :: float()}

avg_kr()

avg_kr() = {Value :: number(), Weight :: float()}

bucket()

bucket() = 
    {Interval :: intervals:interval(), Avg :: avg() | unknown}

data()

data() = {load_data_list(), ring_data()}

full_state()

full_state() = 
    {PrevState :: unknown | state(), CurrentState :: state()}

histogram()

histogram() = [bucket()]

histogram_size()

histogram_size() = pos_integer()

instance()

instance() = 
    {Module :: gossip_load, Id :: atom() | uid:global_uid()}

load_data()

load_data() = 
    {load_data,
     Name :: atom(),
     Avg :: avg(),
     Avg2 :: avg(),
     Min :: non_neg_integer(),
     Max :: non_neg_integer(),
     Histogram :: histogram()}

load_data_list()

load_data_list() = [load_data() | load_data_skipped(), ...]

load_data_list2()

load_data_list2() = 
    [load_data_uninit() | load_data_skipped() | load_data(), ...]

load_data_skipped()

load_data_skipped() = {load_data, atom(), skip}

load_data_uninit()

load_data_uninit() = 
    #load_data{name = atom(),
               avg = unknown | avg(),
               avg2 = unknown | avg(),
               min = unknown | min(),
               max = unknown | max(),
               histo = unknown | histogram()}

load_info()

abstract datatype: load_info()

load_info_other()

load_info_other() = 
    #load_info_other{name = atom(),
                     avg = unknown | float(),
                     stddev = unknown | float(),
                     min = unknown | min(),
                     max = unknown | max()}

max()

max() = non_neg_integer()

merged()

merged() = non_neg_integer()

min()

min() = non_neg_integer()

ring_data()

ring_data() = {ring_data, SizeInv :: avg(), AvgKr :: avg_kr()}

ring_data2()

ring_data2() = ring_data_uninit() | ring_data()

ring_data_uninit()

ring_data_uninit() = 
    #ring_data{size_inv = unknown | avg(),
               avg_kr = unknown | avg_kr()}

round()

round() = non_neg_integer()

state()

state() = 
    #state{status = status(),
           instance = instance(),
           load_data_list = load_data_list2(),
           ring_data = ring_data2(),
           leader = unknown | boolean(),
           range = unknown | intervals:interval(),
           request = boolean(),
           requestor = none | comm:mypid(),
           no_of_buckets = non_neg_integer(),
           round = round(),
           merged = non_neg_integer(),
           convergence_count = non_neg_integer()}

status()

status() = init | uninit

Function Index

check_config/0
fanout/0The fanout (number of peers contacted per cycle).
get_values_best/1Sends a (local) message to the gossip module of the requesting process' group asking for the best aggregation results.
get_values_best_feeder/1
handle_msg/2Handle get_state_response messages from the dht_node.
init/1Initiate the gossip_load module.
init_feeder/1
integrate_data/3Integrate the reply data.
is_histogram/1Checks if a given list is a valid histogram.
load_info_get/2Gets the value to the given key from the given load_info record.
load_info_other_get/3Gets values from a load_info_other record which can be found in the load_info record.
max_cycles_per_round/0The maximum number of cycles per round.
min_cycles_per_round/0The minimum number of cycles per round.
notify_change/3Notifies the gossip_load module about changes.
request_histogram/2Request a histogram with Size number of Buckets.
round_has_converged/1Checks if the current round has converged yet.
select_data/1Select and prepare the load information to be sent to the peer.
select_node/1Returns false, i.e.
select_reply_data/4Process the data from the requestor and select reply data.
shutdown/1Shutd down the gossip_load module.
tester_create_histogram/1Creates a histogram() within the specifications of this modules, i.e.
tester_create_histogram_size/1
tester_create_load_data_list/1Creates a fixed sized list of load_data every time.
tester_create_round/1Creates round values with greatly reduces variance, so that more rounds are valid rounds (i.e.
tester_create_state/11Creates a state record with greatly reduced variety in the round numbers to reduce warings.
trigger_interval/0The time interval in ms after which a new cycle is trigger by the behaviour module.
web_debug_info/1Returns a key-value list of debug infos for the Web Interface.

Function Details

trigger_interval/0

trigger_interval() -> pos_integer()

The time interval in ms after which a new cycle is trigger by the behaviour module.

fanout/0

fanout() -> pos_integer()

The fanout (number of peers contacted per cycle).

min_cycles_per_round/0

min_cycles_per_round() -> non_neg_integer()

The minimum number of cycles per round. Only full cycles (i.e. received replies) are counted (ignored triggers do not count as cycle). Only relevant for leader, all other nodes enter rounds when told to do so.

max_cycles_per_round/0

max_cycles_per_round() -> pos_integer()

The maximum number of cycles per round. Only full cycles (i.e. received replies) are counted (ignored triggers do not count as cycle). Only relevant for leader, all other nodes enter rounds when told to do so.

check_config/0

check_config() -> boolean()

get_values_best/1

get_values_best(Options :: [proplists:property()]) -> ok

Sends a (local) message to the gossip module of the requesting process' group asking for the best aggregation results. The response in the form {gossip_get_values_best_response, BestValues} will be send (local) to the requesting process. BestValues are either the aggregation restult from the current or previous round, depending on convergence_count_best_values. If the option {instance, Instance} is present, the Instance is used. Otherwise {gossip_load, default} is used as instance. If the option {source_pid, SourcePid} is present, response is sent to the SourcePid instead of the pid of the requesting process. If the option {delay, Seconds} is present, the request is delayed for (at least) Seconds seconds. If the option {send_after, Milliseconds} is present, the request is delayed for excatly Milliseconds milliseconds.

request_histogram/2

request_histogram(Size :: histogram_size(),
                  SourcePid :: comm:mypid()) ->
                     ok

Request a histogram with Size number of Buckets.
The resulting histogram will be sent to SourceId, when all values have properly converged.

init/1

init(Args :: [proplists:property()]) -> {ok, full_state()}

Initiate the gossip_load module.
Instance (mandatory) makes the module aware of its own instance id, which is saved in the state of the module. NoOfBuckets (optional) defines the size of the histogram calculated. Requestor (optinal) defines to whom the calculated histogram will be sent (used for request_histogram/1, called through the gossip module).

select_node/1

select_node(FullState :: full_state()) ->
               {boolean(), full_state()}

Returns false, i.e. peer selection is done by gossip module. State: the state of the gossip_load module

select_data/1

select_data(FullState :: full_state()) -> {ok, full_state()}

Select and prepare the load information to be sent to the peer.
Called by the gossip module at the beginning of every cycle.
The selected exchange data is sent back to the gossip module as a message of the form {selected_data, Instance, ExchangeData}. State: the state of the gossip_load module

select_reply_data/4

select_reply_data(PData :: data(),
                  Ref :: pos_integer(),
                  Round :: round(),
                  FullState :: full_state()) ->
                     {discard_msg | ok | retry | send_back,
                      full_state()}

Process the data from the requestor and select reply data.
Called by the behaviour module upon a p2p_exch message.
PData: exchange data from the p2p_exch request
Ref: used by the gossip module to identify the request
RoundStatus / Round: round information used for special handling of messages from previous rounds
State: the state of the gossip_load module

integrate_data/3

integrate_data(QData :: data(),
               Round :: round(),
               FullState :: full_state()) ->
                  {discard_msg | ok | retry | send_back,
                   full_state()}

Integrate the reply data.
Called by the behaviour module upon a p2p_exch_reply message.
QData: the reply data from the peer
RoundStatus / Round: round information used for special handling of messages from previous rounds
State: the state of the gossip_load module
Upon finishing the processing of the data, a message of the form {integrated_data, Instance, RoundStatus} is to be sent to the gossip module.

handle_msg/2

handle_msg(Message ::
               {get_node_details_response,
                node_details:node_details()},
           FullState :: full_state()) ->
              {ok, full_state()}

Handle get_state_response messages from the dht_node.
The received load information is stored and the status is set to init, allowing the start of a new gossip round. State: the state of the gossip_load module

round_has_converged/1

round_has_converged(FullState :: full_state()) ->
                       {boolean(), full_state()}

Checks if the current round has converged yet
Returns true if the round has converged, false otherwise.

notify_change/3

notify_change(Keyword :: new_round,
              NewRound :: round(),
              FullState :: full_state()) ->
                 {ok, full_state()}

Notifies the gossip_load module about changes.
Changes can be one of the following:

  1. new_round
    Notifies the the callback module about the beginning of round
  2. leader
    Notifies the the callback module about a change in the key range of the node. The MsgTag indicates whether the node is a leader or not, the NewRange is the new key range of the node.
  3. exch_failure
    Notifies the the callback module about a failed message delivery, including the exchange data and round from the original message.

web_debug_info/1

web_debug_info(FullState :: full_state()) ->
                  {KeyValueList ::
                       [{Key :: string(), Value :: any()}, ...],
                   full_state()}

Returns a key-value list of debug infos for the Web Interface.
Called by the gossip module upon {web_debug_info} messages. State: the state of the gossip_load module.

shutdown/1

shutdown(State :: full_state()) -> {ok, shutdown}

Shutd down the gossip_load module.
Called by the gossip module upon stop_gossip_task(CBModule).

load_info_get/2

load_info_get(Key :: avgLoad, LoadInfoRecord :: load_info()) ->
                 unknown | float()

Gets the value to the given key from the given load_info record.

load_info_other_get/3

load_info_other_get(Key :: avgLoad,
                    Module :: atom(),
                    LoadInfoRecord :: load_info()) ->
                       unknown | float()

Gets values from a load_info_other record which can be found in the load_info record.

tester_create_state/11

tester_create_state(Status :: status(),
                    Instance :: instance(),
                    LoadDataList :: load_data_list(),
                    RingData :: ring_data(),
                    Leader :: boolean(),
                    Range :: intervals:non_empty_interval(),
                    Request :: boolean(),
                    NoOfBuckets :: histogram_size(),
                    Round :: round(),
                    Merged :: non_neg_integer(),
                    ConvergenceCount :: non_neg_integer()) ->
                       state()

Creates a state record with greatly reduced variety in the round numbers to reduce warings. Used as value_creator in tester.erl (property testing).

tester_create_round/1

tester_create_round(Round :: 0..10) -> round()

Creates round values with greatly reduces variance, so that more rounds are valid rounds (i.e. rounds from messages and from the state match).

tester_create_histogram/1

tester_create_histogram(ListOfAvgs :: [avg() | unknown]) ->
                           histogram()

Creates a histogram() within the specifications of this modules, i.e. in particular that all histograms need to have the same keys (keyrange/no_of_buckets).

tester_create_histogram_size/1

tester_create_histogram_size(Size :: 1..50) -> histogram_size()

tester_create_load_data_list/1

tester_create_load_data_list(LoadDataTuple ::
                                 {load_data(),
                                  load_data(),
                                  load_data(),
                                  load_data()}) ->
                                load_data_list()

Creates a fixed sized list of load_data every time. Size must not change once created. Tester fails with massive memory leak here when input type is load_data_list().

is_histogram/1

is_histogram(Histogram :: [{intervals:interval(), avg()}]) ->
                boolean()

Checks if a given list is a valid histogram. Used as type_checker in tester.erl (property testing).

init_feeder/1

init_feeder(X1 ::
                {NoOfBuckets :: non_neg_integer(),
                 Requestor :: comm:mypid(),
                 Random1 :: boolean(),
                 Random2 :: boolean()}) ->
               {[proplists:property()]}

get_values_best_feeder/1

get_values_best_feeder(X1 ::
                           {Options :: [proplists:property()],
                            Secs :: 0..1,
                            Millisecs :: 0..1000}) ->
                          {[proplists:property()]}


Generated by EDoc, Aug 2 2016, 13:44:18.