No title Revision 353765306266 (Mon Oct 03 2011 at 03:19) - Diff Link to this snippet: https://friendpaste.com/rhBaKupuywEZT1iHtR60Y Embed: manni perldoc borland colorful default murphy trac fruity autumn bw emacs pastie friendly Show line numbers Wrap lines 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erlindex c35bb91..0a99845 100644--- a/src/couchdb/couch_db.erl+++ b/src/couchdb/couch_db.erl@@ -24,7 +24,7 @@ -export([start_link/3,make_doc/2,set_admins/2,get_admins/1,ensure_full_commit/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -+-export([write_streamed_attachment/4]). -include("couch_db.hrl"). @@ -400,8 +400,14 @@ doc_flush_binaries(Doc, Fd) -> % written to a different file SizeAcc + Len; {_Key, {_Type, Bin}} when is_binary(Bin) ->+ % we have a new binary to write SizeAcc + size(Bin);+ {_Key, {_Type, {Fun, undefined}}} when is_function(Fun) ->+ % function without a known length+ % we'll have to alloc as we go with this one, for now, nothing+ SizeAcc; {_Key, {_Type, {Fun, Len}}} when is_function(Fun) ->+ % function to yield binary data with known length SizeAcc + Len end end,@@ -436,6 +442,16 @@ doc_flush_binaries(Doc, Fd) -> Bin when is_binary(Bin) -> {ok, StreamPointer} = couch_stream:write(OutputStream, Bin), {Fd, StreamPointer, size(Bin)};+ {StreamFun, undefined} when is_function(StreamFun) ->+ % StreamFun(MaxChunkSize, WriterFun) + % will call our WriterFun+ % once for each chunk of the attachment.+ WriterFun = make_writer_fun(OutputStream),+ MaxChunkSize = 4294967296, % TODO config+ % couch_config:get("couchdb", "max_document_size", "4294967296")+ {ok, {TotalLength, NewStreamPointer}} = + StreamFun(MaxChunkSize, WriterFun, {0, nil}),+ {Fd, NewStreamPointer, TotalLength}; {Fun, Len} when is_function(Fun) -> {ok, StreamPointer} = write_streamed_attachment(OutputStream, Fun, Len, nil),@@ -445,8 +461,26 @@ doc_flush_binaries(Doc, Fd) -> end, Bins), {ok, _FinalPos} = couch_stream:close(OutputStream),- Doc#doc{attachments = NewBins}.++% WriterFun({Length, Binary}, State)+% WriterFun({0, _Footers}, State)+% Called with Length == 0 on the last time.+% WriterFun returns NewState.+make_writer_fun(Stream) ->+ fun+ ({0, _Footers}, {FinalLen, SpFin}) ->+ % last block, return the final tuple+ {ok, {FinalLen, SpFin}};+ ({Length, Bin}, {Total, nil}) ->+ % save StreamPointer + {ok, StreamPointer} = couch_stream:write(Stream, Bin),+ {Total+Length, StreamPointer};+ ({Length, Bin}, {Total, SpAcc}) ->+ % write the Bin to disk + {ok, _Sp} = couch_stream:write(Stream, Bin),+ {Total+Length, SpAcc}+ end. write_streamed_attachment(_Stream, _F, 0, SpAcc) -> {ok, SpAcc};diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erlindex 6b9079c..3447e22 100644--- a/src/couchdb/couch_httpd.erl+++ b/src/couchdb/couch_httpd.erl@@ -16,7 +16,7 @@ -export([start_link/0, stop/0, handle_request/3]). -export([header_value/2,header_value/3,qs_value/2,qs_value/3,qs/1,path/1,absolute_uri/2]).--export([verify_is_server_admin/1,unquote/1,quote/1,recv/2]).+-export([verify_is_server_admin/1,unquote/1,quote/1,recv/2,recv_chunked/4]). -export([parse_form/1,json_body/1,body/1,doc_etag/1, make_etag/1, etag_respond/3]). -export([primary_header_value/2,partition/1,serve_file/3]). -export([start_chunked_response/3,send_chunk/2]).@@ -260,6 +260,12 @@ parse_form(#httpd{mochi_req=MochiReq}) -> recv(#httpd{mochi_req=MochiReq}, Len) -> MochiReq:recv(Len). +recv_chunked(#httpd{mochi_req=MochiReq}, MaxChunkSize, ChunkFun, InitState) ->+ % Fun is called once with each chunk+ % Fun({Length, Binary})+ % called with Length == 0 on the last time.+ MochiReq:recv_body(MaxChunkSize, ChunkFun, InitState).+ body(#httpd{mochi_req=MochiReq}) -> % Maximum size of document PUT request body (4GB) MaxSize = list_to_integer(diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erlindex ae92512..df88fe9 100644--- a/src/couchdb/couch_httpd_db.erl+++ b/src/couchdb/couch_httpd_db.erl@@ -591,10 +591,24 @@ db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts) []; _ -> [{FileName, {- list_to_binary(couch_httpd:header_value(Req,"Content-Type")),+ case couch_httpd:header_value(Req,"Content-Type") of+ undefined ->+ % we could throw an error here or guess by the name+ % currently, just giving it a default+ <<"application/octet-stream">>;+ CType ->+ list_to_binary(CType)+ end, case couch_httpd:header_value(Req,"Content-Length") of undefined -> - throw({bad_request, "Attachment uploads must be fixed length"});+ {fun(MaxChunkSize, ChunkFun, InitState) -> + % ChunkFun is called once with each chunk+ % ChunkFun({Length, Binary}, State)+ % called with Length == 0 on the last time.+ % returns NewState+ couch_httpd:recv_chunked(Req, MaxChunkSize, + ChunkFun, InitState) + end, undefined}; Length -> {fun() -> couch_httpd:recv(Req, 0) end, list_to_integer(Length)} enddiff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erlindex d957268..d6f7269 100644--- a/src/couchdb/couch_stream.erl+++ b/src/couchdb/couch_stream.erl@@ -136,6 +136,7 @@ handle_call(get_state, _From, Stream) -> {reply, {Pos, BytesRemaining}, Stream}; handle_call({set_min_buffer, MinBuffer}, _From, Stream) -> {reply, ok, Stream#write_stream{min_alloc = MinBuffer}};+% set next_alloc if we need more room handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) -> #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream, case BytesRemainingInCurrentBuffer < BufferSizeRequested ofdiff --git a/src/mochiweb/mochiweb_request.erl b/src/mochiweb/mochiweb_request.erlindex 311ed50..095b251 100644--- a/src/mochiweb/mochiweb_request.erl+++ b/src/mochiweb/mochiweb_request.erl@@ -12,7 +12,7 @@ -define(READ_SIZE, 8192). -export([get_header_value/1, get_primary_header_value/1, get/1, dump/0]).--export([send/1, recv/1, recv/2, recv_body/0, recv_body/1]).+-export([send/1, recv/1, recv/2, recv_body/0, recv_body/1, recv_body/3]). -export([start_response/1, start_response_length/1, start_raw_response/1]). -export([respond/1, ok/1]). -export([not_found/0, not_found/1]).@@ -171,6 +171,9 @@ recv_body() -> %% @doc Receive the body of the HTTP request (defined by Content-Length). %% Will receive up to MaxBody bytes. recv_body(MaxBody) ->+ recv_body(MaxBody, nil, nil).++recv_body(MaxBody, ChunkFun, ChunkAcc) -> case get_header_value("expect") of "100-continue" -> start_raw_response({100, gb_trees:empty()});@@ -183,7 +186,15 @@ recv_body(MaxBody) -> {unknown_transfer_encoding, Unknown} -> exit({unknown_transfer_encoding, Unknown}); chunked ->- read_chunked_body(MaxBody, []);+ case ChunkFun of+ nil ->+ read_chunked_body(MaxBody);+ _StreamFun ->+ % In this case the MaxBody is actually used to+ % determine the maximum allowed size of a single+ % chunk.+ stream_chunked_body(MaxBody, ChunkFun, ChunkAcc)+ end; 0 -> <<>>; Length when is_integer(Length), Length =< MaxBody ->@@ -408,15 +419,27 @@ parse_post() -> Cached end. -read_chunked_body(Max, Acc) ->+read_chunked_body(MaxBufferSize) ->+ stream_chunked_body(MaxBufferSize, fun+ ({0, _}, Acc) ->+ iolist_to_binary(lists:reverse(Acc));+ ({Length, Bin}, Acc) ->+ [Bin | Acc]+ end, []).++% takes a function and the max amount to bite off in each call.+% the function gets called with each chunk.+% used internally by read_chunked_body/2+stream_chunked_body(MaxChunkSize, Fun, FunState) ->+ io:format("stream_chunked_body~n",[]), case read_chunk_length() of 0 ->- read_chunk(0),- iolist_to_binary(lists:reverse(Acc));- Length when Length > Max ->+ Result = Fun({0, read_chunk(0)}, FunState);+ Length when Length > MaxChunkSize -> exit({body_too_large, chunked}); Length ->- read_chunked_body(Max - Length, [read_chunk(Length) | Acc])+ NewState = Fun({Length, read_chunk(Length)}, FunState),+ stream_chunked_body(MaxChunkSize, Fun, NewState) end. %% @spec read_chunk_length() -> integer()