2012-02-29 40 views
29

我想在Linux平台中探索协议缓冲区(PB),我的编码语言是C++。我在协议缓冲区在线文档中找到了一些示例,但没有具体给套接字发送和接收(或者我完全错过了:))。所以我决定在实际消息之前添加消息长度并通过套接字发送。如果有人能够提出比我打算做的更好的解决方案,我会很感激,而且还有什么东西可以在PB中制作出来,用于创建这样的数据包。C++中的套接字协议缓冲区

但我仍然在服务器端出现问题,我必须解码数据包。假设客户端发送一个10字节的数据包,其中前4个字节是数据包的长度;但在解码数据包之前不可能知道长度。所以,即使我读了前4个字节,我如何使用协议缓冲区推断出半读数据包的值。

回答

25

最后我可以得到它的工作。我在这里发布代码,以便人们可以查看和评论它,以及如果有人想用C++实现它,这段代码可以提供帮助。这是一个破旧的代码,我的意图是让Protobuf以前缀方式工作。我从一些我不记得的网站获取客户端服务器的代码,并且修改了它以适应protobuf。在这里,服务器首先窥探套接字并获取总数据包的长度,然后实际读取套接字以读取整个数据包。可以有万亿种方法来做到这一点,但为了快速解决问题,我以这种方式做到了。但我需要找到一个更好的方法来避免每个数据包2 recv,但在我的情况下,所有的消息都是不同的大小,所以这是我猜测的唯一方法。

原型文件

message log_packet { 
    required fixed64 log_time =1; 
    required fixed32 log_micro_sec =2; 
    required fixed32 sequence_no =3; 
    required fixed32 shm_app_id =4; 
    required string packet_id =5; 
    required string log_level=6; 
    required string log_msg=7; 
    } 

协议缓冲客户端代码

#include <unistd.h> 
#include "message.pb.h" 
#include <iostream> 
#include <google/protobuf/message.h> 
#include <google/protobuf/descriptor.h> 
#include <google/protobuf/io/zero_copy_stream_impl.h> 
#include <google/protobuf/io/coded_stream.h> 
#include <google/protobuf/io/zero_copy_stream_impl_lite.h> 


using namespace google::protobuf::io; 

using namespace std; 
int main(int argv, char** argc){ 

/* Coded output stram */ 

log_packet payload ; 

payload.set_log_time(10); 
payload.set_log_micro_sec(10); 
payload.set_sequence_no(1); 
payload.set_shm_app_id(101); 
payload.set_packet_id("TST"); 
payload.set_log_level("DEBUG"); 
payload.set_log_msg("What shall we say then"); 

cout<<"size after serilizing is "<<payload.ByteSize()<<endl; 
int siz = payload.ByteSize()+4; 
char *pkt = new char [siz]; 
google::protobuf::io::ArrayOutputStream aos(pkt,siz); 
CodedOutputStream *coded_output = new CodedOutputStream(&aos); 
coded_output->WriteVarint32(payload.ByteSize()); 
payload.SerializeToCodedStream(coded_output); 

     int host_port= 1101; 
     char* host_name="127.0.0.1"; 

     struct sockaddr_in my_addr; 

     char buffer[1024]; 
     int bytecount; 
     int buffer_len=0; 

     int hsock; 
     int * p_int; 
     int err; 

     hsock = socket(AF_INET, SOCK_STREAM, 0); 
     if(hsock == -1){ 
       printf("Error initializing socket %d\n",errno); 
       goto FINISH; 
     } 

     p_int = (int*)malloc(sizeof(int)); 
     *p_int = 1; 

     if((setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)|| 
       (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)){ 
       printf("Error setting options %d\n",errno); 
       free(p_int); 
       goto FINISH; 
     } 
     free(p_int); 

     my_addr.sin_family = AF_INET ; 
     my_addr.sin_port = htons(host_port); 

     memset(&(my_addr.sin_zero), 0, 8); 
     my_addr.sin_addr.s_addr = inet_addr(host_name); 
     if(connect(hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1){ 
       if((err = errno) != EINPROGRESS){ 
         fprintf(stderr, "Error connecting socket %d\n", errno); 
         goto FINISH; 
       } 
     } 




     for (int i =0;i<10000;i++){ 
      for (int j = 0 ;j<10;j++) { 

       if((bytecount=send(hsock, (void *) pkt,siz,0))== -1) { 
         fprintf(stderr, "Error sending data %d\n", errno); 
         goto FINISH; 
       } 
       printf("Sent bytes %d\n", bytecount); 
       usleep(1); 
     } 
     } 
     delete pkt; 

FINISH: 
     close(hsock); 

} 

协议缓冲服务器代码

#include <fcntl.h> 
#include <string.h> 
#include <stdlib.h> 
#include <errno.h> 
#include <stdio.h> 
#include <netinet/in.h> 
#include <resolv.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <unistd.h> 
#include <pthread.h> 
#include "message.pb.h" 
#include <iostream> 
#include <google/protobuf/io/coded_stream.h> 
#include <google/protobuf/io/zero_copy_stream_impl.h> 

using namespace std; 
using namespace google::protobuf::io; 



void* SocketHandler(void*); 

int main(int argv, char** argc){ 

     int host_port= 1101; 

     struct sockaddr_in my_addr; 

     int hsock; 
     int * p_int ; 
     int err; 

     socklen_t addr_size = 0; 
     int* csock; 
     sockaddr_in sadr; 
     pthread_t thread_id=0; 

     hsock = socket(AF_INET, SOCK_STREAM, 0); 
     if(hsock == -1){ 
       printf("Error initializing socket %d\n", errno); 
       goto FINISH; 
     } 

     p_int = (int*)malloc(sizeof(int)); 
     *p_int = 1; 

     if((setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)|| 
       (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)){ 
       printf("Error setting options %d\n", errno); 
       free(p_int); 
       goto FINISH; 
     } 
     free(p_int); 

     my_addr.sin_family = AF_INET ; 
     my_addr.sin_port = htons(host_port); 

     memset(&(my_addr.sin_zero), 0, 8); 
     my_addr.sin_addr.s_addr = INADDR_ANY ; 

     if(bind(hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1){ 
       fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno); 
       goto FINISH; 
     } 
     if(listen(hsock, 10) == -1){ 
       fprintf(stderr, "Error listening %d\n",errno); 
       goto FINISH; 
     } 

     //Now lets do the server stuff 

     addr_size = sizeof(sockaddr_in); 

     while(true){ 
       printf("waiting for a connection\n"); 
       csock = (int*)malloc(sizeof(int)); 
       if((*csock = accept(hsock, (sockaddr*)&sadr, &addr_size))!= -1){ 
         printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr)); 
         pthread_create(&thread_id,0,&SocketHandler, (void*)csock); 
         pthread_detach(thread_id); 
       } 
       else{ 
         fprintf(stderr, "Error accepting %d\n", errno); 
       } 
     } 

FINISH: 
;//oops 
} 

google::protobuf::uint32 readHdr(char *buf) 
{ 
    google::protobuf::uint32 size; 
    google::protobuf::io::ArrayInputStream ais(buf,4); 
    CodedInputStream coded_input(&ais); 
    coded_input.ReadVarint32(&size);//Decode the HDR and get the size 
    cout<<"size of payload is "<<size<<endl; 
    return size; 
} 

void readBody(int csock,google::protobuf::uint32 siz) 
{ 
    int bytecount; 
    log_packet payload; 
    char buffer [siz+4];//size of the payload and hdr 
    //Read the entire buffer including the hdr 
    if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){ 
       fprintf(stderr, "Error receiving data %d\n", errno); 
     } 
    cout<<"Second read byte count is "<<bytecount<<endl; 
    //Assign ArrayInputStream with enough memory 
    google::protobuf::io::ArrayInputStream ais(buffer,siz+4); 
    CodedInputStream coded_input(&ais); 
    //Read an unsigned integer with Varint encoding, truncating to 32 bits. 
    coded_input.ReadVarint32(&siz); 
    //After the message's length is read, PushLimit() is used to prevent the CodedInputStream 
    //from reading beyond that length.Limits are used when parsing length-delimited 
    //embedded messages 
    google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz); 
    //De-Serialize 
    payload.ParseFromCodedStream(&coded_input); 
    //Once the embedded message has been parsed, PopLimit() is called to undo the limit 
    coded_input.PopLimit(msgLimit); 
    //Print the message 
    cout<<"Message is "<<payload.DebugString(); 

} 

void* SocketHandler(void* lp){ 
    int *csock = (int*)lp; 

     char buffer[4]; 
     int bytecount=0; 
     string output,pl; 
     log_packet logp; 

     memset(buffer, '\0', 4); 

     while (1) { 
     //Peek into the socket and get the packet size 
     if((bytecount = recv(*csock, 
         buffer, 
           4, MSG_PEEK))== -1){ 
       fprintf(stderr, "Error receiving data %d\n", errno); 
     }else if (bytecount == 0) 
       break; 
     cout<<"First read byte count is "<<bytecount<<endl; 
     readBody(*csock,readHdr(buffer)); 
     } 

FINISH: 
     free(csock); 
    return 0; 
} 
+0

非常感谢好友。 – 2013-11-14 10:49:21

+0

对于问题+1和对答案+1,这是有帮助的。 – 2013-11-14 11:08:23

+1

谢谢!这对我遇到的问题非常有帮助 – 2013-11-18 20:20:37

10

不幸的是,protobuf的不适用于“包装”(定界)你的消息提供了一种方式:

如果你想多封邮件写到一个文件或流,它 是你保持跟踪一条消息结束的位置,并且下一个 开始。协议缓冲区连线格式不是自行分隔的,所以 协议缓冲区分析程序无法确定消息在其所属的 上的结束位置。解决此问题的最简单方法是在写入消息之前写入每条消息的大小 。

(从他们documentation

所以,他们基本上建议您得出了相同的解决方案。

+0

谢谢塔马斯,所以我会用我的方法去 – punith 2012-02-29 09:40:04