%%%------------------------------------------------------------------- %% Reproduces CLOSE-WAIT conditions in CouchBeam. %% Observe in a terminal %% > watch -n 1 "ss -p -o state close-wait '( dport = 5984 )' | wc -l " %% %% Run couchbed:run() to reproduce the CLOSE-WAIT condition. %% The condition appears after about 60 secondes, one CLOSE-WAIT socket %% per killed couchbeam_view_stream process. %%%------------------------------------------------------------------- -module(couchbed). -export([run/0, start_client/0, start/0, get/0, generate_and_store_docs/2, create_view/1, query_view/2]). -define(DB_NAME, "close_wait"). -define(VIEW, {"design", "view"}). -define(COUCH_URL, "http://localhost:5984"). -define(OPTIONS, [{basic_auth, {"admin", "admin"}}]). -define(MAX, 50). -spec run() -> ok. run() -> start(), Db = ?MODULE:get(), io:format("~p~n", [Db]), %generate_and_store_docs(Db, 50), Pid = start_client(), create_view(Db), query_view(Db, Pid), io:format("~p~n", [hackney_pool:get_stats(default)]), ok. -spec query_view(couchbeam:db(), pid()) -> ok. query_view(Db, Pid) -> lists:foreach(fun(_N) -> {ok, _Ref } = couchbeam_view:stream(Db, ?VIEW, [{stream_to, Pid}]), ok end, lists:seq(1, ?MAX)), kill_view_stream_process(), ok. -spec start_client() -> pid(). start_client() -> Pid = spawn(fun() -> loop() end), Pid. -spec loop() -> no_return(). loop() -> receive _Message -> case rand:uniform(400) of 1 -> io:format("~p~n", [hackney_pool:get_stats(default)]), loop(); _ -> loop() end end. -spec get() -> couchbeam:db(). get() -> S = couchbeam:server_connection(?COUCH_URL, ?OPTIONS), DbName = uri_string:quote(?DB_NAME), {ok, Db} = case couchbeam:db_exists(S,DbName) of true -> couchbeam:open_db(S, DbName, []); false -> couchbeam:create_db(S,DbName), couchbeam:open_db(S, DbName, []) end, Db. -spec kill_view_stream_process() -> ok. kill_view_stream_process() -> killall(ets:tab2list(couchbeam_view_streams)). -spec killall(list()) -> ok. killall([]) -> ok; killall([{_Ref, Pid} | Pids]) -> % kill 10% of case rand:uniform(10) of 1 -> io:format("killing ~p~n", [Pid]), exit(Pid, kill); _ -> ok end, killall(Pids). -spec generate_and_store_docs(couchbeam:db(), non_neg_integer()) -> ok. generate_and_store_docs(Db, N) -> io:format("creating ~p docs~n", [N]), lists:foreach(fun(_) -> Doc = {[{<<"type">>,<<"random_doc">>}, {<<"value">>, rand:uniform(1000)}]}, io:format("."), {ok, _Doc} = couchbeam:save_doc(Db, Doc) end, lists:seq(1, N)), io:format("~n"), ok. -spec create_view(couchbeam:db()) -> ok. create_view(Db) -> {Design, View} = ?VIEW, create_view(Db, list_to_binary(Design), list_to_binary(View)). -spec create_view(couchbeam:db(), binary(), binary()) -> couchbeam:document(). create_view(Db, Design, View) -> DesignDoc = {[ {<<"_id">>, <<"_design/",Design/binary>>}, {<<"language">>,<<"javascript">>}, {<<"views">>, {[{View, {[{<<"map">>, <<"function (doc) {\n emit(doc._id, doc); \n}">> }]} }]} } ]}, case couchbeam:save_doc(Db, DesignDoc) of {ok, DesignDoc1} -> io:format("Created view"), DesignDoc1; {error, conflict} -> io:format("View already exists"), ok; {error, Reason} -> io:format("Error creating view: ~p~n", [Reason]), error end. -spec start() -> ok. start() -> application:start(inets), application:ensure_all_started(couchbeam), io:format("Everything started~n").