corosync  2.4.5
lib/cpg.c
Go to the documentation of this file.
1 /*
2  * vi: set autoindent tabstop=4 shiftwidth=4 :
3  *
4  * Copyright (c) 2006-2015 Red Hat, Inc.
5  *
6  * All rights reserved.
7  *
8  * Author: Christine Caulfield (ccaulfi@redhat.com)
9  * Author: Jan Friesse (jfriesse@redhat.com)
10  *
11  * This software licensed under BSD license, the text of which follows:
12  *
13  * Redistribution and use in source and binary forms, with or without
14  * modification, are permitted provided that the following conditions are met:
15  *
16  * - Redistributions of source code must retain the above copyright notice,
17  * this list of conditions and the following disclaimer.
18  * - Redistributions in binary form must reproduce the above copyright notice,
19  * this list of conditions and the following disclaimer in the documentation
20  * and/or other materials provided with the distribution.
21  * - Neither the name of the MontaVista Software, Inc. nor the names of its
22  * contributors may be used to endorse or promote products derived from this
23  * software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35  * THE POSSIBILITY OF SUCH DAMAGE.
36  */
37 /*
38  * Provides a closed process group API using the coroipcc executive
39  */
40 
41 #include <config.h>
42 
43 #include <stdlib.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <unistd.h>
47 #include <sys/types.h>
48 #include <sys/socket.h>
49 #include <sys/mman.h>
50 #include <sys/uio.h>
51 #include <sys/stat.h>
52 #include <errno.h>
53 #include <limits.h>
54 
55 #include <qb/qbdefs.h>
56 #include <qb/qbipcc.h>
57 #include <qb/qblog.h>
58 
59 #include <corosync/hdb.h>
60 #include <corosync/list.h>
61 #include <corosync/corotypes.h>
62 #include <corosync/corodefs.h>
63 #include <corosync/cpg.h>
64 #include <corosync/ipc_cpg.h>
65 
66 #include "util.h"
67 
68 #ifndef MAP_ANONYMOUS
69 #define MAP_ANONYMOUS MAP_ANON
70 #endif
71 
72 /*
73  * Maximum number of times to retry a send when transmitting
74  * a large message fragment
75  */
76 #define MAX_RETRIES 100
77 
78 /*
79  * ZCB files have following umask (umask is same as used in libqb)
80  */
81 #define CPG_MEMORY_MAP_UMASK 077
82 
84 {
85  struct list_head list;
86  uint32_t nodeid;
87  uint32_t pid;
88  char *assembly_buf;
89  uint32_t assembly_buf_ptr;
90 };
91 
92 struct cpg_inst {
93  qb_ipcc_connection_t *c;
94  int finalize;
95  void *context;
96  union {
99  };
101  uint32_t max_msg_size;
103 };
104 static void cpg_inst_free (void *inst);
105 
106 DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free);
107 
110  qb_ipcc_connection_t *conn;
112  struct list_head list;
113 };
114 
115 DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
116 
117 
118 /*
119  * Internal (not visible by API) functions
120  */
121 
122 static cs_error_t
123 coroipcc_msg_send_reply_receive (
124  qb_ipcc_connection_t *c,
125  const struct iovec *iov,
126  unsigned int iov_len,
127  void *res_msg,
128  size_t res_len)
129 {
130  return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
132 }
133 
134 static void cpg_iteration_instance_finalize (struct cpg_iteration_instance_t *cpg_iteration_instance)
135 {
136  list_del (&cpg_iteration_instance->list);
137  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
138 }
139 
140 static void cpg_inst_free (void *inst)
141 {
142  struct cpg_inst *cpg_inst = (struct cpg_inst *)inst;
143  qb_ipcc_disconnect(cpg_inst->c);
144 }
145 
146 static void cpg_inst_finalize (struct cpg_inst *cpg_inst, hdb_handle_t handle)
147 {
148  struct list_head *iter, *iter_next;
150 
151  /*
152  * Traverse thru iteration instances and delete them
153  */
154  for (iter = cpg_inst->iteration_list_head.next; iter != &cpg_inst->iteration_list_head;iter = iter_next) {
155  iter_next = iter->next;
156 
157  cpg_iteration_instance = list_entry (iter, struct cpg_iteration_instance_t, list);
158 
159  cpg_iteration_instance_finalize (cpg_iteration_instance);
160  }
161  hdb_handle_destroy (&cpg_handle_t_db, handle);
162 }
163 
172  cpg_handle_t *handle,
173  cpg_callbacks_t *callbacks)
174 {
175  cpg_model_v1_data_t model_v1_data;
176 
177  memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
178 
179  if (callbacks) {
180  model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
181  model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
182  }
183 
184  return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
185 }
186 
188  cpg_handle_t *handle,
189  cpg_model_t model,
190  cpg_model_data_t *model_data,
191  void *context)
192 {
193  cs_error_t error;
194  struct cpg_inst *cpg_inst;
195 
196  if (model != CPG_MODEL_V1) {
197  error = CS_ERR_INVALID_PARAM;
198  goto error_no_destroy;
199  }
200 
201  error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
202  if (error != CS_OK) {
203  goto error_no_destroy;
204  }
205 
206  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, *handle, (void *)&cpg_inst));
207  if (error != CS_OK) {
208  goto error_destroy;
209  }
210 
211  cpg_inst->c = qb_ipcc_connect ("cpg", IPC_REQUEST_SIZE);
212  if (cpg_inst->c == NULL) {
213  error = qb_to_cs_error(-errno);
214  goto error_put_destroy;
215  }
216 
217  if (model_data != NULL) {
218  switch (model) {
219  case CPG_MODEL_V1:
220  memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t));
221  if ((cpg_inst->model_v1_data.flags & ~(CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF)) != 0) {
222  error = CS_ERR_INVALID_PARAM;
223 
224  goto error_destroy;
225  }
226  break;
227  }
228  }
229 
230  /* Allow space for corosync internal headers */
231  cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;
232  cpg_inst->model_data.model = model;
233  cpg_inst->context = context;
234 
235  list_init(&cpg_inst->iteration_list_head);
236 
237  list_init(&cpg_inst->assembly_list_head);
238 
239  hdb_handle_put (&cpg_handle_t_db, *handle);
240 
241  return (CS_OK);
242 
243 error_put_destroy:
244  hdb_handle_put (&cpg_handle_t_db, *handle);
245 error_destroy:
246  hdb_handle_destroy (&cpg_handle_t_db, *handle);
247 error_no_destroy:
248  return (error);
249 }
250 
252  cpg_handle_t handle)
253 {
254  struct cpg_inst *cpg_inst;
255  struct iovec iov;
256  struct req_lib_cpg_finalize req_lib_cpg_finalize;
257  struct res_lib_cpg_finalize res_lib_cpg_finalize;
258  cs_error_t error;
259 
260  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
261  if (error != CS_OK) {
262  return (error);
263  }
264 
265  /*
266  * Another thread has already started finalizing
267  */
268  if (cpg_inst->finalize) {
269  hdb_handle_put (&cpg_handle_t_db, handle);
270  return (CS_ERR_BAD_HANDLE);
271  }
272 
273  cpg_inst->finalize = 1;
274 
275  /*
276  * Send service request
277  */
278  req_lib_cpg_finalize.header.size = sizeof (struct req_lib_cpg_finalize);
279  req_lib_cpg_finalize.header.id = MESSAGE_REQ_CPG_FINALIZE;
280 
281  iov.iov_base = (void *)&req_lib_cpg_finalize;
282  iov.iov_len = sizeof (struct req_lib_cpg_finalize);
283 
284  error = coroipcc_msg_send_reply_receive (cpg_inst->c,
285  &iov,
286  1,
287  &res_lib_cpg_finalize,
288  sizeof (struct res_lib_cpg_finalize));
289 
290  cpg_inst_finalize (cpg_inst, handle);
291  hdb_handle_put (&cpg_handle_t_db, handle);
292 
293  return (error);
294 }
295 
297  cpg_handle_t handle,
298  int *fd)
299 {
300  cs_error_t error;
301  struct cpg_inst *cpg_inst;
302 
303  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
304  if (error != CS_OK) {
305  return (error);
306  }
307 
308  error = qb_to_cs_error (qb_ipcc_fd_get (cpg_inst->c, fd));
309 
310  hdb_handle_put (&cpg_handle_t_db, handle);
311 
312  return (error);
313 }
314 
316  cpg_handle_t handle,
317  uint32_t *size)
318 {
319  cs_error_t error;
320  struct cpg_inst *cpg_inst;
321 
322  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
323  if (error != CS_OK) {
324  return (error);
325  }
326 
327  *size = cpg_inst->max_msg_size;
328 
329  hdb_handle_put (&cpg_handle_t_db, handle);
330 
331  return (error);
332 }
333 
335  cpg_handle_t handle,
336  void **context)
337 {
338  cs_error_t error;
339  struct cpg_inst *cpg_inst;
340 
341  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
342  if (error != CS_OK) {
343  return (error);
344  }
345 
346  *context = cpg_inst->context;
347 
348  hdb_handle_put (&cpg_handle_t_db, handle);
349 
350  return (CS_OK);
351 }
352 
354  cpg_handle_t handle,
355  void *context)
356 {
357  cs_error_t error;
358  struct cpg_inst *cpg_inst;
359 
360  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
361  if (error != CS_OK) {
362  return (error);
363  }
364 
365  cpg_inst->context = context;
366 
367  hdb_handle_put (&cpg_handle_t_db, handle);
368 
369  return (CS_OK);
370 }
371 
373  cpg_handle_t handle,
374  cs_dispatch_flags_t dispatch_types)
375 {
376  int timeout = -1;
377  cs_error_t error;
378  int cont = 1; /* always continue do loop except when set to 0 */
379  struct cpg_inst *cpg_inst;
380  struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
381  struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
382  struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
383  struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
384  struct cpg_inst cpg_inst_copy;
385  struct qb_ipc_response_header *dispatch_data;
386  struct cpg_address member_list[CPG_MEMBERS_MAX];
387  struct cpg_address left_list[CPG_MEMBERS_MAX];
388  struct cpg_address joined_list[CPG_MEMBERS_MAX];
389  struct cpg_name group_name;
390  struct cpg_assembly_data *assembly_data;
391  struct list_head *iter, *tmp_iter;
392  mar_cpg_address_t *left_list_start;
393  mar_cpg_address_t *joined_list_start;
394  unsigned int i;
395  struct cpg_ring_id ring_id;
396  uint32_t totem_member_list[CPG_MEMBERS_MAX];
397  int32_t errno_res;
398  char dispatch_buf[IPC_DISPATCH_SIZE];
399 
400  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
401  if (error != CS_OK) {
402  return (error);
403  }
404 
405  /*
406  * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
407  * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
408  */
409  if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
410  timeout = 0;
411  }
412 
413  dispatch_data = (struct qb_ipc_response_header *)dispatch_buf;
414  do {
415  errno_res = qb_ipcc_event_recv (
416  cpg_inst->c,
417  dispatch_buf,
419  timeout);
420  error = qb_to_cs_error (errno_res);
421  if (error == CS_ERR_BAD_HANDLE) {
422  error = CS_OK;
423  goto error_put;
424  }
425  if (error == CS_ERR_TRY_AGAIN) {
426  if (dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
427  /*
428  * Don't mask error
429  */
430  goto error_put;
431  }
432  error = CS_OK;
433  if (dispatch_types == CS_DISPATCH_ALL) {
434  break; /* exit do while cont is 1 loop */
435  } else {
436  continue; /* next poll */
437  }
438  }
439  if (error != CS_OK) {
440  goto error_put;
441  }
442 
443  /*
444  * Make copy of callbacks, message data, unlock instance, and call callback
445  * A risk of this dispatch method is that the callback routines may
446  * operate at the same time that cpgFinalize has been called.
447  */
448  memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
449  switch (cpg_inst_copy.model_data.model) {
450  case CPG_MODEL_V1:
451  /*
452  * Dispatch incoming message
453  */
454  switch (dispatch_data->id) {
456  if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
457  break;
458  }
459 
460  res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
461 
462  marshall_from_mar_cpg_name_t (
463  &group_name,
464  &res_cpg_deliver_callback->group_name);
465 
466  cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
467  &group_name,
468  res_cpg_deliver_callback->nodeid,
469  res_cpg_deliver_callback->pid,
470  &res_cpg_deliver_callback->message,
471  res_cpg_deliver_callback->msglen);
472  break;
473 
475  res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
476 
477  marshall_from_mar_cpg_name_t (
478  &group_name,
479  &res_cpg_partial_deliver_callback->group_name);
480 
481  /*
482  * Search for assembly data for current messages (nodeid, pid) pair in list of assemblies
483  */
484  assembly_data = NULL;
485  for (iter = cpg_inst->assembly_list_head.next; iter != &cpg_inst->assembly_list_head; iter = iter->next) {
486  struct cpg_assembly_data *current_assembly_data = list_entry (iter, struct cpg_assembly_data, list);
487  if (current_assembly_data->nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->pid == res_cpg_partial_deliver_callback->pid) {
488  assembly_data = current_assembly_data;
489  break;
490  }
491  }
492 
493  if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
494 
495  /*
496  * As this is LIBCPG_PARTIAL_FIRST packet, check that there is no ongoing assembly.
497  * Otherwise the sending of packet must have been interrupted and error should have
498  * been reported to sending client. Therefore here last assembly will be dropped.
499  */
500  if (assembly_data) {
501  list_del (&assembly_data->list);
502  free(assembly_data->assembly_buf);
503  free(assembly_data);
504  assembly_data = NULL;
505  }
506 
507  assembly_data = malloc(sizeof(struct cpg_assembly_data));
508  if (!assembly_data) {
509  error = CS_ERR_NO_MEMORY;
510  goto error_put;
511  }
512 
513  assembly_data->nodeid = res_cpg_partial_deliver_callback->nodeid;
514  assembly_data->pid = res_cpg_partial_deliver_callback->pid;
515  assembly_data->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
516  if (!assembly_data->assembly_buf) {
517  free(assembly_data);
518  error = CS_ERR_NO_MEMORY;
519  goto error_put;
520  }
521  assembly_data->assembly_buf_ptr = 0;
522  list_init (&assembly_data->list);
523 
524  list_add (&assembly_data->list, &cpg_inst->assembly_list_head);
525  }
526  if (assembly_data) {
527  memcpy(assembly_data->assembly_buf + assembly_data->assembly_buf_ptr,
528  res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
529  assembly_data->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
530 
531  if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
532  cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
533  &group_name,
534  res_cpg_partial_deliver_callback->nodeid,
535  res_cpg_partial_deliver_callback->pid,
536  assembly_data->assembly_buf,
537  res_cpg_partial_deliver_callback->msglen);
538 
539  list_del (&assembly_data->list);
540  free(assembly_data->assembly_buf);
541  free(assembly_data);
542  }
543  }
544  break;
545 
547  if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
548  break;
549  }
550 
551  res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
552 
553  for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
554  marshall_from_mar_cpg_address_t (&member_list[i],
555  &res_cpg_confchg_callback->member_list[i]);
556  }
557  left_list_start = res_cpg_confchg_callback->member_list +
558  res_cpg_confchg_callback->member_list_entries;
559  for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
560  marshall_from_mar_cpg_address_t (&left_list[i],
561  &left_list_start[i]);
562  }
563  joined_list_start = res_cpg_confchg_callback->member_list +
564  res_cpg_confchg_callback->member_list_entries +
565  res_cpg_confchg_callback->left_list_entries;
566  for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
567  marshall_from_mar_cpg_address_t (&joined_list[i],
568  &joined_list_start[i]);
569  }
570  marshall_from_mar_cpg_name_t (
571  &group_name,
572  &res_cpg_confchg_callback->group_name);
573 
574  cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
575  &group_name,
576  member_list,
577  res_cpg_confchg_callback->member_list_entries,
578  left_list,
579  res_cpg_confchg_callback->left_list_entries,
580  joined_list,
581  res_cpg_confchg_callback->joined_list_entries);
582 
583  /*
584  * If member left while his partial packet was being assembled, assembly data must be removed from list
585  */
586  for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
587  for (iter = cpg_inst->assembly_list_head.next; iter != &cpg_inst->assembly_list_head;iter = tmp_iter) {
588  struct cpg_assembly_data *current_assembly_data = list_entry (iter, struct cpg_assembly_data, list);
589 
590  tmp_iter = iter->next;
591 
592  if (current_assembly_data->nodeid != left_list[i].nodeid || current_assembly_data->pid != left_list[i].pid)
593  continue;
594 
595  list_del (&current_assembly_data->list);
596  free(current_assembly_data->assembly_buf);
597  free(current_assembly_data);
598  }
599  }
600 
601  break;
603  if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
604  break;
605  }
606 
607  res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
608 
609  marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
610  for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
611  totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
612  }
613 
614  cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
615  ring_id,
616  res_cpg_totem_confchg_callback->member_list_entries,
617  totem_member_list);
618  break;
619  default:
620  error = CS_ERR_LIBRARY;
621  goto error_put;
622  break;
623  } /* - switch (dispatch_data->id) */
624  break; /* case CPG_MODEL_V1 */
625  } /* - switch (cpg_inst_copy.model_data.model) */
626 
627  if (cpg_inst_copy.finalize || cpg_inst->finalize) {
628  /*
629  * If the finalize has been called then get out of the dispatch.
630  */
631  cpg_inst->finalize = 1;
632  error = CS_ERR_BAD_HANDLE;
633  goto error_put;
634  }
635 
636  /*
637  * Determine if more messages should be processed
638  */
639  if (dispatch_types == CS_DISPATCH_ONE || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
640  cont = 0;
641  }
642  } while (cont);
643 
644 error_put:
645  hdb_handle_put (&cpg_handle_t_db, handle);
646  return (error);
647 }
648 
650  cpg_handle_t handle,
651  const struct cpg_name *group)
652 {
653  cs_error_t error;
654  struct cpg_inst *cpg_inst;
655  struct iovec iov[2];
656  struct req_lib_cpg_join req_lib_cpg_join;
657  struct res_lib_cpg_join response;
658 
659  if (group->length > CPG_MAX_NAME_LENGTH) {
660  return (CS_ERR_NAME_TOO_LONG);
661  }
662 
663  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
664  if (error != CS_OK) {
665  return (error);
666  }
667 
668  /* Now join */
669  req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
670  req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
671  req_lib_cpg_join.pid = getpid();
672  req_lib_cpg_join.flags = 0;
673 
674  switch (cpg_inst->model_data.model) {
675  case CPG_MODEL_V1:
676  req_lib_cpg_join.flags = cpg_inst->model_v1_data.flags;
677  break;
678  }
679 
680  marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
681  group);
682 
683  iov[0].iov_base = (void *)&req_lib_cpg_join;
684  iov[0].iov_len = sizeof (struct req_lib_cpg_join);
685 
686  do {
687  error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
688  &response, sizeof (struct res_lib_cpg_join));
689 
690  if (error != CS_OK) {
691  goto error_exit;
692  }
693  } while (response.header.error == CS_ERR_BUSY);
694 
695  error = response.header.error;
696 
697 error_exit:
698  hdb_handle_put (&cpg_handle_t_db, handle);
699 
700  return (error);
701 }
702 
704  cpg_handle_t handle,
705  const struct cpg_name *group)
706 {
707  cs_error_t error;
708  struct cpg_inst *cpg_inst;
709  struct iovec iov[2];
710  struct req_lib_cpg_leave req_lib_cpg_leave;
711  struct res_lib_cpg_leave res_lib_cpg_leave;
712 
713  if (group->length > CPG_MAX_NAME_LENGTH) {
714  return (CS_ERR_NAME_TOO_LONG);
715  }
716 
717  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
718  if (error != CS_OK) {
719  return (error);
720  }
721 
722  req_lib_cpg_leave.header.size = sizeof (struct req_lib_cpg_leave);
723  req_lib_cpg_leave.header.id = MESSAGE_REQ_CPG_LEAVE;
724  req_lib_cpg_leave.pid = getpid();
725  marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
726  group);
727 
728  iov[0].iov_base = (void *)&req_lib_cpg_leave;
729  iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
730 
731  do {
732  error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
733  &res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
734 
735  if (error != CS_OK) {
736  goto error_exit;
737  }
738  } while (res_lib_cpg_leave.header.error == CS_ERR_BUSY);
739 
740  error = res_lib_cpg_leave.header.error;
741 
742 error_exit:
743  hdb_handle_put (&cpg_handle_t_db, handle);
744 
745  return (error);
746 }
747 
749  cpg_handle_t handle,
750  struct cpg_name *group_name,
751  struct cpg_address *member_list,
752  int *member_list_entries)
753 {
754  cs_error_t error;
755  struct cpg_inst *cpg_inst;
756  struct iovec iov;
757  struct req_lib_cpg_membership_get req_lib_cpg_membership_get;
758  struct res_lib_cpg_membership_get res_lib_cpg_membership_get;
759  unsigned int i;
760 
761  if (group_name->length > CPG_MAX_NAME_LENGTH) {
762  return (CS_ERR_NAME_TOO_LONG);
763  }
764  if (member_list == NULL) {
765  return (CS_ERR_INVALID_PARAM);
766  }
767  if (member_list_entries == NULL) {
768  return (CS_ERR_INVALID_PARAM);
769  }
770 
771  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
772  if (error != CS_OK) {
773  return (error);
774  }
775 
776  req_lib_cpg_membership_get.header.size = sizeof (struct req_lib_cpg_membership_get);
777  req_lib_cpg_membership_get.header.id = MESSAGE_REQ_CPG_MEMBERSHIP;
778 
779  marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
780  group_name);
781 
782  iov.iov_base = (void *)&req_lib_cpg_membership_get;
783  iov.iov_len = sizeof (struct req_lib_cpg_membership_get);
784 
785  error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
786  &res_lib_cpg_membership_get, sizeof (res_lib_cpg_membership_get));
787 
788  if (error != CS_OK) {
789  goto error_exit;
790  }
791 
792  error = res_lib_cpg_membership_get.header.error;
793 
794  /*
795  * Copy results to caller
796  */
797  *member_list_entries = res_lib_cpg_membership_get.member_count;
798  if (member_list) {
799  for (i = 0; i < res_lib_cpg_membership_get.member_count; i++) {
800  marshall_from_mar_cpg_address_t (&member_list[i],
801  &res_lib_cpg_membership_get.member_list[i]);
802  }
803  }
804 
805 error_exit:
806  hdb_handle_put (&cpg_handle_t_db, handle);
807 
808  return (error);
809 }
810 
812  cpg_handle_t handle,
813  unsigned int *local_nodeid)
814 {
815  cs_error_t error;
816  struct cpg_inst *cpg_inst;
817  struct iovec iov;
818  struct req_lib_cpg_local_get req_lib_cpg_local_get;
819  struct res_lib_cpg_local_get res_lib_cpg_local_get;
820 
821  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
822  if (error != CS_OK) {
823  return (error);
824  }
825 
826  req_lib_cpg_local_get.header.size = sizeof (struct qb_ipc_request_header);
827  req_lib_cpg_local_get.header.id = MESSAGE_REQ_CPG_LOCAL_GET;
828 
829  iov.iov_base = (void *)&req_lib_cpg_local_get;
830  iov.iov_len = sizeof (struct req_lib_cpg_local_get);
831 
832  error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
833  &res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
834 
835  if (error != CS_OK) {
836  goto error_exit;
837  }
838 
839  error = res_lib_cpg_local_get.header.error;
840 
841  *local_nodeid = res_lib_cpg_local_get.local_nodeid;
842 
843 error_exit:
844  hdb_handle_put (&cpg_handle_t_db, handle);
845 
846  return (error);
847 }
848 
850  cpg_handle_t handle,
851  cpg_flow_control_state_t *flow_control_state)
852 {
853  cs_error_t error;
854  struct cpg_inst *cpg_inst;
855 
856  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
857  if (error != CS_OK) {
858  return (error);
859  }
860  *flow_control_state = CPG_FLOW_CONTROL_DISABLED;
861  error = CS_OK;
862 
863  hdb_handle_put (&cpg_handle_t_db, handle);
864 
865  return (error);
866 }
867 
868 static int
869 memory_map (char *path, const char *file, void **buf, size_t bytes)
870 {
871  int32_t fd;
872  void *addr;
873  int32_t res;
874  char *buffer;
875  int32_t i;
876  size_t written;
877  size_t page_size;
878  long int sysconf_page_size;
879  mode_t old_umask;
880 
881  snprintf (path, PATH_MAX, "/dev/shm/%s", file);
882 
883  old_umask = umask(CPG_MEMORY_MAP_UMASK);
884  fd = mkstemp (path);
885  (void)umask(old_umask);
886  if (fd == -1) {
887  snprintf (path, PATH_MAX, LOCALSTATEDIR "/run/%s", file);
888  old_umask = umask(CPG_MEMORY_MAP_UMASK);
889  fd = mkstemp (path);
890  (void)umask(old_umask);
891  if (fd == -1) {
892  return (-1);
893  }
894  }
895 
896  res = ftruncate (fd, bytes);
897  if (res == -1) {
898  goto error_close_unlink;
899  }
900  sysconf_page_size = sysconf(_SC_PAGESIZE);
901  if (sysconf_page_size <= 0) {
902  goto error_close_unlink;
903  }
904  page_size = sysconf_page_size;
905  buffer = malloc (page_size);
906  if (buffer == NULL) {
907  goto error_close_unlink;
908  }
909  memset (buffer, 0, page_size);
910  for (i = 0; i < (bytes / page_size); i++) {
911 retry_write:
912  written = write (fd, buffer, page_size);
913  if (written == -1 && errno == EINTR) {
914  goto retry_write;
915  }
916  if (written != page_size) {
917  free (buffer);
918  goto error_close_unlink;
919  }
920  }
921  free (buffer);
922 
923  addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
924  MAP_SHARED, fd, 0);
925 
926  if (addr == MAP_FAILED) {
927  goto error_close_unlink;
928  }
929 #ifdef MADV_NOSYNC
930  madvise(addr, bytes, MADV_NOSYNC);
931 #endif
932 
933  res = close (fd);
934  if (res) {
935  munmap(addr, bytes);
936 
937  return (-1);
938  }
939  *buf = addr;
940 
941  return 0;
942 
943 error_close_unlink:
944  close (fd);
945  unlink(path);
946  return -1;
947 }
948 
950  cpg_handle_t handle,
951  size_t size,
952  void **buffer)
953 {
954  void *buf = NULL;
955  char path[PATH_MAX];
956  mar_req_coroipcc_zc_alloc_t req_coroipcc_zc_alloc;
957  struct qb_ipc_response_header res_coroipcs_zc_alloc;
958  size_t map_size;
959  struct iovec iovec;
960  struct coroipcs_zc_header *hdr;
961  cs_error_t error;
962  struct cpg_inst *cpg_inst;
963 
964  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
965  if (error != CS_OK) {
966  return (error);
967  }
968 
969  map_size = size + sizeof (struct req_lib_cpg_mcast) + sizeof (struct coroipcs_zc_header);
970  assert(memory_map (path, "corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
971 
972  if (strlen(path) >= CPG_ZC_PATH_LEN) {
973  unlink(path);
974  munmap (buf, map_size);
975  return (CS_ERR_NAME_TOO_LONG);
976  }
977 
978  req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t);
979  req_coroipcc_zc_alloc.header.id = MESSAGE_REQ_CPG_ZC_ALLOC;
980  req_coroipcc_zc_alloc.map_size = map_size;
981  strcpy (req_coroipcc_zc_alloc.path_to_file, path);
982 
983  iovec.iov_base = (void *)&req_coroipcc_zc_alloc;
984  iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t);
985 
986  error = coroipcc_msg_send_reply_receive (
987  cpg_inst->c,
988  &iovec,
989  1,
990  &res_coroipcs_zc_alloc,
991  sizeof (struct qb_ipc_response_header));
992 
993  if (error != CS_OK) {
994  goto error_exit;
995  }
996 
997  hdr = (struct coroipcs_zc_header *)buf;
998  hdr->map_size = map_size;
999  *buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header) + sizeof (struct req_lib_cpg_mcast);
1000 
1001 error_exit:
1002  hdb_handle_put (&cpg_handle_t_db, handle);
1003  return (error);
1004 }
1005 
1007  cpg_handle_t handle,
1008  void *buffer)
1009 {
1010  cs_error_t error;
1011  unsigned int res;
1012  struct cpg_inst *cpg_inst;
1013  mar_req_coroipcc_zc_free_t req_coroipcc_zc_free;
1014  struct qb_ipc_response_header res_coroipcs_zc_free;
1015  struct iovec iovec;
1016  struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header) - sizeof (struct req_lib_cpg_mcast));
1017 
1018  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1019  if (error != CS_OK) {
1020  return (error);
1021  }
1022 
1023  req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t);
1024  req_coroipcc_zc_free.header.id = MESSAGE_REQ_CPG_ZC_FREE;
1025  req_coroipcc_zc_free.map_size = header->map_size;
1026  req_coroipcc_zc_free.server_address = header->server_address;
1027 
1028  iovec.iov_base = (void *)&req_coroipcc_zc_free;
1029  iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t);
1030 
1031  error = coroipcc_msg_send_reply_receive (
1032  cpg_inst->c,
1033  &iovec,
1034  1,
1035  &res_coroipcs_zc_free,
1036  sizeof (struct qb_ipc_response_header));
1037 
1038  if (error != CS_OK) {
1039  goto error_exit;
1040  }
1041 
1042  res = munmap ((void *)header, header->map_size);
1043  if (res == -1) {
1044  error = qb_to_cs_error(-errno);
1045 
1046  goto error_exit;
1047  }
1048 
1049 error_exit:
1050  hdb_handle_put (&cpg_handle_t_db, handle);
1051 
1052  return (error);
1053 }
1054 
1056  cpg_handle_t handle,
1058  void *msg,
1059  size_t msg_len)
1060 {
1061  cs_error_t error;
1062  struct cpg_inst *cpg_inst;
1064  struct res_lib_cpg_mcast res_lib_cpg_mcast;
1065  mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute;
1066  struct coroipcs_zc_header *hdr;
1067  struct iovec iovec;
1068 
1069  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1070  if (error != CS_OK) {
1071  return (error);
1072  }
1073 
1074  if (msg_len > IPC_REQUEST_SIZE) {
1075  error = CS_ERR_TOO_BIG;
1076  goto error_exit;
1077  }
1078 
1079  req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
1080  req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
1081  msg_len;
1082 
1083  req_lib_cpg_mcast->header.id = MESSAGE_REQ_CPG_MCAST;
1084  req_lib_cpg_mcast->guarantee = guarantee;
1085  req_lib_cpg_mcast->msglen = msg_len;
1086 
1087  hdr = (struct coroipcs_zc_header *)(((char *)req_lib_cpg_mcast) - sizeof (struct coroipcs_zc_header));
1088 
1089  req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t);
1090  req_coroipcc_zc_execute.header.id = MESSAGE_REQ_CPG_ZC_EXECUTE;
1091  req_coroipcc_zc_execute.server_address = hdr->server_address;
1092 
1093  iovec.iov_base = (void *)&req_coroipcc_zc_execute;
1094  iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t);
1095 
1096  error = coroipcc_msg_send_reply_receive (
1097  cpg_inst->c,
1098  &iovec,
1099  1,
1100  &res_lib_cpg_mcast,
1101  sizeof(res_lib_cpg_mcast));
1102 
1103  if (error != CS_OK) {
1104  goto error_exit;
1105  }
1106 
1107  error = res_lib_cpg_mcast.header.error;
1108 
1109 error_exit:
1110  hdb_handle_put (&cpg_handle_t_db, handle);
1111 
1112  return (error);
1113 }
1114 
1115 static cs_error_t send_fragments (
1116  struct cpg_inst *cpg_inst,
1118  size_t msg_len,
1119  const struct iovec *iovec,
1120  unsigned int iov_len)
1121 {
1122  int i;
1123  cs_error_t error = CS_OK;
1124  struct iovec iov[2];
1127  size_t sent = 0;
1128  size_t iov_sent = 0;
1129  int retry_count;
1130 
1132  req_lib_cpg_mcast.guarantee = guarantee;
1133  req_lib_cpg_mcast.msglen = msg_len;
1134 
1135  iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1136  iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
1137 
1138  i=0;
1139  iov_sent = 0 ;
1140  qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1141 
1142  while (error == CS_OK && sent < msg_len) {
1143 
1144  retry_count = 0;
1145  if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
1146  iov[1].iov_len = cpg_inst->max_msg_size;
1147  }
1148  else {
1149  iov[1].iov_len = iovec[i].iov_len - iov_sent;
1150  }
1151 
1152  if (sent == 0) {
1154  }
1155  else if ((sent + iov[1].iov_len) == msg_len) {
1157  }
1158  else {
1160  }
1161 
1162  req_lib_cpg_mcast.fraglen = iov[1].iov_len;
1163  req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
1164  iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
1165 
1166  resend:
1167  error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
1169  sizeof (res_lib_cpg_partial_send));
1170 
1171  if (error == CS_ERR_TRY_AGAIN) {
1172  fprintf(stderr, "sleep. counter=%d\n", retry_count);
1173  if (++retry_count > MAX_RETRIES) {
1174  goto error_exit;
1175  }
1176  usleep(10000);
1177  goto resend;
1178  }
1179 
1180  iov_sent += iov[1].iov_len;
1181  sent += iov[1].iov_len;
1182 
1183  /* Next iovec */
1184  if (iov_sent >= iovec[i].iov_len) {
1185  i++;
1186  iov_sent = 0;
1187  }
1188  error = res_lib_cpg_partial_send.header.error;
1189  }
1190 error_exit:
1191  qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1192 
1193  return error;
1194 }
1195 
1196 
1198  cpg_handle_t handle,
1199  cpg_guarantee_t guarantee,
1200  const struct iovec *iovec,
1201  unsigned int iov_len)
1202 {
1203  int i;
1204  cs_error_t error;
1205  struct cpg_inst *cpg_inst;
1206  struct iovec iov[64];
1207  struct req_lib_cpg_mcast req_lib_cpg_mcast;
1208  size_t msg_len = 0;
1209 
1210  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1211  if (error != CS_OK) {
1212  return (error);
1213  }
1214 
1215  for (i = 0; i < iov_len; i++ ) {
1216  msg_len += iovec[i].iov_len;
1217  }
1218 
1219  if (msg_len > cpg_inst->max_msg_size) {
1220  error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
1221  goto error_exit;
1222  }
1223 
1224  req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
1225  msg_len;
1226 
1227  req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST;
1228  req_lib_cpg_mcast.guarantee = guarantee;
1229  req_lib_cpg_mcast.msglen = msg_len;
1230 
1231  iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1232  iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
1233  memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
1234 
1235  qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1236  error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
1237  qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1238 
1239 error_exit:
1240  hdb_handle_put (&cpg_handle_t_db, handle);
1241 
1242  return (error);
1243 }
1244 
1246  cpg_handle_t handle,
1247  cpg_iteration_type_t iteration_type,
1248  const struct cpg_name *group,
1249  cpg_iteration_handle_t *cpg_iteration_handle)
1250 {
1251  cs_error_t error;
1252  struct iovec iov;
1253  struct cpg_inst *cpg_inst;
1254  struct cpg_iteration_instance_t *cpg_iteration_instance;
1255  struct req_lib_cpg_iterationinitialize req_lib_cpg_iterationinitialize;
1256  struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize;
1257 
1258  if (group && group->length > CPG_MAX_NAME_LENGTH) {
1259  return (CS_ERR_NAME_TOO_LONG);
1260  }
1261  if (cpg_iteration_handle == NULL) {
1262  return (CS_ERR_INVALID_PARAM);
1263  }
1264 
1265  if ((iteration_type == CPG_ITERATION_ONE_GROUP && group == NULL) ||
1266  (iteration_type != CPG_ITERATION_ONE_GROUP && group != NULL)) {
1267  return (CS_ERR_INVALID_PARAM);
1268  }
1269 
1270  if (iteration_type != CPG_ITERATION_NAME_ONLY && iteration_type != CPG_ITERATION_ONE_GROUP &&
1271  iteration_type != CPG_ITERATION_ALL) {
1272 
1273  return (CS_ERR_INVALID_PARAM);
1274  }
1275 
1276  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1277  if (error != CS_OK) {
1278  return (error);
1279  }
1280 
1281  error = hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1282  sizeof (struct cpg_iteration_instance_t), cpg_iteration_handle));
1283  if (error != CS_OK) {
1284  goto error_put_cpg_db;
1285  }
1286 
1287  error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1288  (void *)&cpg_iteration_instance));
1289  if (error != CS_OK) {
1290  goto error_destroy;
1291  }
1292 
1293  cpg_iteration_instance->conn = cpg_inst->c;
1294 
1295  list_init (&cpg_iteration_instance->list);
1296 
1297  req_lib_cpg_iterationinitialize.header.size = sizeof (struct req_lib_cpg_iterationinitialize);
1298  req_lib_cpg_iterationinitialize.header.id = MESSAGE_REQ_CPG_ITERATIONINITIALIZE;
1299  req_lib_cpg_iterationinitialize.iteration_type = iteration_type;
1300  if (group) {
1301  marshall_to_mar_cpg_name_t (&req_lib_cpg_iterationinitialize.group_name, group);
1302  }
1303 
1304  iov.iov_base = (void *)&req_lib_cpg_iterationinitialize;
1305  iov.iov_len = sizeof (struct req_lib_cpg_iterationinitialize);
1306 
1307  error = coroipcc_msg_send_reply_receive (cpg_inst->c,
1308  &iov,
1309  1,
1310  &res_lib_cpg_iterationinitialize,
1311  sizeof (struct res_lib_cpg_iterationinitialize));
1312 
1313  if (error != CS_OK) {
1314  goto error_put_destroy;
1315  }
1316 
1317  cpg_iteration_instance->executive_iteration_handle =
1318  res_lib_cpg_iterationinitialize.iteration_handle;
1319  cpg_iteration_instance->cpg_iteration_handle = *cpg_iteration_handle;
1320 
1321  list_add (&cpg_iteration_instance->list, &cpg_inst->iteration_list_head);
1322 
1323  hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1324  hdb_handle_put (&cpg_handle_t_db, handle);
1325 
1326  return (res_lib_cpg_iterationinitialize.header.error);
1327 
1328 error_put_destroy:
1329  hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1330 error_destroy:
1331  hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1332 error_put_cpg_db:
1333  hdb_handle_put (&cpg_handle_t_db, handle);
1334 
1335  return (error);
1336 }
1337 
1339  cpg_iteration_handle_t handle,
1340  struct cpg_iteration_description_t *description)
1341 {
1342  cs_error_t error;
1343  struct cpg_iteration_instance_t *cpg_iteration_instance;
1344  struct req_lib_cpg_iterationnext req_lib_cpg_iterationnext;
1345  struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext;
1346 
1347  if (description == NULL) {
1348  return CS_ERR_INVALID_PARAM;
1349  }
1350 
1351  error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1352  (void *)&cpg_iteration_instance));
1353  if (error != CS_OK) {
1354  goto error_exit;
1355  }
1356 
1357  req_lib_cpg_iterationnext.header.size = sizeof (struct req_lib_cpg_iterationnext);
1358  req_lib_cpg_iterationnext.header.id = MESSAGE_REQ_CPG_ITERATIONNEXT;
1359  req_lib_cpg_iterationnext.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1360 
1361  error = qb_to_cs_error (qb_ipcc_send (cpg_iteration_instance->conn,
1362  &req_lib_cpg_iterationnext,
1363  req_lib_cpg_iterationnext.header.size));
1364  if (error != CS_OK) {
1365  goto error_put;
1366  }
1367 
1368  error = qb_to_cs_error (qb_ipcc_recv (cpg_iteration_instance->conn,
1369  &res_lib_cpg_iterationnext,
1370  sizeof(struct res_lib_cpg_iterationnext), -1));
1371  if (error != CS_OK) {
1372  goto error_put;
1373  }
1374 
1375  marshall_from_mar_cpg_iteration_description_t(
1376  description,
1377  &res_lib_cpg_iterationnext.description);
1378 
1379  error = res_lib_cpg_iterationnext.header.error;
1380 
1381 error_put:
1382  hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1383 
1384 error_exit:
1385  return (error);
1386 }
1387 
1389  cpg_iteration_handle_t handle)
1390 {
1391  cs_error_t error;
1392  struct iovec iov;
1393  struct cpg_iteration_instance_t *cpg_iteration_instance;
1394  struct req_lib_cpg_iterationfinalize req_lib_cpg_iterationfinalize;
1395  struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize;
1396 
1397  error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1398  (void *)&cpg_iteration_instance));
1399  if (error != CS_OK) {
1400  goto error_exit;
1401  }
1402 
1403  req_lib_cpg_iterationfinalize.header.size = sizeof (struct req_lib_cpg_iterationfinalize);
1404  req_lib_cpg_iterationfinalize.header.id = MESSAGE_REQ_CPG_ITERATIONFINALIZE;
1405  req_lib_cpg_iterationfinalize.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1406 
1407  iov.iov_base = (void *)&req_lib_cpg_iterationfinalize;
1408  iov.iov_len = sizeof (struct req_lib_cpg_iterationfinalize);
1409 
1410  error = coroipcc_msg_send_reply_receive (cpg_iteration_instance->conn,
1411  &iov,
1412  1,
1413  &res_lib_cpg_iterationfinalize,
1414  sizeof (struct req_lib_cpg_iterationfinalize));
1415 
1416  if (error != CS_OK) {
1417  goto error_put;
1418  }
1419 
1420  cpg_iteration_instance_finalize (cpg_iteration_instance);
1421  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
1422 
1423  return (res_lib_cpg_iterationfinalize.header.error);
1424 
1425 error_put:
1426  hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1427 error_exit:
1428  return (error);
1429 }
1430