--- Revision None +++ Revision 353765306266 @@ -0,0 +1,216 @@ +diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl +index c35bb91..0a99845 100644 +--- a/src/couchdb/couch_db.erl ++++ b/src/couchdb/couch_db.erl +@@ -24,7 +24,7 @@ + -export([start_link/3,make_doc/2,set_admins/2,get_admins/1,ensure_full_commit/1]). + -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). + +- ++-export([write_streamed_attachment/4]). + -include("couch_db.hrl"). + + +@@ -400,8 +400,14 @@ doc_flush_binaries(Doc, Fd) -> + % written to a different file + SizeAcc + Len; + {_Key, {_Type, Bin}} when is_binary(Bin) -> ++ % we have a new binary to write + SizeAcc + size(Bin); ++ {_Key, {_Type, {Fun, undefined}}} when is_function(Fun) -> ++ % function without a known length ++ % we'll have to alloc as we go with this one, for now, nothing ++ SizeAcc; + {_Key, {_Type, {Fun, Len}}} when is_function(Fun) -> ++ % function to yield binary data with known length + SizeAcc + Len + end + end, +@@ -436,6 +442,16 @@ doc_flush_binaries(Doc, Fd) -> + Bin when is_binary(Bin) -> + {ok, StreamPointer} = couch_stream:write(OutputStream, Bin), + {Fd, StreamPointer, size(Bin)}; ++ {StreamFun, undefined} when is_function(StreamFun) -> ++ % StreamFun(MaxChunkSize, WriterFun) ++ % will call our WriterFun ++ % once for each chunk of the attachment. ++ WriterFun = make_writer_fun(OutputStream), ++ MaxChunkSize = 4294967296, % TODO config ++ % couch_config:get("couchdb", "max_document_size", "4294967296") ++ {ok, {TotalLength, NewStreamPointer}} = ++ StreamFun(MaxChunkSize, WriterFun, {0, nil}), ++ {Fd, NewStreamPointer, TotalLength}; + {Fun, Len} when is_function(Fun) -> + {ok, StreamPointer} = + write_streamed_attachment(OutputStream, Fun, Len, nil), +@@ -445,8 +461,26 @@ doc_flush_binaries(Doc, Fd) -> + end, Bins), + + {ok, _FinalPos} = couch_stream:close(OutputStream), +- + Doc#doc{attachments = NewBins}. ++ ++% WriterFun({Length, Binary}, State) ++% WriterFun({0, _Footers}, State) ++% Called with Length == 0 on the last time. ++% WriterFun returns NewState. ++make_writer_fun(Stream) -> ++ fun ++ ({0, _Footers}, {FinalLen, SpFin}) -> ++ % last block, return the final tuple ++ {ok, {FinalLen, SpFin}}; ++ ({Length, Bin}, {Total, nil}) -> ++ % save StreamPointer ++ {ok, StreamPointer} = couch_stream:write(Stream, Bin), ++ {Total+Length, StreamPointer}; ++ ({Length, Bin}, {Total, SpAcc}) -> ++ % write the Bin to disk ++ {ok, _Sp} = couch_stream:write(Stream, Bin), ++ {Total+Length, SpAcc} ++ end. + + write_streamed_attachment(_Stream, _F, 0, SpAcc) -> + {ok, SpAcc}; +diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl +index 6b9079c..3447e22 100644 +--- a/src/couchdb/couch_httpd.erl ++++ b/src/couchdb/couch_httpd.erl +@@ -16,7 +16,7 @@ + -export([start_link/0, stop/0, handle_request/3]). + + -export([header_value/2,header_value/3,qs_value/2,qs_value/3,qs/1,path/1,absolute_uri/2]). +--export([verify_is_server_admin/1,unquote/1,quote/1,recv/2]). ++-export([verify_is_server_admin/1,unquote/1,quote/1,recv/2,recv_chunked/4]). + -export([parse_form/1,json_body/1,body/1,doc_etag/1, make_etag/1, etag_respond/3]). + -export([primary_header_value/2,partition/1,serve_file/3]). + -export([start_chunked_response/3,send_chunk/2]). +@@ -260,6 +260,12 @@ parse_form(#httpd{mochi_req=MochiReq}) -> + recv(#httpd{mochi_req=MochiReq}, Len) -> + MochiReq:recv(Len). + ++recv_chunked(#httpd{mochi_req=MochiReq}, MaxChunkSize, ChunkFun, InitState) -> ++ % Fun is called once with each chunk ++ % Fun({Length, Binary}) ++ % called with Length == 0 on the last time. ++ MochiReq:recv_body(MaxChunkSize, ChunkFun, InitState). ++ + body(#httpd{mochi_req=MochiReq}) -> + % Maximum size of document PUT request body (4GB) + MaxSize = list_to_integer( +diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl +index ae92512..df88fe9 100644 +--- a/src/couchdb/couch_httpd_db.erl ++++ b/src/couchdb/couch_httpd_db.erl +@@ -591,10 +591,24 @@ db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts) + []; + _ -> + [{FileName, { +- list_to_binary(couch_httpd:header_value(Req,"Content-Type")), ++ case couch_httpd:header_value(Req,"Content-Type") of ++ undefined -> ++ % we could throw an error here or guess by the name ++ % currently, just giving it a default ++ <<"application/octet-stream">>; ++ CType -> ++ list_to_binary(CType) ++ end, + case couch_httpd:header_value(Req,"Content-Length") of + undefined -> +- throw({bad_request, "Attachment uploads must be fixed length"}); ++ {fun(MaxChunkSize, ChunkFun, InitState) -> ++ % ChunkFun is called once with each chunk ++ % ChunkFun({Length, Binary}, State) ++ % called with Length == 0 on the last time. ++ % returns NewState ++ couch_httpd:recv_chunked(Req, MaxChunkSize, ++ ChunkFun, InitState) ++ end, undefined}; + Length -> + {fun() -> couch_httpd:recv(Req, 0) end, list_to_integer(Length)} + end +diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl +index d957268..d6f7269 100644 +--- a/src/couchdb/couch_stream.erl ++++ b/src/couchdb/couch_stream.erl +@@ -136,6 +136,7 @@ handle_call(get_state, _From, Stream) -> + {reply, {Pos, BytesRemaining}, Stream}; + handle_call({set_min_buffer, MinBuffer}, _From, Stream) -> + {reply, ok, Stream#write_stream{min_alloc = MinBuffer}}; ++% set next_alloc if we need more room + handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) -> + #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream, + case BytesRemainingInCurrentBuffer < BufferSizeRequested of +diff --git a/src/mochiweb/mochiweb_request.erl b/src/mochiweb/mochiweb_request.erl +index 311ed50..095b251 100644 +--- a/src/mochiweb/mochiweb_request.erl ++++ b/src/mochiweb/mochiweb_request.erl +@@ -12,7 +12,7 @@ + -define(READ_SIZE, 8192). + + -export([get_header_value/1, get_primary_header_value/1, get/1, dump/0]). +--export([send/1, recv/1, recv/2, recv_body/0, recv_body/1]). ++-export([send/1, recv/1, recv/2, recv_body/0, recv_body/1, recv_body/3]). + -export([start_response/1, start_response_length/1, start_raw_response/1]). + -export([respond/1, ok/1]). + -export([not_found/0, not_found/1]). +@@ -171,6 +171,9 @@ recv_body() -> + %% @doc Receive the body of the HTTP request (defined by Content-Length). + %% Will receive up to MaxBody bytes. + recv_body(MaxBody) -> ++ recv_body(MaxBody, nil, nil). ++ ++recv_body(MaxBody, ChunkFun, ChunkAcc) -> + case get_header_value("expect") of + "100-continue" -> + start_raw_response({100, gb_trees:empty()}); +@@ -183,7 +186,15 @@ recv_body(MaxBody) -> + {unknown_transfer_encoding, Unknown} -> + exit({unknown_transfer_encoding, Unknown}); + chunked -> +- read_chunked_body(MaxBody, []); ++ case ChunkFun of ++ nil -> ++ read_chunked_body(MaxBody); ++ _StreamFun -> ++ % In this case the MaxBody is actually used to ++ % determine the maximum allowed size of a single ++ % chunk. ++ stream_chunked_body(MaxBody, ChunkFun, ChunkAcc) ++ end; + 0 -> + <<>>; + Length when is_integer(Length), Length =< MaxBody -> +@@ -408,15 +419,27 @@ parse_post() -> + Cached + end. + +-read_chunked_body(Max, Acc) -> ++read_chunked_body(MaxBufferSize) -> ++ stream_chunked_body(MaxBufferSize, fun ++ ({0, _}, Acc) -> ++ iolist_to_binary(lists:reverse(Acc)); ++ ({Length, Bin}, Acc) -> ++ [Bin | Acc] ++ end, []). ++ ++% takes a function and the max amount to bite off in each call. ++% the function gets called with each chunk. ++% used internally by read_chunked_body/2 ++stream_chunked_body(MaxChunkSize, Fun, FunState) -> ++ io:format("stream_chunked_body~n",[]), + case read_chunk_length() of + 0 -> +- read_chunk(0), +- iolist_to_binary(lists:reverse(Acc)); +- Length when Length > Max -> ++ Result = Fun({0, read_chunk(0)}, FunState); ++ Length when Length > MaxChunkSize -> + exit({body_too_large, chunked}); + Length -> +- read_chunked_body(Max - Length, [read_chunk(Length) | Acc]) ++ NewState = Fun({Length, read_chunk(Length)}, FunState), ++ stream_chunked_body(MaxChunkSize, Fun, NewState) + end. + + %% @spec read_chunk_length() -> integer() +