3

我开始使用pubsub模拟器来测试我的基本实现,并在尝试创建新主题时遇到了问题。在pubsub模拟器上创建主题

我的模拟器监听在localhost:8085,如果我通过API创建主题

PUT http://localhost:8085/v1/projects/testproject/topics/test 

一切工作正常和话题被创建。 但是,如果我运行下面的代码片段没有按预期工作,没有话题被创建:

TopicName topicName = TopicName.create("testproject", "test"); 
    ChannelProvider channelProvider = 
      TopicAdminSettings.defaultChannelProviderBuilder() 
       .setEndpoint("localhost:8085") 
       .setCredentialsProvider(
         FixedCredentialsProvider.create(NoCredentials.getInstance())) 
       .build(); 
    TopicAdminClient topicClient = TopicAdminClient.create(
      TopicAdminSettings.defaultBuilder().setChannelProvider(channelProvider).build()); 
     topicClient.createTopic(topicName); 

而这个运行模拟器日志

[pubsub] Apr 27, 2017 1:10:47 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete 
[pubsub] INFORMATION: Adding handler(s) to newly registered Channel. 
[pubsub] Apr 27, 2017 1:10:47 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead 
[pubsub] INFORMATION: Detected non-HTTP/2 connection. 
[pubsub] Apr 27, 2017 1:10:47 PM io.gapi.emulators.netty.NotFoundHandler handleRequest 
[pubsub] INFORMATION: Unknown request URI: /bad-request 
[pubsub] Apr 27, 2017 1:10:47 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete 
[pubsub] INFORMATION: Adding handler(s) to newly registered Channel. 
[pubsub] Apr 27, 2017 1:10:47 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead 
[pubsub] INFORMATION: Detected non-HTTP/2 connection. 
[pubsub] Apr 27, 2017 1:10:47 PM io.gapi.emulators.netty.NotFoundHandler handleRequest 
[pubsub] INFORMATION: Unknown request URI: /bad-request 

...  

[pubsub] Apr 27, 2017 1:10:49 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete 
[pubsub] INFORMATION: Adding handler(s) to newly registered Channel. 
[pubsub] Apr 27, 2017 1:10:49 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead 
[pubsub] INFORMATION: Detected non-HTTP/2 connection. 

我失去了我的ChannelProvider的东西吗?或者我没有正确配置我的TopicAdminClient?我没有看到什么错误,因为我用 this as reference

也许有人可以帮我解决这个问题。

回答

2

用于与模拟器通信的通道需要将negotiationType属性设置为NegotiationType.PLAINTEXT。这意味着您需要创建一个自定义ChannelProvider。像下面这样的应该工作:

public class PlainTextChannelProvider implements ChannelProvider { 
    @Override 
    public boolean shouldAutoClose() { 
    return false; 
    } 

    @Override 
    public boolean needsExecutor() { 
    return false; 
    } 

    @Override 
    public ManagedChannel getChannel() throws IOException { 
    return NettyChannelBuilder.forAddress("localhost", 8085) 
     .negotiationType(NegotiationType.PLAINTEXT) 
     .build(); 
    } 

    @Override 
    public ManagedChannel getChannel(Executor executor) throws IOException { 
    return getChannel(); 
    } 
} 
+0

谢谢你,创建主题和发布消息工作。我是否必须为Subscriber.Listener实现类似的东西?我为Subscriber设置了相同的channelProvider,但是在调用subscriber.stopAsync()时,它总是抛出一个java.util.concurrent.RejectedExecutionException,并且(它似乎是随机的)不会拉取消息,我在这里丢失了什么? –

+0

您也需要为订阅服务器使用相同类型的ChannelProvider。很难说是什么导致了RejectedExecutionException。我认为这不是特定于订阅者或模拟器的问题,但可能与您的代码有关的订阅,您使用的任何执行程序或应用程序中对象的生存期有关。 –

+0

我打开另一个问题,所以我可以给更详细的描述[链接](http://stackoverflow.com/questions/43786716/subscriber-stopasync-results-in-rejectedexecutionexception) –