No title Revision 88875eba5126 (Tue Dec 16 2008 at 17:22) - Diff Link to this snippet: https://friendpaste.com/7b3OcX1RcrOHKlMefYxdJh Embed: manni perldoc borland colorful default murphy trac fruity autumn bw emacs pastie friendly Show line numbers Wrap lines 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491Index: src/couchdb/couch_httpd_view.erl===================================================================--- src/couchdb/couch_httpd_view.erl (revision 724788)+++ src/couchdb/couch_httpd_view.erl (working copy)@@ -22,18 +22,19 @@ start_json_response/2,send_chunk/2,end_json_response/1]). design_doc_view(Req, Db, Id, ViewName, Keys) ->+ #view_query_args{+ update = Update,+ reduce = Reduce+ } = QueryArgs = parse_view_query(Req, Keys), case couch_view:get_map_view({couch_db:name(Db), - <<"_design/", Id/binary>>, ViewName}) of+ <<"_design/", Id/binary>>, ViewName, Update}) of {ok, View} -> - QueryArgs = parse_view_query(Req, Keys), output_map_view(Req, View, Db, QueryArgs, Keys); {not_found, Reason} -> case couch_view:get_reduce_view({couch_db:name(Db), <<"_design/", Id/binary>>, ViewName}) of {ok, View} ->- #view_query_args{- reduce = Reduce- } = QueryArgs = parse_view_query(Req, Keys, true),+ parse_view_query(Req, Keys, true), % just for validation case Reduce of false -> {reduce, _N, _Lang, MapView} = View,Index: src/couchdb/couch_view_group.erl===================================================================--- src/couchdb/couch_view_group.erl (revision 0)+++ src/couchdb/couch_view_group.erl (revision 0)@@ -0,0 +1,192 @@+% Licensed under the Apache License, Version 2.0 (the "License"); you may not+% use this file except in compliance with the License. You may obtain a copy of+% the License at+%+% http://www.apache.org/licenses/LICENSE-2.0+%+% Unless required by applicable law or agreed to in writing, software+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the+% License for the specific language governing permissions and limitations under+% the License.++-module(couch_view_group).+-behaviour(gen_server).++%% API+-export([start_link/1, request_group/2]).+% -export([design_doc_to_view_group/1]).++%% gen_server callbacks+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,+ terminate/2, code_change/3]).++-include("couch_db.hrl").+ +-record(group_state, {+ spawn_fun,+ target_seq=0,+ group_seq=0,+ group=nil,+ updater_pid=nil,+ waiting_list=[]+}).++% api methods+request_group(Pid, Seq) ->+ ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]),+ case gen_server:call(Pid, {request_group, Seq}, infinity) of+ {ok, Group} ->+ ?LOG_DEBUG("get_updated_group replied with group", []),+ {ok, Group};+ Else ->+ ?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]),+ Else+ end.+++% from template+start_link(InitArgs) ->+ gen_server:start_link(couch_view_group, InitArgs, []).++% init differentiates between temp and design_doc views. It creates a closure+% which spawns the appropriate view_updater. (It might also spawn the first+% view_updater run.)+init(InitArgs) ->+ SpawnFun = fun() -> spawn_updater(InitArgs) end,+ process_flag(trap_exit, true),+ {ok, #group_state{spawn_fun=SpawnFun}}.++% There are two sources of messages: couch_view, which requests an up to date+% view group, and the couch_view_updater, which when spawned, updates the+% group and sends it back here. We employ a caching mechanism, so that between+% database writes, we don't have to spawn a couch_view_updater with every view+% request. This should give us more control, and the ability to request view+% statuses eventually.++% The caching mechanism: each request is submitted with a seq_id for the+% database at the time it was read. We guarantee to return a view from that+% sequence or newer.++% If the request sequence is higher than our current high_target seq, we set+% that as the highest seqence. If the updater is not running, we launch it.++handle_call({request_group, RequestSeq}, From, + #group_state{+ target_seq=TargetSeq, + spawn_fun=SpawnFun,+ updater_pid=Up,+ waiting_list=WaitList+ }=State) when RequestSeq > TargetSeq, Up == nil -> + UpdaterPid = SpawnFun(),+ {noreply, State#group_state{+ target_seq=RequestSeq, + updater_pid=UpdaterPid,+ waiting_list=[{From,RequestSeq}|WaitList]+ }, infinity};++handle_call({request_group, RequestSeq}, From, + #group_state{+ target_seq=TargetSeq,+ waiting_list=WaitList+ }=State) when RequestSeq > TargetSeq ->+ {noreply, State#group_state{+ target_seq=RequestSeq, + waiting_list=[{From,RequestSeq}|WaitList]+ }, infinity};+ ++% If the request seqence is less than or equal to the seq_id of a known Group,+% we respond with that Group.+handle_call({request_group, RequestSeq}, _From, + State=#group_state{+ group_seq=GroupSeq,+ group=Group + }) when RequestSeq =< GroupSeq ->+ {reply, {ok, Group}, State};++% Otherwise: TargetSeq => RequestSeq > GroupSeq+% We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq+handle_call({request_group, RequestSeq}, From,+ #group_state{+ waiting_list=WaitList+ }=State) ->+ {noreply, State#group_state{+ waiting_list=[{From, RequestSeq}|WaitList]+ }, infinity}.+++% When the updater finishes, it will return a group with a seq_id, we should+% store that group and seq_id in our state. If our high_target is higher than+% the returned group, start a new updater.++handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}}, + State=#group_state{+ target_seq=TargetSeq, + waiting_list=WaitList,+ spawn_fun=SpawnFun}) when TargetSeq > NewGroupSeq ->+ StillWaiting = reply_with_group(Group, WaitList, []),+ UpdaterPid = SpawnFun(),+ {noreply, State#group_state{ + updater_pid=UpdaterPid,+ waiting_list=StillWaiting,+ group_seq=NewGroupSeq,+ group=Group}};+ +handle_cast({new_group, Group=#group{current_seq=NewGroupSeq}},+ State=#group_state{waiting_list=WaitList}) ->+ StillWaiting = reply_with_group(Group, WaitList, []),+ {noreply, State#group_state{+ updater_pid=nil,+ waiting_list=StillWaiting,+ group_seq=NewGroupSeq,+ group=Group}}.+ +handle_info({'EXIT', _FromPid, normal}, State) ->+ {noreply, State};+ +handle_info({'EXIT', FromPid, Reason}, State) ->+ ?LOG_DEBUG("Exit from updater: ~p", [{FromPid, Reason}]),+ {stop, Reason, State};+ +handle_info(_Info, State) ->+ {noreply, State}.++terminate(Reason, _State=#group_state{waiting_list=WaitList}) ->+ lists:foreach(fun({Waiter, _}) -> gen_server:reply(Waiter, {error, Reason}) end, WaitList), + ok.++code_change(_OldVsn, State, _Extra) ->+ {ok, State}.++% error handling? the updater could die on us, we can save ourselves here.+% but we shouldn't, we could be dead for a reason, like the view got changed, or something.+++%% Local Functions++% reply_with_group/3+% for each item in the WaitingList {Pid, Seq}+% if the Seq is =< GroupSeq, reply+reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], StillWaiting) when Seq =< GroupSeq ->+ gen_server:reply(Pid, {ok, Group}),+ reply_with_group(Group, WaitList, StillWaiting);++% else+% put it in the continuing waiting list +reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) ->+ reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting]);++% return the still waiting list+reply_with_group(_Group, [], StillWaiting) ->+ StillWaiting.++spawn_updater({RootDir, DbName, GroupId}) -> + spawn_link(couch_view_updater, update,+ [RootDir, DbName, GroupId, self()]);++spawn_updater({DbName, Fd, Lang, MapSrc, RedSrc}) ->+ spawn_link(couch_view_updater, temp_update,+ [DbName, Fd, Lang, MapSrc, RedSrc, self()]).+ +Index: src/couchdb/couch_db_updater.erl===================================================================--- src/couchdb/couch_db_updater.erl (revision 724788)+++ src/couchdb/couch_db_updater.erl (working copy)@@ -129,7 +129,7 @@ Db#db{ fulldocinfo_by_id_btree = DocInfoByIdBTree2, docinfo_by_seq_btree = DocInfoBySeqBTree2,- update_seq = NewSeq,+ update_seq = NewSeq + 1, header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}), ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),Index: src/couchdb/couch_httpd.erl===================================================================--- src/couchdb/couch_httpd.erl (revision 724788)+++ src/couchdb/couch_httpd.erl (working copy)@@ -261,7 +261,7 @@ [User, Pass] -> {User, Pass}; [User] ->- {User, <<"">>};+ {User, ""}; _ -> nil end;Index: src/couchdb/couch_db.erl===================================================================--- src/couchdb/couch_db.erl (revision 724788)+++ src/couchdb/couch_db.erl (working copy)@@ -17,7 +17,7 @@ -export([open_ref_counted/3,num_refs/1,monitor/1]). -export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).--export([get_missing_revs/2,name/1,doc_to_tree/1]).+-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1]). -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).@@ -145,6 +145,9 @@ gen_server:call(UpdatePid, {purge_docs, IdsRevs}). +get_update_seq(#db{header=#db_header{update_seq=Seq}})->+ Seq.+ get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})-> PurgeSeq. Index: src/couchdb/couch_db.hrl===================================================================--- src/couchdb/couch_db.hrl (revision 724788)+++ src/couchdb/couch_db.hrl (working copy)@@ -88,7 +88,7 @@ -record(user_ctx,- {name=nil,+ {name=null, roles=[] }). @@ -152,6 +152,41 @@ include_docs = false }). +-record(group,+ {sig=nil,+ db=nil,+ fd=nil,+ name,+ def_lang,+ views,+ id_btree=nil,+ current_seq=0,+ purge_seq=0,+ query_server=nil+ }). +-record(view,+ {id_num,+ map_names=[],+ def,+ btree=nil,+ reduce_funs=[]+ }).++-record(server,{+ root_dir = [],+ dbname_regexp,+ max_dbs_open=100,+ current_dbs_open=0,+ start_time=""+ }).++-record(index_header,+ {seq=0,+ purge_seq=0,+ id_btree_state=nil,+ view_states=nil+ }).+ % small value used in revision trees to indicate the revision isn't stored -define(REV_MISSING, []).Index: src/couchdb/couch_view_updater.erl===================================================================--- src/couchdb/couch_view_updater.erl (revision 0)+++ src/couchdb/couch_view_updater.erl (revision 0)@@ -0,0 +1,385 @@+% Licensed under the Apache License, Version 2.0 (the "License"); you may not+% use this file except in compliance with the License. You may obtain a copy of+% the License at+%+% http://www.apache.org/licenses/LICENSE-2.0+%+% Unless required by applicable law or agreed to in writing, software+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the+% License for the specific language governing permissions and limitations under+% the License.++-module(couch_view_updater).++-export([update/4, temp_update/6]).++-include("couch_db.hrl").++++update(RootDir, DbName, GroupId, NotifyPid) ->+ {ok, #group{sig=Sig,fd=Fd}=Group} = prepare_group(RootDir, DbName, GroupId),+ {ok, Db} = couch_db:open(DbName, []),+ Result = update_group(Group#group{db=Db}),+ ?LOG_DEBUG("update {Result} DONE ~p", [{Result}]), + couch_db:close(Db),+ case Result of+ {same, Group2} ->+ gen_server:cast(NotifyPid, {new_group, Group2});+ {updated, Group2} ->+ HeaderData = {Sig, get_index_header_data(Group2)},+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData),+ gen_server:cast(NotifyPid, {new_group, Group2})+ end,+ garbage_collect().+ +temp_update(DbName, Fd, Lang, MapSrc, RedSrc, NotifyPid) ->+ case couch_db:open(DbName, []) of+ {ok, Db} ->+ View = #view{map_names=["_temp"],+ id_num=0,+ btree=nil,+ def=MapSrc,+ reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},+ Group = #group{name="_temp",+ db=Db,+ views=[View],+ current_seq=0,+ def_lang=Lang,+ id_btree=nil},+ Group2 = init_group(Db, Fd, Group,nil),+ couch_db:monitor(Db),+ {_Updated, Group3} = update_group(Group2#group{db=Db}),+ couch_db:close(Db),+ gen_server:cast(NotifyPid, {new_group, Group3}),+ garbage_collect();+ Else ->+ exit(Else)+ end.+++update_group(#group{db=Db,current_seq=CurrentSeq}=Group) ->+ ViewEmptyKVs = [{View, []} || View <- Group#group.views],+ % compute on all docs modified since we last computed.+ {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}}+ = couch_db:enum_docs_since(+ Db,+ CurrentSeq,+ fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end,+ {[], Group, ViewEmptyKVs, []}+ ),+ {Group4, Results} = view_compute(Group3, UncomputedDocs),+ {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),+ couch_query_servers:stop_doc_map(Group4#group.query_server),+ NewSeq = couch_db:get_update_seq(Db),+ if CurrentSeq /= NewSeq ->+ {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq),+ {updated, Group5#group{query_server=nil}};+ true ->+ {same, Group4#group{query_server=nil}}+ end.+++get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, + id_btree=IdBtree,views=Views}) ->+ ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],+ #index_header{seq=Seq,+ purge_seq=PurgeSeq,+ id_btree_state=couch_btree:get_state(IdBtree),+ view_states=ViewStates}.++++purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->+ {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),+ Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],+ {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),++ % now populate the dictionary with all the keys to delete+ ViewKeysToRemoveDict = lists:foldl(+ fun({ok,{DocId,ViewNumRowKeys}}, ViewDictAcc) ->+ lists:foldl(+ fun({ViewNum, RowKey}, ViewDictAcc2) ->+ dict:append(ViewNum, {RowKey, DocId}, ViewDictAcc2)+ end, ViewDictAcc, ViewNumRowKeys);+ ({not_found, _}, ViewDictAcc) ->+ ViewDictAcc+ end, dict:new(), Lookups),++ % Now remove the values from the btrees+ Views2 = lists:map(+ fun(#view{id_num=Num,btree=Btree}=View) ->+ case dict:find(Num, ViewKeysToRemoveDict) of+ {ok, RemoveKeys} ->+ {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys),+ View#view{btree=Btree2};+ error -> % no keys to remove in this view+ View+ end+ end, Views),+ Group#group{id_btree=IdBtree2,+ views=Views2,+ purge_seq=couch_db:get_purge_seq(Db)}.++process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys}) ->+ % This fun computes once for each document + #doc_info{id=DocId, deleted=Deleted} = DocInfo,+ case DocId of+ GroupId ->+ % uh oh. this is the design doc with our definitions. See if+ % anything in the definition changed.+ case couch_db:open_doc(Db, DocInfo) of+ {ok, Doc} ->+ case design_doc_to_view_group(Doc) of+ #group{sig=Sig} ->+ % The same md5 signature, keep on computing+ {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};+ _ ->+ ?LOG_DEBUG("throw(restart) md5 broke ~p", [DocId]),+ throw(restart)+ end;+ {not_found, deleted} ->+ ?LOG_DEBUG("throw(restart) {not_found, deleted} ~p", [DocId]),+ throw(restart)+ end;+ <<?DESIGN_DOC_PREFIX, _/binary>> -> % we skip design docs+ {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};+ _ ->+ {Docs2, DocIdViewIdKeys2} =+ if Deleted ->+ {Docs, [{DocId, []} | DocIdViewIdKeys]};+ true ->+ {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]),+ {[Doc | Docs], DocIdViewIdKeys}+ end,+ + case couch_util:should_flush() of+ true ->+ {Group1, Results} = view_compute(Group, Docs2),+ {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2),+ {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3,+ DocInfo#doc_info.update_seq),+ garbage_collect(),+ ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],+ {ok, {[], Group2, ViewEmptyKeyValues, []}};+ false ->+ {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2}}+ end+ end.++view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) ->+ {ViewKVs, DocIdViewIdKeysAcc};+view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) ->+ {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []),+ NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc],+ view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys).+++view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) ->+ {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)};+view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) ->+ % Take any identical keys and combine the values+ ResultKVs2 = lists:foldl(+ fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) ->+ case Key == PrevKey of+ true ->+ case PrevVal of+ {dups, Dups} ->+ [{PrevKey, {dups, [Value|Dups]}} | AccRest];+ _ ->+ [{PrevKey, {dups, [Value,PrevVal]}} | AccRest]+ end;+ false ->+ [{Key,Value},{PrevKey,PrevVal}|AccRest]+ end;+ (KV, []) ->+ [KV] + end, [], lists:sort(ResultKVs)),+ NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2],+ NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc],+ NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2],+ NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc,+ view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc).++view_compute(Group, []) ->+ {Group, []};+view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) ->+ {ok, QueryServer} =+ case QueryServerIn of+ nil -> % doc map not started+ Definitions = [View#view.def || View <- Group#group.views],+ couch_query_servers:start_doc_map(DefLang, Definitions);+ _ ->+ {ok, QueryServerIn}+ end,+ {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs),+ {Group#group{query_server=QueryServer}, Results}.++++write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) ->+ #group{id_btree=IdBtree} = Group,++ AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []],+ RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []],+ LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys],+ {ok, LookupResults, IdBtree2}+ = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds),+ KeysToRemoveByView = lists:foldl(+ fun(LookupResult, KeysToRemoveByViewAcc) ->+ case LookupResult of+ {ok, {DocId, ViewIdKeys}} ->+ lists:foldl(+ fun({ViewId, Key}, KeysToRemoveByViewAcc2) ->+ dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2)+ end,+ KeysToRemoveByViewAcc, ViewIdKeys);+ {not_found, _} ->+ KeysToRemoveByViewAcc+ end+ end,+ dict:new(), LookupResults),++ Views2 = [+ begin+ KeysToRemove = couch_util:dict_find(View#view.id_num, KeysToRemoveByView, []),+ {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove),+ View#view{btree = ViewBtree2}+ end+ ||+ {View, AddKeyValues} <- ViewKeyValuesToAdd+ ],+ Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2},+ {ok, Group2}.+ +prepare_group(RootDir, DbName, GroupId) ->+ {Db, Group} = case (catch couch_db:open(DbName, [])) of+ {ok, Db0} ->+ case (catch couch_db:open_doc(Db0, GroupId)) of+ {ok, Doc} ->+ {Db0, design_doc_to_view_group(Doc)};+ Else ->+ delete_index_file(RootDir, DbName, GroupId),+ ?LOG_DEBUG("prepare_group exit Else ~p self() ~p", [Else, self()]), + exit(Else)+ end;+ Else ->+ delete_index_file(RootDir, DbName, GroupId),+ exit(Else)+ end,+ FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++ + binary_to_list(GroupId) ++".view",+ Group2 =+ case couch_file:open(FileName) of+ {ok, Fd} ->+ Sig = Group#group.sig,+ case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of+ {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} ->+ % sigs match!+ DbPurgeSeq = couch_db:get_purge_seq(Db),+ % We can only use index with the same, or next purge seq as the db.+ if DbPurgeSeq == PurgeSeq ->+ init_group(Db, Fd, Group, HeaderInfo);+ DbPurgeSeq == PurgeSeq + 1 ->+ ?LOG_DEBUG("Purging entries from view index.",[]),+ purge_index(init_group(Db, Fd, Group, HeaderInfo));+ true ->+ ?LOG_DEBUG("Reseting view index due to lost purge entries.",[]),+ reset_file(Db, Fd, DbName, Group)+ end;+ _ ->+ reset_file(Db, Fd, DbName, Group)+ end;+ {error, enoent} ->+ case couch_file:open(FileName, [create]) of+ {ok, Fd} -> reset_file(Db, Fd, DbName, Group);+ Error -> throw(Error)+ end+ end,++ couch_db:monitor(Db),+ couch_db:close(Db),+ {ok, Group2}.++% maybe move to another module+design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->+ Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>),+ {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}),++ % add the views to a dictionary object, with the map source as the key+ DictBySrc =+ lists:foldl(+ fun({Name, {MRFuns}}, DictBySrcAcc) ->+ MapSrc = proplists:get_value(<<"map">>, MRFuns),+ RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null),+ View =+ case dict:find(MapSrc, DictBySrcAcc) of+ {ok, View0} -> View0;+ error -> #view{def=MapSrc} % create new view object+ end,+ View2 =+ if RedSrc == null ->+ View#view{map_names=[Name|View#view.map_names]};+ true ->+ View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}+ end,+ dict:store(MapSrc, View2, DictBySrcAcc)+ end, dict:new(), RawViews),+ % number the views+ {Views, _N} = lists:mapfoldl(+ fun({_Src, View}, N) ->+ {View#view{id_num=N},N+1}+ end, 0, dict:to_list(DictBySrc)),++ Group = #group{name=Id, views=Views, def_lang=Language},+ Group#group{sig=erlang:md5(term_to_binary(Group))}.++reset_group(#group{views=Views}=Group) ->+ Views2 = [View#view{btree=nil} || View <- Views],+ Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,+ id_btree=nil,views=Views2}.++reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->+ ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),+ ok = couch_file:truncate(Fd, 0),+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),+ init_group(Db, Fd, reset_group(Group), nil).++delete_index_file(RootDir, DbName, GroupId) ->+ file:delete(RootDir ++ "/." ++ binary_to_list(DbName)+ ++ binary_to_list(GroupId) ++ ".view").++init_group(Db, Fd, #group{views=Views}=Group, nil) ->+ init_group(Db, Fd, Group,+ #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),+ id_btree_state=nil, view_states=[nil || _ <- Views]});+init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->+ #index_header{seq=Seq, purge_seq=PurgeSeq,+ id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,+ {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),+ Views2 = lists:zipwith(+ fun(BtreeState, #view{reduce_funs=RedFuns}=View) ->+ FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],+ ReduceFun = + fun(reduce, KVs) ->+ KVs2 = couch_view:expand_dups(KVs,[]),+ KVs3 = couch_view:detuple_kvs(KVs2,[]),+ {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, + KVs3),+ {length(KVs3), Reduced};+ (rereduce, Reds) ->+ Count = lists:sum([Count0 || {Count0, _} <- Reds]),+ UserReds = [UserRedsList || {_, UserRedsList} <- Reds],+ {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs,+ UserReds),+ {Count, Reduced}+ end,+ {ok, Btree} = couch_btree:open(BtreeState, Fd,+ [{less, fun couch_view:less_json_keys/2},+ {reduce, ReduceFun}]),+ View#view{btree=Btree}+ end,+ ViewStates, Views),+ Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,+ id_btree=IdBtree, views=Views2}.\ No newline at end of fileIndex: src/couchdb/couch_server.erl===================================================================--- src/couchdb/couch_server.erl (revision 724788)+++ src/couchdb/couch_server.erl (working copy)@@ -22,14 +22,6 @@ -include("couch_db.hrl"). --record(server,{- root_dir = [],- dbname_regexp,- max_dbs_open=100,- current_dbs_open=0,- start_time=""- }).- start() -> start(["default.ini"]). Index: src/couchdb/Makefile.am===================================================================--- src/couchdb/Makefile.am (revision 724788)+++ src/couchdb/Makefile.am (working copy)@@ -63,6 +63,8 @@ couch_stream.erl \ couch_util.erl \ couch_view.erl \+ couch_view_updater.erl \+ couch_view_group.erl \ couch_db_updater.erl EXTRA_DIST = $(source_files) couch_db.hrl@@ -92,6 +94,8 @@ couch_stream.beam \ couch_util.beam \ couch_view.beam \+ couch_view_updater.beam \+ couch_view_group.beam \ couch_db_updater.beam # doc_base = \Index: src/couchdb/couch_view.erl===================================================================--- src/couchdb/couch_view.erl (revision 724788)+++ src/couchdb/couch_view.erl (working copy)@@ -13,45 +13,12 @@ -module(couch_view). -behaviour(gen_server). --export([start_link/0,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/5]).+-export([start_link/0,fold/4,fold/5,less_json/2,less_json_keys/2,expand_dups/2,detuple_kvs/2]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]). -export([get_reduce_view/1, get_map_view/1,get_row_count/1,reduce_to_count/1, fold_reduce/7]). -include("couch_db.hrl").---record(group,- {sig=nil,- db=nil,- fd=nil,- name,- def_lang,- views,- id_btree=nil,- current_seq=0,- purge_seq=0,- query_server=nil- }).---record(view,- {id_num,- map_names=[],- def,- btree=nil,- reduce_funs=[]- }).---record(server,- {root_dir- }).---record(index_header,- {seq=0,- purge_seq=0,- id_btree_state=nil,- view_states=nil- }). - start_link() -> gen_server:start_link({local, couch_view}, couch_view, [], []). @@ -59,41 +26,30 @@ {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, MapSrc, RedSrc}), Pid. -get_updater(DbName, GroupId) ->- {ok, Pid} = gen_server:call(couch_view, {start_updater, DbName, GroupId}),+get_group_server(DbName, GroupId) ->+ {ok, Pid} = gen_server:call(couch_view, {start_group_server, DbName, GroupId}), Pid. -get_updated_group(Pid) ->- Mref = erlang:monitor(process, Pid),- receive- {'DOWN', Mref, _, _, Reason} ->- throw(Reason)- after 0 ->- Pid ! {self(), get_updated},- receive- {Pid, Response} ->- erlang:demonitor(Mref),- receive- {'DOWN', Mref, _, _, _} -> ok- after 0 -> ok- end,- Response;- {'DOWN', Mref, _, _, Reason} ->- throw(Reason)- end- end.+get_updated_group(DbName, GroupId, Update) ->+ couch_view_group:request_group(get_group_server(DbName, GroupId), seq_for_update(DbName, Update)). +get_updated_group(temp, DbName, Type, MapSrc, RedSrc, Update) ->+ couch_view_group:request_group(get_temp_updater(DbName, Type, MapSrc, RedSrc), seq_for_update(DbName, Update)).+ get_row_count(#view{btree=Bt}) -> {ok, {Count, _Reds}} = couch_btree:full_reduce(Bt), {ok, Count}. get_reduce_view({temp, DbName, Type, MapSrc, RedSrc}) ->- {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, MapSrc, RedSrc)),+ {ok, #group{views=[View]}} = get_updated_group(temp, DbName, Type, MapSrc, RedSrc, true), {ok, {temp_reduce, View}}; get_reduce_view({DbName, GroupId, Name}) ->- {ok, #group{views=Views,def_lang=Lang}} =- get_updated_group(get_updater(DbName, GroupId)),- get_reduce_view0(Name, Lang, Views).+ case get_updated_group(DbName, GroupId, true) of+ {error, Reason} ->+ Reason;+ {ok, #group{views=Views,def_lang=Lang}} ->+ get_reduce_view0(Name, Lang, Views)+ end. get_reduce_view0(_Name, _Lang, []) -> {not_found, missing_named_view};@@ -153,13 +109,26 @@ N + 1; get_key_pos(Key, [_|Rest], N) -> get_key_pos(Key, Rest, N+1).+ +seq_for_update(DbName, Update) ->+ case Update of+ true ->+ {ok, #db{update_seq=CurrentSeq}} = couch_db:open(DbName, []),+ CurrentSeq;+ _Else ->+ 0+ end. get_map_view({temp, DbName, Type, Src}) ->- {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src, [])),+ {ok, #group{views=[View]}} = get_updated_group(temp, DbName, Type, Src, [], true), {ok, View};-get_map_view({DbName, GroupId, Name}) ->- {ok, #group{views=Views}} = get_updated_group(get_updater(DbName, GroupId)),- get_map_view0(Name, Views).+get_map_view({DbName, GroupId, Name, Update}) ->+ case get_updated_group(DbName, GroupId, Update) of+ {error, Reason} ->+ Reason;+ {ok, #group{views=Views}} ->+ get_map_view0(Name, Views)+ end. get_map_view0(_Name, []) -> {not_found, missing_named_view};@@ -183,38 +152,7 @@ Count. -design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->- Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>),- {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}),- - % add the views to a dictionary object, with the map source as the key- DictBySrc =- lists:foldl(- fun({Name, {MRFuns}}, DictBySrcAcc) ->- MapSrc = proplists:get_value(<<"map">>, MRFuns),- RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null),- View =- case dict:find(MapSrc, DictBySrcAcc) of- {ok, View0} -> View0;- error -> #view{def=MapSrc} % create new view object- end,- View2 =- if RedSrc == null ->- View#view{map_names=[Name|View#view.map_names]};- true ->- View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}- end,- dict:store(MapSrc, View2, DictBySrcAcc)- end, dict:new(), RawViews),- % number the views- {Views, _N} = lists:mapfoldl(- fun({_Src, View}, N) ->- {View#view{id_num=N},N+1}- end, 0, dict:to_list(DictBySrc)), - Group = #group{name=Id, views=Views, def_lang=Language},- Group#group{sig=erlang:md5(term_to_binary(Group))}.- fold_fun(_Fun, [], _, Acc) -> {ok, Acc}; fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) ->@@ -253,10 +191,10 @@ (_Else) -> ok end),- ets:new(couch_views_by_db, [bag, private, named_table]),- ets:new(couch_views_by_name, [set, protected, named_table]),- ets:new(couch_views_by_updater, [set, private, named_table]),- ets:new(couch_views_temp_fd_by_db, [set, protected, named_table]),+ ets:new(couch_groups_by_db, [bag, private, named_table]),+ ets:new(group_servers_by_name, [set, protected, named_table]),+ ets:new(couch_groups_by_updater, [set, private, named_table]),+ ets:new(couch_temp_group_fd_by_db, [set, protected, named_table]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir}}. @@ -268,9 +206,9 @@ <<SigInt:128/integer>> = erlang:md5(term_to_binary({Lang, MapSrc, RedSrc})), Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])), Pid = - case ets:lookup(couch_views_by_name, {DbName, Name}) of+ case ets:lookup(group_servers_by_name, {DbName, Name}) of [] ->- case ets:lookup(couch_views_temp_fd_by_db, DbName) of+ case ets:lookup(couch_temp_group_fd_by_db, DbName) of [] -> FileName = Root ++ "/." ++ binary_to_list(DbName) ++ "_temp", {ok, Fd} = couch_file:open(FileName, [create, overwrite]),@@ -279,21 +217,20 @@ ok end, ?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]),- NewPid = spawn_link(couch_view, start_temp_update_loop,- [DbName, Fd, Lang, MapSrc, RedSrc]),- true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count + 1}),+ {ok, NewPid} = couch_view_group:start_link({DbName, Fd, Lang, MapSrc, RedSrc}),+ true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count + 1}), add_to_ets(NewPid, DbName, Name), NewPid; [{_, ExistingPid0}] -> ExistingPid0 end, {reply, {ok, Pid}, Server};-handle_call({start_updater, DbName, GroupId}, _From, #server{root_dir=Root}=Server) ->+handle_call({start_group_server, DbName, GroupId}, _From, #server{root_dir=Root}=Server) -> Pid = - case ets:lookup(couch_views_by_name, {DbName, GroupId}) of+ case ets:lookup(group_servers_by_name, {DbName, GroupId}) of [] ->- ?LOG_DEBUG("Spawning new update process for view group ~s in database ~s.", [GroupId, DbName]),- NewPid = spawn_link(couch_view, start_update_loop, [Root, DbName, GroupId]),+ ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", [GroupId, DbName]),+ {ok, NewPid} = couch_view_group:start_link({Root, DbName, GroupId}), add_to_ets(NewPid, DbName, GroupId), NewPid; [{_, ExistingPid0}] ->@@ -303,11 +240,11 @@ handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) -> % shutdown all the updaters- Names = ets:lookup(couch_views_by_db, DbName),+ Names = ets:lookup(couch_groups_by_db, DbName), lists:foreach( fun({_DbName, GroupId}) -> ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [GroupId, DbName]),- [{_, Pid}] = ets:lookup(couch_views_by_name, {DbName, GroupId}),+ [{_, Pid}] = ets:lookup(group_servers_by_name, {DbName, GroupId}), exit(Pid, kill), receive {'EXIT', Pid, _} -> delete_from_ets(Pid, DbName, GroupId)@@ -318,22 +255,23 @@ {noreply, Server}. handle_info({'EXIT', _FromPid, normal}, Server) ->- {noreply, Server};+ {noreply, Server}; handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) ->- case ets:lookup(couch_views_by_updater, FromPid) of+ ?LOG_DEBUG("Exit from process: ~p", [{FromPid, Reason}]),+ case ets:lookup(couch_groups_by_updater, FromPid) of [] -> % non-updater linked process must have died, we propagate the error ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]), exit(Reason); [{_, {DbName, "_temp_" ++ _ = GroupId}}] -> delete_from_ets(FromPid, DbName, GroupId),- [{_, Fd, Count}] = ets:lookup(couch_views_temp_fd_by_db, DbName),+ [{_, Fd, Count}] = ets:lookup(couch_temp_group_fd_by_db, DbName), case Count of 1 -> % Last ref couch_file:close(Fd), file:delete(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_temp"),- true = ets:delete(couch_views_temp_fd_by_db, DbName);+ true = ets:delete(couch_temp_group_fd_by_db, DbName); _ ->- true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count - 1})+ true = ets:insert(couch_temp_group_fd_by_db, {DbName, Fd, Count - 1}) end; [{_, {DbName, GroupId}}] -> delete_from_ets(FromPid, DbName, GroupId)@@ -344,225 +282,21 @@ exit({error, Msg}). add_to_ets(Pid, DbName, GroupId) ->- true = ets:insert(couch_views_by_updater, {Pid, {DbName, GroupId}}),- true = ets:insert(couch_views_by_name, {{DbName, GroupId}, Pid}),- true = ets:insert(couch_views_by_db, {DbName, GroupId}).+ true = ets:insert(couch_groups_by_updater, {Pid, {DbName, GroupId}}),+ true = ets:insert(group_servers_by_name, {{DbName, GroupId}, Pid}),+ true = ets:insert(couch_groups_by_db, {DbName, GroupId}). delete_from_ets(Pid, DbName, GroupId) ->- true = ets:delete(couch_views_by_updater, Pid),- true = ets:delete(couch_views_by_name, {DbName, GroupId}),- true = ets:delete_object(couch_views_by_db, {DbName, GroupId}).+ true = ets:delete(couch_groups_by_updater, Pid),+ true = ets:delete(group_servers_by_name, {DbName, GroupId}),+ true = ets:delete_object(couch_groups_by_db, {DbName, GroupId}). code_change(_OldVsn, State, _Extra) -> {ok, State}. -start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) ->- NotifyPids = get_notify_pids(1000),- case couch_db:open(DbName, []) of- {ok, Db} ->- View = #view{map_names=["_temp"],- id_num=0,- btree=nil,- def=MapSrc,- reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},- Group = #group{name="_temp",- db=Db,- views=[View],- current_seq=0,- def_lang=Lang,- id_btree=nil},- Group2 = init_group(Db, Fd, Group,nil),- couch_db:monitor(Db),- couch_db:close(Db),- temp_update_loop(DbName, Group2, NotifyPids);- Else ->- exit(Else)- end. -temp_update_loop(DbName, Group, NotifyPids) ->- {ok, Db} = couch_db:open(DbName, []),- {_Updated, Group2} = update_group(Group#group{db=Db}),- couch_db:close(Db),- [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],- garbage_collect(),- temp_update_loop(DbName, Group2, get_notify_pids(10000)). --reset_group(#group{views=Views}=Group) ->- Views2 = [View#view{btree=nil} || View <- Views],- Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,- id_btree=nil,views=Views2}.--start_update_loop(RootDir, DbName, GroupId) ->- % wait for a notify request before doing anything. This way, we can just- % exit and any exits will be noticed by the callers.- start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)).- -start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->- {Db, Group} =- case (catch couch_db:open(DbName, [])) of- {ok, Db0} ->- case (catch couch_db:open_doc(Db0, GroupId)) of- {ok, Doc} ->- {Db0, design_doc_to_view_group(Doc)};- Else ->- delete_index_file(RootDir, DbName, GroupId),- exit(Else)- end;- Else ->- delete_index_file(RootDir, DbName, GroupId),- exit(Else)- end,- FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++ - binary_to_list(GroupId) ++".view",- Group2 =- case couch_file:open(FileName) of- {ok, Fd} ->- Sig = Group#group.sig,- case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of- {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} ->- % sigs match!- DbPurgeSeq = couch_db:get_purge_seq(Db),- case (PurgeSeq == DbPurgeSeq) or ((PurgeSeq + 1) == DbPurgeSeq) of- true ->- % We can only use index with the same, or next purge seq as the- % db.- init_group(Db, Fd, Group, HeaderInfo);- false ->- reset_file(Db, Fd, DbName, Group)- end;- _ ->- reset_file(Db, Fd, DbName, Group)- end;- {error, enoent} ->- case couch_file:open(FileName, [create]) of- {ok, Fd} -> reset_file(Db, Fd, DbName, Group);- Error -> throw(Error)- end- end,- - couch_db:monitor(Db),- couch_db:close(Db),- update_loop(RootDir, DbName, GroupId, Group2, NotifyPids).--reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->- ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),- ok = couch_file:truncate(Fd, 0),- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),- init_group(Db, Fd, reset_group(Group), nil).--update_loop(RootDir, DbName, GroupId, #group{sig=Sig,fd=Fd}=Group, NotifyPids) ->- {ok, Db}= couch_db:open(DbName, []),- Result =- try- update_group(Group#group{db=Db})- catch- throw: restart -> restart- after- couch_db:close(Db)- end,- case Result of- {same, Group2} ->- [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],- update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000));- {updated, Group2} ->- HeaderData = {Sig, get_index_header_data(Group2)},- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData),- [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],- garbage_collect(),- update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000));- restart ->- couch_file:close(Group#group.fd),- start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids())- end.--% wait for the first request to come in.-get_notify_pids(Wait) ->- receive- {Pid, get_updated} ->- [Pid | get_notify_pids()];- {'DOWN', _MonitorRef, _Type, _Pid, _Info} ->- ?LOG_DEBUG("View monitor received parent db shutdown notification. Shutting down instance.", []),- exit(db_shutdown);- Else ->- ?LOG_ERROR("Unexpected message in view updater: ~p", [Else]),- exit({error, Else})- after Wait ->- exit(wait_timeout)- end.-% then keep getting all available and return.-get_notify_pids() ->- receive- {Pid, get_updated} ->- [Pid | get_notify_pids()]- after 0 ->- []- end.- -purge(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->- {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),- Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],- {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),-- % now populate the dictionary with all the keys to delete- ViewKeysToRemoveDict = lists:foldl(- fun({ok,{DocId,ViewNumRowKeys}}, ViewDictAcc) ->- lists:foldl(- fun({ViewNum, RowKey}, ViewDictAcc2) ->- dict:append(ViewNum, {RowKey, DocId}, ViewDictAcc2)- end, ViewDictAcc, ViewNumRowKeys);- ({not_found, _}, ViewDictAcc) ->- ViewDictAcc- end, dict:new(), Lookups),- - % Now remove the values from the btrees- Views2 = lists:map(- fun(#view{id_num=Num,btree=Btree}=View) ->- case dict:find(Num, ViewKeysToRemoveDict) of- {ok, RemoveKeys} ->- {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys),- View#view{btree=Btree2};- error -> % no keys to remove in this view- View- end- end, Views),- Group#group{id_btree=IdBtree2,- views=Views2,- purge_seq=couch_db:get_purge_seq(Db)}.- - -update_group(#group{db=Db,current_seq=CurrentSeq,- purge_seq=GroupPurgeSeq}=Group) ->- ViewEmptyKVs = [{View, []} || View <- Group#group.views],- % compute on all docs modified since we last computed.- DbPurgeSeq = couch_db:get_purge_seq(Db),- Group2 =- case DbPurgeSeq of- GroupPurgeSeq -> - Group;- DbPurgeSeq when GroupPurgeSeq + 1 == DbPurgeSeq ->- purge(Group);- _ ->- throw(restart)- end,- {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys, NewSeq}}- = couch_db:enum_docs_since(- Db,- CurrentSeq,- fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end,- {[], Group2, ViewEmptyKVs, [], CurrentSeq}- ),- {Group4, Results} = view_compute(Group3, UncomputedDocs),- {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),- couch_query_servers:stop_doc_map(Group4#group.query_server),- if CurrentSeq /= NewSeq ->- {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq),- {updated, Group5#group{query_server=nil}};- true ->- {same, Group4#group{query_server=nil}}- end.- delete_index_dir(RootDir, DbName) -> nuke_dir(RootDir ++ "/." ++ binary_to_list(DbName) ++ "_design"). @@ -583,50 +317,6 @@ ok = file:del_dir(Dir) end. -delete_index_file(RootDir, DbName, GroupId) ->- file:delete(RootDir ++ "/." ++ binary_to_list(DbName)- ++ binary_to_list(GroupId) ++ ".view").--init_group(Db, Fd, #group{views=Views}=Group, nil) ->- init_group(Db, Fd, Group,- #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),- id_btree_state=nil, view_states=[nil || _ <- Views]});-init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->- #index_header{seq=Seq, purge_seq=PurgeSeq,- id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,- {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),- Views2 = lists:zipwith(- fun(BtreeState, #view{reduce_funs=RedFuns}=View) ->- FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],- ReduceFun = - fun(reduce, KVs) ->- KVs2 = expand_dups(KVs,[]),- KVs3 = detuple_kvs(KVs2,[]),- {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, KVs3),- {length(KVs3), Reduced};- (rereduce, Reds) ->- Count = lists:sum([Count0 || {Count0, _} <- Reds]),- UserReds = [UserRedsList || {_, UserRedsList} <- Reds],- {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, UserReds),- {Count, Reduced}- end,- {ok, Btree} = couch_btree:open(BtreeState, Fd,- [{less, fun less_json_keys/2},{reduce, ReduceFun}]),- View#view{btree=Btree}- end,- ViewStates, Views),- Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,- id_btree=IdBtree, views=Views2}.---get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, - id_btree=IdBtree,views=Views}) ->- ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],- #index_header{seq=Seq,- purge_seq=PurgeSeq,- id_btree_state=couch_btree:get_state(IdBtree),- view_states=ViewStates}.- % keys come back in the language of btree - tuples. less_json_keys(A, B) -> less_json(tuple_to_list(A), tuple_to_list(B)).@@ -703,129 +393,4 @@ end end. -process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) ->- % This fun computes once for each document- #doc_info{id=DocId, update_seq=Seq, deleted=Deleted} = DocInfo,- case DocId of- GroupId ->- % uh oh. this is the design doc with our definitions. See if- % anything in the definition changed.- case couch_db:open_doc(Db, DocInfo) of- {ok, Doc} ->- case design_doc_to_view_group(Doc) of- #group{sig=Sig} ->- % The same md5 signature, keep on computing- {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};- _ ->- throw(restart)- end;- {not_found, deleted} ->- throw(restart)- end;- <<?DESIGN_DOC_PREFIX, _>> -> % we skip design docs- {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};- _ ->- {Docs2, DocIdViewIdKeys2} =- if Deleted ->- {Docs, [{DocId, []} | DocIdViewIdKeys]};- true ->- {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]),- {[Doc | Docs], DocIdViewIdKeys}- end,- case couch_util:should_flush() of- true ->- {Group1, Results} = view_compute(Group, Docs2),- {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2),- {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, Seq),- garbage_collect(),- ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],- {ok, {[], Group2, ViewEmptyKeyValues, [], Seq}};- false ->- {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2, Seq}}- end- end. -view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) ->- {ViewKVs, DocIdViewIdKeysAcc};-view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) ->- {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []),- NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc],- view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys).---view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) ->- {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)};-view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) ->- % Take any identical keys and combine the values- ResultKVs2 = lists:foldl(- fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) ->- case Key == PrevKey of- true ->- case PrevVal of- {dups, Dups} ->- [{PrevKey, {dups, [Value|Dups]}} | AccRest];- _ ->- [{PrevKey, {dups, [Value,PrevVal]}} | AccRest]- end;- false ->- [{Key,Value},{PrevKey,PrevVal}|AccRest]- end;- (KV, []) ->- [KV] - end, [], lists:sort(ResultKVs)),- NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2],- NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc],- NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs2],- NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc,- view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc).--view_compute(Group, []) ->- {Group, []};-view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) ->- {ok, QueryServer} =- case QueryServerIn of- nil -> % doc map not started- Definitions = [View#view.def || View <- Group#group.views],- couch_query_servers:start_doc_map(DefLang, Definitions);- _ ->- {ok, QueryServerIn}- end,- {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs),- {Group#group{query_server=QueryServer}, Results}.----write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) ->- #group{id_btree=IdBtree} = Group,-- AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []],- RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []],- LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys],- {ok, LookupResults, IdBtree2}- = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds),- KeysToRemoveByView = lists:foldl(- fun(LookupResult, KeysToRemoveByViewAcc) ->- case LookupResult of- {ok, {DocId, ViewIdKeys}} ->- lists:foldl(- fun({ViewId, Key}, KeysToRemoveByViewAcc2) ->- dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2)- end,- KeysToRemoveByViewAcc, ViewIdKeys);- {not_found, _} ->- KeysToRemoveByViewAcc- end- end,- dict:new(), LookupResults),-- Views2 = [- begin- KeysToRemove = couch_util:dict_find(View#view.id_num, KeysToRemoveByView, []),- {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove),- View#view{btree = ViewBtree2}- end- ||- {View, AddKeyValues} <- ViewKeyValuesToAdd- ],- Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2},- {ok, Group2}.Index: share/www/script/couch_tests.js===================================================================--- share/www/script/couch_tests.js (revision 724788)+++ share/www/script/couch_tests.js (working copy)@@ -1905,6 +1905,7 @@ } T(db.view("test/single_doc").total_rows == 1); + var info = db.info(); var doc1 = db.open("1"); var doc2 = db.open("2"); @@ -1913,7 +1914,13 @@ body: JSON.stringify({"1":[doc1._rev], "2":[doc2._rev]}), }); T(xhr.status == 200);- ++ var newInfo = db.info();+ // purging increments the update sequence+ T(info.update_seq+1 == newInfo.update_seq);+ // and it increments the purge_seq+ T(info.purge_seq+1 == newInfo.purge_seq);+ var result = JSON.parse(xhr.responseText); T(result.purged["1"][0] == doc1._rev); T(result.purged["2"][0] == doc2._rev);Index: etc/couchdb/local_dev.ini===================================================================--- etc/couchdb/local_dev.ini (revision 724788)+++ etc/couchdb/local_dev.ini (working copy)@@ -48,3 +48,9 @@ [test] foo = bar++[test]+foo = bar++[test]+foo = bar