41 #include <sys/types.h>
42 #include <sys/socket.h>
44 #include <sys/ioctl.h>
45 #include <netinet/in.h>
54 #include <arpa/inet.h>
59 #include <qb/qbipc_common.h>
69 #define MAP_ANONYMOUS MAP_ANON
76 #define GROUP_HASH_SIZE 32
142 static struct list_head joinlist_messages_head;
171 static unsigned int my_member_list_entries;
175 static unsigned int my_old_member_list_entries = 0;
201 static int cpg_lib_init_fn (
void *conn);
203 static int cpg_lib_exit_fn (
void *conn);
205 static void message_handler_req_exec_cpg_procjoin (
209 static void message_handler_req_exec_cpg_procleave (
213 static void message_handler_req_exec_cpg_joinlist (
217 static void message_handler_req_exec_cpg_mcast (
221 static void message_handler_req_exec_cpg_partial_mcast (
225 static void message_handler_req_exec_cpg_downlist_old (
229 static void message_handler_req_exec_cpg_downlist (
233 static void exec_cpg_procjoin_endian_convert (
void *msg);
235 static void exec_cpg_joinlist_endian_convert (
void *msg);
237 static void exec_cpg_mcast_endian_convert (
void *msg);
239 static void exec_cpg_partial_mcast_endian_convert (
void *msg);
241 static void exec_cpg_downlist_endian_convert_old (
void *msg);
243 static void exec_cpg_downlist_endian_convert (
void *msg);
245 static void message_handler_req_lib_cpg_join (
void *conn,
const void *message);
247 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message);
249 static void message_handler_req_lib_cpg_finalize (
void *conn,
const void *message);
251 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message);
253 static void message_handler_req_lib_cpg_partial_mcast (
void *conn,
const void *message);
255 static void message_handler_req_lib_cpg_membership (
void *conn,
256 const void *message);
258 static void message_handler_req_lib_cpg_local_get (
void *conn,
259 const void *message);
261 static void message_handler_req_lib_cpg_iteration_initialize (
263 const void *message);
265 static void message_handler_req_lib_cpg_iteration_next (
267 const void *message);
269 static void message_handler_req_lib_cpg_iteration_finalize (
271 const void *message);
273 static void message_handler_req_lib_cpg_zc_alloc (
275 const void *message);
277 static void message_handler_req_lib_cpg_zc_free (
279 const void *message);
281 static void message_handler_req_lib_cpg_zc_execute (
283 const void *message);
285 static int cpg_node_joinleave_send (
unsigned int pid,
const mar_cpg_name_t *group_name,
int fn,
int reason);
287 static int cpg_exec_send_downlist(
void);
289 static int cpg_exec_send_joinlist(
void);
291 static void downlist_inform_clients (
void);
293 static void joinlist_inform_clients (
void);
295 static void joinlist_messages_delete (
void);
297 static void cpg_sync_init (
298 const unsigned int *trans_list,
299 size_t trans_list_entries,
300 const unsigned int *member_list,
301 size_t member_list_entries,
304 static int cpg_sync_process (
void);
306 static void cpg_sync_activate (
void);
308 static void cpg_sync_abort (
void);
310 static void do_proc_join(
316 static void do_proc_leave(
322 static int notify_lib_totem_membership (
324 int member_list_entries,
325 const unsigned int *member_list);
327 static inline int zcb_all_free (
330 static char *cpg_print_group_name (
343 .lib_handler_fn = message_handler_req_lib_cpg_leave,
347 .lib_handler_fn = message_handler_req_lib_cpg_mcast,
351 .lib_handler_fn = message_handler_req_lib_cpg_membership,
355 .lib_handler_fn = message_handler_req_lib_cpg_local_get,
359 .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
363 .lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
367 .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
371 .lib_handler_fn = message_handler_req_lib_cpg_finalize,
375 .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
379 .lib_handler_fn = message_handler_req_lib_cpg_zc_free,
383 .lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
387 .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
397 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
400 .exec_handler_fn = message_handler_req_exec_cpg_procleave,
401 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
404 .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
405 .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
408 .exec_handler_fn = message_handler_req_exec_cpg_mcast,
409 .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
412 .exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
413 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
416 .exec_handler_fn = message_handler_req_exec_cpg_downlist,
417 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
420 .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
421 .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
426 .
name =
"corosync cluster closed process group service v1.01",
429 .private_data_size =
sizeof (
struct cpg_pd),
432 .lib_init_fn = cpg_lib_init_fn,
433 .lib_exit_fn = cpg_lib_exit_fn,
434 .lib_engine = cpg_lib_engine,
436 .exec_init_fn = cpg_exec_init_fn,
437 .exec_dump_fn = NULL,
438 .exec_engine = cpg_exec_engine,
440 .sync_init = cpg_sync_init,
441 .sync_process = cpg_sync_process,
442 .sync_activate = cpg_sync_activate,
443 .sync_abort = cpg_sync_abort
448 return (&cpg_service_engine);
452 struct qb_ipc_request_header header __attribute__((aligned(8)));
459 struct qb_ipc_request_header header __attribute__((aligned(8)));
468 struct qb_ipc_request_header header __attribute__((aligned(8)));
479 struct qb_ipc_request_header header __attribute__((aligned(8)));
485 struct qb_ipc_request_header header __attribute__((aligned(8)));
512 for (i = 0; i < group->length; i++) {
515 if (c >=
' ' && c < 0x7f && c !=
'\\') {
519 res[dest_pos++] =
'\\';
520 res[dest_pos++] =
'\\';
522 snprintf(res + dest_pos,
sizeof(res) - dest_pos,
"\\x%02X", c);
532 static void cpg_sync_init (
533 const unsigned int *trans_list,
534 size_t trans_list_entries,
535 const unsigned int *member_list,
536 size_t member_list_entries,
545 memcpy (my_member_list, member_list, member_list_entries *
546 sizeof (
unsigned int));
547 my_member_list_entries = member_list_entries;
549 last_sync_ring_id.nodeid = ring_id->
rep.
nodeid;
550 last_sync_ring_id.seq = ring_id->
seq;
556 for (i = 0; i < my_old_member_list_entries; i++) {
558 for (j = 0; j < trans_list_entries; j++) {
559 if (my_old_member_list[i] == trans_list[j]) {
565 g_req_exec_cpg_downlist.nodeids[entries++] =
566 my_old_member_list[i];
569 g_req_exec_cpg_downlist.left_nodes = entries;
572 static int cpg_sync_process (
void)
577 res = cpg_exec_send_downlist();
584 res = cpg_exec_send_joinlist();
589 static void cpg_sync_activate (
void)
591 memcpy (my_old_member_list, my_member_list,
592 my_member_list_entries *
sizeof (
unsigned int));
593 my_old_member_list_entries = my_member_list_entries;
595 downlist_inform_clients ();
597 joinlist_inform_clients ();
599 joinlist_messages_delete ();
601 notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
604 static void cpg_sync_abort (
void)
607 joinlist_messages_delete ();
610 static int notify_lib_totem_membership (
612 int member_list_entries,
613 const unsigned int *member_list)
627 res->member_list_entries = member_list_entries;
628 res->header.size = size;
630 res->header.error =
CS_OK;
636 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
647 static int notify_lib_joinlist(
650 int joined_list_entries,
652 int left_list_entries,
665 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
667 if (mar_name_compare (&pi->
group, group_name) == 0) {
671 for (i = 0; i < left_list_entries; i++) {
672 if (left_list[i].
nodeid == pi->
nodeid && left_list[i].pid == pi->
pid) {
689 res->joined_list_entries = joined_list_entries;
690 res->left_list_entries = left_list_entries;
691 res->member_list_entries = count;
693 res->header.size = size;
695 res->header.error =
CS_OK;
698 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
701 if (mar_name_compare (&pi->
group, group_name) == 0) {
705 for (i = 0;i < left_list_entries; i++) {
706 if (left_list[i].
nodeid == pi->
nodeid && left_list[i].pid == pi->
pid) {
712 retgi->nodeid = pi->
nodeid;
713 retgi->pid = pi->
pid;
720 if (left_list_entries) {
722 retgi += left_list_entries;
725 if (joined_list_entries) {
727 retgi += joined_list_entries;
733 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
735 if (mar_name_compare (&cpd->
group_name, group_name) == 0) {
736 assert (joined_list_entries <= 1);
737 if (joined_list_entries) {
738 if (joined_list[0].
pid == cpd->
pid &&
749 if (left_list_entries) {
750 if (left_list[0].
pid == cpd->
pid &&
767 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
773 notify_lib_totem_membership (cpd->
conn, my_old_member_list_entries, my_old_member_list);
783 "%s: members(old:%d left:%d)",
789 static void downlist_inform_clients (
void)
799 int left_list_entries;
802 qb_map_iter_t *miter;
805 downlist_log(
"my downlist", &g_req_exec_cpg_downlist);
807 group_map = qb_skiplist_create();
814 for (iter = process_info_list_head.
next; iter != &process_info_list_head; ) {
819 for (i = 0; i < g_req_exec_cpg_downlist.left_nodes; i++) {
821 if (pi->
nodeid == g_req_exec_cpg_downlist.nodeids[i]) {
828 marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->
group);
829 cpg_group.value[cpg_group.length] = 0;
831 pcd = (
struct confchg_data *)qb_map_get(group_map, cpg_group.value);
833 pcd = (
struct confchg_data *)calloc(1,
sizeof(
struct confchg_data));
834 memcpy(&pcd->cpg_group, &cpg_group,
sizeof(
struct cpg_name));
835 qb_map_put(group_map, pcd->cpg_group.value, pcd);
837 size = pcd->left_list_entries;
838 pcd->left_list[size].nodeid = left_pi->
nodeid;
839 pcd->left_list[size].pid = left_pi->
pid;
841 pcd->left_list_entries++;
842 list_del (&left_pi->
list);
848 miter = qb_map_iter_create(group_map);
849 while (qb_map_iter_next(miter, (
void **)&pcd)) {
850 marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
852 log_printf (LOG_DEBUG,
"left_list_entries:%d", pcd->left_list_entries);
853 for (i=0; i<pcd->left_list_entries; i++) {
854 log_printf (LOG_DEBUG,
"left_list[%d] group:%s, ip:%s, pid:%d",
855 i, cpg_print_group_name(&group),
857 pcd->left_list[i].pid);
861 notify_lib_joinlist(&group, NULL,
863 pcd->left_list_entries,
869 qb_map_iter_free(miter);
870 qb_map_destroy(group_map);
876 static void joinlist_remove_zombie_pi_entries (
void)
884 for (pi_iter = process_info_list_head.
next; pi_iter != &process_info_list_head; ) {
886 pi_iter = pi_iter->
next;
899 for (jl_iter = joinlist_messages_head.
next;
900 jl_iter != &joinlist_messages_head;
901 jl_iter = jl_iter->
next) {
910 pi->
pid == stored_msg->
pid &&
923 static void joinlist_inform_clients (
void)
930 for (iter = joinlist_messages_head.
next;
931 iter != &joinlist_messages_head;
936 log_printf (LOG_DEBUG,
"joinlist_messages[%u] group:%s, ip:%s, pid:%d",
937 i++, cpg_print_group_name(&stored_msg->
group_name),
950 joinlist_remove_zombie_pi_entries ();
953 static void joinlist_messages_delete (
void)
958 for (iter = joinlist_messages_head.
next;
959 iter != &joinlist_messages_head;
962 iter_next = iter->
next;
965 list_del (&stored_msg->
list);
968 list_init (&joinlist_messages_head);
973 list_init (&joinlist_messages_head);
987 iter_next = iter->
next;
990 list_del (&pi->
list);
994 list_del (&cpg_iteration_instance->
list);
995 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->
handle);
998 static void cpg_pd_finalize (
struct cpg_pd *cpd)
1001 struct cpg_iteration_instance *cpii;
1008 iter_next = iter->
next;
1012 cpg_iteration_instance_finalize (cpii);
1015 list_del (&cpd->
list);
1018 static int cpg_lib_exit_fn (
void *conn)
1029 cpg_pd_finalize (cpd);
1035 static int cpg_node_joinleave_send (
unsigned int pid,
const mar_cpg_name_t *group_name,
int fn,
int reason)
1038 struct iovec req_exec_cpg_iovec;
1057 static void exec_cpg_procjoin_endian_convert (
void *msg)
1061 req_exec_cpg_procjoin->pid =
swab32(req_exec_cpg_procjoin->pid);
1062 swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
1063 req_exec_cpg_procjoin->reason =
swab32(req_exec_cpg_procjoin->reason);
1066 static void exec_cpg_joinlist_endian_convert (
void *msg_v)
1069 struct qb_ipc_response_header *res = (
struct qb_ipc_response_header *)msg;
1072 swab_mar_int32_t (&res->size);
1074 while ((
const char*)jle < msg + res->size) {
1081 static void exec_cpg_downlist_endian_convert_old (
void *msg)
1085 static void exec_cpg_downlist_endian_convert (
void *msg)
1090 req_exec_cpg_downlist->left_nodes =
swab32(req_exec_cpg_downlist->left_nodes);
1091 req_exec_cpg_downlist->old_members =
swab32(req_exec_cpg_downlist->old_members);
1093 for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
1094 req_exec_cpg_downlist->nodeids[i] =
swab32(req_exec_cpg_downlist->nodeids[i]);
1099 static void exec_cpg_mcast_endian_convert (
void *msg)
1103 swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1104 swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1105 req_exec_cpg_mcast->pid =
swab32(req_exec_cpg_mcast->pid);
1106 req_exec_cpg_mcast->msglen =
swab32(req_exec_cpg_mcast->msglen);
1107 swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1110 static void exec_cpg_partial_mcast_endian_convert (
void *msg)
1114 swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1115 swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1116 req_exec_cpg_mcast->pid =
swab32(req_exec_cpg_mcast->pid);
1117 req_exec_cpg_mcast->msglen =
swab32(req_exec_cpg_mcast->msglen);
1118 req_exec_cpg_mcast->fraglen =
swab32(req_exec_cpg_mcast->fraglen);
1119 req_exec_cpg_mcast->type =
swab32(req_exec_cpg_mcast->type);
1120 swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1126 for (iter = process_info_list_head.
next; iter != &process_info_list_head; ) {
1130 if (pi->
pid == pid && pi->
nodeid == nodeid &&
1131 mar_name_compare (&pi->
group, group_name) == 0) {
1139 static void do_proc_join(
1142 unsigned int nodeid,
1151 if (process_info_find (name, pid, nodeid) != NULL) {
1161 memcpy(&pi->
group, name,
sizeof(*name));
1162 list_init(&pi->
list);
1167 list_to_add = &process_info_list_head;
1168 for (list = process_info_list_head.
next; list != &process_info_list_head; list = list->
next) {
1178 list_add (&pi->
list, list_to_add);
1180 notify_info.pid = pi->
pid;
1181 notify_info.nodeid =
nodeid;
1182 notify_info.reason = reason;
1184 notify_lib_joinlist(&pi->
group, NULL,
1190 static void do_proc_leave(
1193 unsigned int nodeid,
1200 notify_info.pid = pid;
1201 notify_info.nodeid =
nodeid;
1202 notify_info.reason = reason;
1204 notify_lib_joinlist(name, NULL,
1209 for (iter = process_info_list_head.
next; iter != &process_info_list_head; ) {
1213 if (pi->
pid == pid && pi->
nodeid == nodeid &&
1214 mar_name_compare (&pi->
group, name)==0) {
1215 list_del (&pi->
list);
1221 static void message_handler_req_exec_cpg_downlist_old (
1222 const void *message,
1223 unsigned int nodeid)
1229 static void message_handler_req_exec_cpg_downlist(
1230 const void *message,
1231 unsigned int nodeid)
1233 const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
1236 req_exec_cpg_downlist->left_nodes);
1240 static void message_handler_req_exec_cpg_procjoin (
1241 const void *message,
1242 unsigned int nodeid)
1249 (
unsigned int)req_exec_cpg_procjoin->pid);
1251 do_proc_join (&req_exec_cpg_procjoin->group_name,
1252 req_exec_cpg_procjoin->pid, nodeid,
1256 static void message_handler_req_exec_cpg_procleave (
1257 const void *message,
1258 unsigned int nodeid)
1265 (
unsigned int)req_exec_cpg_procjoin->pid);
1267 do_proc_leave (&req_exec_cpg_procjoin->group_name,
1268 req_exec_cpg_procjoin->pid, nodeid,
1269 req_exec_cpg_procjoin->reason);
1274 static void message_handler_req_exec_cpg_joinlist (
1275 const void *message_v,
1276 unsigned int nodeid)
1278 const char *message = message_v;
1279 const struct qb_ipc_response_header *res = (
const struct qb_ipc_response_header *)message;
1286 while ((
const char*)jle < message + res->size) {
1290 stored_msg->
pid = jle->
pid;
1292 list_init (&stored_msg->
list);
1293 list_add (&stored_msg->
list, &joinlist_messages_head);
1298 static void message_handler_req_exec_cpg_mcast (
1299 const void *message,
1300 unsigned int nodeid)
1304 int msglen = req_exec_cpg_mcast->msglen;
1307 struct iovec iovec[2];
1321 iovec[1].iov_base = (
char*)message+
sizeof(*req_exec_cpg_mcast);
1322 iovec[1].iov_len = msglen;
1324 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; ) {
1329 && (mar_name_compare (&cpd->
group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1333 for (pi_iter = process_info_list_head.
next;
1334 pi_iter != &process_info_list_head; pi_iter = pi_iter->
next) {
1338 if (pi->
nodeid == nodeid &&
1339 mar_name_compare (&pi->
group, &req_exec_cpg_mcast->group_name) == 0) {
1356 static void message_handler_req_exec_cpg_partial_mcast (
1357 const void *message,
1358 unsigned int nodeid)
1362 int msglen = req_exec_cpg_mcast->fraglen;
1365 struct iovec iovec[2];
1383 iovec[1].iov_base = (
char*)message+
sizeof(*req_exec_cpg_mcast);
1384 iovec[1].iov_len = msglen;
1386 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; ) {
1391 && (mar_name_compare (&cpd->
group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1395 for (pi_iter = process_info_list_head.
next;
1396 pi_iter != &process_info_list_head; pi_iter = pi_iter->
next) {
1400 if (pi->
nodeid == nodeid &&
1401 mar_name_compare (&pi->
group, &req_exec_cpg_mcast->group_name) == 0) {
1419 static int cpg_exec_send_downlist(
void)
1424 g_req_exec_cpg_downlist.header.size =
sizeof(
struct req_exec_cpg_downlist);
1426 g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
1428 iov.iov_base = (
void *)&g_req_exec_cpg_downlist;
1429 iov.iov_len = g_req_exec_cpg_downlist.header.size;
1434 static int cpg_exec_send_joinlist(
void)
1438 struct qb_ipc_response_header *res;
1441 struct iovec req_exec_cpg_iovec;
1443 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
1455 buf = alloca(
sizeof(
struct qb_ipc_response_header) +
sizeof(
struct join_list_entry) * count);
1461 jle = (
struct join_list_entry *)(buf +
sizeof(
struct qb_ipc_response_header));
1462 res = (
struct qb_ipc_response_header *)buf;
1464 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
1475 res->size =
sizeof(
struct qb_ipc_response_header)+sizeof(struct
join_list_entry) * count;
1477 req_exec_cpg_iovec.iov_base = buf;
1478 req_exec_cpg_iovec.iov_len = res->size;
1483 static int cpg_lib_init_fn (
void *conn)
1486 memset (cpd, 0,
sizeof(
struct cpg_pd));
1488 list_add (&cpd->
list, &cpg_pd_list_head);
1499 static void message_handler_req_lib_cpg_join (
void *conn,
const void *message)
1508 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
1511 if (cpd_item->
pid == req_lib_cpg_join->pid &&
1512 mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->
group_name) == 0) {
1524 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
1528 mar_name_compare(&req_lib_cpg_join->group_name, &pi->
group) == 0) {
1544 cpd->
pid = req_lib_cpg_join->pid;
1545 cpd->
flags = req_lib_cpg_join->flags;
1546 memcpy (&cpd->
group_name, &req_lib_cpg_join->group_name,
1549 cpg_node_joinleave_send (req_lib_cpg_join->pid,
1550 &req_lib_cpg_join->group_name,
1572 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message)
1594 cpg_node_joinleave_send (req_lib_cpg_leave->pid,
1595 &req_lib_cpg_leave->group_name,
1609 static void message_handler_req_lib_cpg_finalize (
1611 const void *message)
1623 list_del (&cpd->
list);
1624 list_init (&cpd->
list);
1644 fd = open (path, O_RDWR, 0600);
1652 res = ftruncate (fd, bytes);
1654 goto error_close_unlink;
1657 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
1660 if (addr == MAP_FAILED) {
1661 goto error_close_unlink;
1664 madvise(addr, bytes, MADV_NOSYNC);
1669 munmap (addr, bytes);
1681 static inline int zcb_alloc (
1683 const char *path_to_file,
1690 zcb_mapped = malloc (
sizeof (
struct zcb_mapped));
1691 if (zcb_mapped == NULL) {
1704 list_init (&zcb_mapped->
list);
1712 static inline int zcb_free (
struct zcb_mapped *zcb_mapped)
1716 res = munmap (zcb_mapped->
addr, zcb_mapped->
size);
1717 list_del (&zcb_mapped->
list);
1722 static inline int zcb_by_addr_free (
struct cpg_pd *cpd,
void *addr)
1725 struct zcb_mapped *zcb_mapped;
1726 unsigned int res = 0;
1731 zcb_mapped =
list_entry (list,
struct zcb_mapped, list);
1733 if (zcb_mapped->
addr == addr) {
1734 res = zcb_free (zcb_mapped);
1742 static inline int zcb_all_free (
1746 struct zcb_mapped *zcb_mapped;
1751 zcb_mapped =
list_entry (list,
struct zcb_mapped, list);
1755 zcb_free (zcb_mapped);
1765 static uint64_t void2serveraddr (
void *server_ptr)
1773 static void *serveraddr2void (uint64_t
server_addr)
1781 static void message_handler_req_lib_cpg_zc_alloc (
1783 const void *message)
1786 struct qb_ipc_response_header res_header;
1794 res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
1801 res_header.size =
sizeof (
struct qb_ipc_response_header);
1808 static void message_handler_req_lib_cpg_zc_free (
1810 const void *message)
1813 struct qb_ipc_response_header res_header;
1819 addr = serveraddr2void (hdr->server_address);
1821 zcb_by_addr_free (cpd, addr);
1823 res_header.size =
sizeof (
struct qb_ipc_response_header);
1831 static void message_handler_req_lib_cpg_partial_mcast (
void *conn,
const void *message)
1837 struct iovec req_exec_cpg_iovec[2];
1840 int msglen = req_lib_cpg_mcast->fraglen;
1872 if (error ==
CS_OK) {
1873 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + msglen;
1876 req_exec_cpg_mcast.pid = cpd->
pid;
1877 req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
1878 req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
1879 req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
1881 memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1884 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
1885 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
1886 req_exec_cpg_iovec[1].iov_base = (
char *)&req_lib_cpg_mcast->message;
1887 req_exec_cpg_iovec[1].iov_len = msglen;
1890 assert(result == 0);
1893 conn, group_name.value, cpd->
cpd_state, error);
1902 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message)
1904 const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message;
1908 struct iovec req_exec_cpg_iovec[2];
1909 struct req_exec_cpg_mcast req_exec_cpg_mcast;
1910 int msglen = req_lib_cpg_mcast->msglen;
1931 if (error ==
CS_OK) {
1932 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + msglen;
1935 req_exec_cpg_mcast.pid = cpd->
pid;
1936 req_exec_cpg_mcast.msglen = msglen;
1938 memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1941 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
1942 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
1943 req_exec_cpg_iovec[1].iov_base = (
char *)&req_lib_cpg_mcast->message;
1944 req_exec_cpg_iovec[1].iov_len = msglen;
1947 assert(result == 0);
1950 conn, group_name.value, cpd->
cpd_state, error);
1954 static void message_handler_req_lib_cpg_zc_execute (
1956 const void *message)
1959 struct qb_ipc_request_header *
header;
1962 struct iovec req_exec_cpg_iovec[2];
1963 struct req_exec_cpg_mcast req_exec_cpg_mcast;
1964 struct req_lib_cpg_mcast *req_lib_cpg_mcast;
1970 header = (
struct qb_ipc_request_header *)(((
char *)serveraddr2void(hdr->server_address) + sizeof (
struct coroipcs_zc_header)));
1971 req_lib_cpg_mcast = (
struct req_lib_cpg_mcast *)header;
1990 if (error ==
CS_OK) {
1991 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
1994 req_exec_cpg_mcast.pid = cpd->
pid;
1995 req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
1997 memcpy(&req_exec_cpg_mcast.group_name, &cpd->
group_name,
2000 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
2001 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
2002 req_exec_cpg_iovec[1].iov_base = (
char *)header +
sizeof(
struct req_lib_cpg_mcast);
2003 req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
2020 static void message_handler_req_lib_cpg_membership (
void *conn,
2021 const void *message)
2024 (
struct req_lib_cpg_membership_get *)message;
2027 int member_count = 0;
2034 for (iter = process_info_list_head.
next;
2035 iter != &process_info_list_head; iter = iter->
next) {
2038 if (mar_name_compare (&pi->
group, &req_lib_cpg_membership_get->group_name) == 0) {
2050 static void message_handler_req_lib_cpg_local_get (
void *conn,
2051 const void *message)
2064 static void message_handler_req_lib_cpg_iteration_initialize (
2066 const void *message)
2073 struct cpg_iteration_instance *cpg_iteration_instance;
2086 res = hdb_handle_create (&cpg_iteration_handle_t_db,
sizeof (
struct cpg_iteration_instance),
2087 &cpg_iteration_handle);
2094 res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (
void *)&cpg_iteration_instance);
2102 cpg_iteration_instance->
handle = cpg_iteration_handle;
2107 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
2119 iter2 = iter2->
next) {
2122 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2138 if (mar_name_compare (&pi->
group, &req_lib_cpg_iterationinitialize->group_name) != 0)
2151 goto error_put_destroy;
2155 list_init (&new_pi->
list);
2169 iter2 = iter2->
next) {
2172 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2177 list_add (&new_pi->
list, iter2);
2187 list_init (&cpg_iteration_instance->
list);
2193 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2195 if (error !=
CS_OK) {
2196 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2209 static void message_handler_req_lib_cpg_iteration_next (
2211 const void *message)
2215 struct cpg_iteration_instance *cpg_iteration_instance;
2222 res = hdb_handle_get (&cpg_iteration_handle_t_db,
2223 req_lib_cpg_iterationnext->iteration_handle,
2224 (
void *)&cpg_iteration_instance);
2231 assert (cpg_iteration_instance);
2252 hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
2262 static void message_handler_req_lib_cpg_iteration_finalize (
2264 const void *message)
2268 struct cpg_iteration_instance *cpg_iteration_instance;
2274 res = hdb_handle_get (&cpg_iteration_handle_t_db,
2275 req_lib_cpg_iterationfinalize->iteration_handle,
2276 (
void *)&cpg_iteration_instance);
2283 assert (cpg_iteration_instance);
2285 cpg_iteration_instance_finalize (cpg_iteration_instance);
2286 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->
handle);