47 #include <sys/types.h>
48 #include <sys/socket.h>
55 #include <qb/qbdefs.h>
56 #include <qb/qbipcc.h>
69 #define MAP_ANONYMOUS MAP_ANON
76 #define MAX_RETRIES 100
81 #define CPG_MEMORY_MAP_UMASK 077
93 qb_ipcc_connection_t *
c;
104 static void cpg_inst_free (
void *inst);
123 coroipcc_msg_send_reply_receive (
124 qb_ipcc_connection_t *c,
125 const struct iovec *iov,
126 unsigned int iov_len,
130 return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
136 list_del (&cpg_iteration_instance->
list);
140 static void cpg_inst_free (
void *inst)
143 qb_ipcc_disconnect(cpg_inst->
c);
155 iter_next = iter->
next;
159 cpg_iteration_instance_finalize (cpg_iteration_instance);
161 hdb_handle_destroy (&cpg_handle_t_db, handle);
194 struct cpg_inst *cpg_inst;
198 goto error_no_destroy;
201 error =
hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db,
sizeof (
struct cpg_inst), handle));
202 if (error !=
CS_OK) {
203 goto error_no_destroy;
206 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, *handle, (
void *)&cpg_inst));
207 if (error !=
CS_OK) {
212 if (cpg_inst->
c == NULL) {
214 goto error_put_destroy;
217 if (model_data != NULL) {
239 hdb_handle_put (&cpg_handle_t_db, *handle);
244 hdb_handle_put (&cpg_handle_t_db, *handle);
246 hdb_handle_destroy (&cpg_handle_t_db, *handle);
254 struct cpg_inst *cpg_inst;
260 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
261 if (error !=
CS_OK) {
269 hdb_handle_put (&cpg_handle_t_db, handle);
278 req_lib_cpg_finalize.header.size =
sizeof (
struct req_lib_cpg_finalize);
281 iov.iov_base = (
void *)&req_lib_cpg_finalize;
282 iov.iov_len =
sizeof (
struct req_lib_cpg_finalize);
284 error = coroipcc_msg_send_reply_receive (cpg_inst->
c,
287 &res_lib_cpg_finalize,
288 sizeof (
struct res_lib_cpg_finalize));
290 cpg_inst_finalize (cpg_inst, handle);
291 hdb_handle_put (&cpg_handle_t_db, handle);
301 struct cpg_inst *cpg_inst;
303 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
304 if (error !=
CS_OK) {
310 hdb_handle_put (&cpg_handle_t_db, handle);
320 struct cpg_inst *cpg_inst;
322 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
323 if (error !=
CS_OK) {
329 hdb_handle_put (&cpg_handle_t_db, handle);
339 struct cpg_inst *cpg_inst;
341 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
342 if (error !=
CS_OK) {
348 hdb_handle_put (&cpg_handle_t_db, handle);
358 struct cpg_inst *cpg_inst;
360 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
361 if (error !=
CS_OK) {
367 hdb_handle_put (&cpg_handle_t_db, handle);
379 struct cpg_inst *cpg_inst;
384 struct cpg_inst cpg_inst_copy;
385 struct qb_ipc_response_header *dispatch_data;
400 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
401 if (error !=
CS_OK) {
413 dispatch_data = (
struct qb_ipc_response_header *)dispatch_buf;
415 errno_res = qb_ipcc_event_recv (
439 if (error !=
CS_OK) {
448 memcpy (&cpg_inst_copy, cpg_inst,
sizeof (
struct cpg_inst));
454 switch (dispatch_data->id) {
462 marshall_from_mar_cpg_name_t (
464 &res_cpg_deliver_callback->group_name);
468 res_cpg_deliver_callback->nodeid,
469 res_cpg_deliver_callback->pid,
470 &res_cpg_deliver_callback->message,
471 res_cpg_deliver_callback->msglen);
477 marshall_from_mar_cpg_name_t (
479 &res_cpg_partial_deliver_callback->group_name);
484 assembly_data = NULL;
487 if (current_assembly_data->
nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->
pid == res_cpg_partial_deliver_callback->pid) {
488 assembly_data = current_assembly_data;
501 list_del (&assembly_data->
list);
504 assembly_data = NULL;
508 if (!assembly_data) {
513 assembly_data->
nodeid = res_cpg_partial_deliver_callback->nodeid;
514 assembly_data->
pid = res_cpg_partial_deliver_callback->pid;
515 assembly_data->
assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
522 list_init (&assembly_data->
list);
528 res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
529 assembly_data->
assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
534 res_cpg_partial_deliver_callback->nodeid,
535 res_cpg_partial_deliver_callback->pid,
537 res_cpg_partial_deliver_callback->msglen);
539 list_del (&assembly_data->
list);
553 for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
554 marshall_from_mar_cpg_address_t (&member_list[i],
557 left_list_start = res_cpg_confchg_callback->
member_list +
558 res_cpg_confchg_callback->member_list_entries;
559 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
560 marshall_from_mar_cpg_address_t (&left_list[i],
561 &left_list_start[i]);
563 joined_list_start = res_cpg_confchg_callback->
member_list +
564 res_cpg_confchg_callback->member_list_entries +
565 res_cpg_confchg_callback->left_list_entries;
566 for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
567 marshall_from_mar_cpg_address_t (&joined_list[i],
568 &joined_list_start[i]);
570 marshall_from_mar_cpg_name_t (
572 &res_cpg_confchg_callback->group_name);
577 res_cpg_confchg_callback->member_list_entries,
579 res_cpg_confchg_callback->left_list_entries,
581 res_cpg_confchg_callback->joined_list_entries);
586 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
590 tmp_iter = iter->
next;
592 if (current_assembly_data->
nodeid != left_list[i].
nodeid || current_assembly_data->
pid != left_list[i].
pid)
595 list_del (¤t_assembly_data->
list);
597 free(current_assembly_data);
609 marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
610 for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
611 totem_member_list[i] = res_cpg_totem_confchg_callback->
member_list[i];
616 res_cpg_totem_confchg_callback->member_list_entries,
645 hdb_handle_put (&cpg_handle_t_db, handle);
654 struct cpg_inst *cpg_inst;
663 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
664 if (error !=
CS_OK) {
669 req_lib_cpg_join.header.size =
sizeof (
struct req_lib_cpg_join);
671 req_lib_cpg_join.pid = getpid();
672 req_lib_cpg_join.flags = 0;
680 marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
683 iov[0].iov_base = (
void *)&req_lib_cpg_join;
684 iov[0].iov_len =
sizeof (
struct req_lib_cpg_join);
687 error = coroipcc_msg_send_reply_receive (cpg_inst->
c, iov, 1,
690 if (error !=
CS_OK) {
695 error = response.header.error;
698 hdb_handle_put (&cpg_handle_t_db, handle);
708 struct cpg_inst *cpg_inst;
717 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
718 if (error !=
CS_OK) {
722 req_lib_cpg_leave.header.size =
sizeof (
struct req_lib_cpg_leave);
724 req_lib_cpg_leave.pid = getpid();
725 marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
728 iov[0].iov_base = (
void *)&req_lib_cpg_leave;
729 iov[0].iov_len =
sizeof (
struct req_lib_cpg_leave);
732 error = coroipcc_msg_send_reply_receive (cpg_inst->
c, iov, 1,
733 &res_lib_cpg_leave, sizeof (
struct res_lib_cpg_leave));
735 if (error !=
CS_OK) {
738 }
while (res_lib_cpg_leave.header.error ==
CS_ERR_BUSY);
740 error = res_lib_cpg_leave.header.error;
743 hdb_handle_put (&cpg_handle_t_db, handle);
752 int *member_list_entries)
755 struct cpg_inst *cpg_inst;
764 if (member_list == NULL) {
767 if (member_list_entries == NULL) {
771 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
772 if (error !=
CS_OK) {
776 req_lib_cpg_membership_get.header.size =
sizeof (
struct req_lib_cpg_membership_get);
779 marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
782 iov.iov_base = (
void *)&req_lib_cpg_membership_get;
783 iov.iov_len =
sizeof (
struct req_lib_cpg_membership_get);
785 error = coroipcc_msg_send_reply_receive (cpg_inst->
c, &iov, 1,
786 &res_lib_cpg_membership_get, sizeof (res_lib_cpg_membership_get));
788 if (error !=
CS_OK) {
792 error = res_lib_cpg_membership_get.header.error;
797 *member_list_entries = res_lib_cpg_membership_get.member_count;
799 for (i = 0; i < res_lib_cpg_membership_get.member_count; i++) {
800 marshall_from_mar_cpg_address_t (&member_list[i],
806 hdb_handle_put (&cpg_handle_t_db, handle);
813 unsigned int *local_nodeid)
816 struct cpg_inst *cpg_inst;
821 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
822 if (error !=
CS_OK) {
826 req_lib_cpg_local_get.header.size =
sizeof (
struct qb_ipc_request_header);
829 iov.iov_base = (
void *)&req_lib_cpg_local_get;
830 iov.iov_len =
sizeof (
struct req_lib_cpg_local_get);
832 error = coroipcc_msg_send_reply_receive (cpg_inst->
c, &iov, 1,
833 &res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
835 if (error !=
CS_OK) {
839 error = res_lib_cpg_local_get.header.error;
841 *local_nodeid = res_lib_cpg_local_get.local_nodeid;
844 hdb_handle_put (&cpg_handle_t_db, handle);
854 struct cpg_inst *cpg_inst;
856 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
857 if (error !=
CS_OK) {
863 hdb_handle_put (&cpg_handle_t_db, handle);
869 memory_map (
char *path,
const char *file,
void **buf,
size_t bytes)
878 long int sysconf_page_size;
881 snprintf (path, PATH_MAX,
"/dev/shm/%s", file);
885 (void)umask(old_umask);
890 (void)umask(old_umask);
896 res = ftruncate (fd, bytes);
898 goto error_close_unlink;
900 sysconf_page_size = sysconf(_SC_PAGESIZE);
901 if (sysconf_page_size <= 0) {
902 goto error_close_unlink;
904 page_size = sysconf_page_size;
905 buffer = malloc (page_size);
906 if (buffer == NULL) {
907 goto error_close_unlink;
909 memset (buffer, 0, page_size);
910 for (i = 0; i < (bytes / page_size); i++) {
912 written = write (fd, buffer, page_size);
913 if (written == -1 && errno == EINTR) {
916 if (written != page_size) {
918 goto error_close_unlink;
923 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
926 if (addr == MAP_FAILED) {
927 goto error_close_unlink;
930 madvise(addr, bytes, MADV_NOSYNC);
957 struct qb_ipc_response_header res_coroipcs_zc_alloc;
962 struct cpg_inst *cpg_inst;
964 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
965 if (error !=
CS_OK) {
969 map_size = size +
sizeof (
struct req_lib_cpg_mcast) + sizeof (struct coroipcs_zc_header);
970 assert(memory_map (path,
"corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
974 munmap (buf, map_size);
980 req_coroipcc_zc_alloc.map_size = map_size;
981 strcpy (req_coroipcc_zc_alloc.path_to_file, path);
983 iovec.iov_base = (
void *)&req_coroipcc_zc_alloc;
986 error = coroipcc_msg_send_reply_receive (
990 &res_coroipcs_zc_alloc,
991 sizeof (
struct qb_ipc_response_header));
993 if (error !=
CS_OK) {
997 hdr = (
struct coroipcs_zc_header *)buf;
999 *buffer = ((
char *)buf) +
sizeof (
struct coroipcs_zc_header) + sizeof (struct req_lib_cpg_mcast);
1002 hdb_handle_put (&cpg_handle_t_db, handle);
1012 struct cpg_inst *cpg_inst;
1014 struct qb_ipc_response_header res_coroipcs_zc_free;
1018 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
1019 if (error !=
CS_OK) {
1025 req_coroipcc_zc_free.map_size = header->
map_size;
1028 iovec.iov_base = (
void *)&req_coroipcc_zc_free;
1031 error = coroipcc_msg_send_reply_receive (
1035 &res_coroipcs_zc_free,
1036 sizeof (
struct qb_ipc_response_header));
1038 if (error !=
CS_OK) {
1042 res = munmap ((
void *)header, header->
map_size);
1050 hdb_handle_put (&cpg_handle_t_db, handle);
1062 struct cpg_inst *cpg_inst;
1069 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
1070 if (error !=
CS_OK) {
1079 req_lib_cpg_mcast = (
struct req_lib_cpg_mcast *)(((
char *)msg) -
sizeof (
struct req_lib_cpg_mcast));
1080 req_lib_cpg_mcast->header.size =
sizeof (
struct req_lib_cpg_mcast) +
1084 req_lib_cpg_mcast->guarantee =
guarantee;
1085 req_lib_cpg_mcast->msglen = msg_len;
1093 iovec.iov_base = (
void *)&req_coroipcc_zc_execute;
1096 error = coroipcc_msg_send_reply_receive (
1101 sizeof(res_lib_cpg_mcast));
1103 if (error !=
CS_OK) {
1107 error = res_lib_cpg_mcast.header.error;
1110 hdb_handle_put (&cpg_handle_t_db, handle);
1116 struct cpg_inst *cpg_inst,
1119 const struct iovec *iovec,
1120 unsigned int iov_len)
1124 struct iovec iov[2];
1128 size_t iov_sent = 0;
1140 qb_ipcc_fc_enable_max_set(cpg_inst->
c, 2);
1142 while (error ==
CS_OK && sent < msg_len) {
1145 if ( (iovec[i].iov_len - iov_sent) > cpg_inst->
max_msg_size) {
1149 iov[1].iov_len = iovec[i].iov_len - iov_sent;
1155 else if ((sent + iov[1].iov_len) == msg_len) {
1164 iov[1].iov_base = (
char *)iovec[i].iov_base + iov_sent;
1167 error = coroipcc_msg_send_reply_receive (cpg_inst->
c, iov, 2,
1172 fprintf(stderr,
"sleep. counter=%d\n", retry_count);
1180 iov_sent += iov[1].iov_len;
1181 sent += iov[1].iov_len;
1184 if (iov_sent >= iovec[i].iov_len) {
1191 qb_ipcc_fc_enable_max_set(cpg_inst->
c, 1);
1200 const struct iovec *iovec,
1201 unsigned int iov_len)
1205 struct cpg_inst *cpg_inst;
1206 struct iovec iov[64];
1210 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
1211 if (error !=
CS_OK) {
1215 for (i = 0; i < iov_len; i++ ) {
1216 msg_len += iovec[i].iov_len;
1220 error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
1224 req_lib_cpg_mcast.header.size =
sizeof (
struct req_lib_cpg_mcast) +
1228 req_lib_cpg_mcast.guarantee =
guarantee;
1229 req_lib_cpg_mcast.msglen = msg_len;
1231 iov[0].iov_base = (
void *)&req_lib_cpg_mcast;
1232 iov[0].iov_len =
sizeof (
struct req_lib_cpg_mcast);
1233 memcpy (&iov[1], iovec, iov_len *
sizeof (
struct iovec));
1235 qb_ipcc_fc_enable_max_set(cpg_inst->
c, 2);
1237 qb_ipcc_fc_enable_max_set(cpg_inst->
c, 1);
1240 hdb_handle_put (&cpg_handle_t_db, handle);
1253 struct cpg_inst *cpg_inst;
1261 if (cpg_iteration_handle == NULL) {
1276 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
1277 if (error !=
CS_OK) {
1281 error =
hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1283 if (error !=
CS_OK) {
1284 goto error_put_cpg_db;
1287 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1288 (
void *)&cpg_iteration_instance));
1289 if (error !=
CS_OK) {
1293 cpg_iteration_instance->
conn = cpg_inst->
c;
1295 list_init (&cpg_iteration_instance->
list);
1297 req_lib_cpg_iterationinitialize.header.size =
sizeof (
struct req_lib_cpg_iterationinitialize);
1299 req_lib_cpg_iterationinitialize.iteration_type = iteration_type;
1301 marshall_to_mar_cpg_name_t (&req_lib_cpg_iterationinitialize.group_name, group);
1304 iov.iov_base = (
void *)&req_lib_cpg_iterationinitialize;
1305 iov.iov_len =
sizeof (
struct req_lib_cpg_iterationinitialize);
1307 error = coroipcc_msg_send_reply_receive (cpg_inst->
c,
1310 &res_lib_cpg_iterationinitialize,
1311 sizeof (
struct res_lib_cpg_iterationinitialize));
1313 if (error !=
CS_OK) {
1314 goto error_put_destroy;
1318 res_lib_cpg_iterationinitialize.iteration_handle;
1323 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1324 hdb_handle_put (&cpg_handle_t_db, handle);
1326 return (res_lib_cpg_iterationinitialize.header.error);
1329 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1331 hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1333 hdb_handle_put (&cpg_handle_t_db, handle);
1347 if (description == NULL) {
1351 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1352 (
void *)&cpg_iteration_instance));
1353 if (error !=
CS_OK) {
1357 req_lib_cpg_iterationnext.header.size =
sizeof (
struct req_lib_cpg_iterationnext);
1362 &req_lib_cpg_iterationnext,
1363 req_lib_cpg_iterationnext.header.size));
1364 if (error !=
CS_OK) {
1369 &res_lib_cpg_iterationnext,
1370 sizeof(
struct res_lib_cpg_iterationnext), -1));
1371 if (error !=
CS_OK) {
1375 marshall_from_mar_cpg_iteration_description_t(
1377 &res_lib_cpg_iterationnext.description);
1379 error = res_lib_cpg_iterationnext.header.error;
1382 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1397 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1398 (
void *)&cpg_iteration_instance));
1399 if (error !=
CS_OK) {
1403 req_lib_cpg_iterationfinalize.header.size =
sizeof (
struct req_lib_cpg_iterationfinalize);
1407 iov.iov_base = (
void *)&req_lib_cpg_iterationfinalize;
1408 iov.iov_len =
sizeof (
struct req_lib_cpg_iterationfinalize);
1410 error = coroipcc_msg_send_reply_receive (cpg_iteration_instance->
conn,
1413 &res_lib_cpg_iterationfinalize,
1414 sizeof (
struct req_lib_cpg_iterationfinalize));
1416 if (error !=
CS_OK) {
1420 cpg_iteration_instance_finalize (cpg_iteration_instance);
1423 return (res_lib_cpg_iterationfinalize.header.error);
1426 hdb_handle_put (&cpg_iteration_handle_t_db, handle);