Multipart. Continuous streaming - Async POC.

This commit is contained in:
lganzzzo 2020-08-02 11:40:19 +03:00
parent 6c3195a84d
commit 678219180c
12 changed files with 119 additions and 33 deletions

View File

@ -38,6 +38,25 @@ oatpp::String Multipart::getBoundary() {
return m_boundary;
}
std::shared_ptr<Part> Multipart::readNextPartSimple() {
async::Action action;
auto result = readNextPart(action);
if(!action.isNone()) {
throw std::runtime_error("[oatpp::web::mime::multipart::Multipart::readNextPartSimple()]. Error."
"Async method is called for non-async API.");
}
return result;
}
void Multipart::writeNextPartSimple(const std::shared_ptr<Part>& part) {
async::Action action;
writeNextPart(part, action);
if(!action.isNone()) {
throw std::runtime_error("[oatpp::web::mime::multipart::Multipart::writeNextPartSimple()]. Error."
"Async method is called for non-async API.");
}
}
oatpp::String Multipart::generateRandomBoundary(v_int32 boundarySize) {
std::unique_ptr<v_char8[]> buffer(new v_char8[boundarySize]);
utils::random::Random::randomBytes(buffer.get(), boundarySize);

View File

@ -65,13 +65,27 @@ public:
* Read part-by-part from Multipart.
* @return
*/
virtual std::shared_ptr<Part> readNextPart() = 0;
virtual std::shared_ptr<Part> readNextPart(async::Action& action) = 0;
/**
* Write part-by-part to Multipart.
* @param part
*/
virtual void writeNextPart(const std::shared_ptr<Part>& part) = 0;
virtual void writeNextPart(const std::shared_ptr<Part>& part, async::Action& action) = 0;
/**
* Read part-by-part from Multipart. <br>
* Call writeNextPart(...) and throw if `action.isNone() == false`.
* @return
*/
std::shared_ptr<Part> readNextPartSimple();
/**
* Write part-by-part to Multipart.
* Call writeNextPartSimple(...) and throw if `action.isNone() == false`.
* @param part
*/
void writeNextPartSimple(const std::shared_ptr<Part>& part);
public:

View File

@ -45,7 +45,7 @@ std::shared_ptr<PartList> PartList::createSharedWithRandomBoundary(v_int32 bound
return std::make_shared<PartList>(boundary);
}
std::shared_ptr<Part> PartList::readNextPart() {
std::shared_ptr<Part> PartList::readNextPart(async::Action& action) {
if(!m_readIteratorInitialized) {
m_readIteratorInitialized = true;
m_iterator = m_parts.begin();
@ -56,7 +56,7 @@ std::shared_ptr<Part> PartList::readNextPart() {
return *m_iterator ++;
}
void PartList::writeNextPart(const std::shared_ptr<Part>& part) {
void PartList::writeNextPart(const std::shared_ptr<Part>& part, async::Action& action) {
if(part->getName()) {
auto it = m_namedParts.find(part->getName());

View File

@ -64,13 +64,13 @@ public:
* Read part-by-part from Multipart.
* @return
*/
std::shared_ptr<Part> readNextPart() override;
std::shared_ptr<Part> readNextPart(async::Action& action) override;
/**
* Write part-by-part to Multipart.
* @param part
*/
void writeNextPart(const std::shared_ptr<Part>& part) override;
void writeNextPart(const std::shared_ptr<Part>& part, async::Action& action) override;
/**
* Get part by name <br>

View File

@ -63,10 +63,10 @@ void PartsParser::onPartData(p_char8 data, v_buff_size size) {
m_currReader->onPartData(m_currPart, data, size);
}
} else {
m_multipart->writeNextPart(m_currPart);
if(m_currReader) {
m_currReader->onPartData(m_currPart, data, size);
m_currReader->onPartData(m_currPart, nullptr, 0);
}
m_multipart->writeNextPartSimple(m_currPart);
}
}
@ -111,16 +111,49 @@ async::CoroutineStarter AsyncPartsParser::onPartHeadersAsync(const Headers& part
}
async::CoroutineStarter AsyncPartsParser::onPartDone(const std::shared_ptr<Part>& part) {
class PutPartCoroutine : public async::Coroutine<PutPartCoroutine> {
private:
Multipart* m_multipart;
std::shared_ptr<AsyncPartReader> m_currReader;
std::shared_ptr<Part> m_part;
public:
PutPartCoroutine(Multipart* multipart,
const std::shared_ptr<AsyncPartReader>& currReader,
const std::shared_ptr<Part>& part)
: m_multipart(multipart)
, m_currReader(currReader)
, m_part(part)
{}
Action act() override {
return m_currReader->onPartDataAsync(m_part, nullptr, 0).next(yieldTo(&PutPartCoroutine::putPart));
}
Action putPart() {
async::Action action;
m_multipart->writeNextPart(m_part, action);
if(action.isNone()) {
return action;
}
return finish();
}
};
return PutPartCoroutine::start(m_multipart, m_currReader, part);
}
async::CoroutineStarter AsyncPartsParser::onPartDataAsync(p_char8 data, v_buff_size size) {
if(size > 0) {
if(m_currReader) {
return m_currReader->onPartDataAsync(m_currPart, data, size);
}
} else {
m_multipart->writeNextPart(m_currPart);
if(m_currReader) {
return m_currReader->onPartDataAsync(m_currPart, data, size);
}
return onPartDone(m_currPart);
}
return nullptr;
}

View File

@ -142,6 +142,8 @@ typedef std::unordered_map<oatpp::String, std::shared_ptr<AsyncPartReader>> Asyn
*/
class AsyncPartsParser : public StatefulParser::AsyncListener {
friend AsyncReader;
private:
async::CoroutineStarter onPartDone(const std::shared_ptr<Part>& part);
private:
AsyncPartReadersMap m_readers;
std::shared_ptr<AsyncPartReader> m_defaultReader;

View File

@ -32,12 +32,14 @@ v_io_size MultipartBody::readBody(void *buffer, v_buff_size count, async::Action
const auto& stream = part->getInputStream();
if(!stream) {
OATPP_LOGW("[oatpp::web::protocol::http::outgoing::MultipartBody::MultipartReadCallback::readBody()]", "Warning. Part has no input stream", m_state);
m_iterator.inc();
m_iterator.inc(action);
return 0;
}
auto res = stream->read(buffer, count, action);
if(res == 0) {
m_iterator.inc();
if(action.isNone()) {
m_iterator.inc(action);
}
}
return res;
}
@ -48,6 +50,11 @@ v_io_size MultipartBody::read(void *buffer, v_buff_size count, async::Action& ac
return 0;
}
m_iterator.init(action);
if(!action.isNone()) {
return 0;
}
p_char8 currBufferPtr = (p_char8) buffer;
v_io_size bytesLeft = count;
@ -170,9 +177,6 @@ MultipartBody::MultipartBody(const std::shared_ptr<Multipart>& multipart, const
{}
void MultipartBody::declareHeaders(Headers& headers) {
if(m_iterator.finished()) {
return;
}
headers.put_LockFree(oatpp::web::protocol::http::Header::CONTENT_TYPE, m_contentType + "; boundary=" + m_multipart->getBoundary());
}

View File

@ -66,17 +66,28 @@ private:
std::shared_ptr<Multipart> m_multipart;
std::shared_ptr<Part> m_part;
bool m_isFirst;
bool m_initialized;
public:
PartIterator(const std::shared_ptr<Multipart>& multipart)
: m_multipart(multipart)
, m_part(m_multipart->readNextPart())
, m_part(nullptr)
, m_isFirst(true)
, m_initialized(false)
{}
void inc() {
m_part = m_multipart->readNextPart();
m_isFirst = false;
void init(async::Action& action) {
if(!m_initialized) {
m_part = m_multipart->readNextPart(action);
m_initialized = true;
}
}
void inc(async::Action& action) {
m_part = m_multipart->readNextPart(action);
if(m_part) {
m_isFirst = false;
}
}
bool finished() {

View File

@ -74,7 +74,7 @@ void runTests() {
OATPP_LOGD("aaa", "coroutine size=%d", sizeof(oatpp::async::AbstractCoroutine));
OATPP_LOGD("aaa", "action size=%d", sizeof(oatpp::async::Action));
/*
OATPP_RUN_TEST(oatpp::test::base::CommandLineArgumentsTest);
OATPP_RUN_TEST(oatpp::test::base::LoggerTest);
@ -156,11 +156,11 @@ void runTests() {
test_port.run();
}
*/
{
oatpp::test::web::FullTest test_virtual(0, 1000);
test_virtual.run();
// oatpp::test::web::FullTest test_virtual(0, 1000);
// test_virtual.run();
oatpp::test::web::FullTest test_port(8000, 10);
test_port.run();

View File

@ -125,7 +125,7 @@ std::shared_ptr<PartList> createMultipart(const std::unordered_map<oatpp::String
oatpp::web::mime::multipart::Headers partHeaders;
auto part = std::make_shared<oatpp::web::mime::multipart::Part>(partHeaders);
multipart->writeNextPart(part);
multipart->writeNextPartSimple(part);
part->putHeader("Content-Disposition", "form-data; name=\"" + pair.first + "\"");
part->setDataInfo(std::make_shared<oatpp::data::stream::BufferInputStream>(pair.second));

View File

@ -122,7 +122,7 @@ std::shared_ptr<PartList> createMultipart(const std::unordered_map<oatpp::String
oatpp::web::mime::multipart::Headers partHeaders;
auto part = std::make_shared<oatpp::web::mime::multipart::Part>(partHeaders);
multipart->writeNextPart(part);
multipart->writeNextPartSimple(part);
part->putHeader("Content-Disposition", "form-data; name=\"" + pair.first + "\"");
part->setDataInfo(std::make_shared<oatpp::data::stream::BufferInputStream>(pair.second));

View File

@ -298,21 +298,24 @@ public:
: oatpp::web::mime::multipart::Multipart(generateRandomBoundary())
{}
std::shared_ptr<Part> readNextPart() override {
std::shared_ptr<Part> readNextPart(async::Action& action) override {
if(counter == 5) {
return nullptr;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
auto part = std::make_shared<Part>();
part->putHeader(Header::CONTENT_TYPE, "text/html");
// oatpp::String frameData;
//
oatpp::String frameData;
// if(counter % 2 == 0) {
// frameData = "<html><body>0</body></html>";
// } else {
// frameData = "<html><body>1</body></html>";
// }
//
// part->setDataInfo(std::make_shared<oatpp::data::stream::BufferInputStream>(frameData));
if(counter % 2 == 0) {
@ -329,7 +332,7 @@ public:
}
void writeNextPart(const std::shared_ptr<Part>& part) override {
void writeNextPart(const std::shared_ptr<Part>& part, async::Action& action) override {
throw std::runtime_error("No writes here!!!");
}
@ -340,7 +343,7 @@ public:
auto body = std::make_shared<oatpp::web::protocol::http::outgoing::MultipartBody>(
multipart,
"multipart/x-mixed-replace",
false /* flush frames immediately */
true /* flush frames immediately */
);
return OutgoingResponse::createShared(Status::CODE_200, body);
}