57 #include <sys/types.h>
59 #include <sys/socket.h>
62 #include <sys/ioctl.h>
63 #include <sys/param.h>
64 #include <netinet/in.h>
65 #include <arpa/inet.h>
78 #include <qb/qbdefs.h>
79 #include <qb/qbutil.h>
80 #include <qb/qbloop.h>
86 #define LOGSYS_UTILS_ONLY 1
95 #define LOCALHOST_IP inet_addr("127.0.0.1")
96 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384
97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384
98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500
100 #define RETRANSMIT_ENTRIES_MAX 30
101 #define TOKEN_SIZE_MAX 64000
102 #define LEAVE_DUMMY_NODEID 0
114 #define SEQNO_START_MSG 0x0
115 #define SEQNO_START_TOKEN 0x0
137 #define ENDIAN_LOCAL 0xff22
436 const char *
function,
439 const char *format, ...)
__attribute__((format(printf, 6, 7)));;
452 unsigned int msg_len,
453 int endian_conversion_required);
457 const unsigned int *member_list,
size_t member_list_entries,
458 const unsigned int *left_list,
size_t left_list_entries,
459 const unsigned int *joined_list,
size_t joined_list_entries,
534 int endian_conversion_needed);
579 static int message_handler_orf_token (
583 int endian_conversion_needed);
585 static int message_handler_mcast (
589 int endian_conversion_needed);
591 static int message_handler_memb_merge_detect (
595 int endian_conversion_needed);
597 static int message_handler_memb_join (
601 int endian_conversion_needed);
603 static int message_handler_memb_commit_token (
607 int endian_conversion_needed);
609 static int message_handler_token_hold_cancel (
613 int endian_conversion_needed);
617 static unsigned int main_msgs_missing (
void);
619 static void main_token_seqid_get (
622 unsigned int *token_is);
624 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src);
626 static void srp_addr_to_nodeid (
627 unsigned int *nodeid_out,
629 unsigned int entries);
631 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b);
637 static void messages_deliver_to_app (
struct totemsrp_instance *instance,
int skip,
unsigned int end_point);
639 int fcc_mcasts_allowed);
640 static void messages_free (
struct totemsrp_instance *instance,
unsigned int token_aru);
644 static void target_set_completed (
void *context);
646 static void memb_state_commit_token_target_set (
struct totemsrp_instance *instance);
651 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out);
653 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out);
654 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out);
655 static void memb_merge_detect_endian_convert (
658 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in);
659 static void timer_function_orf_token_timeout (
void *data);
660 static void timer_function_pause_timeout (
void *data);
661 static void timer_function_heartbeat_timeout (
void *data);
662 static void timer_function_token_retransmit_timeout (
void *data);
663 static void timer_function_token_hold_retransmit_timeout (
void *data);
664 static void timer_function_merge_detect_timeout (
void *data);
666 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr);
672 unsigned int msg_len);
677 unsigned int iface_no);
682 message_handler_orf_token,
683 message_handler_mcast,
684 message_handler_memb_merge_detect,
685 message_handler_memb_join,
686 message_handler_memb_commit_token,
687 message_handler_token_hold_cancel
691 #define log_printf(level, format, args...) \
693 instance->totemsrp_log_printf ( \
694 level, instance->totemsrp_subsys_id, \
695 __FUNCTION__, __FILE__, __LINE__, \
698 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
700 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
701 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
702 instance->totemsrp_log_printf ( \
703 level, instance->totemsrp_subsys_id, \
704 __FUNCTION__, __FILE__, __LINE__, \
705 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
711 return gather_state_from_desc[gsfrom];
751 static void main_token_seqid_get (
754 unsigned int *token_is)
766 static unsigned int main_msgs_missing (
void)
775 uint64_t timestamp_msec;
778 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
783 "Process pause detected for %d ms, flushing membership messages.", (
unsigned int)(now_msec - timestamp_msec));
798 unsigned long long nano_secs = qb_util_nano_current_get ();
800 time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
833 qb_loop_t *poll_handle,
841 unsigned int msg_len,
842 int endian_conversion_required),
846 const unsigned int *member_list,
size_t member_list_entries,
847 const unsigned int *left_list,
size_t left_list_entries,
848 const unsigned int *joined_list,
size_t joined_list_entries,
850 void (*waiting_trans_ack_cb_fn) (
857 if (instance == NULL) {
861 totemsrp_instance_initialize (instance);
899 "Token Timeout (%d ms) retransmit timeout (%d ms)",
902 "token hold (%d ms) retransmits before loss (%d retrans)",
905 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
912 "downcheck (%d ms) fail to recv const (%d msgs)",
918 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
922 "missed count const (%d messages)",
926 "send threads (%d threads)", totem_config->
threads);
928 "RRP token expired timeout (%d ms)",
931 "RRP token problem counter (%d ms)",
934 "RRP threshold (%d problem count)",
937 "RRP multicast threshold (%d problem count)",
940 "RRP automatic recovery check timeout (%d ms)",
967 timer_function_pause_timeout (instance);
971 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
982 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
986 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
988 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
1005 main_token_seqid_get,
1007 target_set_completed);
1027 token_event_stats_collector,
1033 token_event_stats_collector,
1035 *srp_context = instance;
1048 memb_leave_message_send (instance);
1068 unsigned int nodeid,
1070 unsigned int interfaces_size,
1072 unsigned int *iface_count)
1076 unsigned int found = 0;
1089 if (interfaces_size >= *iface_count) {
1109 if (interfaces_size >= *iface_count) {
1126 const char *cipher_type,
1127 const char *hash_type)
1176 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b)
1181 for (i = 0; i < 1; i++) {
1190 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src)
1201 static void srp_addr_to_nodeid (
1202 unsigned int *nodeid_out,
1204 unsigned int entries)
1208 for (i = 0; i < entries; i++) {
1209 nodeid_out[i] = srp_addr_in[i].
addr[0].
nodeid;
1213 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in)
1227 static void memb_set_subtract (
1228 struct srp_addr *out_list,
int *out_list_entries,
1229 struct srp_addr *one_list,
int one_list_entries,
1230 struct srp_addr *two_list,
int two_list_entries)
1236 *out_list_entries = 0;
1238 for (i = 0; i < one_list_entries; i++) {
1239 for (j = 0; j < two_list_entries; j++) {
1240 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1246 srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1247 *out_list_entries = *out_list_entries + 1;
1256 static void memb_consensus_set (
1283 static int memb_consensus_isset (
1300 static int memb_consensus_agreed (
1304 int token_memb_entries = 0;
1308 memb_set_subtract (token_memb, &token_memb_entries,
1312 for (i = 0; i < token_memb_entries; i++) {
1313 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1328 assert (token_memb_entries >= 1);
1333 static void memb_consensus_notset (
1335 struct srp_addr *no_consensus_list,
1336 int *no_consensus_list_entries,
1338 int comparison_list_entries)
1342 *no_consensus_list_entries = 0;
1345 if (memb_consensus_isset (instance, &instance->
my_proc_list[i]) == 0) {
1346 srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->
my_proc_list[i]);
1347 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1355 static int memb_set_equal (
1356 struct srp_addr *set1,
int set1_entries,
1357 struct srp_addr *set2,
int set2_entries)
1364 if (set1_entries != set2_entries) {
1367 for (i = 0; i < set2_entries; i++) {
1368 for (j = 0; j < set1_entries; j++) {
1369 if (srp_addr_equal (&set1[j], &set2[i])) {
1385 static int memb_set_subset (
1386 const struct srp_addr *subset,
int subset_entries,
1387 const struct srp_addr *fullset,
int fullset_entries)
1393 if (subset_entries > fullset_entries) {
1396 for (i = 0; i < subset_entries; i++) {
1397 for (j = 0; j < fullset_entries; j++) {
1398 if (srp_addr_equal (&subset[i], &fullset[j])) {
1412 static void memb_set_merge (
1413 const struct srp_addr *subset,
int subset_entries,
1414 struct srp_addr *fullset,
int *fullset_entries)
1420 for (i = 0; i < subset_entries; i++) {
1421 for (j = 0; j < *fullset_entries; j++) {
1422 if (srp_addr_equal (&fullset[j], &subset[i])) {
1428 srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1429 *fullset_entries = *fullset_entries + 1;
1436 static void memb_set_and_with_ring_id (
1452 for (i = 0; i < set2_entries; i++) {
1453 for (j = 0; j < set1_entries; j++) {
1454 if (srp_addr_equal (&set1[j], &set2[i])) {
1455 if (memcmp (&set1_ring_ids[j], old_ring_id,
sizeof (
struct memb_ring_id)) == 0) {
1462 srp_addr_copy (&and[*and_entries], &set1[j]);
1463 *and_entries = *and_entries + 1;
1470 #ifdef CODE_COVERAGE
1471 static void memb_set_print (
1478 printf (
"List '%s' contains %d entries:\n",
string, list_entries);
1480 for (i = 0; i < list_entries; i++) {
1481 printf (
"Address %d with %d rings\n", i, list[i].
no_addrs);
1482 for (j = 0; j < list[i].
no_addrs; j++) {
1483 printf (
"\tiface %d %s\n", j,
totemip_print (&list[i].addr[j]));
1484 printf (
"\tfamily %d\n", list[i].addr[j].
family);
1489 static void my_leave_memb_clear(
1496 static unsigned int my_leave_memb_match(
1498 unsigned int nodeid)
1501 unsigned int ret = 0;
1512 static void my_leave_memb_set(
1514 unsigned int nodeid)
1531 "Cannot set LEAVE nodeid=%d", nodeid);
1538 assert (instance != NULL);
1542 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr)
1544 assert (instance != NULL);
1558 timer_function_token_retransmit_timeout,
1575 timer_function_merge_detect_timeout,
1604 "Saving state aru %x high seq received %x",
1614 "Restoring instance->my_aru %x my high seq received %x",
1621 "Resetting old ring state");
1634 timer_function_pause_timeout,
1649 timer_function_orf_token_timeout,
1664 timer_function_heartbeat_timeout,
1680 static void cancel_token_retransmit_timeout (
struct totemsrp_instance *instance)
1685 static void start_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1693 timer_function_token_hold_retransmit_timeout,
1700 static void cancel_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1706 static void memb_state_consensus_timeout_expired (
1710 int no_consensus_list_entries;
1713 if (memb_consensus_agreed (instance)) {
1714 memb_consensus_reset (instance);
1716 memb_consensus_set (instance, &instance->
my_id);
1718 reset_token_timeout (instance);
1720 memb_consensus_notset (
1723 &no_consensus_list_entries,
1727 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1740 static void timer_function_pause_timeout (
void *data)
1745 reset_pause_timeout (instance);
1750 old_ring_state_restore (instance);
1755 static void timer_function_orf_token_timeout (
void *data)
1762 "The token was lost in the OPERATIONAL state.");
1764 "A processor failed, forming new configuration.");
1772 "The consensus timeout expired.");
1773 memb_state_consensus_timeout_expired (instance);
1780 "The token was lost in the COMMIT state.");
1787 "The token was lost in the RECOVERY state.");
1788 memb_recovery_state_token_loss (instance);
1794 static void timer_function_heartbeat_timeout (
void *data)
1798 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->
memb_state);
1799 timer_function_orf_token_timeout(data);
1802 static void memb_timer_function_state_gather (
void *data)
1814 memb_join_message_send (instance);
1825 memb_timer_function_state_gather,
1835 static void memb_timer_function_gather_consensus_timeout (
void *data)
1838 memb_state_consensus_timeout_expired (instance);
1841 static void deliver_messages_from_recovery_to_regular (
struct totemsrp_instance *instance)
1846 unsigned int range = 0;
1859 for (i = 1; i <= range; i++) {
1865 recovery_message_item = ptr;
1870 mcast = recovery_message_item->
mcast;
1876 regular_message_item.mcast =
1877 (
struct mcast *)(((
char *)recovery_message_item->
mcast) +
sizeof (
struct mcast));
1878 regular_message_item.msg_len =
1879 recovery_message_item->
msg_len -
sizeof (
struct mcast);
1880 mcast = regular_message_item.mcast;
1889 "comparing if ring id is for this processors old ring seqno %d",
1903 ®ular_message_item, mcast->
seq);
1910 "-not adding msg with seq no %x", mcast->
seq);
1921 int joined_list_entries = 0;
1922 unsigned int aru_save;
1929 char left_node_msg[1024];
1930 char joined_node_msg[1024];
1931 char failed_node_msg[1024];
1935 memb_consensus_reset (instance);
1937 old_ring_state_reset (instance);
1939 deliver_messages_from_recovery_to_regular (instance);
1942 "Delivering to app %x to %x",
1945 aru_save = instance->
my_aru;
1958 memb_set_subtract (joined_list, &joined_list_entries,
1986 srp_addr_to_nodeid (trans_memb_list_totemip,
1999 instance->
my_aru = aru_save;
2009 joined_list, joined_list_entries,
2014 srp_addr_to_nodeid (new_memb_list_totemip,
2016 srp_addr_to_nodeid (joined_list_totemip, joined_list,
2017 joined_list_entries);
2021 joined_list_totemip, joined_list_entries, &instance->
my_ring_id);
2083 regular_message = ptr;
2084 free (regular_message->
mcast);
2090 if (joined_list_entries) {
2092 sptr += snprintf(joined_node_msg,
sizeof(joined_node_msg)-sptr,
" joined:");
2093 for (i=0; i< joined_list_entries; i++) {
2094 sptr += snprintf(joined_node_msg+sptr,
sizeof(joined_node_msg)-sptr,
" %u", joined_list_totemip[i]);
2098 joined_node_msg[0] =
'\0';
2104 sptr += snprintf(left_node_msg,
sizeof(left_node_msg)-sptr,
" left:");
2106 sptr += snprintf(left_node_msg+sptr,
sizeof(left_node_msg)-sptr,
" %u", left_list[i]);
2109 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2111 sptr2 += snprintf(failed_node_msg,
sizeof(failed_node_msg)-sptr2,
" failed:");
2113 sptr2 += snprintf(failed_node_msg+sptr2,
sizeof(left_node_msg)-sptr2,
" %u", left_list[i]);
2117 failed_node_msg[0] =
'\0';
2121 left_node_msg[0] =
'\0';
2122 failed_node_msg[0] =
'\0';
2125 my_leave_memb_clear(instance);
2128 "entering OPERATIONAL state.");
2130 "A new membership (%s:%lld) was formed. Members%s%s",
2136 if (strlen(failed_node_msg)) {
2138 "Failed to receive the leave message.%s",
2149 reset_pause_timeout (instance);
2162 static void memb_state_gather_enter (
2173 &instance->
my_id, 1,
2176 memb_join_message_send (instance);
2187 memb_timer_function_state_gather,
2203 memb_timer_function_gather_consensus_timeout,
2212 cancel_token_retransmit_timeout (instance);
2213 cancel_token_timeout (instance);
2214 cancel_merge_detect_timeout (instance);
2216 memb_consensus_reset (instance);
2218 memb_consensus_set (instance, &instance->
my_id);
2221 "entering GATHER state from %d(%s).",
2222 gather_from, gsfrom_to_msg(gather_from));
2237 static void timer_function_token_retransmit_timeout (
void *data);
2239 static void target_set_completed (
2244 memb_state_commit_token_send (instance);
2248 static void memb_state_commit_enter (
2251 old_ring_state_save (instance);
2253 memb_state_commit_token_update (instance);
2255 memb_state_commit_token_target_set (instance);
2271 "entering COMMIT state.");
2274 reset_token_retransmit_timeout (instance);
2275 reset_token_timeout (instance);
2291 static void memb_state_recovery_enter (
2296 int local_received_flg = 1;
2297 unsigned int low_ring_aru;
2298 unsigned int range = 0;
2299 unsigned int messages_originated = 0;
2308 "entering RECOVERY state.");
2319 memb_state_commit_token_send_recovery (instance, commit_token);
2334 memcpy (&my_new_memb_ring_id_list[i],
2335 &memb_list[i].ring_id,
2338 memb_set_and_with_ring_id (
2340 my_new_memb_ring_id_list,
2354 "position [%d] member %s:", i,
totemip_print (&addr[i].addr[0]));
2356 "previous ring seq %llx rep %s",
2361 "aru %x high delivered %x received flag %d",
2379 local_received_flg = 0;
2383 if (local_received_flg == 1) {
2399 if (sq_lt_compare (memb_list[i].
aru, low_ring_aru)) {
2401 low_ring_aru = memb_list[i].
aru;
2422 "copying all old ring messages from %x-%x.",
2425 for (i = 1; i <= range; i++) {
2432 low_ring_aru + i, &ptr);
2436 sort_queue_item = ptr;
2437 messages_originated++;
2452 sort_queue_item->
mcast,
2457 "Originated %d messages in RECOVERY.", messages_originated);
2462 "Did not need to originate any messages in recovery.");
2472 reset_token_timeout (instance);
2473 reset_token_retransmit_timeout (instance);
2486 token_hold_cancel_send (instance);
2493 struct iovec *iovec,
2494 unsigned int iov_len,
2501 unsigned int addr_idx;
2510 if (cs_queue_is_full (queue_use)) {
2515 memset (&message_item, 0,
sizeof (
struct message_item));
2520 message_item.
mcast = totemsrp_buffer_alloc (instance);
2521 if (message_item.
mcast == 0) {
2528 memset(message_item.
mcast, 0, sizeof (
struct mcast));
2538 addr = (
char *)message_item.
mcast;
2539 addr_idx = sizeof (
struct mcast);
2540 for (i = 0; i < iov_len; i++) {
2541 memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2542 addr_idx += iovec[i].iov_len;
2545 message_item.
msg_len = addr_idx;
2549 cs_queue_item_add (queue_use, &message_item);
2571 cs_queue_avail (queue_use, &avail);
2582 static int orf_token_remcast (
2586 struct sort_queue_item *sort_queue_item;
2590 struct sq *sort_queue;
2598 res = sq_in_range (sort_queue, seq);
2607 res = sq_item_get (sort_queue, seq, &ptr);
2612 sort_queue_item = ptr;
2616 sort_queue_item->
mcast,
2626 static void messages_free (
2628 unsigned int token_aru)
2630 struct sort_queue_item *regular_message;
2633 int log_release = 0;
2634 unsigned int release_to;
2635 unsigned int range = 0;
2637 release_to = token_aru;
2638 if (sq_lt_compare (instance->
my_last_aru, release_to)) {
2658 for (i = 1; i <= range; i++) {
2664 regular_message = ptr;
2665 totemsrp_buffer_release (instance, regular_message->
mcast);
2676 "releasing messages up to and including %x", release_to);
2680 static void update_aru (
2685 struct sq *sort_queue;
2687 unsigned int my_aru_saved = 0;
2697 my_aru_saved = instance->
my_aru;
2698 for (i = 1; i <= range; i++) {
2702 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2710 instance->
my_aru += i - 1;
2716 static int orf_token_mcast (
2719 int fcc_mcasts_allowed)
2723 struct sq *sort_queue;
2724 struct sort_queue_item sort_queue_item;
2725 struct mcast *mcast;
2726 unsigned int fcc_mcast_current;
2731 reset_token_retransmit_timeout (instance);
2742 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2743 if (cs_queue_is_empty (mcast_queue)) {
2746 message_item = (
struct message_item *)cs_queue_item_get (mcast_queue);
2754 memset (&sort_queue_item, 0,
sizeof (
struct sort_queue_item));
2758 mcast = sort_queue_item.
mcast;
2765 sq_item_add (sort_queue, &sort_queue_item, message_item->
mcast->
seq);
2769 message_item->
mcast,
2775 cs_queue_item_remove (mcast_queue);
2783 update_aru (instance);
2788 return (fcc_mcast_current);
2795 static int orf_token_rtr (
2798 unsigned int *fcc_allowed)
2803 struct sq *sort_queue;
2805 unsigned int range = 0;
2806 char retransmit_msg[1024];
2815 rtr_list = &orf_token->
rtr_list[0];
2817 strcpy (retransmit_msg,
"Retransmit List: ");
2822 sprintf (value,
"%x ", rtr_list[i].seq);
2823 strcat (retransmit_msg, value);
2825 strcat (retransmit_msg,
"");
2827 "%s", retransmit_msg);
2840 if (memcmp (&rtr_list[i].ring_id, &instance->
my_ring_id,
2847 res = orf_token_remcast (instance, rtr_list[i].seq);
2854 memmove (&rtr_list[i], &rtr_list[i + 1],
2870 range = orf_token->
seq - instance->
my_aru;
2874 (i <= range); i++) {
2879 res = sq_in_range (sort_queue, instance->
my_aru + i);
2887 res = sq_item_inuse (sort_queue, instance->
my_aru + i);
2898 res = sq_item_miss_count (sort_queue, instance->
my_aru + i);
2908 if (instance->
my_aru + i == rtr_list[j].
seq) {
2938 static void timer_function_token_retransmit_timeout (
void *data)
2948 token_retransmit (instance);
2949 reset_token_retransmit_timeout (instance);
2954 static void timer_function_token_hold_retransmit_timeout (
void *data)
2965 token_retransmit (instance);
2970 static void timer_function_merge_detect_timeout(
void *data)
2979 memb_merge_detect_transmit (instance);
2992 static int token_send (
2994 struct orf_token *orf_token,
2998 unsigned int orf_token_size;
3000 orf_token_size =
sizeof (
struct orf_token) +
3001 (orf_token->rtr_list_entries *
sizeof (
struct rtr_item));
3008 if (forward_token == 0) {
3052 struct orf_token orf_token;
3084 res = token_send (instance, &orf_token, 1);
3089 static void memb_state_commit_token_update (
3094 unsigned int high_aru;
3130 if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3131 high_aru = memb_list[i].
aru;
3141 if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3156 static void memb_state_commit_token_target_set (
3173 static int memb_state_commit_token_send_recovery (
3177 unsigned int commit_token_size;
3199 reset_token_retransmit_timeout (instance);
3203 static int memb_state_commit_token_send (
3206 unsigned int commit_token_size;
3228 reset_token_retransmit_timeout (instance);
3236 int token_memb_entries = 0;
3240 memb_set_subtract (token_memb, &token_memb_entries,
3247 assert(token_memb_entries > 0);
3249 lowest_addr = &token_memb[0].
addr[0];
3250 for (i = 1; i < token_memb_entries; i++) {
3258 static int srp_addr_compare (
const void *a,
const void *b)
3266 static void memb_state_commit_token_create (
3272 int token_memb_entries = 0;
3275 "Creating commit token because I am the rep.");
3277 memb_set_subtract (token_memb, &token_memb_entries,
3296 qsort (token_memb, token_memb_entries,
sizeof (
struct srp_addr),
3305 memcpy (addr, token_memb,
3306 token_memb_entries *
sizeof (
struct srp_addr));
3307 memset (memb_list, 0,
3313 char memb_join_data[40000];
3316 unsigned int addr_idx;
3325 msg_len =
sizeof(
struct memb_join) +
3326 ((instance->my_proc_list_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3328 if (msg_len >
sizeof(memb_join_data)) {
3330 "memb_join_message too long. Ignoring message.");
3344 addr = (
char *)memb_join;
3345 addr_idx =
sizeof (
struct memb_join);
3346 memcpy (&addr[addr_idx],
3353 memcpy (&addr[addr_idx],
3375 char memb_join_data[40000];
3376 struct memb_join *memb_join = (
struct memb_join *)memb_join_data;
3378 unsigned int addr_idx;
3379 int active_memb_entries;
3384 "sending join/leave message");
3391 &instance->
my_id, 1,
3394 memb_set_subtract (active_memb, &active_memb_entries,
3396 &instance->
my_id, 1);
3398 msg_len =
sizeof(
struct memb_join) +
3399 ((active_memb_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3401 if (msg_len >
sizeof(memb_join_data)) {
3403 "memb_leave message too long. Ignoring message.");
3424 addr = (
char *)memb_join;
3425 addr_idx =
sizeof (
struct memb_join);
3426 memcpy (&addr[addr_idx],
3428 active_memb_entries *
3431 active_memb_entries *
3433 memcpy (&addr[addr_idx],
3472 static void memb_ring_id_set (
3491 token_hold_cancel_send (instance);
3494 if (callback_handle == 0) {
3497 *handle_out = (
void *)callback_handle;
3498 list_init (&callback_handle->
list);
3500 callback_handle->
data = (
void *) data;
3502 callback_handle->
delete =
delete;
3521 list_del (&h->
list);
3528 static void token_callbacks_execute (
3534 struct list_head *callback_listhead = 0;
3550 for (list = callback_listhead->
next; list != callback_listhead;
3553 token_callback_instance =
list_entry (list,
struct token_callback_instance, list);
3555 list_next = list->
next;
3556 del = token_callback_instance->
delete;
3563 token_callback_instance->
data);
3567 if (res == -1 && del == 1) {
3568 list_add (list, callback_listhead);
3570 free (token_callback_instance);
3594 if (queue_use != NULL) {
3595 backlog = cs_queue_used (queue_use);
3602 static int fcc_calculate (
3604 struct orf_token *token)
3606 unsigned int transmits_allowed;
3607 unsigned int backlog_calc;
3615 instance->
my_cbl = backlog_get (instance);
3624 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3625 transmits_allowed = backlog_calc;
3629 return (transmits_allowed);
3635 static void fcc_rtr_limit (
3637 struct orf_token *token,
3638 unsigned int *transmits_allowed)
3642 assert (check >= 0);
3649 *transmits_allowed = 0;
3653 static void fcc_token_update (
3655 struct orf_token *token,
3656 unsigned int msgs_transmitted)
3658 token->
fcc += msgs_transmitted - instance->
my_trc;
3660 instance->
my_trc = msgs_transmitted;
3667 static int check_totemip_sanity(
3670 int endian_conversion_needed)
3675 if (endian_conversion_needed) {
3679 if (family != AF_INET && family != AF_INET6) {
3681 "Received message corrupted... ignoring.");
3689 static int check_srpaddr_sanity(
3692 int endian_conversion_needed)
3700 for (i = 0; i < addr->
no_addrs; i++) {
3702 if (check_totemip_sanity(instance, &addr->
addr[i], endian_conversion_needed) == -1) {
3711 static int check_orf_token_sanity(
3715 int endian_conversion_needed)
3718 const struct orf_token *token = (
const struct orf_token *)msg;
3719 size_t required_len;
3722 if (msg_len <
sizeof(
struct orf_token)) {
3724 "Received orf_token message is too short... ignoring.");
3729 if (check_totemip_sanity(instance, &token->
ring_id.
rep, endian_conversion_needed) == -1) {
3733 if (endian_conversion_needed) {
3739 required_len =
sizeof(
struct orf_token) + rtr_entries *
sizeof(
struct rtr_item);
3740 if (msg_len < required_len) {
3742 "Received orf_token message is too short... ignoring.");
3747 for (i = 0; i < rtr_entries; i++) {
3749 endian_conversion_needed) == -1) {
3757 static int check_mcast_sanity(
3761 int endian_conversion_needed)
3763 const struct mcast *mcast_msg = (
const struct mcast *)msg;
3765 if (msg_len <
sizeof(
struct mcast)) {
3767 "Received mcast message is too short... ignoring.");
3772 if ((check_totemip_sanity(instance, &mcast_msg->
ring_id.
rep, endian_conversion_needed) == -1) ||
3773 (check_srpaddr_sanity(instance, &mcast_msg->
system_from, endian_conversion_needed) == -1)) {
3780 static int check_memb_merge_detect_sanity(
3784 int endian_conversion_needed)
3790 "Received memb_merge_detect message is too short... ignoring.");
3795 if ((check_totemip_sanity(instance, &mmd_msg->
ring_id.
rep, endian_conversion_needed) == -1) ||
3796 (check_srpaddr_sanity(instance, &mmd_msg->
system_from, endian_conversion_needed) == -1)) {
3803 static int check_memb_join_sanity(
3807 int endian_conversion_needed)
3809 const struct memb_join *mj_msg = (
const struct memb_join *)msg;
3812 size_t required_len;
3814 const struct srp_addr *failed_list;
3817 if (msg_len <
sizeof(
struct memb_join)) {
3819 "Received memb_join message is too short... ignoring.");
3824 if (check_srpaddr_sanity(instance, &mj_msg->
system_from, endian_conversion_needed) == -1) {
3831 if (endian_conversion_needed) {
3832 proc_list_entries =
swab32(proc_list_entries);
3833 failed_list_entries =
swab32(failed_list_entries);
3836 required_len =
sizeof(
struct memb_join) + ((proc_list_entries + failed_list_entries) *
sizeof(
struct srp_addr));
3837 if (msg_len < required_len) {
3839 "Received memb_join message is too short... ignoring.");
3845 failed_list = proc_list + proc_list_entries;
3848 if (check_srpaddr_sanity(instance, &proc_list[i], endian_conversion_needed) == -1) {
3854 if (check_srpaddr_sanity(instance, &failed_list[i], endian_conversion_needed) == -1) {
3862 static int check_memb_commit_token_sanity(
3866 int endian_conversion_needed)
3872 size_t required_len;
3877 "Received memb_commit_token message is too short... ignoring.");
3882 if (check_totemip_sanity(instance, &mct_msg->
ring_id.
rep, endian_conversion_needed) == -1) {
3887 if (endian_conversion_needed) {
3888 addr_entries =
swab32(addr_entries);
3893 if (msg_len < required_len) {
3895 "Received memb_commit_token message is too short... ignoring.");
3904 if (check_srpaddr_sanity(instance, &addr[i], endian_conversion_needed) == -1) {
3908 if (memb_list[i].ring_id.
rep.
family != 0) {
3909 if (check_totemip_sanity(instance, &memb_list[i].ring_id.
rep,
3910 endian_conversion_needed) == -1) {
3919 static int check_token_hold_cancel_sanity(
3923 int endian_conversion_needed)
3929 "Received token_hold_cancel message is too short... ignoring.");
3934 if (check_totemip_sanity(instance, &thc_msg->
ring_id.
rep, endian_conversion_needed) == -1) {
3949 static int message_handler_orf_token (
3953 int endian_conversion_needed)
3955 char token_storage[1500];
3956 char token_convert[1500];
3957 struct orf_token *token = NULL;
3959 unsigned int transmits_allowed;
3960 unsigned int mcasted_retransmit;
3961 unsigned int mcasted_regular;
3962 unsigned int last_aru;
3965 unsigned long long tv_current;
3966 unsigned long long tv_diff;
3968 tv_current = qb_util_nano_current_get ();
3969 tv_diff = tv_current -
tv_old;
3970 tv_old = tv_current;
3973 "Time since last token %0.4f ms", ((
float)tv_diff) / 1000000.0);
3976 if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3983 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3984 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3989 if (endian_conversion_needed) {
3990 orf_token_endian_convert ((
struct orf_token *)msg,
3991 (
struct orf_token *)token_convert);
3992 msg = (
struct orf_token *)token_convert;
3999 token = (
struct orf_token *)token_storage;
4000 memcpy (token, msg,
sizeof (
struct orf_token));
4001 memcpy (&token->
rtr_list[0], (
char *)msg + sizeof (
struct orf_token),
4009 start_merge_detect_timeout (instance);
4012 cancel_merge_detect_timeout (instance);
4013 cancel_token_hold_retransmit_timeout (instance);
4019 #ifdef TEST_RECOVERY_MSG_COUNT
4060 messages_free (instance, token->
aru);
4079 reset_heartbeat_timeout(instance);
4082 cancel_heartbeat_timeout(instance);
4097 transmits_allowed = fcc_calculate (instance, token);
4098 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
4106 fcc_rtr_limit (instance, token, &transmits_allowed);
4107 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4114 fcc_token_update (instance, token, mcasted_retransmit +
4117 if (sq_lt_compare (instance->
my_aru, token->
aru) ||
4122 if (token->
aru == token->
seq) {
4128 if (token->
aru == last_aru && token->
aru_addr != 0) {
4143 "FAILED TO RECEIVE");
4147 memb_set_merge (&instance->
my_id, 1,
4174 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4187 "install seq %x aru %x high seq received %x",
4205 "retrans flag count %x token aru %x install seq %x aru %x %x",
4209 memb_state_operational_enter (instance);
4216 token_send (instance, token, forward_token);
4219 tv_current = qb_util_nano_current_get ();
4220 tv_diff = tv_current -
tv_old;
4221 tv_old = tv_current;
4224 ((
float)tv_diff) / 1000000.0);
4227 messages_deliver_to_app (instance, 0,
4235 reset_token_timeout (instance);
4236 reset_token_retransmit_timeout (instance);
4240 start_token_hold_retransmit_timeout (instance);
4250 reset_heartbeat_timeout(instance);
4253 cancel_heartbeat_timeout(instance);
4259 static void messages_deliver_to_app (
4262 unsigned int end_point)
4264 struct sort_queue_item *sort_queue_item_p;
4267 struct mcast *mcast_in;
4268 struct mcast mcast_header;
4269 unsigned int range = 0;
4270 int endian_conversion_required;
4271 unsigned int my_high_delivered_stored = 0;
4287 for (i = 1; i <= range; i++) {
4295 my_high_delivered_stored + i);
4301 my_high_delivered_stored + i, &ptr);
4305 if (res != 0 && skip == 0) {
4316 sort_queue_item_p = ptr;
4318 mcast_in = sort_queue_item_p->
mcast;
4319 assert (mcast_in != (
struct mcast *)0xdeadbeef);
4321 endian_conversion_required = 0;
4323 endian_conversion_required = 1;
4324 mcast_endian_convert (mcast_in, &mcast_header);
4326 memcpy (&mcast_header, mcast_in,
sizeof (
struct mcast));
4333 memb_set_subset (&mcast_header.system_from,
4347 "Delivering MCAST message with seq %x to pending delivery queue",
4354 mcast_header.header.nodeid,
4355 ((
char *)sort_queue_item_p->
mcast) + sizeof (
struct mcast),
4356 sort_queue_item_p->
msg_len - sizeof (
struct mcast),
4357 endian_conversion_required);
4364 static int message_handler_mcast (
4368 int endian_conversion_needed)
4370 struct sort_queue_item sort_queue_item;
4371 struct sq *sort_queue;
4372 struct mcast mcast_header;
4374 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4378 if (endian_conversion_needed) {
4379 mcast_endian_convert (msg, &mcast_header);
4381 memcpy (&mcast_header, msg,
sizeof (
struct mcast));
4392 #ifdef TEST_DROP_MCAST_PERCENTAGE
4393 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4401 if (memcmp (&instance->
my_ring_id, &mcast_header.ring_id,
4407 &mcast_header.system_from, 1,
4413 if (!memb_set_subset (
4414 &mcast_header.system_from,
4419 memb_set_merge (&mcast_header.system_from, 1,
4440 "Received ringid(%s:%lld) seq %x",
4442 mcast_header.ring_id.seq,
4450 sq_in_range (sort_queue, mcast_header.seq) &&
4451 sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4457 sort_queue_item.
mcast = totemsrp_buffer_alloc (instance);
4458 if (sort_queue_item.
mcast == NULL) {
4461 memcpy (sort_queue_item.
mcast, msg, msg_len);
4462 sort_queue_item.
msg_len = msg_len;
4465 mcast_header.seq)) {
4469 sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4472 update_aru (instance);
4481 static int message_handler_memb_merge_detect (
4485 int endian_conversion_needed)
4489 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4493 if (endian_conversion_needed) {
4520 if (!memb_set_subset (
4544 static void memb_join_process (
4546 const struct memb_join *memb_join)
4550 int gather_entered = 0;
4551 int fail_minus_memb_entries = 0;
4568 "Discarding LEAVE message during flush, nodeid=%u",
4575 "Discarding JOIN message during flush, nodeid=%d", memb_join->
header.
nodeid);
4590 if (memb_set_equal (proc_list,
4595 memb_set_equal (failed_list,
4600 memb_consensus_set (instance, &memb_join->
system_from);
4602 if (memb_consensus_agreed (instance) && instance->
failed_to_recv == 1) {
4609 memb_state_commit_token_create (instance);
4611 memb_state_commit_enter (instance);
4614 if (memb_consensus_agreed (instance) &&
4615 memb_lowest_in_config (instance)) {
4617 memb_state_commit_token_create (instance);
4619 memb_state_commit_enter (instance);
4624 if (memb_set_subset (proc_list,
4629 memb_set_subset (failed_list,
4641 memb_set_merge (proc_list,
4645 if (memb_set_subset (
4646 &instance->
my_id, 1,
4653 if (memb_set_subset (
4658 if (memb_set_subset (
4663 memb_set_merge (failed_list,
4667 memb_set_subtract (fail_minus_memb,
4668 &fail_minus_memb_entries,
4674 memb_set_merge (fail_minus_memb,
4675 fail_minus_memb_entries,
4686 if (gather_entered == 0 &&
4693 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out)
4715 srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4718 srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4743 srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4748 if (in_memb_list[i].ring_id.
rep.
family != 0) {
4754 out_memb_list[i].
aru =
swab32 (in_memb_list[i].aru);
4761 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out)
4785 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out)
4801 static void memb_merge_detect_endian_convert (
4813 static int ignore_join_under_operational (
4815 const struct memb_join *memb_join)
4825 if (memb_set_subset (&instance->
my_id, 1,
4843 static int message_handler_memb_join (
4847 int endian_conversion_needed)
4849 const struct memb_join *memb_join;
4850 struct memb_join *memb_join_convert = alloca (msg_len);
4852 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4856 if (endian_conversion_needed) {
4857 memb_join = memb_join_convert;
4858 memb_join_endian_convert (msg, memb_join_convert);
4868 if (pause_flush (instance)) {
4877 if (!ignore_join_under_operational (instance, memb_join)) {
4878 memb_join_process (instance, memb_join);
4883 memb_join_process (instance, memb_join);
4894 memb_join_process (instance, memb_join);
4907 memb_join_process (instance, memb_join);
4908 memb_recovery_state_token_loss (instance);
4916 static int message_handler_memb_commit_token (
4920 int endian_conversion_needed)
4930 "got commit token");
4932 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4936 if (endian_conversion_needed) {
4937 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4939 memcpy (memb_commit_token_convert, msg, msg_len);
4941 memb_commit_token = memb_commit_token_convert;
4944 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4945 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4955 memb_set_subtract (sub, &sub_entries,
4959 if (memb_set_equal (addr,
4965 memcpy (instance->
commit_token, memb_commit_token, msg_len);
4966 memb_state_commit_enter (instance);
4980 memb_state_recovery_enter (instance, memb_commit_token);
4995 "Sending initial ORF token");
4998 orf_token_send_initial (instance);
4999 reset_token_timeout (instance);
5000 reset_token_retransmit_timeout (instance);
5007 static int message_handler_token_hold_cancel (
5011 int endian_conversion_needed)
5015 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
5024 timer_function_token_retransmit_timeout (instance);
5033 unsigned int msg_len)
5038 if (msg_len <
sizeof (
struct message_header)) {
5040 "Received message is too short... ignoring %u.",
5041 (
unsigned int)msg_len);
5046 switch (message_header->
type) {
5067 printf (
"wrong message type\n");
5084 unsigned int iface_no)
5100 "Created or loaded sequence id %llx.%s for this ring.",
5122 totem_config->
net_mtu -=
sizeof (
struct mcast);
5127 void (*totem_service_ready) (
void))