2016-04-15 14 views
1

我一直在努力kurento过去几个月,今天突然我开始收到下面的日志:Kurento:奇怪的日志连续发生“disconnectAll()”

[0x00007f40b9f6d700] debug KurentoMediaElementImpl MediaElementImpl.cpp:660 disconnectAll() <kmswebrtcendpoint3> Retry disconnect all <kmshubport3> 

该日志被连续造成100产生%cpu使用率与100 + mb的多个日志文件创建。

我也有这个记录在另一个日志文件

2016-04-15 20:05:47,914497 23751 [0x00007fd903de1700] error KurentoWorkerPool   WorkerPool.cpp:41 workerThreadLoop() Unexpected error while running the server: boost::filesystem::last_write_time: No such file or directory: "/var/log/kurento-media-server/media-server_2016-04-15_20-05-18.00049.pid23751.log" 
2016-04-15 20:05:47,914513 23751 [0x00007fd90f7fe700] debug KurentoMediaSet   MediaSet.cpp:470 async_delete() Destroying HubPort -> d1975230-7fcb-498e-8d4a-bfc7afbf8bf2_kurento.MediaPipeline/528783df-ac14-47bc-a7ad-8044721563b9_kurento.HubPort 
2016-04-15 20:05:47,914593 23751 [0x00007fd903de1700] debug KurentoWorkerPool   WorkerPool.cpp:37 workerThreadLoop() Working thread starting 
2016-04-15 20:05:47,917583 23751 [0x00007fd903de1700] debug KurentoMediaSet   MediaSet.cpp:470 async_delete() Destroying Composite -> d1975230-7fcb-498e-8d4a-bfc7afbf8bf2_kurento.MediaPipeline/63c84bc4-0d42-4299-ae22-219c0a603837_kurento.Composite 
2016-04-15 20:06:10,067442 23751 [0x00007fd9cbfff700] debug KurentoMediaSet   MediaSet.cpp:131 doGarbageCollection() Running garbage collector 
2016-04-15 20:10:10,067626 23751 [0x00007fd9cbfff700] debug KurentoMediaSet   MediaSet.cpp:131 doGarbageCollection() Running garbage collector 
2016-04-15 20:14:10,067789 23751 [0x00007fd9cbfff700] debug KurentoMediaSet   MediaSet.cpp:131 doGarbageCollection() Running garbage collector 

注:

我使用自定义的组调用Java例子,我写了一公里过滤器,我已经使用vadfilter过滤器来获得非静音音频缓冲区,将其复制到gst_buffer对象中,直到检测到静音,然后使用curl_easy_perform将其发送到Web服务。

也当我使用sudo netstat -np | grep "CLOSE_WAIT"

我看到机器的,我现在用的组呼会议和Web服务的IP地址CLOSE_WAIT状态列,但我不知道,如果他们在CLOSE_WAIT因为崩溃在我的插件或其他东西。

P.S:我用gst_buffer_copy_deep()gst_buffer_get_memorygst_buffer_append_memory直到检测沉默复制后续音频缓冲。

如果有人能指出任何与我所面对的错误相关的东西,这将会非常有帮助。

还有更多...我有enter link description here你可以参考看到我得到的错误。谢谢。

编辑1:

namespace kurento 
{ 
    namespace module 
    { 
    namespace vadcustomfilter 
    { 
     void VADCustomFilterImpl::newBufferHandler (GstBuffer * buffer) 
     { 
     try 
     { 
      //instead of doing audio_buffer=null we set a flag audio_buffer_null to TRUE when we want to make a new copy of audio_buffer 
      if (!audio_buffer_null) 
      { 
       // dest, source, flag, offset, size 
       //gst_buffer_copy_into (audio_buffer, buffer, GST_BUFFER_COPY_MEMORY, 0, -1); 
       GstMemory *inbuf_memory = gst_buffer_get_memory(buffer,0); 
       gst_buffer_append_memory(audio_buffer,inbuf_memory); 
      } 
      else 
      { 
       audio_buffer = gst_buffer_new(); 
       audio_buffer = gst_buffer_copy_deep (buffer); 
       audio_buffer_null = FALSE; 
      } 
     } 
     catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR ("EXCEPTION: new buffer"); 
     } 
     } 


     void VADCustomFilterImpl::busMessage (GstMessage * message) 
     { 
     try 
     { 
      switch (message->type) 
      { 
       case GST_MESSAGE_EOS: 
       { 
        GST_WARNING ("---------------> GST_MESSAGE_EOS"); 
       } 
       break; 

       case GST_MESSAGE_ELEMENT: 
       { 
        const GstStructure *structure = gst_message_get_structure (message); 
        if (structure == NULL) return; 

        const gchar *name = gst_structure_get_name (structure); 
        if (name == NULL || strlen(name) == 0) return; 


        if (strcmp (name, "vadfilter") == 0) 
        { 
         //messages are voice , silence, max_voice_reached 
         const GValue *msg_value = NULL; 
         gint64 silence_buffer = 0; 

         gst_structure_get_int64(structure, "silence", &silence_buffer); 
         silence_buffer /= sizeof(gint16); 

         //silence_buffer is the size of buffer which was emitted in chunks before silence was detected 
         //use to check for min voice size 

         if (silence_buffer > 0) 
         { 
          //min_voice_buffer_size set from java server to set a limit of min buffer size which can be sent to the web service 
          if (audio_buffer != NULL && (silence_buffer > (gint64)min_voice_buffer_size)) 
          { 
           audio_buffer_copy = NULL; 
           audio_buffer_copy = gst_buffer_copy_deep(audio_buffer); 

           push_buffer_in_queue(); 
          } 
          return; 
         } 

         gint64 max_voice = 0; 
         gst_structure_get_int64 (structure, "max_voice_reached", &max_voice); 

         if (audio_buffer != NULL && (max_voice > 0)) 
          { 
          audio_buffer_copy = NULL; 
          audio_buffer_copy = gst_buffer_copy_deep(audio_buffer); 
          push_buffer_in_queue(); 
          } 
         } /* vadfilter */ 
        } /* message_element */ 
        break; 

       default: 
        break; 

       } /* switch */ 
      } 
      catch (std::bad_weak_ptr & e) 
      { 
       GST_ERROR("EXCEPTION: bus message"); 
      } 
     } /* bus message */ 


     void VADCustomFilterImpl::push_buffer_in_queue() 
     { 
     try 
     { 
      GstMapInfo map;   
      if (audio_buffer_copy != NULL) 
      { 

       if (gst_buffer_map (audio_buffer_copy, &map, GST_MAP_READ)) 
       { 
        AUDIO_STREAM *pcm = new AUDIO_STREAM; 
        pcm->size = (unsigned int) map.size; 
        pcm->data = new unsigned char[pcm->size + 1]; 
        std::memcpy (pcm->data, (unsigned char *) map.data, pcm->size); 

        gst_buffer_unmap (audio_buffer_copy, &map); 

        //set the flag for audio_buffer to create a new copy 
        audio_buffer_null = TRUE; 
        fetch_response(pcm); 
        if (pcm->data != NULL) 
        {    
         pcm->size = 0; 
         pcm->data = NULL; 
        } 

        if (pcm != NULL) 
        {   
         delete pcm; 
         pcm = NULL; 
        }    
       } 
       else 
        GST_ERROR("push_buffer_in_queue: Invalid buffer"); 


      } 
     } 
     catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR ("EXCEPTION::push_buffer_in_queue "); 
     } 
     } 

     VADCustomFilterImpl::VADCustomFilterImpl (const boost::property_tree::ptree &conf, 
               std::shared_ptr<MediaPipeline> mediaPipeline, 
               int minVoiceBufferSize, 
               int maxVoiceBufferSize, 
               int64_t minSilenceTime, 
               const std::string &url, 
               const std::string &language):FilterImpl 
               (conf, 
               std::dynamic_pointer_cast <MediaObjectImpl> (mediaPipeline)) 
     { 

     try 
     { 
      min_voice_buffer_size = (gsize) minVoiceBufferSize; 
      curl_url = url ; 
      accept_language = (!language.empty()) ? ("Accept-Language: " + language) : ("Accept-Language: en-US"); 

      g_object_set (element, "filter-factory", "vadfilter", NULL); 

      g_object_get(G_OBJECT(element), "filter", &vadfilter, NULL); 

      if (vadfilter == NULL) 
       { 
       throw KurentoException (MEDIA_OBJECT_NOT_AVAILABLE, "Media Object not available"); 
       } 


      g_object_set(G_OBJECT(vadfilter), "minimum-silence-time", (guint64)minSilenceTime, NULL); //2000000000 
      g_object_set(G_OBJECT(vadfilter), "max-voice-buffer-size", (guint)maxVoiceBufferSize, NULL); 

      g_object_unref(vadfilter); 

      g_async_queue_create(); 

      curl_init(); 
     } 
     catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR("EXCEPTION: constructor... "); 
     } 
     } 

     void VADCustomFilterImpl::postConstructor() 
     { 
     try 
     { 
      GstBus *bus; 

      std::shared_ptr <MediaPipelineImpl> pipe; 

      FilterImpl::postConstructor(); 

      pipe = std::dynamic_pointer_cast <MediaPipelineImpl>(getMediaPipeline()); 

      bus = gst_pipeline_get_bus (GST_PIPELINE (pipe->getPipeline())); 

      /* register bus message handler */ 
      bus_handler_id = register_signal_handler (G_OBJECT (bus), 
            "message", 
            std::function < 
            void (GstElement *, 
            GstMessage *) > 
            (std:: 
            bind 
            (&VADCustomFilterImpl:: 
            busMessage, this, 
            std::placeholders::_2)), 
            std::dynamic_pointer_cast < 
            VADCustomFilterImpl > 
            (shared_from_this())); 

      /* register new_buffer handler */ 
      remove_silence_id = register_signal_handler(G_OBJECT(vadfilter), 
            "buffer", 
            std::function < 
            void (GstElement *, 
             GstBuffer *) > 
            (std:: 
             bind 
             (&VADCustomFilterImpl:: 
             newBufferHandler, this, 
             std::placeholders:: 
             _2)), 
            std:: 
            dynamic_pointer_cast < 
            VADCustomFilterImpl > 
            (shared_from_this())); 

      if (bus) 
       g_object_unref (bus); 
     } 
      catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR("EXCEPTION: post constructor... "); 
     } 
     } 

     VADCustomFilterImpl::~VADCustomFilterImpl() 
     { 
     std::shared_ptr <MediaPipelineImpl> pipe; 

     try 
     {   
      if (bus_handler_id > 0) 
      { 
      pipe = std::dynamic_pointer_cast <MediaPipelineImpl> (getMediaPipeline()); 
      GstBus *bus = gst_pipeline_get_bus (GST_PIPELINE (pipe->getPipeline())); 
      if (bus) 
      { 
       unregister_signal_handler (bus, bus_handler_id); 
       g_object_unref (bus); 
      } 

      } 

      /*if (remove_silence_id > 0) 
      { 
       unregister_signal_handler (element, remove_silence_id); 
      } 
      */ 

      /* curl cleanup */ 
      curl_cleanup(); 

      /* buffer queue cleanup */ 
      if (g_async_buffer_queue) 
      { 
       g_async_queue_unref (g_async_buffer_queue); 
       g_async_buffer_queue = NULL; 
      } 


      /* audio_buffer cleanup */ 
      if (audio_buffer_copy != NULL) 
      { 
       audio_buffer_copy = NULL; 
      } 

      if (audio_buffer != NULL) 
      { 
       audio_buffer = NULL; 
       audio_buffer_null = TRUE; 
      } 

     } 
     catch (std::bad_weak_ptr & e) 
     { 
      remove_silence_id = -1; 
      GST_ERROR("EXCEPTION: destructor "); 
     } 
     } 


     size_t VADCustomFilterImpl::curl_callback (char *contents, size_t size, 
         size_t nmemb, void *userp) 
     { 
     size_t realsize = 0; 
     try 
     { 
      realsize = size * nmemb; /* calculate buffer size */ 
      ((std::string *) userp)->append ((char *) contents, realsize); 
     } 
     catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR("EXCEPTION: curl_callback"); 
     } 
     return realsize; 
     } 

     int VADCustomFilterImpl::curl_init() 
     { 
     /* init curl handle */ 
     if ((curl_handle = curl_easy_init()) == NULL) 
      return 1; 


     /* set content type */ 
     headers = curl_slist_append (headers, "Accept: application/xml"); 

     headers = curl_slist_append (headers, "Transfer-Encoding: chunked"); 
     headers = curl_slist_append (headers, accept_language.c_str());  

     /* set curl options */ 
     curl_easy_setopt (curl_handle, CURLOPT_VERBOSE, 0); 
     curl_easy_setopt (curl_handle, CURLOPT_POST, 1); 
     curl_easy_setopt (curl_handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); 
     curl_easy_setopt (curl_handle, CURLOPT_HTTPHEADER, headers); 
     curl_easy_setopt (curl_handle, CURLOPT_TIMEOUT, 10);   
     curl_easy_setopt (curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0"); /* set default user agent */     
     curl_easy_setopt (curl_handle, CURLOPT_URL, curl_url.c_str()); /* set url to fetch */  

     return 0; 
     } 


     int VADCustomFilterImpl::curl_fetch (AUDIO_STREAM * pcm) 
     { 
     curl_op_in_progress = TRUE; 

     if (curl_handle == NULL) 
      return 1; 

     CURLcode rcode; 

     try 
     { 
      response.clear(); 
      curl_easy_setopt (curl_handle, CURLOPT_WRITEFUNCTION, curl_callback); /* set calback function */   
      curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, &response);/* pass fetch struct pointer */ 
      curl_easy_setopt (curl_handle, CURLOPT_POSTFIELDSIZE, pcm->size); 
      curl_easy_setopt (curl_handle, CURLOPT_POSTFIELDS, pcm->data); 

      rcode = curl_easy_perform(curl_handle); 

      handle_curl_response(rcode); 
     } 
     catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR("EXCEPTION: curl_fetch "); 
     } 


     return 1; 
     } 

    void VADCustomFilterImpl::handle_curl_response(CURLcode rcode) 
    { 
     curl_op_in_progress = FALSE; 
     switch (rcode) 
     { 
      case CURLE_OK: 
      { 

       if (!response.empty()) 
       { 
        SendresponseReceivedEvent(response); 
        response.clear(); 
       } 
      } 
       break; 

      case CURLE_OPERATION_TIMEDOUT: 
      case CURLE_HTTP_RETURNED_ERROR: /* HTTP Error, >= 400*/ 
      { 
       GST_ERROR ("ERROR: handle_curl_response: (%s)", curl_easy_strerror (rcode)); 

       goto pop_pcm_from_queue; 
      } 
      break; 

      default: 
       GST_ERROR ("ERROR: Failed to fetch url (%s) - curl said: %s\n", curl_url.c_str(), curl_easy_strerror (rcode)); 
       break; 
     } /* switch */ 



     pop_pcm_from_queue: 
      if (g_async_queue_length (g_async_buffer_queue) > 0) 
       { 
       AUDIO_STREAM *pcm_popped = reinterpret_cast < AUDIO_STREAM * >(g_async_queue_pop (g_async_buffer_queue)); 
       curl_fetch (pcm_popped); 

       } 
      else 
       GST_ERROR ("handle_curl_response: queue is empty, no pcm to pop out"); 
    } 

    void VADCustomFilterImpl::curl_cleanup() 
    { 
     if (curl_handle) 
      curl_easy_cleanup (curl_handle); 

     /* free headers */ 
     if (headers) 
      curl_slist_free_all (headers); 

     curl_url.clear(); 
    } 

    /* fetch the response from web service, if it is already fetching the push the current audio buffer in queue */ 
    void VADCustomFilterImpl::fetch_response(AUDIO_STREAM * pcm) 
    { 
    //check queue size if > 0 then push else hit curl 
    if (!curl_op_in_progress) 
     { 
     curl_fetch (pcm); 
     } 
    else 
     { 
     try 
     {   
      if (g_async_buffer_queue == NULL) 
       g_async_queue_create(); 

      g_async_queue_push (g_async_buffer_queue, pcm); 
     } 
     catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR("EXCEPTION: curl_fetch "); 
     } 

     } 
    } 

     void VADCustomFilterImpl::g_async_queue_create() 
     { 
     g_async_buffer_queue = g_async_queue_new(); 
     } 
     }   /* vadcustomfilter */ 
    }  /* module */ 
}  /* kurento */ 

回答

1

给你提供在这里和这个其他related question的信息。我很确定问题在于你的过滤器阻止了媒体流(可能是因为它发送了http请求)。此外,所有与缓冲区和小对象相关的错误都表明在处理媒体时,您在过滤器中发生了错误。

您应该检查您的代码,并可能咨询专家以了解问题的根源。使用您提供的信息而无法访问源代码可能很难找到。即使使用源代码,也很难确定造成死锁的原因。

+0

嗨,我已经添加了源代码,请检查eidt –