1/*	$NetBSD: isns_thread.c,v 1.2 2019/07/03 18:40:33 dholland Exp $	*/
2
3/*-
4 * Copyright (c) 2004,2009 The NetBSD Foundation, Inc.
5 * All rights reserved.
6 *
7 * This code is derived from software contributed to The NetBSD Foundation
8 * by Wasabi Systems, Inc.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 *    notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 *    notice, this list of conditions and the following disclaimer in the
17 *    documentation and/or other materials provided with the distribution.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32#include <sys/cdefs.h>
33__RCSID("$NetBSD: isns_thread.c,v 1.2 2019/07/03 18:40:33 dholland Exp $");
34
35
36/*
37 * isns_thread.c
38 */
39
40#include <sys/types.h>
41
42#include <unistd.h>
43
44#include "isns.h"
45#include "isns_config.h"
46#include "isns_defs.h"
47
48static struct iovec read_buf[2 + (ISNS_MAX_PDU_PAYLOAD / ISNS_BUF_SIZE) +
49    ((ISNS_MAX_PDU_PAYLOAD % ISNS_BUF_SIZE) != 0)];
50
51static struct isns_task_s *isns_get_next_task(struct isns_config_s *);
52
53/*
54 * isns_control_thread()
55 */
56void *
57isns_control_thread(void *arg)
58{
59	struct isns_config_s *cfg_p = (struct isns_config_s *)arg;
60	struct kevent evt_chgs[5], *evt_p;
61
62	int n, nevents;
63	isns_kevent_handler *evt_handler_p;
64	int run_thread;
65
66	run_thread = 1;
67
68	while (run_thread) {
69		/* if no task outstanding, check queue here and send PDU */
70		while ((cfg_p->curtask_p == NULL)
71		    && ((cfg_p->curtask_p = isns_get_next_task(cfg_p)) != NULL))
72			isns_run_task(cfg_p->curtask_p);
73
74		nevents = kevent(cfg_p->kq, NULL, 0, evt_chgs,
75		    ARRAY_ELEMS(evt_chgs), NULL);
76
77		DBG("isns_control_thread: kevent() nevents=%d\n", nevents);
78
79		for (n = 0, evt_p = evt_chgs; n < nevents; n++, evt_p++) {
80			DBG("event[%d] - data=%d\n", n, (int)evt_p->data);
81			evt_handler_p = (void *)evt_p->udata;
82			run_thread = (evt_handler_p(evt_p, cfg_p) == 0);
83		}
84	}
85
86	return 0;
87}
88
89/*
90 * isns_get_next_task()
91 */
92static struct isns_task_s *
93isns_get_next_task(struct isns_config_s *cfg_p)
94{
95	struct isns_task_s *task_p = NULL;
96
97
98	DBG("isns_get_next_task: entered\n");
99
100	task_p = isns_taskq_remove(cfg_p);
101
102	if (cfg_p->sd_connected)
103		return task_p;
104	else {
105		if (task_p == NULL)
106			return NULL;
107		else {
108			if (task_p->task_type != ISNS_TASK_INIT_SOCKET_IO) {
109				isns_taskq_insert_head(cfg_p, task_p);
110
111				task_p = isns_new_task(cfg_p,
112				    ISNS_TASK_RECONNECT_SERVER, 0);
113				task_p->var.reconnect_server.ai_p = cfg_p->ai_p;
114			}
115
116			return task_p;
117		}
118	}
119}
120
121/*
122 * isns_kevent_pipe()
123 */
124int
125isns_kevent_pipe(struct kevent* evt_p, struct isns_config_s *cfg_p)
126{
127	uint8_t cmd_type;
128	int force_isns_stop;
129	uint16_t trans_id;
130	ssize_t rbytes;
131	int pipe_nbytes;
132
133	force_isns_stop = 0;
134	pipe_nbytes = (int)evt_p->data;
135
136	while (pipe_nbytes > 0) {
137		rbytes = read(cfg_p->pipe_fds[0], &cmd_type,
138		    sizeof(cmd_type));
139		if (rbytes < 0) {
140			DBG("isns_kevent_pipe: error on wepe_sys_read\n");
141			/*?? should we break here? */
142			continue;
143		}
144
145		pipe_nbytes -= (int)rbytes;
146		switch (cmd_type) {
147		case ISNS_CMD_PROCESS_TASKQ:
148			DBG("isns_kevent_pipe: ISNS_CMD_PROCESS_TASKQ\n");
149			break;
150
151		case ISNS_CMD_ABORT_TRANS:
152			DBG("isns_kevent_pipe: ISNS_CMD_ABORT_TRANS\n");
153			rbytes = read(cfg_p->pipe_fds[0], &trans_id,
154			    sizeof(trans_id));
155			if (rbytes < 0)
156				DBG("isns_kevent_pipe: "
157				    "error reading trans id\n");
158			else if (rbytes != sizeof(trans_id))
159				DBG("isns_kevent_pipe: "
160				    "short read reading trans id\n");
161			else {
162				isns_abort_trans(cfg_p, trans_id);
163				pipe_nbytes -= (int)rbytes;
164			}
165			break;
166
167		case ISNS_CMD_STOP:
168			DBG("isns_kevent_pipe: ISNS_CMD_STOP\n");
169			force_isns_stop = 1;
170			pipe_nbytes = 0;
171			break;
172
173		default:
174			DBG("isns_kevent_pipe: unknown command (cmd=%d)\n",
175			    cmd_type);
176			break;
177		}
178	}
179
180	return (force_isns_stop ? 1 : 0);
181}
182
183/*
184 * isns_is_trans_complete()
185 */
186static int
187isns_is_trans_complete(struct isns_trans_s *trans_p)
188{
189	struct isns_pdu_s *pdu_p;
190	uint16_t count;
191
192	pdu_p = trans_p->pdu_rsp_list;
193	count = 0;
194	while (pdu_p->next != NULL) {
195		if (pdu_p->hdr.seq_id != count++) return 0;
196		pdu_p = pdu_p->next;
197	}
198	if ((pdu_p->hdr.seq_id != count) ||
199	    !(pdu_p->hdr.flags & ISNS_FLAG_LAST_PDU))
200		return 0;
201
202	return 1;
203}
204
205/*
206 * isns_is_valid_resp()
207 */
208static int
209isns_is_valid_resp(struct isns_trans_s *trans_p, struct isns_pdu_s *pdu_p)
210{
211	struct isns_pdu_s *curpdu_p;
212
213	if (pdu_p->hdr.trans_id != trans_p->id)
214		return 0;
215	if (pdu_p->hdr.func_id != (trans_p->func_id | 0x8000))
216		return 0;
217	curpdu_p = trans_p->pdu_rsp_list;
218	while (curpdu_p != NULL) {
219		if (curpdu_p->hdr.seq_id == pdu_p->hdr.seq_id) return 0;
220		curpdu_p = curpdu_p->next;
221	}
222
223	return 1;
224}
225
226/*
227 * isns_process_in_pdu()
228 */
229static void
230isns_process_in_pdu(struct isns_config_s *cfg_p)
231{
232	struct isns_task_s *curtask_p;
233	struct isns_trans_s *trans_p;
234
235	DBG("isns_process_in_pdu: entered\n");
236
237	if ((curtask_p = cfg_p->curtask_p) == NULL)
238		isns_free_pdu(cfg_p->pdu_in_p);
239	else if ((trans_p = curtask_p->var.send_pdu.trans_p) == NULL)
240		isns_free_pdu(cfg_p->pdu_in_p);
241	else if (!isns_is_valid_resp(trans_p, cfg_p->pdu_in_p))
242		isns_free_pdu(cfg_p->pdu_in_p);
243	else {
244		isns_add_pdu_response(trans_p, cfg_p->pdu_in_p);
245
246		if (isns_is_trans_complete(trans_p)) {
247			isns_complete_trans(trans_p);
248			isns_end_task(curtask_p);
249		}
250	}
251
252	cfg_p->pdu_in_p = NULL;
253}
254
255/*
256 * isns_kevent_socket()
257 */
258int
259isns_kevent_socket(struct kevent *evt_p, struct isns_config_s *cfg_p)
260{
261	struct iovec *iovp;
262	struct isns_buffer_s *curbuf_p, *newbuf_p;
263	struct isns_pdu_s *pdu_p;
264	int64_t bavail; /* bytes available in socket buffer */
265	uint32_t cur_len, buf_len, unread_len, rd_len, b_len;
266	ssize_t rv;
267	uint16_t payload_len;
268	int iovcnt, more, transport_evt;
269
270
271	DBG("isns_kevent_socket: entered\n");
272
273	transport_evt = 0;
274	bavail = evt_p->data;
275	iovp = read_buf;
276
277	more = (bavail > 0);
278	while (more) {
279		if (cfg_p->pdu_in_p == NULL) {
280			/*
281 	 		 * Try to form a valid pdu by starting with the hdr.
282			 * If there isn't enough data in the socket buffer
283			 * to form a full hdr, just return.
284 	 		 *
285 	 		 * Once we have read in our hdr, allocate all buffers
286			 * needed.
287 	 		 */
288
289			if (bavail < (int64_t)sizeof(struct isns_pdu_hdr_s))
290				return 0;
291
292			/* Form a placeholder pdu */
293			pdu_p = isns_new_pdu(cfg_p, 0, 0, 0);
294
295			/* Read the header into our placeholder pdu */
296			read_buf[0].iov_base = &(pdu_p->hdr);
297			read_buf[0].iov_len = sizeof(struct isns_pdu_hdr_s);
298			iovcnt = 1;
299
300			iovp = read_buf;
301			rv = isns_socket_readv(cfg_p->sd, iovp, iovcnt);
302			if ((rv == 0) || (rv == -1)) {
303				DBG("isns_kevent_socket: isns_socket_readv(1) "
304				    "returned %d\n", rv);
305				transport_evt = 1;
306				break;
307			}
308
309			bavail -= sizeof(struct isns_pdu_hdr_s);
310			/*
311			 * ToDo: read until sizeof(struct isns_pdu_hdr_s) has
312			 *       been read in. This statement should be
313			 *
314			 *       bavail -= rv;
315			 */
316
317			/* adjust byte order */
318			pdu_p->hdr.isnsp_version = isns_ntohs(pdu_p->hdr.
319			    isnsp_version);
320			pdu_p->hdr.func_id = isns_ntohs(pdu_p->hdr.func_id);
321			pdu_p->hdr.payload_len = isns_ntohs(pdu_p->hdr.
322			    payload_len);
323			pdu_p->hdr.flags = isns_ntohs(pdu_p->hdr.flags);
324			pdu_p->hdr.trans_id = isns_ntohs(pdu_p->hdr.trans_id);
325			pdu_p->hdr.seq_id = isns_ntohs(pdu_p->hdr.seq_id);
326			pdu_p->byteorder_host = 1;
327
328			/* Try to sense early whether we might have garbage */
329			if (pdu_p->hdr.isnsp_version != ISNSP_VERSION) {
330				DBG("isns_kevent_socket: pdu_p->hdr."
331				    "isnsp_version != ISNSP_VERSION\n");
332				isns_free_pdu(pdu_p);
333
334				transport_evt = 1;
335				break;
336			}
337
338			/* Allocate all the necessary payload buffers */
339			payload_len = pdu_p->hdr.payload_len;
340			curbuf_p = pdu_p->payload_p;
341			buf_len = 0;
342			while (buf_len + curbuf_p->alloc_len < payload_len) {
343				buf_len += curbuf_p->alloc_len;
344				newbuf_p = isns_new_buffer(0);
345				curbuf_p->next = newbuf_p;
346				curbuf_p = newbuf_p;
347			}
348			curbuf_p->next = NULL;
349
350			/* Hold on to our placeholder pdu */
351			cfg_p->pdu_in_p = pdu_p;
352			more = (bavail > 0) ? 1 : 0;
353		} else if (bavail > 0) {
354			/*
355 	 		 * Fill in the pdu payload data.
356			 *
357 	 		 * If we can fill it all in now
358	 		 *     -AND- it corresponds to the active transaction
359			 *           then add the pdu to the transaction's
360			 *           pdu_rsp_list
361	 		 *     -AND- it does not correspond to the active
362			 *           transaction (or there is no active
363			 *           transaction) then drop it on the floor.
364			 * We may not be able to fill it all in now.
365	 		 *     -EITHER WAY- fill in as much payload data now
366			 *                  as we can.
367 	 		 */
368
369			/* Refer to our placeholder pdu */
370			pdu_p = cfg_p->pdu_in_p;
371
372			/* How much payload data has been filled in? */
373			cur_len = 0;
374			curbuf_p = pdu_p->payload_p;
375			while (curbuf_p->cur_len == curbuf_p->alloc_len) {
376				cur_len += curbuf_p->cur_len;
377				curbuf_p = curbuf_p->next;
378			}
379			cur_len += curbuf_p->cur_len;
380
381			/* How much payload data is left to be filled in? */
382			unread_len = pdu_p->hdr.payload_len - cur_len;
383
384			/* Read as much remaining payload data as possible */
385			iovcnt = 0;
386			while (curbuf_p->next != NULL) {
387				read_buf[iovcnt].iov_base = isns_buffer_data(
388			    	    curbuf_p, curbuf_p->cur_len);
389				read_buf[iovcnt].iov_len = curbuf_p->alloc_len -
390			    	    curbuf_p->cur_len;
391				iovcnt++;
392
393				curbuf_p = curbuf_p->next;
394			}
395			read_buf[iovcnt].iov_base = isns_buffer_data(curbuf_p,
396		    	    curbuf_p->cur_len);
397			read_buf[iovcnt].iov_len = unread_len;
398			iovcnt++;
399
400			rv = isns_socket_readv(cfg_p->sd, iovp, iovcnt);
401			if ((rv == 0) || (rv == -1)) {
402				DBG("isns_kevent_socket: isns_socket_readv(2) "
403			    	    "returned %d\n",rv);
404				isns_free_pdu(cfg_p->pdu_in_p);
405				cfg_p->pdu_in_p = NULL;
406
407				transport_evt = 1;
408				break;
409			}
410
411			/* Update cur_len in buffers that newly have data */
412			curbuf_p = pdu_p->payload_p;
413			while (curbuf_p->cur_len == curbuf_p->alloc_len)
414				curbuf_p = curbuf_p->next;
415
416			rd_len = (uint32_t)rv;
417			do {
418				b_len = curbuf_p->alloc_len - curbuf_p->cur_len;
419				if (rd_len > b_len) {
420					curbuf_p->cur_len = curbuf_p->alloc_len;
421					rd_len -= b_len;
422				} else {
423					curbuf_p->cur_len += rd_len;
424					break;
425				}
426
427				curbuf_p = curbuf_p->next;
428			} while (curbuf_p != NULL);
429
430			bavail -= rv;
431
432			if (rv == (int)unread_len)
433				isns_process_in_pdu(cfg_p);
434
435			more = (bavail > (int64_t)sizeof(struct isns_pdu_hdr_s)) ? 1 : 0;
436		}
437	}
438
439	transport_evt |= (evt_p->flags & EV_EOF);
440	if (transport_evt) {
441		DBG("isns_kevent_socket: processing transport event\n");
442
443		isns_socket_close(cfg_p->sd);
444		cfg_p->sd_connected = 0;
445
446		if (cfg_p->curtask_p != NULL)
447			isns_process_connection_loss(cfg_p);
448
449		if (cfg_p->pdu_in_p != NULL) {
450			isns_free_pdu(cfg_p->pdu_in_p);
451			cfg_p->pdu_in_p = NULL;
452		}
453	}
454
455	return 0;
456}
457
458/* ARGSUSED */
459/*
460 * isns_kevent_timer_recon()
461 */
462int
463isns_kevent_timer_recon(struct kevent *evt_p, struct isns_config_s *cfg_p)
464{
465	int rv;
466
467
468	DBG("isns_kevent_timer_recon: entered\n");
469
470	rv = isns_socket_create(&(cfg_p->sd), cfg_p->ai_p->ai_family,
471		cfg_p->ai_p->ai_socktype);
472	if (rv != 0)
473		return 0;
474
475	rv = isns_socket_connect(cfg_p->sd, cfg_p->ai_p->ai_addr,
476	    cfg_p->ai_p->ai_addrlen);
477	if (rv == 0) {
478		/* Remove ISNS_EVT_TIMER_RECON from kqueue */
479		rv = isns_change_kevent_list(cfg_p,
480		    (uintptr_t)ISNS_EVT_TIMER_RECON, EVFILT_TIMER, EV_DELETE,
481		    (int64_t)0, (intptr_t)0);
482		if (rv == -1)
483			DBG("isns_kevent_timer_recon: error on "
484			    "isns_change_kevent_list(1)\n");
485
486		cfg_p->sd_connected = 1;
487
488		/* Add cfg_p->sd to kqueue */
489		rv = isns_change_kevent_list(cfg_p, (uintptr_t)cfg_p->sd,
490		    EVFILT_READ, EV_ADD | EV_CLEAR, (int64_t)0,
491		    (intptr_t)isns_kevent_socket);
492		if (rv == -1)
493			DBG("isns_kevent_timer_recon: error on "
494			    "isns_change_kevent_list(2)\n");
495
496		isns_end_task(cfg_p->curtask_p);
497	}
498
499	return 0;
500}
501
502
503/* ARGSUSED */
504/*
505 * isns_kevent_timer_refresh
506 */
507int
508isns_kevent_timer_refresh(struct kevent* evt_p, struct isns_config_s *cfg_p)
509{
510	struct isns_refresh_s *ref_p;
511	ISNS_TRANS trans;
512	uint32_t status;
513	int rval;
514
515	DBG("isns_kevent_timer_refresh: entered\n");
516
517	/* If refresh info pointer NULL, or no name assigned, just return. */
518	ref_p = cfg_p->refresh_p;
519	if ((ref_p == NULL) || (ref_p->node[0] == '\0'))
520	    	return 0;
521
522	if (ref_p->trans_p != NULL) {
523		/* If the previous refresh trans is not complete, return. */
524		rval = isns_get_pdu_response_status(ref_p->trans_p, &status);
525		if (rval == EPERM) {
526			DBG("isns_kevent_timer_refresh: "
527			    "prev refresh trans not complete\n");
528			return 0;
529		}
530		/* Free previous refresh trans. */
531		isns_free_trans(ref_p->trans_p);
532		ref_p->trans_p = NULL;
533	}
534
535	/* Build new refresh transaction and send it. */
536	trans = isns_new_trans((ISNS_HANDLE)cfg_p, isnsp_DevAttrQry, 0);
537	if (trans == ISNS_INVALID_TRANS) {
538		DBG("isns_kevent_timer_refresh: error on isns_new_trans()\n");
539		return 0;
540	}
541
542	ref_p->trans_p = (struct isns_trans_s *)trans;
543	/* First we add our source attribute */
544	isns_add_string(trans, isnst_iSCSIName, ref_p->node);
545	/* Now add our message attribute */
546	isns_add_string(trans, isnst_iSCSIName, ref_p->node);
547	isns_add_tlv(trans, isnst_Delimiter, 0, NULL);
548	/* and finally the operating attributes */
549	isns_add_tlv(trans, isnst_EID, 0, NULL);
550	isns_send_trans(trans, NULL, NULL);
551
552	return 0;
553}
554