/* * Copyright 2022 Haiku Inc. All rights reserved. * Distributed under the terms of the MIT License. * * Authors: * Niels Sascha Reedijk, niels.reedijk@gmail.com */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "HttpBuffer.h" #include "HttpParser.h" #include "HttpResultPrivate.h" #include "HttpSerializer.h" #include "NetServicesPrivate.h" using namespace std::literals; using namespace BPrivate::Network; /*! \brief Maximum size of the HTTP Header lines of the message. In the RFC there is no maximum, but we need to prevent the situation where we keep growing the internal buffer waiting for the end of line ('\r\n\') characters to occur. */ static constexpr ssize_t kMaxHeaderLineSize = 64 * 1024; struct CounterDeleter { void operator()(int32* counter) const noexcept { atomic_add(counter, -1); } }; class BHttpSession::Request { public: Request(BHttpRequest&& request, BBorrow target, BMessenger observer); Request(Request& original, const Redirect& redirect); // States enum RequestState { InitialState, Connected, RequestSent, ContentReceived }; RequestState State() const noexcept { return fRequestStatus; } // Result Helpers std::shared_ptr Result() { return fResult; } void SetError(std::exception_ptr e); // Helpers for maintaining the connection count std::pair GetHost() const; void SetCounter(int32* counter) noexcept; // Operational methods void ResolveHostName(); void OpenConnection(); void TransferRequest(); bool ReceiveResult(); void Disconnect() noexcept; // Object information int Socket() const noexcept { return fSocket->Socket(); } int32 Id() const noexcept { return fResult->id; } bool CanCancel() const noexcept { return fResult->CanCancel(); } // Message helper void SendMessage(uint32 what, std::function dataFunc = nullptr) const; private: BHttpRequest fRequest; // Request state/events RequestState fRequestStatus = InitialState; // Communication BMessenger fObserver; std::shared_ptr fResult; // Connection BNetworkAddress fRemoteAddress; std::unique_ptr fSocket; // Sending and receiving HttpBuffer fBuffer; HttpSerializer fSerializer; HttpParser fParser; // Receive state BHttpStatus fStatus; BHttpFields fFields; // Redirection bool fMightRedirect = false; int8 fRemainingRedirects; // Connection counter std::unique_ptr fConnectionCounter; }; class BHttpSession::Impl { public: Impl(); ~Impl() noexcept; BHttpResult Execute(BHttpRequest&& request, BBorrow target, BMessenger observer); void Cancel(int32 identifier); void SetMaxConnectionsPerHost(size_t maxConnections); void SetMaxHosts(size_t maxConnections); private: // Thread functions static status_t ControlThreadFunc(void* arg); static status_t DataThreadFunc(void* arg); // Helper functions std::vector GetRequestsForControlThread(); private: // constants (can be accessed unlocked) const sem_id fControlQueueSem; const sem_id fDataQueueSem; const thread_id fControlThread; const thread_id fDataThread; // locking mechanism BLocker fLock; std::atomic fQuitting = false; // queues & shared data std::list fControlQueue; std::deque fDataQueue; std::vector fCancelList; // data owned by the controlThread using Host = std::pair; std::map fConnectionCount; // data that can only be accessed atomically std::atomic fMaxConnectionsPerHost = 2; std::atomic fMaxHosts = 10; // data owned by the dataThread std::map connectionMap; std::vector objectList; }; struct BHttpSession::Redirect { BUrl url; bool redirectToGet; }; // #pragma mark -- BHttpSession::Impl BHttpSession::Impl::Impl() : fControlQueueSem(create_sem(0, "http:control")), fDataQueueSem(create_sem(0, "http:data")), fControlThread(spawn_thread(ControlThreadFunc, "http:control", B_NORMAL_PRIORITY, this)), fDataThread(spawn_thread(DataThreadFunc, "http:data", B_NORMAL_PRIORITY, this)) { // check initialization of semaphores if (fControlQueueSem < 0) throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create control queue semaphore"); if (fDataQueueSem < 0) throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create data queue semaphore"); // set up internal threads if (fControlThread < 0) throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create control thread"); if (resume_thread(fControlThread) != B_OK) throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot resume control thread"); if (fDataThread < 0) throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create data thread"); if (resume_thread(fDataThread) != B_OK) throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot resume data thread"); } BHttpSession::Impl::~Impl() noexcept { fQuitting.store(true); delete_sem(fControlQueueSem); delete_sem(fDataQueueSem); status_t threadResult; wait_for_thread(fControlThread, &threadResult); // The control thread waits for the data thread } BHttpResult BHttpSession::Impl::Execute(BHttpRequest&& request, BBorrow target, BMessenger observer) { auto wRequest = Request(std::move(request), std::move(target), observer); auto retval = BHttpResult(wRequest.Result()); auto lock = AutoLocker(fLock); fControlQueue.push_back(std::move(wRequest)); release_sem(fControlQueueSem); return retval; } void BHttpSession::Impl::Cancel(int32 identifier) { auto lock = AutoLocker(fLock); // Check if the item is on the control queue fControlQueue.remove_if([&identifier](auto& request) { if (request.Id() == identifier) { try { throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled); } catch (...) { request.SetError(std::current_exception()); } return true; } return false; }); // Get it on the list for deletion in the data queue fCancelList.push_back(identifier); release_sem(fDataQueueSem); } void BHttpSession::Impl::SetMaxConnectionsPerHost(size_t maxConnections) { if (maxConnections <= 0 || maxConnections >= INT32_MAX) { throw BRuntimeError( __PRETTY_FUNCTION__, "MaxConnectionsPerHost must be between 1 and INT32_MAX"); } fMaxConnectionsPerHost.store(maxConnections, std::memory_order_relaxed); } void BHttpSession::Impl::SetMaxHosts(size_t maxConnections) { if (maxConnections <= 0) throw BRuntimeError(__PRETTY_FUNCTION__, "MaxHosts must be 1 or more"); fMaxHosts.store(maxConnections, std::memory_order_relaxed); } /*static*/ status_t BHttpSession::Impl::ControlThreadFunc(void* arg) { BHttpSession::Impl* impl = static_cast(arg); // Outer loop to use the fControlQueueSem when new items have entered the queue while (true) { if (auto status = acquire_sem(impl->fControlQueueSem); status == B_INTERRUPTED) continue; else if (status != B_OK) { // Most likely B_BAD_SEM_ID indicating that the sem was deleted; go to cleanup break; } // Check if we have woken up because we are quitting if (impl->fQuitting.load()) break; // Get items to process (locking done by the helper) auto requests = impl->GetRequestsForControlThread(); if (requests.size() == 0) continue; for (auto& request: requests) { bool hasError = false; try { request.ResolveHostName(); request.OpenConnection(); } catch (...) { request.SetError(std::current_exception()); hasError = true; } if (hasError) { // Do not add the request back to the queue; release the sem to do another round // in case there is another item waiting because the limits of concurrent requests // were reached release_sem(impl->fControlQueueSem); continue; } impl->fLock.Lock(); impl->fDataQueue.push_back(std::move(request)); impl->fLock.Unlock(); release_sem(impl->fDataQueueSem); } } // Clean up and make sure we are quitting if (impl->fQuitting.load()) { // First wait for the data thread to complete status_t threadResult; wait_for_thread(impl->fDataThread, &threadResult); // Cancel all requests for (auto& request: impl->fControlQueue) { try { throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled); } catch (...) { request.SetError(std::current_exception()); } } } else { throw BRuntimeError( __PRETTY_FUNCTION__, "Unknown reason that the controlQueueSem is deleted"); } // Cleanup: wait for data thread return B_OK; } static constexpr uint16 EVENT_CANCELLED = 0x4000; /*static*/ status_t BHttpSession::Impl::DataThreadFunc(void* arg) { BHttpSession::Impl* data = static_cast(arg); // initial initialization of wait list data->objectList.push_back( object_wait_info{data->fDataQueueSem, B_OBJECT_TYPE_SEMAPHORE, B_EVENT_ACQUIRE_SEMAPHORE}); while (true) { if (auto status = wait_for_objects(data->objectList.data(), data->objectList.size()); status == B_INTERRUPTED) continue; else if (status < 0) { // Something went inexplicably wrong throw BSystemError("wait_for_objects()", status); } // First check if the change is in acquiring the sem, meaning that // there are new requests to be scheduled if (data->objectList[0].events == B_EVENT_ACQUIRE_SEMAPHORE) { if (auto status = acquire_sem(data->fDataQueueSem); status == B_INTERRUPTED) continue; else if (status != B_OK) { // Most likely B_BAD_SEM_ID indicating that the sem was deleted break; } // Process the cancelList and dataQueue. Note that there might // be a situation where a request is cancelled and added in the // same iteration, but that is taken care by this algorithm. data->fLock.Lock(); while (!data->fDataQueue.empty()) { auto request = std::move(data->fDataQueue.front()); data->fDataQueue.pop_front(); auto socket = request.Socket(); data->connectionMap.insert(std::make_pair(socket, std::move(request))); // Add to objectList data->objectList.push_back( object_wait_info{socket, B_OBJECT_TYPE_FD, B_EVENT_WRITE}); } for (auto id: data->fCancelList) { // To cancel, we set a special event status on the // object_wait_info list so that we can handle it below. // Also: the first item in the waitlist is always the semaphore // so the fun starts at offset 1. size_t offset = 0; for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend(); it++) { offset++; if (it->second.Id() == id) { data->objectList[offset].events = EVENT_CANCELLED; break; } } } data->fCancelList.clear(); data->fLock.Unlock(); } else if ((data->objectList[0].events & B_EVENT_INVALID) == B_EVENT_INVALID) { // The semaphore has been deleted. Start the cleanup break; } // Process all objects that are ready bool resizeObjectList = false; for (auto& item: data->objectList) { if (item.type != B_OBJECT_TYPE_FD) continue; if ((item.events & B_EVENT_WRITE) == B_EVENT_WRITE) { auto& request = data->connectionMap.find(item.object)->second; auto error = false; try { request.TransferRequest(); } catch (...) { request.SetError(std::current_exception()); error = true; } // End failed writes if (error) { request.Disconnect(); data->connectionMap.erase(item.object); release_sem(data->fControlQueueSem); // wake up control thread; there may queued requests unblocked. resizeObjectList = true; } } else if ((item.events & B_EVENT_READ) == B_EVENT_READ) { auto& request = data->connectionMap.find(item.object)->second; auto finished = false; try { if (request.CanCancel()) finished = true; else finished = request.ReceiveResult(); } catch (const Redirect& r) { // Request is redirected, send back to the controlThread // Move existing request into a new request and hand over to the control queue auto lock = AutoLocker(data->fLock); data->fControlQueue.emplace_back(request, r); release_sem(data->fControlQueueSem); finished = true; } catch (...) { request.SetError(std::current_exception()); finished = true; } if (finished) { // Clean up finished requests; including redirected requests request.Disconnect(); data->connectionMap.erase(item.object); release_sem(data->fControlQueueSem); // wake up control thread; there may queued requests unblocked. resizeObjectList = true; } } else if ((item.events & B_EVENT_DISCONNECTED) == B_EVENT_DISCONNECTED) { auto& request = data->connectionMap.find(item.object)->second; try { throw BNetworkRequestError( __PRETTY_FUNCTION__, BNetworkRequestError::NetworkError); } catch (...) { request.SetError(std::current_exception()); } data->connectionMap.erase(item.object); resizeObjectList = true; } else if ((item.events & EVENT_CANCELLED) == EVENT_CANCELLED) { auto& request = data->connectionMap.find(item.object)->second; request.Disconnect(); try { throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled); } catch (...) { request.SetError(std::current_exception()); } data->connectionMap.erase(item.object); release_sem(data->fControlQueueSem); // wake up control thread; there may queued requests unblocked. resizeObjectList = true; } else if (item.events == 0) { // No events for this item, skip continue; } else { // Likely to be B_EVENT_INVALID. This should not happen auto& request = data->connectionMap.find(item.object)->second; request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) { msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugError); msg.AddString(UrlEventData::DebugMessage, "Unexpected event; socket deleted?"); }); throw BRuntimeError( __PRETTY_FUNCTION__, "Socket was deleted at an unexpected time"); } } // Reset objectList data->objectList[0].events = B_EVENT_ACQUIRE_SEMAPHORE; if (resizeObjectList) data->objectList.resize(data->connectionMap.size() + 1); auto i = 1; for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend(); it++) { data->objectList[i].object = it->first; if (it->second.State() == Request::InitialState) throw BRuntimeError(__PRETTY_FUNCTION__, "Invalid state of request"); else if (it->second.State() == Request::Connected) data->objectList[i].events = B_EVENT_WRITE | B_EVENT_DISCONNECTED; else data->objectList[i].events = B_EVENT_READ | B_EVENT_DISCONNECTED; i++; } } // Clean up and make sure we are quitting if (data->fQuitting.load()) { // Cancel all requests for (auto it = data->connectionMap.begin(); it != data->connectionMap.end(); it++) { try { throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled); } catch (...) { it->second.SetError(std::current_exception()); } } } else { throw BRuntimeError(__PRETTY_FUNCTION__, "Unknown reason that the dataQueueSem is deleted"); } return B_OK; } /*! \brief Internal helper that filters the lists of requests to guard against the concurrent requests limit. This method will do the locking of the internal structure. */ std::vector BHttpSession::Impl::GetRequestsForControlThread() { std::vector requests; // Clean up connection list if it is at the max number of hosts if (fConnectionCount.size() >= fMaxHosts.load()) { for (auto it = fConnectionCount.begin(); it != fConnectionCount.end();) { if (atomic_get(std::addressof(it->second)) == 0) { it = fConnectionCount.erase(it); } else { it++; } } } // Process the list of pending requests and review if they can be started. auto lock = AutoLocker(fLock); fControlQueue.remove_if([this, &requests](auto& request) { auto host = request.GetHost(); auto it = fConnectionCount.find(host); if (it != fConnectionCount.end()) { if (static_cast(atomic_get(std::addressof(it->second))) >= fMaxConnectionsPerHost.load(std::memory_order_relaxed)) { request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) { msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugWarning); msg.AddString(UrlEventData::DebugMessage, "Request is queued: too many active connections for host"); }); return false; } else { atomic_add(std::addressof(it->second), 1); request.SetCounter(std::addressof(it->second)); } } else { if (fConnectionCount.size() == fMaxHosts.load()) { request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) { msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugWarning); msg.AddString(UrlEventData::DebugMessage, "Request is queued: maximum number of concurrent hosts"); }); return false; } auto [newIt, success] = fConnectionCount.insert({host, 1}); if (!success) { throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot insert into fConnectionCount"); } request.SetCounter(std::addressof(newIt->second)); } requests.emplace_back(std::move(request)); return true; }); return requests; } // #pragma mark -- BHttpSession (public interface) BHttpSession::BHttpSession() { fImpl = std::make_shared(); } BHttpSession::~BHttpSession() = default; BHttpSession::BHttpSession(const BHttpSession&) noexcept = default; BHttpSession& BHttpSession::operator=(const BHttpSession&) noexcept = default; BHttpResult BHttpSession::Execute(BHttpRequest&& request, BBorrow target, BMessenger observer) { return fImpl->Execute(std::move(request), std::move(target), observer); } void BHttpSession::Cancel(int32 identifier) { fImpl->Cancel(identifier); } void BHttpSession::Cancel(const BHttpResult& request) { fImpl->Cancel(request.Identity()); } void BHttpSession::SetMaxConnectionsPerHost(size_t maxConnections) { fImpl->SetMaxConnectionsPerHost(maxConnections); } void BHttpSession::SetMaxHosts(size_t maxConnections) { fImpl->SetMaxHosts(maxConnections); } // #pragma mark -- BHttpSession::Request (helpers) BHttpSession::Request::Request(BHttpRequest&& request, BBorrow target, BMessenger observer) : fRequest(std::move(request)), fObserver(observer) { auto identifier = get_netservices_request_identifier(); // interpret the remaining redirects fRemainingRedirects = fRequest.MaxRedirections(); // create shared data fResult = std::make_shared(identifier); // check if there is a target if (target.HasValue()) fResult->bodyTarget = std::move(target); // inform the parser when we do a HEAD request, so not to expect content if (fRequest.Method() == BHttpMethod::Head) fParser.SetNoContent(); } BHttpSession::Request::Request(Request& original, const BHttpSession::Redirect& redirect) : fRequest(std::move(original.fRequest)), fObserver(original.fObserver), fResult(original.fResult) { // update the original request with the new location fRequest.SetUrl(redirect.url); if (redirect.redirectToGet && (fRequest.Method() != BHttpMethod::Head && fRequest.Method() != BHttpMethod::Get)) { fRequest.SetMethod(BHttpMethod::Get); fRequest.ClearRequestBody(); } fRemainingRedirects = original.fRemainingRedirects--; // inform the parser when we do a HEAD request, so not to expect content if (fRequest.Method() == BHttpMethod::Head) fParser.SetNoContent(); } /*! \brief Helper that sets the error in the result to \a e and notifies the listeners. */ void BHttpSession::Request::SetError(std::exception_ptr e) { fResult->SetError(e); SendMessage(UrlEvent::DebugMessage, [&e](BMessage& msg) { msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugError); try { std::rethrow_exception(e); } catch (BError& error) { msg.AddString(UrlEventData::DebugMessage, error.DebugMessage()); } catch (std::exception& error) { msg.AddString(UrlEventData::DebugMessage, error.what()); } catch (...) { msg.AddString(UrlEventData::DebugMessage, "Unknown exception"); } }); SendMessage(UrlEvent::RequestCompleted, [](BMessage& msg) { msg.AddBool(UrlEventData::Success, false); }); } std::pair BHttpSession::Request::GetHost() const { return {fRequest.Url().Host(), fRequest.Url().Port()}; } void BHttpSession::Request::SetCounter(int32* counter) noexcept { fConnectionCounter = std::unique_ptr(counter); } /*! \brief Resolve the hostname for a request */ void BHttpSession::Request::ResolveHostName() { int port; if (fRequest.Url().HasPort()) port = fRequest.Url().Port(); else if (fRequest.Url().Protocol() == "https") port = 443; else port = 80; // TODO: proxy if (auto status = fRemoteAddress.SetTo(fRequest.Url().Host(), port); status != B_OK) { throw BNetworkRequestError( "BNetworkAddress::SetTo()", BNetworkRequestError::HostnameError, status); } SendMessage(UrlEvent::HostNameResolved, [this](BMessage& msg) { msg.AddString(UrlEventData::HostName, fRequest.Url().Host()); }); } /*! \brief Open the connection and make the socket non-blocking after opening it */ void BHttpSession::Request::OpenConnection() { // Set up the socket if (fRequest.Url().Protocol() == "https") { // To do: secure socket with callbacks to check certificates fSocket = std::make_unique(); } else { fSocket = std::make_unique(); } // Set timeout fSocket->SetTimeout(fRequest.Timeout()); // Open connection if (auto status = fSocket->Connect(fRemoteAddress); status != B_OK) { // TODO: inform listeners that the connection failed throw BNetworkRequestError( "BSocket::Connect()", BNetworkRequestError::NetworkError, status); } // Make the rest of the interaction non-blocking auto flags = fcntl(fSocket->Socket(), F_GETFL, 0); if (flags == -1) throw BRuntimeError("fcntl()", "Error getting socket flags"); if (fcntl(fSocket->Socket(), F_SETFL, flags | O_NONBLOCK) != 0) throw BRuntimeError("fcntl()", "Error setting non-blocking flag on socket"); SendMessage(UrlEvent::ConnectionOpened); fRequestStatus = Connected; } /*! \brief Transfer data from the request to the socket. \returns \c true if the request is complete, or false if there is more. */ void BHttpSession::Request::TransferRequest() { // Assert that we are in the right state if (fRequestStatus != Connected) throw BRuntimeError( __PRETTY_FUNCTION__, "Write request for object that is not in the Connected state"); if (!fSerializer.IsInitialized()) fSerializer.SetTo(fBuffer, fRequest); auto currentBytesWritten = fSerializer.Serialize(fBuffer, fSocket.get()); if (currentBytesWritten > 0) { SendMessage(UrlEvent::UploadProgress, [this](BMessage& msg) { msg.AddInt64(UrlEventData::NumBytes, fSerializer.BodyBytesTransferred()); if (auto totalSize = fSerializer.BodyBytesTotal()) msg.AddInt64(UrlEventData::TotalBytes, totalSize.value()); }); } if (fSerializer.Complete()) fRequestStatus = RequestSent; } /*! \brief Transfer data from the socket and parse the result. \returns \c true if the request is complete, or false if there is more. */ bool BHttpSession::Request::ReceiveResult() { // First: stream data from the socket auto bytesRead = fBuffer.ReadFrom(fSocket.get()); if (bytesRead == B_WOULD_BLOCK || bytesRead == B_INTERRUPTED) return false; auto readEnd = bytesRead == 0; // Parse the content in the buffer switch (fParser.State()) { case HttpInputStreamState::StatusLine: { if (fBuffer.RemainingBytes() == static_cast(bytesRead)) { // In the initial run, the bytes in the buffer will match the bytes read to indicate // the response has started. SendMessage(UrlEvent::ResponseStarted); } if (fParser.ParseStatus(fBuffer, fStatus)) { // the status headers are now received, decide what to do next // Determine if we can handle redirects; else notify of receiving status if (fRemainingRedirects > 0) { switch (fStatus.StatusCode()) { case BHttpStatusCode::MovedPermanently: case BHttpStatusCode::TemporaryRedirect: case BHttpStatusCode::PermanentRedirect: // These redirects require the request body to be sent again. It this is // possible, BHttpRequest::RewindBody() will return true in which case // we can handle the redirect. if (!fRequest.RewindBody()) break; [[fallthrough]]; case BHttpStatusCode::Found: case BHttpStatusCode::SeeOther: // These redirects redirect to GET, so we don't care if we can rewind // the body; in this case redirect fMightRedirect = true; break; default: break; } } if ((fStatus.StatusClass() == BHttpStatusClass::ClientError || fStatus.StatusClass() == BHttpStatusClass::ServerError) && fRequest.StopOnError()) { fRequestStatus = ContentReceived; fResult->SetStatus(std::move(fStatus)); fResult->SetFields(BHttpFields()); fResult->SetBody(); SendMessage(UrlEvent::RequestCompleted, [](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); }); return true; } if (!fMightRedirect) { // we are not redirecting and there is no error, so inform listeners SendMessage(UrlEvent::HttpStatus, [this](BMessage& msg) { msg.AddInt16(UrlEventData::HttpStatusCode, fStatus.code); }); fResult->SetStatus(BHttpStatus{fStatus.code, std::move(fStatus.text)}); } } else { // We do not have enough data for the status line yet if (readEnd) { throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::ProtocolError, "Response did not include a complete status line"); } return false; } [[fallthrough]]; } case HttpInputStreamState::Fields: { if (!fParser.ParseFields(fBuffer, fFields)) { // there may be more headers to receive, throw an error if there will be no more if (readEnd) { throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::ProtocolError, "Response did not include a complete header section"); } break; } // The headers have been received, now set up the rest of the response handling // Handle redirects if (fMightRedirect) { auto redirectToGet = false; switch (fStatus.StatusCode()) { case BHttpStatusCode::Found: case BHttpStatusCode::SeeOther: // 302 and 303 redirections convert all requests to GET request, except for // HEAD redirectToGet = true; [[fallthrough]]; case BHttpStatusCode::MovedPermanently: case BHttpStatusCode::TemporaryRedirect: case BHttpStatusCode::PermanentRedirect: { auto locationField = fFields.FindField("Location"); if (locationField == fFields.end()) { throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::ProtocolError, "Redirect; the Location field must be present and cannot be found"); } auto locationString = BString( (*locationField).Value().data(), (*locationField).Value().size()); auto redirect = BHttpSession::Redirect{ BUrl(fRequest.Url(), locationString), redirectToGet}; if (!redirect.url.IsValid()) { throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::ProtocolError, "Redirect; invalid URL in the Location field"); } // Notify of redirect SendMessage(UrlEvent::HttpRedirect, [&locationString](BMessage& msg) { msg.AddString(UrlEventData::HttpRedirectUrl, locationString); }); throw redirect; } default: // ignore other status codes and continue regular processing SendMessage(UrlEvent::HttpStatus, [this](BMessage& msg) { msg.AddInt16(UrlEventData::HttpStatusCode, fStatus.code); }); fResult->SetStatus(BHttpStatus{fStatus.code, std::move(fStatus.text)}); break; } } // TODO: Parse received cookies // Move headers to the result and inform listener fResult->SetFields(std::move(fFields)); SendMessage(UrlEvent::HttpFields); if (!fParser.HasContent()) { // Any requests with not content are finished fResult->SetBody(); SendMessage(UrlEvent::RequestCompleted, [](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); }); fRequestStatus = ContentReceived; return true; } [[fallthrough]]; } case HttpInputStreamState::Body: { size_t bytesWrittenToBody; // The bytesWrittenToBody may differ from the bytes parsed from the buffer when // there is compression on the incoming stream. bytesRead = fParser.ParseBody( fBuffer, [this, &bytesWrittenToBody](const std::byte* buffer, size_t size) { bytesWrittenToBody = fResult->WriteToBody(buffer, size); return bytesWrittenToBody; }, readEnd); SendMessage(UrlEvent::DownloadProgress, [this, bytesRead](BMessage& msg) { msg.AddInt64(UrlEventData::NumBytes, bytesRead); if (fParser.BodyBytesTotal()) msg.AddInt64(UrlEventData::TotalBytes, fParser.BodyBytesTotal().value()); }); if (bytesWrittenToBody > 0) { SendMessage(UrlEvent::BytesWritten, [bytesWrittenToBody](BMessage& msg) { msg.AddInt64(UrlEventData::NumBytes, bytesWrittenToBody); }); } if (fParser.Complete()) { fResult->SetBody(); SendMessage(UrlEvent::RequestCompleted, [](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); }); fRequestStatus = ContentReceived; return true; } else if (readEnd) { // the parsing of the body is not complete but we are at the end of the data throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::ProtocolError, "Unexpected end of data: more data was expected"); } break; } default: throw BRuntimeError(__PRETTY_FUNCTION__, "Not reachable"); } // There is more to receive return false; } /*! \brief Disconnect the socket. Does not validate if it actually succeeded. */ void BHttpSession::Request::Disconnect() noexcept { fSocket->Disconnect(); } /*! \brief Send a message to the observer, if one is present \param what The code of the message to be sent \param dataFunc Optional function that adds additional data to the message. */ void BHttpSession::Request::SendMessage(uint32 what, std::function dataFunc) const { if (fObserver.IsValid()) { BMessage msg(what); msg.AddInt32(UrlEventData::Id, fResult->id); if (dataFunc) dataFunc(msg); fObserver.SendMessage(&msg); } } // #pragma mark -- Message constants namespace BPrivate::Network::UrlEventData { const char* HttpStatusCode = "url:httpstatuscode"; const char* SSLCertificate = "url:sslcertificate"; const char* SSLMessage = "url:sslmessage"; const char* HttpRedirectUrl = "url:httpredirecturl"; } // namespace BPrivate::Network::UrlEventData