1/*- 2 * Copyright (c) 2011, 2012, 2013, 2014 Spectra Logic Corporation 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions, and the following disclaimer, 10 * without modification. 11 * 2. Redistributions in binary form must reproduce at minimum a disclaimer 12 * substantially similar to the "NO WARRANTY" disclaimer below 13 * ("Disclaimer") and any redistribution must be conditioned upon 14 * including a substantially similar Disclaimer requirement for further 15 * binary redistribution. 16 * 17 * NO WARRANTY 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR 21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 * HOLDERS OR CONTRIBUTORS BE LIABLE FOR SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 26 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING 27 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGES. 29 * 30 * Authors: Justin T. Gibbs (Spectra Logic Corporation) 31 */ 32 33/** 34 * \file consumer.cc 35 */ 36 37#include <sys/cdefs.h> 38#include <sys/poll.h> 39#include <sys/socket.h> 40#include <sys/un.h> 41 42#include <err.h> 43#include <errno.h> 44#include <fcntl.h> 45#include <syslog.h> 46#include <unistd.h> 47 48#include <cstdarg> 49#include <cstring> 50#include <list> 51#include <map> 52#include <string> 53 54#include "guid.h" 55#include "event.h" 56#include "event_factory.h" 57#include "exception.h" 58 59#include "consumer.h" 60/*================================== Macros ==================================*/ 61#define NUM_ELEMENTS(x) (sizeof(x) / sizeof(*x)) 62 63/*============================ Namespace Control =============================*/ 64using std::string; 65namespace DevdCtl 66{ 67 68/*============================= Class Definitions ============================*/ 69/*----------------------------- DevdCtl::Consumer ----------------------------*/ 70//- Consumer Static Private Data ----------------------------------------------- 71const char Consumer::s_devdSockPath[] = "/var/run/devd.seqpacket.pipe"; 72 73//- Consumer Public Methods ---------------------------------------------------- 74Consumer::Consumer(Event::BuildMethod *defBuilder, 75 EventFactory::Record *regEntries, 76 size_t numEntries) 77 : m_devdSockFD(-1), 78 m_eventFactory(defBuilder), 79 m_replayingEvents(false) 80{ 81 m_eventFactory.UpdateRegistry(regEntries, numEntries); 82} 83 84Consumer::~Consumer() 85{ 86 DisconnectFromDevd(); 87} 88 89bool 90Consumer::ConnectToDevd() 91{ 92 struct sockaddr_un devdAddr; 93 int sLen; 94 int result; 95 96 if (m_devdSockFD != -1) { 97 /* Already connected. */ 98 syslog(LOG_DEBUG, "%s: Already connected.", __func__); 99 return (true); 100 } 101 syslog(LOG_INFO, "%s: Connecting to devd.", __func__); 102 103 memset(&devdAddr, 0, sizeof(devdAddr)); 104 devdAddr.sun_family= AF_UNIX; 105 strlcpy(devdAddr.sun_path, s_devdSockPath, sizeof(devdAddr.sun_path)); 106 sLen = SUN_LEN(&devdAddr); 107 108 m_devdSockFD = socket(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0); 109 if (m_devdSockFD == -1) 110 err(1, "Unable to create socket"); 111 result = connect(m_devdSockFD, 112 reinterpret_cast<sockaddr *>(&devdAddr), 113 sLen); 114 if (result == -1) { 115 syslog(LOG_INFO, "Unable to connect to devd"); 116 DisconnectFromDevd(); 117 return (false); 118 } 119 120 syslog(LOG_INFO, "Connection to devd successful"); 121 return (true); 122} 123 124void 125Consumer::DisconnectFromDevd() 126{ 127 if (m_devdSockFD != -1) { 128 syslog(LOG_INFO, "Disconnecting from devd."); 129 close(m_devdSockFD); 130 } 131 m_devdSockFD = -1; 132} 133 134std::string 135Consumer::ReadEvent() 136{ 137 char buf[MAX_EVENT_SIZE + 1]; 138 ssize_t len; 139 140 len = ::recv(m_devdSockFD, buf, MAX_EVENT_SIZE, MSG_WAITALL); 141 if (len == -1) 142 return (std::string("")); 143 else { 144 /* NULL-terminate the result */ 145 buf[len] = '\0'; 146 return (std::string(buf)); 147 } 148} 149 150void 151Consumer::ReplayUnconsumedEvents(bool discardUnconsumed) 152{ 153 EventList::iterator event(m_unconsumedEvents.begin()); 154 bool replayed_any = (event != m_unconsumedEvents.end()); 155 156 m_replayingEvents = true; 157 if (replayed_any) 158 syslog(LOG_INFO, "Started replaying unconsumed events"); 159 while (event != m_unconsumedEvents.end()) { 160 bool consumed((*event)->Process()); 161 if (consumed || discardUnconsumed) { 162 delete *event; 163 event = m_unconsumedEvents.erase(event); 164 } else { 165 event++; 166 } 167 } 168 if (replayed_any) 169 syslog(LOG_INFO, "Finished replaying unconsumed events"); 170 m_replayingEvents = false; 171} 172 173bool 174Consumer::SaveEvent(const Event &event) 175{ 176 if (m_replayingEvents) 177 return (false); 178 m_unconsumedEvents.push_back(event.DeepCopy()); 179 return (true); 180} 181 182Event * 183Consumer::NextEvent() 184{ 185 if (!Connected()) 186 return(NULL); 187 188 Event *event(NULL); 189 try { 190 string evString; 191 192 evString = ReadEvent(); 193 if (! evString.empty()) { 194 Event::TimestampEventString(evString); 195 event = Event::CreateEvent(m_eventFactory, evString); 196 } 197 } catch (const Exception &exp) { 198 exp.Log(); 199 DisconnectFromDevd(); 200 } 201 return (event); 202} 203 204/* Capture and process buffered events. */ 205void 206Consumer::ProcessEvents() 207{ 208 Event *event; 209 while ((event = NextEvent()) != NULL) { 210 if (event->Process()) 211 SaveEvent(*event); 212 delete event; 213 } 214} 215 216void 217Consumer::FlushEvents() 218{ 219 std::string s; 220 221 do 222 s = ReadEvent(); 223 while (! s.empty()) ; 224} 225 226bool 227Consumer::EventsPending() 228{ 229 struct pollfd fds[1]; 230 int result; 231 232 do { 233 fds->fd = m_devdSockFD; 234 fds->events = POLLIN; 235 fds->revents = 0; 236 result = poll(fds, NUM_ELEMENTS(fds), /*timeout*/0); 237 } while (result == -1 && errno == EINTR); 238 239 if (result == -1) 240 err(1, "Polling for devd events failed"); 241 242 if ((fds->revents & POLLERR) != 0) 243 throw Exception("Consumer::EventsPending(): " 244 "POLLERR detected on devd socket."); 245 246 if ((fds->revents & POLLHUP) != 0) 247 throw Exception("Consumer::EventsPending(): " 248 "POLLHUP detected on devd socket."); 249 250 return ((fds->revents & POLLIN) != 0); 251} 252 253} // namespace DevdCtl 254