File: | home/bhubbard/working/src/ceph/src/spdk/dpdk/lib/librte_eal/common/eal_common_proc.c |
Warning: | line 1106, column 4 Value stored to 'dummy_used' is never read |
[?] Use j/k keys for keyboard navigation
1 | /* SPDX-License-Identifier: BSD-3-Clause |
2 | * Copyright(c) 2016-2018 Intel Corporation |
3 | */ |
4 | |
5 | #include <dirent.h> |
6 | #include <errno(*__errno_location ()).h> |
7 | #include <fcntl.h> |
8 | #include <fnmatch.h> |
9 | #include <inttypes.h> |
10 | #include <libgen.h> |
11 | #include <limits.h> |
12 | #include <pthread.h> |
13 | #include <stdio.h> |
14 | #include <stdlib.h> |
15 | #include <string.h> |
16 | #include <sys/file.h> |
17 | #include <sys/time.h> |
18 | #include <sys/types.h> |
19 | #include <sys/socket.h> |
20 | #include <sys/un.h> |
21 | #include <unistd.h> |
22 | |
23 | #include <rte_alarm.h> |
24 | #include <rte_common.h> |
25 | #include <rte_cycles.h> |
26 | #include <rte_eal.h> |
27 | #include <rte_errno(per_lcore__rte_errno).h> |
28 | #include <rte_lcore.h> |
29 | #include <rte_log.h> |
30 | #include <rte_tailq.h> |
31 | |
32 | #include "eal_private.h" |
33 | #include "eal_filesystem.h" |
34 | #include "eal_internal_cfg.h" |
35 | |
36 | static int mp_fd = -1; |
37 | static char mp_filter[PATH_MAX4096]; /* Filter for secondary process sockets */ |
38 | static char mp_dir_path[PATH_MAX4096]; /* The directory path for all mp sockets */ |
39 | static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER{ { 0, 0, 0, 0, 0, 0, 0, { 0, 0 } } }; |
40 | static char peer_name[PATH_MAX4096]; |
41 | |
42 | struct action_entry { |
43 | TAILQ_ENTRY(action_entry)struct { struct action_entry *tqe_next; struct action_entry * *tqe_prev; } next; |
44 | char action_name[RTE_MP_MAX_NAME_LEN64]; |
45 | rte_mp_t action; |
46 | }; |
47 | |
48 | /** Double linked list of actions. */ |
49 | TAILQ_HEAD(action_entry_list, action_entry)struct action_entry_list { struct action_entry *tqh_first; struct action_entry * *tqh_last; }; |
50 | |
51 | static struct action_entry_list action_entry_list = |
52 | TAILQ_HEAD_INITIALIZER(action_entry_list){ ((void*)0), &(action_entry_list).tqh_first }; |
53 | |
54 | enum mp_type { |
55 | MP_MSG, /* Share message with peers, will not block */ |
56 | MP_REQ, /* Request for information, Will block for a reply */ |
57 | MP_REP, /* Response to previously-received request */ |
58 | MP_IGN, /* Response telling requester to ignore this response */ |
59 | }; |
60 | |
61 | struct mp_msg_internal { |
62 | int type; |
63 | struct rte_mp_msg msg; |
64 | }; |
65 | |
66 | struct async_request_param { |
67 | rte_mp_async_reply_t clb; |
68 | struct rte_mp_reply user_reply; |
69 | struct timespec end; |
70 | int n_responses_processed; |
71 | }; |
72 | |
73 | struct pending_request { |
74 | TAILQ_ENTRY(pending_request)struct { struct pending_request *tqe_next; struct pending_request * *tqe_prev; } next; |
75 | enum { |
76 | REQUEST_TYPE_SYNC, |
77 | REQUEST_TYPE_ASYNC |
78 | } type; |
79 | char dst[PATH_MAX4096]; |
80 | struct rte_mp_msg *request; |
81 | struct rte_mp_msg *reply; |
82 | int reply_received; |
83 | RTE_STD_C11 |
84 | union { |
85 | struct { |
86 | struct async_request_param *param; |
87 | } async; |
88 | struct { |
89 | pthread_cond_t cond; |
90 | } sync; |
91 | }; |
92 | }; |
93 | |
94 | TAILQ_HEAD(pending_request_list, pending_request)struct pending_request_list { struct pending_request *tqh_first ; struct pending_request * *tqh_last; }; |
95 | |
96 | static struct { |
97 | struct pending_request_list requests; |
98 | pthread_mutex_t lock; |
99 | } pending_requests = { |
100 | .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests){ ((void*)0), &(pending_requests.requests).tqh_first }, |
101 | .lock = PTHREAD_MUTEX_INITIALIZER{ { 0, 0, 0, 0, 0, 0, 0, { 0, 0 } } }, |
102 | /**< used in async requests only */ |
103 | }; |
104 | |
105 | /* forward declarations */ |
106 | static int |
107 | mp_send(struct rte_mp_msg *msg, const char *peer, int type); |
108 | |
109 | /* for use with alarm callback */ |
110 | static void |
111 | async_reply_handle(void *arg); |
112 | |
113 | /* for use with process_msg */ |
114 | static struct pending_request * |
115 | async_reply_handle_thread_unsafe(void *arg); |
116 | |
117 | static void |
118 | trigger_async_action(struct pending_request *req); |
119 | |
120 | static struct pending_request * |
121 | find_pending_request(const char *dst, const char *act_name) |
122 | { |
123 | struct pending_request *r; |
124 | |
125 | TAILQ_FOREACH(r, &pending_requests.requests, next)for ((r) = ((&pending_requests.requests)->tqh_first); ( r); (r) = ((r)->next.tqe_next)) { |
126 | if (!strcmp(r->dst, dst)__extension__ ({ size_t __s1_len, __s2_len; (__builtin_constant_p (r->dst) && __builtin_constant_p (dst) && (__s1_len = __builtin_strlen (r->dst), __s2_len = __builtin_strlen (dst), (!((size_t)(const void *)((r->dst) + 1) - (size_t) (const void *)(r->dst) == 1) || __s1_len >= 4) && (!((size_t)(const void *)((dst) + 1) - (size_t)(const void * )(dst) == 1) || __s2_len >= 4)) ? __builtin_strcmp (r-> dst, dst) : (__builtin_constant_p (r->dst) && ((size_t )(const void *)((r->dst) + 1) - (size_t)(const void *)(r-> dst) == 1) && (__s1_len = __builtin_strlen (r->dst ), __s1_len < 4) ? (__builtin_constant_p (dst) && ( (size_t)(const void *)((dst) + 1) - (size_t)(const void *)(dst ) == 1) ? __builtin_strcmp (r->dst, dst) : (__extension__ ( { const unsigned char *__s2 = (const unsigned char *) (const char *) (dst); int __result = (((const unsigned char *) (const char *) (r->dst))[0] - __s2[0]); if (__s1_len > 0 && __result == 0) { __result = (((const unsigned char *) (const char *) (r->dst))[1] - __s2[1]); if (__s1_len > 1 && __result == 0) { __result = (((const unsigned char *) (const char *) (r->dst))[2] - __s2[2]); if (__s1_len > 2 && __result == 0) __result = (((const unsigned char *) (const char *) (r->dst))[3] - __s2[3]); } } __result; }))) : (__builtin_constant_p (dst) && ((size_t)(const void *)((dst) + 1) - (size_t )(const void *)(dst) == 1) && (__s2_len = __builtin_strlen (dst), __s2_len < 4) ? (__builtin_constant_p (r->dst) && ((size_t)(const void *)((r->dst) + 1) - (size_t)(const void *)(r->dst) == 1) ? __builtin_strcmp (r->dst, dst) : (- (__extension__ ({ const unsigned char *__s2 = (const unsigned char *) (const char *) (r->dst); int __result = (((const unsigned char *) (const char *) (dst))[0] - __s2[0]); if (__s2_len > 0 && __result == 0) { __result = (((const unsigned char *) (const char *) (dst))[1] - __s2[1]); if (__s2_len > 1 && __result == 0) { __result = (((const unsigned char *) (const char *) (dst))[2] - __s2[2]); if (__s2_len > 2 && __result == 0) __result = (((const unsigned char *) (const char *) (dst))[3] - __s2[3]); } } __result; })))) : __builtin_strcmp (r->dst, dst)))); }) && |
127 | !strcmp(r->request->name, act_name)__extension__ ({ size_t __s1_len, __s2_len; (__builtin_constant_p (r->request->name) && __builtin_constant_p (act_name ) && (__s1_len = __builtin_strlen (r->request-> name), __s2_len = __builtin_strlen (act_name), (!((size_t)(const void *)((r->request->name) + 1) - (size_t)(const void * )(r->request->name) == 1) || __s1_len >= 4) && (!((size_t)(const void *)((act_name) + 1) - (size_t)(const void *)(act_name) == 1) || __s2_len >= 4)) ? __builtin_strcmp ( r->request->name, act_name) : (__builtin_constant_p (r-> request->name) && ((size_t)(const void *)((r->request ->name) + 1) - (size_t)(const void *)(r->request->name ) == 1) && (__s1_len = __builtin_strlen (r->request ->name), __s1_len < 4) ? (__builtin_constant_p (act_name ) && ((size_t)(const void *)((act_name) + 1) - (size_t )(const void *)(act_name) == 1) ? __builtin_strcmp (r->request ->name, act_name) : (__extension__ ({ const unsigned char * __s2 = (const unsigned char *) (const char *) (act_name); int __result = (((const unsigned char *) (const char *) (r->request ->name))[0] - __s2[0]); if (__s1_len > 0 && __result == 0) { __result = (((const unsigned char *) (const char *) ( r->request->name))[1] - __s2[1]); if (__s1_len > 1 && __result == 0) { __result = (((const unsigned char *) (const char *) (r->request->name))[2] - __s2[2]); if (__s1_len > 2 && __result == 0) __result = (((const unsigned char *) (const char *) (r->request->name))[3] - __s2[3 ]); } } __result; }))) : (__builtin_constant_p (act_name) && ((size_t)(const void *)((act_name) + 1) - (size_t)(const void *)(act_name) == 1) && (__s2_len = __builtin_strlen ( act_name), __s2_len < 4) ? (__builtin_constant_p (r->request ->name) && ((size_t)(const void *)((r->request-> name) + 1) - (size_t)(const void *)(r->request->name) == 1) ? __builtin_strcmp (r->request->name, act_name) : ( - (__extension__ ({ const unsigned char *__s2 = (const unsigned char *) (const char *) (r->request->name); int __result = (((const unsigned char *) (const char *) (act_name))[0] - __s2 [0]); if (__s2_len > 0 && __result == 0) { __result = (((const unsigned char *) (const char *) (act_name))[1] - __s2 [1]); if (__s2_len > 1 && __result == 0) { __result = (((const unsigned char *) (const char *) (act_name))[2] - __s2 [2]); if (__s2_len > 2 && __result == 0) __result = (((const unsigned char *) (const char *) (act_name))[3] - __s2 [3]); } } __result; })))) : __builtin_strcmp (r->request-> name, act_name)))); })) |
128 | break; |
129 | } |
130 | |
131 | return r; |
132 | } |
133 | |
134 | static void |
135 | create_socket_path(const char *name, char *buf, int len) |
136 | { |
137 | const char *prefix = eal_mp_socket_path(); |
138 | |
139 | if (strlen(name) > 0) |
140 | snprintf(buf, len, "%s_%s", prefix, name); |
141 | else |
142 | strlcpy(buf, prefix, len)rte_strlcpy(buf, prefix, len); |
143 | } |
144 | |
145 | int |
146 | rte_eal_primary_proc_alive(const char *config_file_path) |
147 | { |
148 | int config_fd; |
149 | |
150 | if (config_file_path) |
151 | config_fd = open(config_file_path, O_RDONLY00); |
152 | else { |
153 | const char *path; |
154 | |
155 | path = eal_runtime_config_path(); |
156 | config_fd = open(path, O_RDONLY00); |
157 | } |
158 | if (config_fd < 0) |
159 | return 0; |
160 | |
161 | int ret = lockf(config_fd, F_TEST3, 0); |
162 | close(config_fd); |
163 | |
164 | return !!ret; |
165 | } |
166 | |
167 | static struct action_entry * |
168 | find_action_entry_by_name(const char *name) |
169 | { |
170 | struct action_entry *entry; |
171 | |
172 | TAILQ_FOREACH(entry, &action_entry_list, next)for ((entry) = ((&action_entry_list)->tqh_first); (entry ); (entry) = ((entry)->next.tqe_next)) { |
173 | if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN)(__extension__ (__builtin_constant_p (64) && ((__builtin_constant_p (entry->action_name) && strlen (entry->action_name ) < ((size_t) (64))) || (__builtin_constant_p (name) && strlen (name) < ((size_t) (64)))) ? __extension__ ({ size_t __s1_len, __s2_len; (__builtin_constant_p (entry->action_name ) && __builtin_constant_p (name) && (__s1_len = __builtin_strlen (entry->action_name), __s2_len = __builtin_strlen (name), (!((size_t)(const void *)((entry->action_name) + 1 ) - (size_t)(const void *)(entry->action_name) == 1) || __s1_len >= 4) && (!((size_t)(const void *)((name) + 1) - ( size_t)(const void *)(name) == 1) || __s2_len >= 4)) ? __builtin_strcmp (entry->action_name, name) : (__builtin_constant_p (entry ->action_name) && ((size_t)(const void *)((entry-> action_name) + 1) - (size_t)(const void *)(entry->action_name ) == 1) && (__s1_len = __builtin_strlen (entry->action_name ), __s1_len < 4) ? (__builtin_constant_p (name) && ((size_t)(const void *)((name) + 1) - (size_t)(const void *) (name) == 1) ? __builtin_strcmp (entry->action_name, name) : (__extension__ ({ const unsigned char *__s2 = (const unsigned char *) (const char *) (name); int __result = (((const unsigned char *) (const char *) (entry->action_name))[0] - __s2[0] ); if (__s1_len > 0 && __result == 0) { __result = (((const unsigned char *) (const char *) (entry->action_name ))[1] - __s2[1]); if (__s1_len > 1 && __result == 0 ) { __result = (((const unsigned char *) (const char *) (entry ->action_name))[2] - __s2[2]); if (__s1_len > 2 && __result == 0) __result = (((const unsigned char *) (const char *) (entry->action_name))[3] - __s2[3]); } } __result; })) ) : (__builtin_constant_p (name) && ((size_t)(const void *)((name) + 1) - (size_t)(const void *)(name) == 1) && (__s2_len = __builtin_strlen (name), __s2_len < 4) ? (__builtin_constant_p (entry->action_name) && ((size_t)(const void *)(( entry->action_name) + 1) - (size_t)(const void *)(entry-> action_name) == 1) ? __builtin_strcmp (entry->action_name, name) : (- (__extension__ ({ const unsigned char *__s2 = (const unsigned char *) (const char *) (entry->action_name); int __result = (((const unsigned char *) (const char *) (name))[ 0] - __s2[0]); if (__s2_len > 0 && __result == 0) { __result = (((const unsigned char *) (const char *) (name))[ 1] - __s2[1]); if (__s2_len > 1 && __result == 0) { __result = (((const unsigned char *) (const char *) (name))[ 2] - __s2[2]); if (__s2_len > 2 && __result == 0) __result = (((const unsigned char *) (const char *) (name))[3] - __s2 [3]); } } __result; })))) : __builtin_strcmp (entry->action_name , name)))); }) : strncmp (entry->action_name, name, 64))) == 0) |
174 | break; |
175 | } |
176 | |
177 | return entry; |
178 | } |
179 | |
180 | static int |
181 | validate_action_name(const char *name) |
182 | { |
183 | if (name == NULL((void*)0)) { |
184 | RTE_LOG(ERR, EAL, "Action name cannot be NULL\n")rte_log(4U, 0, "EAL" ": " "Action name cannot be NULL\n"); |
185 | rte_errno(per_lcore__rte_errno) = EINVAL22; |
186 | return -1; |
187 | } |
188 | if (strnlen(name, RTE_MP_MAX_NAME_LEN64) == 0) { |
189 | RTE_LOG(ERR, EAL, "Length of action name is zero\n")rte_log(4U, 0, "EAL" ": " "Length of action name is zero\n"); |
190 | rte_errno(per_lcore__rte_errno) = EINVAL22; |
191 | return -1; |
192 | } |
193 | if (strnlen(name, RTE_MP_MAX_NAME_LEN64) == RTE_MP_MAX_NAME_LEN64) { |
194 | rte_errno(per_lcore__rte_errno) = E2BIG7; |
195 | return -1; |
196 | } |
197 | return 0; |
198 | } |
199 | |
200 | int __rte_experimental__attribute__((section(".text.experimental"))) |
201 | rte_mp_action_register(const char *name, rte_mp_t action) |
202 | { |
203 | struct action_entry *entry; |
204 | |
205 | if (validate_action_name(name) != 0) |
206 | return -1; |
207 | |
208 | entry = malloc(sizeof(struct action_entry)); |
209 | if (entry == NULL((void*)0)) { |
210 | rte_errno(per_lcore__rte_errno) = ENOMEM12; |
211 | return -1; |
212 | } |
213 | strlcpy(entry->action_name, name, sizeof(entry->action_name))rte_strlcpy(entry->action_name, name, sizeof(entry->action_name )); |
214 | entry->action = action; |
215 | |
216 | pthread_mutex_lock(&mp_mutex_action); |
217 | if (find_action_entry_by_name(name) != NULL((void*)0)) { |
218 | pthread_mutex_unlock(&mp_mutex_action); |
219 | rte_errno(per_lcore__rte_errno) = EEXIST17; |
220 | free(entry); |
221 | return -1; |
222 | } |
223 | TAILQ_INSERT_TAIL(&action_entry_list, entry, next)do { (entry)->next.tqe_next = ((void*)0); (entry)->next .tqe_prev = (&action_entry_list)->tqh_last; *(&action_entry_list )->tqh_last = (entry); (&action_entry_list)->tqh_last = &(entry)->next.tqe_next; } while ( 0); |
224 | pthread_mutex_unlock(&mp_mutex_action); |
225 | return 0; |
226 | } |
227 | |
228 | void __rte_experimental__attribute__((section(".text.experimental"))) |
229 | rte_mp_action_unregister(const char *name) |
230 | { |
231 | struct action_entry *entry; |
232 | |
233 | if (validate_action_name(name) != 0) |
234 | return; |
235 | |
236 | pthread_mutex_lock(&mp_mutex_action); |
237 | entry = find_action_entry_by_name(name); |
238 | if (entry == NULL((void*)0)) { |
239 | pthread_mutex_unlock(&mp_mutex_action); |
240 | return; |
241 | } |
242 | TAILQ_REMOVE(&action_entry_list, entry, next)do { if (((entry)->next.tqe_next) != ((void*)0)) (entry)-> next.tqe_next->next.tqe_prev = (entry)->next.tqe_prev; else (&action_entry_list)->tqh_last = (entry)->next.tqe_prev ; *(entry)->next.tqe_prev = (entry)->next.tqe_next; } while ( 0); |
243 | pthread_mutex_unlock(&mp_mutex_action); |
244 | free(entry); |
245 | } |
246 | |
247 | static int |
248 | read_msg(struct mp_msg_internal *m, struct sockaddr_un *s) |
249 | { |
250 | int msglen; |
251 | struct iovec iov; |
252 | struct msghdr msgh; |
253 | char control[CMSG_SPACE(sizeof(m->msg.fds))((((sizeof(m->msg.fds)) + sizeof (size_t) - 1) & (size_t ) ~(sizeof (size_t) - 1)) + (((sizeof (struct cmsghdr)) + sizeof (size_t) - 1) & (size_t) ~(sizeof (size_t) - 1)))]; |
254 | struct cmsghdr *cmsg; |
255 | int buflen = sizeof(*m) - sizeof(m->msg.fds); |
256 | |
257 | memset(&msgh, 0, sizeof(msgh)); |
258 | iov.iov_base = m; |
259 | iov.iov_len = buflen; |
260 | |
261 | msgh.msg_name = s; |
262 | msgh.msg_namelen = sizeof(*s); |
263 | msgh.msg_iov = &iov; |
264 | msgh.msg_iovlen = 1; |
265 | msgh.msg_control = control; |
266 | msgh.msg_controllen = sizeof(control); |
267 | |
268 | msglen = recvmsg(mp_fd, &msgh, 0); |
269 | if (msglen < 0) { |
270 | RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno))rte_log(4U, 0, "EAL" ": " "recvmsg failed, %s\n", strerror((* __errno_location ()))); |
271 | return -1; |
272 | } |
273 | |
274 | if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNCMSG_TRUNC | MSG_CTRUNCMSG_CTRUNC))) { |
275 | RTE_LOG(ERR, EAL, "truncted msg\n")rte_log(4U, 0, "EAL" ": " "truncted msg\n"); |
276 | return -1; |
277 | } |
278 | |
279 | /* read auxiliary FDs if any */ |
280 | for (cmsg = CMSG_FIRSTHDR(&msgh)((size_t) (&msgh)->msg_controllen >= sizeof (struct cmsghdr) ? (struct cmsghdr *) (&msgh)->msg_control : ( struct cmsghdr *) 0); cmsg != NULL((void*)0); |
281 | cmsg = CMSG_NXTHDR(&msgh, cmsg)__cmsg_nxthdr (&msgh, cmsg)) { |
282 | if ((cmsg->cmsg_level == SOL_SOCKET1) && |
283 | (cmsg->cmsg_type == SCM_RIGHTSSCM_RIGHTS)) { |
284 | memcpy(m->msg.fds, CMSG_DATA(cmsg)((cmsg)->__cmsg_data), sizeof(m->msg.fds)); |
285 | break; |
286 | } |
287 | } |
288 | /* sanity-check the response */ |
289 | if (m->msg.num_fds < 0 || m->msg.num_fds > RTE_MP_MAX_FD_NUM8) { |
290 | RTE_LOG(ERR, EAL, "invalid number of fd's received\n")rte_log(4U, 0, "EAL" ": " "invalid number of fd's received\n" ); |
291 | return -1; |
292 | } |
293 | if (m->msg.len_param < 0 || m->msg.len_param > RTE_MP_MAX_PARAM_LEN256) { |
294 | RTE_LOG(ERR, EAL, "invalid received data length\n")rte_log(4U, 0, "EAL" ": " "invalid received data length\n"); |
295 | return -1; |
296 | } |
297 | return 0; |
298 | } |
299 | |
300 | static void |
301 | process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) |
302 | { |
303 | struct pending_request *pending_req; |
304 | struct action_entry *entry; |
305 | struct rte_mp_msg *msg = &m->msg; |
306 | rte_mp_t action = NULL((void*)0); |
307 | |
308 | RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name)rte_log(8U, 0, "EAL" ": " "msg: %s\n", msg->name); |
309 | |
310 | if (m->type == MP_REP || m->type == MP_IGN) { |
311 | struct pending_request *req = NULL((void*)0); |
312 | |
313 | pthread_mutex_lock(&pending_requests.lock); |
314 | pending_req = find_pending_request(s->sun_path, msg->name); |
315 | if (pending_req) { |
316 | memcpy(pending_req->reply, msg, sizeof(*msg)); |
317 | /* -1 indicates that we've been asked to ignore */ |
318 | pending_req->reply_received = |
319 | m->type == MP_REP ? 1 : -1; |
320 | |
321 | if (pending_req->type == REQUEST_TYPE_SYNC) |
322 | pthread_cond_signal(&pending_req->sync.cond); |
323 | else if (pending_req->type == REQUEST_TYPE_ASYNC) |
324 | req = async_reply_handle_thread_unsafe( |
325 | pending_req); |
326 | } else |
327 | RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name)rte_log(4U, 0, "EAL" ": " "Drop mp reply: %s\n", msg->name ); |
328 | pthread_mutex_unlock(&pending_requests.lock); |
329 | |
330 | if (req != NULL((void*)0)) |
331 | trigger_async_action(req); |
332 | return; |
333 | } |
334 | |
335 | pthread_mutex_lock(&mp_mutex_action); |
336 | entry = find_action_entry_by_name(msg->name); |
337 | if (entry != NULL((void*)0)) |
338 | action = entry->action; |
339 | pthread_mutex_unlock(&mp_mutex_action); |
340 | |
341 | if (!action) { |
342 | if (m->type == MP_REQ && !internal_config.init_complete) { |
343 | /* if this is a request, and init is not yet complete, |
344 | * and callback wasn't registered, we should tell the |
345 | * requester to ignore our existence because we're not |
346 | * yet ready to process this request. |
347 | */ |
348 | struct rte_mp_msg dummy; |
349 | |
350 | memset(&dummy, 0, sizeof(dummy)); |
351 | strlcpy(dummy.name, msg->name, sizeof(dummy.name))rte_strlcpy(dummy.name, msg->name, sizeof(dummy.name)); |
352 | mp_send(&dummy, s->sun_path, MP_IGN); |
353 | } else { |
354 | RTE_LOG(ERR, EAL, "Cannot find action: %s\n",rte_log(4U, 0, "EAL" ": " "Cannot find action: %s\n", msg-> name) |
355 | msg->name)rte_log(4U, 0, "EAL" ": " "Cannot find action: %s\n", msg-> name); |
356 | } |
357 | } else if (action(msg, s->sun_path) < 0) { |
358 | RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name)rte_log(4U, 0, "EAL" ": " "Fail to handle message: %s\n", msg ->name); |
359 | } |
360 | } |
361 | |
362 | static void * |
363 | mp_handle(void *arg __rte_unused__attribute__((__unused__))) |
364 | { |
365 | struct mp_msg_internal msg; |
366 | struct sockaddr_un sa; |
367 | |
368 | while (1) { |
369 | if (read_msg(&msg, &sa) == 0) |
370 | process_msg(&msg, &sa); |
371 | } |
372 | |
373 | return NULL((void*)0); |
374 | } |
375 | |
376 | static int |
377 | timespec_cmp(const struct timespec *a, const struct timespec *b) |
378 | { |
379 | if (a->tv_sec < b->tv_sec) |
380 | return -1; |
381 | if (a->tv_sec > b->tv_sec) |
382 | return 1; |
383 | if (a->tv_nsec < b->tv_nsec) |
384 | return -1; |
385 | if (a->tv_nsec > b->tv_nsec) |
386 | return 1; |
387 | return 0; |
388 | } |
389 | |
390 | enum async_action { |
391 | ACTION_FREE, /**< free the action entry, but don't trigger callback */ |
392 | ACTION_TRIGGER /**< trigger callback, then free action entry */ |
393 | }; |
394 | |
395 | static enum async_action |
396 | process_async_request(struct pending_request *sr, const struct timespec *now) |
397 | { |
398 | struct async_request_param *param; |
399 | struct rte_mp_reply *reply; |
400 | bool_Bool timeout, last_msg; |
401 | |
402 | param = sr->async.param; |
403 | reply = ¶m->user_reply; |
404 | |
405 | /* did we timeout? */ |
406 | timeout = timespec_cmp(¶m->end, now) <= 0; |
407 | |
408 | /* if we received a response, adjust relevant data and copy mesasge. */ |
409 | if (sr->reply_received == 1 && sr->reply) { |
410 | struct rte_mp_msg *msg, *user_msgs, *tmp; |
411 | |
412 | msg = sr->reply; |
413 | user_msgs = reply->msgs; |
414 | |
415 | tmp = realloc(user_msgs, sizeof(*msg) * |
416 | (reply->nb_received + 1)); |
417 | if (!tmp) { |
418 | RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",rte_log(4U, 0, "EAL" ": " "Fail to alloc reply for request %s:%s\n" , sr->dst, sr->request->name) |
419 | sr->dst, sr->request->name)rte_log(4U, 0, "EAL" ": " "Fail to alloc reply for request %s:%s\n" , sr->dst, sr->request->name); |
420 | /* this entry is going to be removed and its message |
421 | * dropped, but we don't want to leak memory, so |
422 | * continue. |
423 | */ |
424 | } else { |
425 | user_msgs = tmp; |
426 | reply->msgs = user_msgs; |
427 | memcpy(&user_msgs[reply->nb_received], |
428 | msg, sizeof(*msg)); |
429 | reply->nb_received++; |
430 | } |
431 | |
432 | /* mark this request as processed */ |
433 | param->n_responses_processed++; |
434 | } else if (sr->reply_received == -1) { |
435 | /* we were asked to ignore this process */ |
436 | reply->nb_sent--; |
437 | } else if (timeout) { |
438 | /* count it as processed response, but don't increment |
439 | * nb_received. |
440 | */ |
441 | param->n_responses_processed++; |
442 | } |
443 | |
444 | free(sr->reply); |
445 | |
446 | last_msg = param->n_responses_processed == reply->nb_sent; |
447 | |
448 | return last_msg ? ACTION_TRIGGER : ACTION_FREE; |
449 | } |
450 | |
451 | static void |
452 | trigger_async_action(struct pending_request *sr) |
453 | { |
454 | struct async_request_param *param; |
455 | struct rte_mp_reply *reply; |
456 | |
457 | param = sr->async.param; |
458 | reply = ¶m->user_reply; |
459 | |
460 | param->clb(sr->request, reply); |
461 | |
462 | /* clean up */ |
463 | free(sr->async.param->user_reply.msgs); |
464 | free(sr->async.param); |
465 | free(sr->request); |
466 | free(sr); |
467 | } |
468 | |
469 | static struct pending_request * |
470 | async_reply_handle_thread_unsafe(void *arg) |
471 | { |
472 | struct pending_request *req = (struct pending_request *)arg; |
473 | enum async_action action; |
474 | struct timespec ts_now; |
475 | struct timeval now; |
476 | |
477 | if (gettimeofday(&now, NULL((void*)0)) < 0) { |
478 | RTE_LOG(ERR, EAL, "Cannot get current time\n")rte_log(4U, 0, "EAL" ": " "Cannot get current time\n"); |
479 | goto no_trigger; |
480 | } |
481 | ts_now.tv_nsec = now.tv_usec * 1000; |
482 | ts_now.tv_sec = now.tv_sec; |
483 | |
484 | action = process_async_request(req, &ts_now); |
485 | |
486 | TAILQ_REMOVE(&pending_requests.requests, req, next)do { if (((req)->next.tqe_next) != ((void*)0)) (req)->next .tqe_next->next.tqe_prev = (req)->next.tqe_prev; else ( &pending_requests.requests)->tqh_last = (req)->next .tqe_prev; *(req)->next.tqe_prev = (req)->next.tqe_next ; } while ( 0); |
487 | |
488 | if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) { |
489 | /* if we failed to cancel the alarm because it's already in |
490 | * progress, don't proceed because otherwise we will end up |
491 | * handling the same message twice. |
492 | */ |
493 | if (rte_errno(per_lcore__rte_errno) == EINPROGRESS115) { |
494 | RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n")rte_log(8U, 0, "EAL" ": " "Request handling is already in progress\n" ); |
495 | goto no_trigger; |
496 | } |
497 | RTE_LOG(ERR, EAL, "Failed to cancel alarm\n")rte_log(4U, 0, "EAL" ": " "Failed to cancel alarm\n"); |
498 | } |
499 | |
500 | if (action == ACTION_TRIGGER) |
501 | return req; |
502 | no_trigger: |
503 | free(req); |
504 | return NULL((void*)0); |
505 | } |
506 | |
507 | static void |
508 | async_reply_handle(void *arg) |
509 | { |
510 | struct pending_request *req; |
511 | |
512 | pthread_mutex_lock(&pending_requests.lock); |
513 | req = async_reply_handle_thread_unsafe(arg); |
514 | pthread_mutex_unlock(&pending_requests.lock); |
515 | |
516 | if (req != NULL((void*)0)) |
517 | trigger_async_action(req); |
518 | } |
519 | |
520 | static int |
521 | open_socket_fd(void) |
522 | { |
523 | struct sockaddr_un un; |
524 | |
525 | peer_name[0] = '\0'; |
526 | if (rte_eal_process_type() == RTE_PROC_SECONDARY) |
527 | snprintf(peer_name, sizeof(peer_name), |
528 | "%d_%"PRIx64"l" "x", getpid(), rte_rdtsc()); |
529 | |
530 | mp_fd = socket(AF_UNIX1, SOCK_DGRAMSOCK_DGRAM, 0); |
531 | if (mp_fd < 0) { |
532 | RTE_LOG(ERR, EAL, "failed to create unix socket\n")rte_log(4U, 0, "EAL" ": " "failed to create unix socket\n"); |
533 | return -1; |
534 | } |
535 | |
536 | memset(&un, 0, sizeof(un)); |
537 | un.sun_family = AF_UNIX1; |
538 | |
539 | create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path)); |
540 | |
541 | unlink(un.sun_path); /* May still exist since last run */ |
542 | |
543 | if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) { |
544 | RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",rte_log(4U, 0, "EAL" ": " "failed to bind %s: %s\n", un.sun_path , strerror((*__errno_location ()))) |
545 | un.sun_path, strerror(errno))rte_log(4U, 0, "EAL" ": " "failed to bind %s: %s\n", un.sun_path , strerror((*__errno_location ()))); |
546 | close(mp_fd); |
547 | return -1; |
548 | } |
549 | |
550 | RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path)rte_log(7U, 0, "EAL" ": " "Multi-process socket %s\n", un.sun_path ); |
551 | return mp_fd; |
552 | } |
553 | |
554 | static void |
555 | close_socket_fd(void) |
556 | { |
557 | char path[PATH_MAX4096]; |
558 | |
559 | if (mp_fd < 0) |
560 | return; |
561 | |
562 | close(mp_fd); |
563 | create_socket_path(peer_name, path, sizeof(path)); |
564 | unlink(path); |
565 | } |
566 | |
567 | int |
568 | rte_mp_channel_init(void) |
569 | { |
570 | char path[PATH_MAX4096]; |
571 | int dir_fd; |
572 | pthread_t mp_handle_tid; |
573 | |
574 | /* in no shared files mode, we do not have secondary processes support, |
575 | * so no need to initialize IPC. |
576 | */ |
577 | if (internal_config.no_shconf) { |
578 | RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC will be disabled\n")rte_log(8U, 0, "EAL" ": " "No shared files mode enabled, IPC will be disabled\n" ); |
579 | return 0; |
580 | } |
581 | |
582 | /* create filter path */ |
583 | create_socket_path("*", path, sizeof(path)); |
584 | strlcpy(mp_filter, basename(path), sizeof(mp_filter))rte_strlcpy(mp_filter, __xpg_basename(path), sizeof(mp_filter )); |
585 | |
586 | /* path may have been modified, so recreate it */ |
587 | create_socket_path("*", path, sizeof(path)); |
588 | strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path))rte_strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path)); |
589 | |
590 | /* lock the directory */ |
591 | dir_fd = open(mp_dir_path, O_RDONLY00); |
592 | if (dir_fd < 0) { |
593 | RTE_LOG(ERR, EAL, "failed to open %s: %s\n",rte_log(4U, 0, "EAL" ": " "failed to open %s: %s\n", mp_dir_path , strerror((*__errno_location ()))) |
594 | mp_dir_path, strerror(errno))rte_log(4U, 0, "EAL" ": " "failed to open %s: %s\n", mp_dir_path , strerror((*__errno_location ()))); |
595 | return -1; |
596 | } |
597 | |
598 | if (flock(dir_fd, LOCK_EX2)) { |
599 | RTE_LOG(ERR, EAL, "failed to lock %s: %s\n",rte_log(4U, 0, "EAL" ": " "failed to lock %s: %s\n", mp_dir_path , strerror((*__errno_location ()))) |
600 | mp_dir_path, strerror(errno))rte_log(4U, 0, "EAL" ": " "failed to lock %s: %s\n", mp_dir_path , strerror((*__errno_location ()))); |
601 | close(dir_fd); |
602 | return -1; |
603 | } |
604 | |
605 | if (open_socket_fd() < 0) { |
606 | close(dir_fd); |
607 | return -1; |
608 | } |
609 | |
610 | if (rte_ctrl_thread_create(&mp_handle_tid, "rte_mp_handle", |
611 | NULL((void*)0), mp_handle, NULL((void*)0)) < 0) { |
612 | RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",rte_log(4U, 0, "EAL" ": " "failed to create mp thead: %s\n", strerror ((*__errno_location ()))) |
613 | strerror(errno))rte_log(4U, 0, "EAL" ": " "failed to create mp thead: %s\n", strerror ((*__errno_location ()))); |
614 | close(mp_fd); |
615 | close(dir_fd); |
616 | mp_fd = -1; |
617 | return -1; |
618 | } |
619 | |
620 | /* unlock the directory */ |
621 | flock(dir_fd, LOCK_UN8); |
622 | close(dir_fd); |
623 | |
624 | return 0; |
625 | } |
626 | |
627 | void |
628 | rte_mp_channel_cleanup(void) |
629 | { |
630 | close_socket_fd(); |
631 | } |
632 | |
633 | /** |
634 | * Return -1, as fail to send message and it's caused by the local side. |
635 | * Return 0, as fail to send message and it's caused by the remote side. |
636 | * Return 1, as succeed to send message. |
637 | * |
638 | */ |
639 | static int |
640 | send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) |
641 | { |
642 | int snd; |
643 | struct iovec iov; |
644 | struct msghdr msgh; |
645 | struct cmsghdr *cmsg; |
646 | struct sockaddr_un dst; |
647 | struct mp_msg_internal m; |
648 | int fd_size = msg->num_fds * sizeof(int); |
649 | char control[CMSG_SPACE(fd_size)((((fd_size) + sizeof (size_t) - 1) & (size_t) ~(sizeof ( size_t) - 1)) + (((sizeof (struct cmsghdr)) + sizeof (size_t) - 1) & (size_t) ~(sizeof (size_t) - 1)))]; |
650 | |
651 | m.type = type; |
652 | memcpy(&m.msg, msg, sizeof(*msg)); |
653 | |
654 | memset(&dst, 0, sizeof(dst)); |
655 | dst.sun_family = AF_UNIX1; |
656 | strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path))rte_strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path)); |
657 | |
658 | memset(&msgh, 0, sizeof(msgh)); |
659 | memset(control, 0, sizeof(control)); |
660 | |
661 | iov.iov_base = &m; |
662 | iov.iov_len = sizeof(m) - sizeof(msg->fds); |
663 | |
664 | msgh.msg_name = &dst; |
665 | msgh.msg_namelen = sizeof(dst); |
666 | msgh.msg_iov = &iov; |
667 | msgh.msg_iovlen = 1; |
668 | msgh.msg_control = control; |
669 | msgh.msg_controllen = sizeof(control); |
670 | |
671 | cmsg = CMSG_FIRSTHDR(&msgh)((size_t) (&msgh)->msg_controllen >= sizeof (struct cmsghdr) ? (struct cmsghdr *) (&msgh)->msg_control : ( struct cmsghdr *) 0); |
672 | cmsg->cmsg_len = CMSG_LEN(fd_size)((((sizeof (struct cmsghdr)) + sizeof (size_t) - 1) & (size_t ) ~(sizeof (size_t) - 1)) + (fd_size)); |
673 | cmsg->cmsg_level = SOL_SOCKET1; |
674 | cmsg->cmsg_type = SCM_RIGHTSSCM_RIGHTS; |
675 | memcpy(CMSG_DATA(cmsg)((cmsg)->__cmsg_data), msg->fds, fd_size); |
676 | |
677 | do { |
678 | snd = sendmsg(mp_fd, &msgh, 0); |
679 | } while (snd < 0 && errno(*__errno_location ()) == EINTR4); |
680 | |
681 | if (snd < 0) { |
682 | rte_errno(per_lcore__rte_errno) = errno(*__errno_location ()); |
683 | /* Check if it caused by peer process exits */ |
684 | if (errno(*__errno_location ()) == ECONNREFUSED111 && |
685 | rte_eal_process_type() == RTE_PROC_PRIMARY) { |
686 | unlink(dst_path); |
687 | return 0; |
688 | } |
689 | RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n",rte_log(4U, 0, "EAL" ": " "failed to send to (%s) due to %s\n" , dst_path, strerror((*__errno_location ()))) |
690 | dst_path, strerror(errno))rte_log(4U, 0, "EAL" ": " "failed to send to (%s) due to %s\n" , dst_path, strerror((*__errno_location ()))); |
691 | return -1; |
692 | } |
693 | |
694 | return 1; |
695 | } |
696 | |
697 | static int |
698 | mp_send(struct rte_mp_msg *msg, const char *peer, int type) |
699 | { |
700 | int dir_fd, ret = 0; |
701 | DIR *mp_dir; |
702 | struct dirent *ent; |
703 | |
704 | if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY)) |
705 | peer = eal_mp_socket_path(); |
706 | |
707 | if (peer) { |
708 | if (send_msg(peer, msg, type) < 0) |
709 | return -1; |
710 | else |
711 | return 0; |
712 | } |
713 | |
714 | /* broadcast to all secondary processes */ |
715 | mp_dir = opendir(mp_dir_path); |
716 | if (!mp_dir) { |
717 | RTE_LOG(ERR, EAL, "Unable to open directory %s\n",rte_log(4U, 0, "EAL" ": " "Unable to open directory %s\n", mp_dir_path ) |
718 | mp_dir_path)rte_log(4U, 0, "EAL" ": " "Unable to open directory %s\n", mp_dir_path ); |
719 | rte_errno(per_lcore__rte_errno) = errno(*__errno_location ()); |
720 | return -1; |
721 | } |
722 | |
723 | dir_fd = dirfd(mp_dir); |
724 | /* lock the directory to prevent processes spinning up while we send */ |
725 | if (flock(dir_fd, LOCK_SH1)) { |
726 | RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",rte_log(4U, 0, "EAL" ": " "Unable to lock directory %s\n", mp_dir_path ) |
727 | mp_dir_path)rte_log(4U, 0, "EAL" ": " "Unable to lock directory %s\n", mp_dir_path ); |
728 | rte_errno(per_lcore__rte_errno) = errno(*__errno_location ()); |
729 | closedir(mp_dir); |
730 | return -1; |
731 | } |
732 | |
733 | while ((ent = readdir(mp_dir))) { |
734 | char path[PATH_MAX4096]; |
735 | |
736 | if (fnmatch(mp_filter, ent->d_name, 0) != 0) |
737 | continue; |
738 | |
739 | snprintf(path, sizeof(path), "%s/%s", mp_dir_path, |
740 | ent->d_name); |
741 | if (send_msg(path, msg, type) < 0) |
742 | ret = -1; |
743 | } |
744 | /* unlock the dir */ |
745 | flock(dir_fd, LOCK_UN8); |
746 | |
747 | /* dir_fd automatically closed on closedir */ |
748 | closedir(mp_dir); |
749 | return ret; |
750 | } |
751 | |
752 | static int |
753 | check_input(const struct rte_mp_msg *msg) |
754 | { |
755 | if (msg == NULL((void*)0)) { |
756 | RTE_LOG(ERR, EAL, "Msg cannot be NULL\n")rte_log(4U, 0, "EAL" ": " "Msg cannot be NULL\n"); |
757 | rte_errno(per_lcore__rte_errno) = EINVAL22; |
758 | return -1; |
759 | } |
760 | |
761 | if (validate_action_name(msg->name) != 0) |
762 | return -1; |
763 | |
764 | if (msg->len_param < 0) { |
765 | RTE_LOG(ERR, EAL, "Message data length is negative\n")rte_log(4U, 0, "EAL" ": " "Message data length is negative\n" ); |
766 | rte_errno(per_lcore__rte_errno) = EINVAL22; |
767 | return -1; |
768 | } |
769 | |
770 | if (msg->num_fds < 0) { |
771 | RTE_LOG(ERR, EAL, "Number of fd's is negative\n")rte_log(4U, 0, "EAL" ": " "Number of fd's is negative\n"); |
772 | rte_errno(per_lcore__rte_errno) = EINVAL22; |
773 | return -1; |
774 | } |
775 | |
776 | if (msg->len_param > RTE_MP_MAX_PARAM_LEN256) { |
777 | RTE_LOG(ERR, EAL, "Message data is too long\n")rte_log(4U, 0, "EAL" ": " "Message data is too long\n"); |
778 | rte_errno(per_lcore__rte_errno) = E2BIG7; |
779 | return -1; |
780 | } |
781 | |
782 | if (msg->num_fds > RTE_MP_MAX_FD_NUM8) { |
783 | RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n",rte_log(4U, 0, "EAL" ": " "Cannot send more than %d FDs\n", 8 ) |
784 | RTE_MP_MAX_FD_NUM)rte_log(4U, 0, "EAL" ": " "Cannot send more than %d FDs\n", 8 ); |
785 | rte_errno(per_lcore__rte_errno) = E2BIG7; |
786 | return -1; |
787 | } |
788 | |
789 | return 0; |
790 | } |
791 | |
792 | int __rte_experimental__attribute__((section(".text.experimental"))) |
793 | rte_mp_sendmsg(struct rte_mp_msg *msg) |
794 | { |
795 | if (check_input(msg) != 0) |
796 | return -1; |
797 | |
798 | RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name)rte_log(8U, 0, "EAL" ": " "sendmsg: %s\n", msg->name); |
799 | return mp_send(msg, NULL((void*)0), MP_MSG); |
800 | } |
801 | |
802 | static int |
803 | mp_request_async(const char *dst, struct rte_mp_msg *req, |
804 | struct async_request_param *param, const struct timespec *ts) |
805 | { |
806 | struct rte_mp_msg *reply_msg; |
807 | struct pending_request *pending_req, *exist; |
808 | int ret = -1; |
809 | |
810 | pending_req = calloc(1, sizeof(*pending_req)); |
811 | reply_msg = calloc(1, sizeof(*reply_msg)); |
812 | if (pending_req == NULL((void*)0) || reply_msg == NULL((void*)0)) { |
813 | RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n")rte_log(4U, 0, "EAL" ": " "Could not allocate space for sync request\n" ); |
814 | rte_errno(per_lcore__rte_errno) = ENOMEM12; |
815 | ret = -1; |
816 | goto fail; |
817 | } |
818 | |
819 | pending_req->type = REQUEST_TYPE_ASYNC; |
820 | strlcpy(pending_req->dst, dst, sizeof(pending_req->dst))rte_strlcpy(pending_req->dst, dst, sizeof(pending_req-> dst)); |
821 | pending_req->request = req; |
822 | pending_req->reply = reply_msg; |
823 | pending_req->async.param = param; |
824 | |
825 | /* queue already locked by caller */ |
826 | |
827 | exist = find_pending_request(dst, req->name); |
828 | if (exist) { |
829 | RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name)rte_log(4U, 0, "EAL" ": " "A pending request %s:%s\n", dst, req ->name); |
830 | rte_errno(per_lcore__rte_errno) = EEXIST17; |
831 | ret = -1; |
832 | goto fail; |
833 | } |
834 | |
835 | ret = send_msg(dst, req, MP_REQ); |
836 | if (ret < 0) { |
837 | RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",rte_log(4U, 0, "EAL" ": " "Fail to send request %s:%s\n", dst , req->name) |
838 | dst, req->name)rte_log(4U, 0, "EAL" ": " "Fail to send request %s:%s\n", dst , req->name); |
839 | ret = -1; |
840 | goto fail; |
841 | } else if (ret == 0) { |
842 | ret = 0; |
843 | goto fail; |
844 | } |
845 | param->user_reply.nb_sent++; |
846 | |
847 | /* if alarm set fails, we simply ignore the reply */ |
848 | if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000, |
849 | async_reply_handle, pending_req) < 0) { |
850 | RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n",rte_log(4U, 0, "EAL" ": " "Fail to set alarm for request %s:%s\n" , dst, req->name) |
851 | dst, req->name)rte_log(4U, 0, "EAL" ": " "Fail to set alarm for request %s:%s\n" , dst, req->name); |
852 | ret = -1; |
853 | goto fail; |
854 | } |
855 | TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next)do { (pending_req)->next.tqe_next = ((void*)0); (pending_req )->next.tqe_prev = (&pending_requests.requests)->tqh_last ; *(&pending_requests.requests)->tqh_last = (pending_req ); (&pending_requests.requests)->tqh_last = &(pending_req )->next.tqe_next; } while ( 0); |
856 | |
857 | return 0; |
858 | fail: |
859 | free(pending_req); |
860 | free(reply_msg); |
861 | return ret; |
862 | } |
863 | |
864 | static int |
865 | mp_request_sync(const char *dst, struct rte_mp_msg *req, |
866 | struct rte_mp_reply *reply, const struct timespec *ts) |
867 | { |
868 | int ret; |
869 | struct rte_mp_msg msg, *tmp; |
870 | struct pending_request pending_req, *exist; |
871 | |
872 | pending_req.type = REQUEST_TYPE_SYNC; |
873 | pending_req.reply_received = 0; |
874 | strlcpy(pending_req.dst, dst, sizeof(pending_req.dst))rte_strlcpy(pending_req.dst, dst, sizeof(pending_req.dst)); |
875 | pending_req.request = req; |
876 | pending_req.reply = &msg; |
877 | pthread_cond_init(&pending_req.sync.cond, NULL((void*)0)); |
878 | |
879 | exist = find_pending_request(dst, req->name); |
880 | if (exist) { |
881 | RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name)rte_log(4U, 0, "EAL" ": " "A pending request %s:%s\n", dst, req ->name); |
882 | rte_errno(per_lcore__rte_errno) = EEXIST17; |
883 | return -1; |
884 | } |
885 | |
886 | ret = send_msg(dst, req, MP_REQ); |
887 | if (ret < 0) { |
888 | RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",rte_log(4U, 0, "EAL" ": " "Fail to send request %s:%s\n", dst , req->name) |
889 | dst, req->name)rte_log(4U, 0, "EAL" ": " "Fail to send request %s:%s\n", dst , req->name); |
890 | return -1; |
891 | } else if (ret == 0) |
892 | return 0; |
893 | |
894 | TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next)do { (&pending_req)->next.tqe_next = ((void*)0); (& pending_req)->next.tqe_prev = (&pending_requests.requests )->tqh_last; *(&pending_requests.requests)->tqh_last = (&pending_req); (&pending_requests.requests)->tqh_last = &(&pending_req)->next.tqe_next; } while ( 0); |
895 | |
896 | reply->nb_sent++; |
897 | |
898 | do { |
899 | ret = pthread_cond_timedwait(&pending_req.sync.cond, |
900 | &pending_requests.lock, ts); |
901 | } while (ret != 0 && ret != ETIMEDOUT110); |
902 | |
903 | TAILQ_REMOVE(&pending_requests.requests, &pending_req, next)do { if (((&pending_req)->next.tqe_next) != ((void*)0) ) (&pending_req)->next.tqe_next->next.tqe_prev = (& pending_req)->next.tqe_prev; else (&pending_requests.requests )->tqh_last = (&pending_req)->next.tqe_prev; *(& pending_req)->next.tqe_prev = (&pending_req)->next. tqe_next; } while ( 0); |
904 | |
905 | if (pending_req.reply_received == 0) { |
906 | RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n",rte_log(4U, 0, "EAL" ": " "Fail to recv reply for request %s:%s\n" , dst, req->name) |
907 | dst, req->name)rte_log(4U, 0, "EAL" ": " "Fail to recv reply for request %s:%s\n" , dst, req->name); |
908 | rte_errno(per_lcore__rte_errno) = ETIMEDOUT110; |
909 | return -1; |
910 | } |
911 | if (pending_req.reply_received == -1) { |
912 | RTE_LOG(DEBUG, EAL, "Asked to ignore response\n")rte_log(8U, 0, "EAL" ": " "Asked to ignore response\n"); |
913 | /* not receiving this message is not an error, so decrement |
914 | * number of sent messages |
915 | */ |
916 | reply->nb_sent--; |
917 | return 0; |
918 | } |
919 | |
920 | tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1)); |
921 | if (!tmp) { |
922 | RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",rte_log(4U, 0, "EAL" ": " "Fail to alloc reply for request %s:%s\n" , dst, req->name) |
923 | dst, req->name)rte_log(4U, 0, "EAL" ": " "Fail to alloc reply for request %s:%s\n" , dst, req->name); |
924 | rte_errno(per_lcore__rte_errno) = ENOMEM12; |
925 | return -1; |
926 | } |
927 | memcpy(&tmp[reply->nb_received], &msg, sizeof(msg)); |
928 | reply->msgs = tmp; |
929 | reply->nb_received++; |
930 | return 0; |
931 | } |
932 | |
933 | int __rte_experimental__attribute__((section(".text.experimental"))) |
934 | rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply, |
935 | const struct timespec *ts) |
936 | { |
937 | int dir_fd, ret = -1; |
938 | DIR *mp_dir; |
939 | struct dirent *ent; |
940 | struct timeval now; |
941 | struct timespec end; |
942 | |
943 | RTE_LOG(DEBUG, EAL, "request: %s\n", req->name)rte_log(8U, 0, "EAL" ": " "request: %s\n", req->name); |
944 | |
945 | reply->nb_sent = 0; |
946 | reply->nb_received = 0; |
947 | reply->msgs = NULL((void*)0); |
948 | |
949 | if (check_input(req) != 0) |
950 | goto end; |
951 | |
952 | if (internal_config.no_shconf) { |
953 | RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n")rte_log(8U, 0, "EAL" ": " "No shared files mode enabled, IPC is disabled\n" ); |
954 | return 0; |
955 | } |
956 | |
957 | if (gettimeofday(&now, NULL((void*)0)) < 0) { |
958 | RTE_LOG(ERR, EAL, "Failed to get current time\n")rte_log(4U, 0, "EAL" ": " "Failed to get current time\n"); |
959 | rte_errno(per_lcore__rte_errno) = errno(*__errno_location ()); |
960 | goto end; |
961 | } |
962 | |
963 | end.tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000; |
964 | end.tv_sec = now.tv_sec + ts->tv_sec + |
965 | (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000; |
966 | |
967 | /* for secondary process, send request to the primary process only */ |
968 | if (rte_eal_process_type() == RTE_PROC_SECONDARY) { |
969 | pthread_mutex_lock(&pending_requests.lock); |
970 | ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end); |
971 | pthread_mutex_unlock(&pending_requests.lock); |
972 | goto end; |
973 | } |
974 | |
975 | /* for primary process, broadcast request, and collect reply 1 by 1 */ |
976 | mp_dir = opendir(mp_dir_path); |
977 | if (!mp_dir) { |
978 | RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path)rte_log(4U, 0, "EAL" ": " "Unable to open directory %s\n", mp_dir_path ); |
979 | rte_errno(per_lcore__rte_errno) = errno(*__errno_location ()); |
980 | goto end; |
981 | } |
982 | |
983 | dir_fd = dirfd(mp_dir); |
984 | /* lock the directory to prevent processes spinning up while we send */ |
985 | if (flock(dir_fd, LOCK_SH1)) { |
986 | RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",rte_log(4U, 0, "EAL" ": " "Unable to lock directory %s\n", mp_dir_path ) |
987 | mp_dir_path)rte_log(4U, 0, "EAL" ": " "Unable to lock directory %s\n", mp_dir_path ); |
988 | rte_errno(per_lcore__rte_errno) = errno(*__errno_location ()); |
989 | goto close_end; |
990 | } |
991 | |
992 | pthread_mutex_lock(&pending_requests.lock); |
993 | while ((ent = readdir(mp_dir))) { |
994 | char path[PATH_MAX4096]; |
995 | |
996 | if (fnmatch(mp_filter, ent->d_name, 0) != 0) |
997 | continue; |
998 | |
999 | snprintf(path, sizeof(path), "%s/%s", mp_dir_path, |
1000 | ent->d_name); |
1001 | |
1002 | /* unlocks the mutex while waiting for response, |
1003 | * locks on receive |
1004 | */ |
1005 | if (mp_request_sync(path, req, reply, &end)) |
1006 | goto unlock_end; |
1007 | } |
1008 | ret = 0; |
1009 | |
1010 | unlock_end: |
1011 | pthread_mutex_unlock(&pending_requests.lock); |
1012 | /* unlock the directory */ |
1013 | flock(dir_fd, LOCK_UN8); |
1014 | |
1015 | close_end: |
1016 | /* dir_fd automatically closed on closedir */ |
1017 | closedir(mp_dir); |
1018 | |
1019 | end: |
1020 | if (ret) { |
1021 | free(reply->msgs); |
1022 | reply->nb_received = 0; |
1023 | reply->msgs = NULL((void*)0); |
1024 | } |
1025 | return ret; |
1026 | } |
1027 | |
1028 | int __rte_experimental__attribute__((section(".text.experimental"))) |
1029 | rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts, |
1030 | rte_mp_async_reply_t clb) |
1031 | { |
1032 | struct rte_mp_msg *copy; |
1033 | struct pending_request *dummy; |
1034 | struct async_request_param *param; |
1035 | struct rte_mp_reply *reply; |
1036 | int dir_fd, ret = 0; |
1037 | DIR *mp_dir; |
1038 | struct dirent *ent; |
1039 | struct timeval now; |
1040 | struct timespec *end; |
1041 | bool_Bool dummy_used = false0; |
1042 | |
1043 | RTE_LOG(DEBUG, EAL, "request: %s\n", req->name)rte_log(8U, 0, "EAL" ": " "request: %s\n", req->name); |
1044 | |
1045 | if (check_input(req) != 0) |
1046 | return -1; |
1047 | |
1048 | if (internal_config.no_shconf) { |
1049 | RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n")rte_log(8U, 0, "EAL" ": " "No shared files mode enabled, IPC is disabled\n" ); |
1050 | return 0; |
1051 | } |
1052 | |
1053 | if (gettimeofday(&now, NULL((void*)0)) < 0) { |
1054 | RTE_LOG(ERR, EAL, "Faile to get current time\n")rte_log(4U, 0, "EAL" ": " "Faile to get current time\n"); |
1055 | rte_errno(per_lcore__rte_errno) = errno(*__errno_location ()); |
1056 | return -1; |
1057 | } |
1058 | copy = calloc(1, sizeof(*copy)); |
1059 | dummy = calloc(1, sizeof(*dummy)); |
1060 | param = calloc(1, sizeof(*param)); |
1061 | if (copy == NULL((void*)0) || dummy == NULL((void*)0) || param == NULL((void*)0)) { |
1062 | RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n")rte_log(4U, 0, "EAL" ": " "Failed to allocate memory for async reply\n" ); |
1063 | rte_errno(per_lcore__rte_errno) = ENOMEM12; |
1064 | goto fail; |
1065 | } |
1066 | |
1067 | /* copy message */ |
1068 | memcpy(copy, req, sizeof(*copy)); |
1069 | |
1070 | param->n_responses_processed = 0; |
1071 | param->clb = clb; |
1072 | end = ¶m->end; |
1073 | reply = ¶m->user_reply; |
1074 | |
1075 | end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000; |
1076 | end->tv_sec = now.tv_sec + ts->tv_sec + |
1077 | (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000; |
1078 | reply->nb_sent = 0; |
1079 | reply->nb_received = 0; |
1080 | reply->msgs = NULL((void*)0); |
1081 | |
1082 | /* we have to lock the request queue here, as we will be adding a bunch |
1083 | * of requests to the queue at once, and some of the replies may arrive |
1084 | * before we add all of the requests to the queue. |
1085 | */ |
1086 | pthread_mutex_lock(&pending_requests.lock); |
1087 | |
1088 | /* we have to ensure that callback gets triggered even if we don't send |
1089 | * anything, therefore earlier we have allocated a dummy request. fill |
1090 | * it, and put it on the queue if we don't send any requests. |
1091 | */ |
1092 | dummy->type = REQUEST_TYPE_ASYNC; |
1093 | dummy->request = copy; |
1094 | dummy->reply = NULL((void*)0); |
1095 | dummy->async.param = param; |
1096 | dummy->reply_received = 1; /* short-circuit the timeout */ |
1097 | |
1098 | /* for secondary process, send request to the primary process only */ |
1099 | if (rte_eal_process_type() == RTE_PROC_SECONDARY) { |
1100 | ret = mp_request_async(eal_mp_socket_path(), copy, param, ts); |
1101 | |
1102 | /* if we didn't send anything, put dummy request on the queue */ |
1103 | if (ret == 0 && reply->nb_sent == 0) { |
1104 | TAILQ_INSERT_TAIL(&pending_requests.requests, dummy,do { (dummy)->next.tqe_next = ((void*)0); (dummy)->next .tqe_prev = (&pending_requests.requests)->tqh_last; *( &pending_requests.requests)->tqh_last = (dummy); (& pending_requests.requests)->tqh_last = &(dummy)->next .tqe_next; } while ( 0) |
1105 | next)do { (dummy)->next.tqe_next = ((void*)0); (dummy)->next .tqe_prev = (&pending_requests.requests)->tqh_last; *( &pending_requests.requests)->tqh_last = (dummy); (& pending_requests.requests)->tqh_last = &(dummy)->next .tqe_next; } while ( 0); |
1106 | dummy_used = true1; |
Value stored to 'dummy_used' is never read | |
1107 | } |
1108 | |
1109 | pthread_mutex_unlock(&pending_requests.lock); |
1110 | |
1111 | /* if we couldn't send anything, clean up */ |
1112 | if (ret != 0) |
1113 | goto fail; |
1114 | return 0; |
1115 | } |
1116 | |
1117 | /* for primary process, broadcast request */ |
1118 | mp_dir = opendir(mp_dir_path); |
1119 | if (!mp_dir) { |
1120 | RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path)rte_log(4U, 0, "EAL" ": " "Unable to open directory %s\n", mp_dir_path ); |
1121 | rte_errno(per_lcore__rte_errno) = errno(*__errno_location ()); |
1122 | goto unlock_fail; |
1123 | } |
1124 | dir_fd = dirfd(mp_dir); |
1125 | |
1126 | /* lock the directory to prevent processes spinning up while we send */ |
1127 | if (flock(dir_fd, LOCK_SH1)) { |
1128 | RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",rte_log(4U, 0, "EAL" ": " "Unable to lock directory %s\n", mp_dir_path ) |
1129 | mp_dir_path)rte_log(4U, 0, "EAL" ": " "Unable to lock directory %s\n", mp_dir_path ); |
1130 | rte_errno(per_lcore__rte_errno) = errno(*__errno_location ()); |
1131 | goto closedir_fail; |
1132 | } |
1133 | |
1134 | while ((ent = readdir(mp_dir))) { |
1135 | char path[PATH_MAX4096]; |
1136 | |
1137 | if (fnmatch(mp_filter, ent->d_name, 0) != 0) |
1138 | continue; |
1139 | |
1140 | snprintf(path, sizeof(path), "%s/%s", mp_dir_path, |
1141 | ent->d_name); |
1142 | |
1143 | if (mp_request_async(path, copy, param, ts)) |
1144 | ret = -1; |
1145 | } |
1146 | /* if we didn't send anything, put dummy request on the queue */ |
1147 | if (ret == 0 && reply->nb_sent == 0) { |
1148 | TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next)do { if (((dummy)->next.tqe_next = (&pending_requests. requests)->tqh_first) != ((void*)0)) (&pending_requests .requests)->tqh_first->next.tqe_prev = &(dummy)-> next.tqe_next; else (&pending_requests.requests)->tqh_last = &(dummy)->next.tqe_next; (&pending_requests.requests )->tqh_first = (dummy); (dummy)->next.tqe_prev = &( &pending_requests.requests)->tqh_first; } while ( 0); |
1149 | dummy_used = true1; |
1150 | } |
1151 | |
1152 | /* finally, unlock the queue */ |
1153 | pthread_mutex_unlock(&pending_requests.lock); |
1154 | |
1155 | /* unlock the directory */ |
1156 | flock(dir_fd, LOCK_UN8); |
1157 | |
1158 | /* dir_fd automatically closed on closedir */ |
1159 | closedir(mp_dir); |
1160 | |
1161 | /* if dummy was unused, free it */ |
1162 | if (!dummy_used) |
1163 | free(dummy); |
1164 | |
1165 | return ret; |
1166 | closedir_fail: |
1167 | closedir(mp_dir); |
1168 | unlock_fail: |
1169 | pthread_mutex_unlock(&pending_requests.lock); |
1170 | fail: |
1171 | free(dummy); |
1172 | free(param); |
1173 | free(copy); |
1174 | return -1; |
1175 | } |
1176 | |
1177 | int __rte_experimental__attribute__((section(".text.experimental"))) |
1178 | rte_mp_reply(struct rte_mp_msg *msg, const char *peer) |
1179 | { |
1180 | RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name)rte_log(8U, 0, "EAL" ": " "reply: %s\n", msg->name); |
1181 | |
1182 | if (check_input(msg) != 0) |
1183 | return -1; |
1184 | |
1185 | if (peer == NULL((void*)0)) { |
1186 | RTE_LOG(ERR, EAL, "peer is not specified\n")rte_log(4U, 0, "EAL" ": " "peer is not specified\n"); |
1187 | rte_errno(per_lcore__rte_errno) = EINVAL22; |
1188 | return -1; |
1189 | } |
1190 | |
1191 | if (internal_config.no_shconf) { |
1192 | RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n")rte_log(8U, 0, "EAL" ": " "No shared files mode enabled, IPC is disabled\n" ); |
1193 | return 0; |
1194 | } |
1195 | |
1196 | return mp_send(msg, peer, MP_REP); |
1197 | } |