diff --git a/vertx-web-client/src/main/java/io/vertx/ext/web/client/impl/HttpContext.java b/vertx-web-client/src/main/java/io/vertx/ext/web/client/impl/HttpContext.java index 5053f74a87..41dc4c360e 100644 --- a/vertx-web-client/src/main/java/io/vertx/ext/web/client/impl/HttpContext.java +++ b/vertx-web-client/src/main/java/io/vertx/ext/web/client/impl/HttpContext.java @@ -15,13 +15,9 @@ */ package io.vertx.ext.web.client.impl; -import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder; import io.vertx.core.*; import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpClientRequest; -import io.vertx.core.http.HttpClientResponse; -import io.vertx.core.http.HttpHeaders; -import io.vertx.core.http.RequestOptions; +import io.vertx.core.http.*; import io.vertx.core.internal.http.HttpClientInternal; import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.PromiseInternal; @@ -433,19 +429,42 @@ private void handlePrepareRequest() { if (body instanceof Pipe) { // } else if (body instanceof MultipartForm) { - MultipartFormUpload multipartForm; + ClientForm form; try { boolean multipart = "multipart/form-data".equals(contentType); - HttpPostRequestEncoder.EncoderMode encoderMode = this.request.multipartMixed() ? HttpPostRequestEncoder.EncoderMode.RFC1738 : HttpPostRequestEncoder.EncoderMode.HTML5; - multipartForm = new MultipartFormUpload(context, (MultipartForm) this.body, multipart, encoderMode); - this.body = multipartForm.pipe(); + if (multipart) { + ClientMultipartForm multipartForm = ClientMultipartForm.multipartForm(); + multipartForm.mixed(request.multipartMixed()); + form = multipartForm; + } else { + form = ClientForm.form(); + } + form.charset(((MultipartForm)body).getCharset()); + ((MultipartForm)body).forEach(part -> { + if (part.isAttribute()) { + form.attribute(part.name(), part.value()); + } else { + ClientMultipartForm multipartForm = (ClientMultipartForm) form; + if (part.isText()) { + if (part.pathname() != null) { + multipartForm.textFileUpload(part.name(), part.filename(), part.mediaType(), part.pathname()); + } else { + multipartForm.textFileUpload(part.name(), part.filename(), part.mediaType(), part.content()); + } + } else { + if (part.pathname() != null) { + multipartForm.binaryFileUpload(part.name(), part.filename(), part.mediaType(), part.pathname()); + } else { + multipartForm.binaryFileUpload(part.name(), part.filename(), part.mediaType(), part.content()); + } + } + } + }); + this.body = form; } catch (Exception e) { fail(e); return; } - for (Map.Entry header : multipartForm.headers()) { - requestOptions.putHeader(header.getKey(), header.getValue()); - } } else if (body == null && "application/json".equals(contentType)) { body = Buffer.buffer("null"); } else if (body instanceof JsonObject) { @@ -530,6 +549,8 @@ private void doSendRequest(HttpClientRequest request) { request.reset(0L, ar2.cause()); } }); + } else if (bodyToSend instanceof ClientForm) { + request.send((ClientForm) bodyToSend); } else { Buffer buffer = (Buffer) bodyToSend; request.send(buffer); diff --git a/vertx-web-client/src/main/java/io/vertx/ext/web/client/impl/MultipartFormUpload.java b/vertx-web-client/src/main/java/io/vertx/ext/web/client/impl/MultipartFormUpload.java deleted file mode 100644 index d4f564ec65..0000000000 --- a/vertx-web-client/src/main/java/io/vertx/ext/web/client/impl/MultipartFormUpload.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Copyright 2014 Red Hat, Inc. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Apache License v2.0 which accompanies this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * The Apache License v2.0 is available at - * http://www.opensource.org/licenses/apache2.0.php - * - * You may elect to redistribute this code under either of these licenses. - */ -package io.vertx.ext.web.client.impl; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.UnpooledByteBufAllocator; -import io.netty.handler.codec.http.*; -import io.netty.handler.codec.http.multipart.*; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.MultiMap; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.internal.ContextInternal; -import io.vertx.core.internal.buffer.BufferInternal; -import io.vertx.core.internal.concurrent.InboundMessageQueue; -import io.vertx.core.internal.http.HttpHeadersInternal; -import io.vertx.core.streams.Pipe; -import io.vertx.core.streams.ReadStream; -import io.vertx.core.streams.WriteStream; -import io.vertx.ext.web.multipart.FormDataPart; -import io.vertx.ext.web.multipart.MultipartForm; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.Charset; - -/** - * A stream that sends a multipart form. - * - * @author Julien Viet - */ -public class MultipartFormUpload implements ReadStream { - - private static final Object END_SENTINEL = new Object(); - - private static final UnpooledByteBufAllocator ALLOC = new UnpooledByteBufAllocator(false); - - private DefaultFullHttpRequest request; - private HttpPostRequestEncoder encoder; - private Handler exceptionHandler; - private Handler dataHandler; - private Handler endHandler; - private final InboundMessageQueue pending; - private boolean writable; - private boolean ended; - private final ContextInternal context; - - public MultipartFormUpload(ContextInternal context, - MultipartForm parts, - boolean multipart, - HttpPostRequestEncoder.EncoderMode encoderMode) throws Exception { - this.context = context; - this.writable = true; - this.pending = new InboundMessageQueue<>(context.executor(), context.executor()) { - @Override - protected void handleResume() { - writable = true; - pump(); - } - @Override - protected void handlePause() { - writable = false; - } - @Override - protected void handleMessage(Object msg) { - handleChunk(msg); - } - }; - this.request = new DefaultFullHttpRequest( - HttpVersion.HTTP_1_1, - io.netty.handler.codec.http.HttpMethod.POST, - "/"); - parts.getCharset(); - Charset charset = parts.getCharset() != null ? parts.getCharset() : HttpConstants.DEFAULT_CHARSET; - this.encoder = new HttpPostRequestEncoder( - new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE, charset) { - @Override - public Attribute createAttribute(HttpRequest request, String name, String value) { - try { - return new MemoryAttribute(name, value, charset); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public FileUpload createFileUpload(HttpRequest request, String name, String filename, String contentType, String contentTransferEncoding, Charset _charset, long size) { - if (_charset == null) { - _charset = charset; - } - return super.createFileUpload(request, name, filename, contentType, contentTransferEncoding, _charset, size); - } - }, - request, - multipart, - charset, - encoderMode); - for (FormDataPart formDataPart : parts) { - if (formDataPart.isAttribute()) { - encoder.addBodyAttribute(formDataPart.name(), formDataPart.value()); - } else { - String pathname = formDataPart.pathname(); - if (pathname != null) { - encoder.addBodyFileUpload(formDataPart.name(), - formDataPart.filename(), new File(formDataPart.pathname()), - formDataPart.mediaType(), formDataPart.isText()); - } else { - String contentType = formDataPart.mediaType(); - if (formDataPart.mediaType() == null) { - if (formDataPart.isText()) { - contentType = "text/plain"; - } else { - contentType = "application/octet-stream"; - } - } - String transferEncoding = formDataPart.isText() ? null : "binary"; - MemoryFileUpload fileUpload = new MemoryFileUpload( - formDataPart.name(), - formDataPart.filename(), - contentType, transferEncoding, null, formDataPart.content().length()); - fileUpload.setContent(((BufferInternal)formDataPart.content()).getByteBuf()); - encoder.addBodyHttpData(fileUpload); - } - } - } - encoder.finalizeRequest(); - } - - private void handleChunk(Object item) { - Handler handler; - synchronized (MultipartFormUpload.this) { - if (item instanceof Buffer) { - handler = dataHandler; - } else if (item instanceof Throwable) { - handler = exceptionHandler; - } else if (item == END_SENTINEL) { - handler = endHandler; - item = null; - } else { - return; - } - } - handler.handle(item); - } - - public void pump() { - if (!context.inThread()) { - throw new IllegalArgumentException(); - } - while (!ended) { - if (encoder.isChunked()) { - try { - HttpContent chunk = encoder.readChunk(ALLOC); - ByteBuf content = chunk.content(); - Buffer buff = BufferInternal.buffer(content); - pending.write(buff); - if (encoder.isEndOfInput()) { - ended = true; - request = null; - encoder = null; - pending.write(END_SENTINEL); - } else if (!writable) { - break; - } - } catch (Exception e) { - ended = true; - request = null; - encoder = null; - pending.write(e); - break; - } - } else { - ByteBuf content = request.content(); - Buffer buffer = BufferInternal.buffer(content); - request = null; - encoder = null; - pending.write(buffer); - ended = true; - pending.write(END_SENTINEL); - } - } - } - - public MultiMap headers() { - return HttpHeadersInternal.headers(request.headers()); - } - - @Override - public synchronized MultipartFormUpload exceptionHandler(Handler handler) { - exceptionHandler = handler; - return this; - } - - @Override - public synchronized MultipartFormUpload handler(Handler handler) { - dataHandler = handler; - return this; - } - - @Override - public synchronized MultipartFormUpload pause() { - pending.pause(); - return this; - } - - @Override - public ReadStream fetch(long amount) { - pending.fetch(amount); - return this; - } - - @Override - public synchronized MultipartFormUpload resume() { - pending.fetch(Long.MAX_VALUE); - return this; - } - - @Override - public synchronized MultipartFormUpload endHandler(Handler handler) { - endHandler = handler; - return this; - } - - @Override - public Pipe pipe() { - Pipe pipe = ReadStream.super.pipe(); - return new Pipe<>() { - @Override - public Pipe endOnFailure(boolean end) { - pipe.endOnFailure(end); - return this; - } - @Override - public Pipe endOnSuccess(boolean end) { - pipe.endOnSuccess(end); - return this; - } - @Override - public Pipe endOnComplete(boolean end) { - pipe.endOnComplete(end); - return this; - } - @Override - public Future to(WriteStream dst) { - Future f = pipe.to(dst); - pump(); - return f; - } - @Override - public void close() { - pipe.close(); - } - }; - } -} diff --git a/vertx-web-client/src/test/java/io/vertx/ext/web/client/tests/MultipartFormUploadTest.java b/vertx-web-client/src/test/java/io/vertx/ext/web/client/tests/MultipartFormUploadTest.java deleted file mode 100644 index 7be10548d6..0000000000 --- a/vertx-web-client/src/test/java/io/vertx/ext/web/client/tests/MultipartFormUploadTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright 2014 Red Hat, Inc. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Apache License v2.0 which accompanies this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * The Apache License v2.0 is available at - * http://www.opensource.org/licenses/apache2.0.php - * - * You may elect to redistribute this code under either of these licenses. - */ -package io.vertx.ext.web.client.tests; - -import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder; -import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.internal.ContextInternal; -import io.vertx.core.internal.VertxInternal; -import io.vertx.ext.unit.Async; -import io.vertx.ext.unit.TestContext; -import io.vertx.ext.unit.junit.VertxUnitRunner; -import io.vertx.ext.web.client.impl.MultipartFormUpload; -import io.vertx.ext.web.multipart.MultipartForm; -import io.vertx.test.core.TestUtils; -import org.junit.*; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; - -import java.io.File; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assume.assumeTrue; - -@RunWith(VertxUnitRunner.class) -public class MultipartFormUploadTest { - - @ClassRule - public static TemporaryFolder testFolder = new TemporaryFolder(); - - private VertxInternal vertx; - - @Before - public void setUp() throws Exception { - vertx = (VertxInternal) Vertx.vertx(); - } - - @After - public void tearDown(TestContext ctx) { - vertx.close().onComplete(ctx.asyncAssertSuccess()); - } - - @Test - public void testSimpleAttribute(TestContext ctx) throws Exception { - Async async = ctx.async(); - Buffer result = Buffer.buffer(); - ContextInternal context = vertx.getOrCreateContext(); - MultipartFormUpload upload = new MultipartFormUpload(context, MultipartForm.create().attribute("foo", "bar"), false, HttpPostRequestEncoder.EncoderMode.RFC1738); - upload.endHandler(v -> { - assertEquals("foo=bar", result.toString()); - async.complete(); - }); - upload.handler(result::appendBuffer); - upload.resume(); - context.runOnContext(v -> upload.pump()); - } - - @Test - public void testFileUploadEventLoopContext(TestContext ctx) throws Exception { - testFileUpload(ctx, vertx.createEventLoopContext(), false); - } - - @Test - public void testFileUploadWorkerContext(TestContext ctx) throws Exception { - testFileUpload(ctx, vertx.createWorkerContext(), false); - } - - @Test - public void testFileUploadVirtualThreadContext(TestContext ctx) throws Exception { - assumeTrue(vertx.isVirtualThreadAvailable()); - testFileUpload(ctx, vertx.createVirtualThreadContext(), false); - } - - @Test - public void testFileUploadPausedEventLoopContext(TestContext ctx) throws Exception { - testFileUpload(ctx, vertx.createEventLoopContext(), true); - } - - @Test - public void testFileUploadPausedWorkerContext(TestContext ctx) throws Exception { - testFileUpload(ctx, vertx.createWorkerContext(), true); - } - - @Test - public void testFileUploadPausedVirtualThreadContext(TestContext ctx) throws Exception { - assumeTrue(vertx.isVirtualThreadAvailable()); - testFileUpload(ctx, vertx.createVirtualThreadContext(), true); - } - - private void testFileUpload(TestContext testContext, ContextInternal context, boolean paused) throws Exception { - File file = testFolder.newFile(); - Files.write(file.toPath(), TestUtils.randomByteArray(32 * 1024)); - - String filename = file.getName(); - String pathname = file.getAbsolutePath(); - - Async async = testContext.async(); - context.runOnContext(v1 -> { - try { - MultipartFormUpload upload = new MultipartFormUpload(context, MultipartForm.create().textFileUpload( - "the-file", - filename, - pathname, - "text/plain"), true, HttpPostRequestEncoder.EncoderMode.RFC1738); - List buffers = Collections.synchronizedList(new ArrayList<>()); - AtomicInteger end = new AtomicInteger(); - upload.endHandler(v2 -> { - assertEquals(0, end.getAndIncrement()); - testContext.assertFalse(buffers.isEmpty()); - async.complete(); - }); - upload.handler(buffer -> { - assertEquals(0, end.get()); - buffers.add(buffer); - }); - if (!paused) { - upload.resume(); - } - upload.pump(); - if (paused) { - context.runOnContext(v3 -> upload.resume()); - } - } catch (Exception e) { - testContext.fail(e); - throw new AssertionError(e); - } - }); - } -} diff --git a/vertx-web-client/src/test/java/io/vertx/ext/web/client/tests/WebClientTest.java b/vertx-web-client/src/test/java/io/vertx/ext/web/client/tests/WebClientTest.java index a90c9ca48e..6cdf7d01c2 100644 --- a/vertx-web-client/src/test/java/io/vertx/ext/web/client/tests/WebClientTest.java +++ b/vertx-web-client/src/test/java/io/vertx/ext/web/client/tests/WebClientTest.java @@ -1,6 +1,5 @@ package io.vertx.ext.web.client.tests; -import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder; import io.vertx.core.*; import io.vertx.core.buffer.Buffer; import io.vertx.core.file.AsyncFile; @@ -36,10 +35,10 @@ import io.vertx.test.fakeresolver.FakeAddress; import io.vertx.test.fakeresolver.FakeEndpointResolver; import io.vertx.test.tls.Cert; +import org.junit.Ignore; import org.junit.Test; import java.io.File; -import java.io.FileNotFoundException; import java.net.ConnectException; import java.net.MalformedURLException; import java.net.URLDecoder; @@ -1498,8 +1497,6 @@ public void testFileUploadWhenFileDoesNotExist() { .textFileUpload("file", "nonexistentFilename", "nonexistentPathname", "text/plain"); builder.sendMultipartForm(form).onComplete(onFailure(err -> { - assertEquals(err.getClass(), HttpPostRequestEncoder.ErrorDataEncoderException.class); - assertEquals(err.getCause().getClass(), FileNotFoundException.class); complete(); })); await(); @@ -1515,7 +1512,6 @@ public void testFileUploads() throws Exception { MultipartForm form = MultipartForm.create() .textFileUpload("file", "nonexistentFilename", "nonexistentPathname", "text/plain"); builder.sendMultipartForm(form).onComplete(onFailure(err -> { - assertEquals(err.getClass(), HttpPostRequestEncoder.ErrorDataEncoderException.class); complete(); })); await();