2

我正在使用Azure队列执行批量导入。 我使用WebJobs在后台执行该过程。 队列非常频繁地出队。如何在2条消息 之间创建延迟?在队列的两个消息读取之间创建延迟?

这是怎么了增加一个消息队列

public async Task<bool> Handle(CreateFileUploadCommand message) 
{ 
    var queueClient = _queueService.GetQueueClient(Constants.Queues.ImportQueue); 

    var brokeredMessage = new BrokeredMessage(JsonConvert.SerializeObject(new ProcessFileUploadMessage 
    { 
     TenantId = message.TenantId, 
     FileExtension = message.FileExtension, 
     FileName = message.Name, 
     DeviceId = message.DeviceId, 
     SessionId = message.SessionId, 
     UserId = message.UserId, 
     OutletId = message.OutletId, 
     CorrelationId = message.CorrelationId, 

    })) 
    { 
     ContentType = "application/json", 
    }; 

    await queueClient.SendAsync(brokeredMessage); 

    return true; 
} 

及以下的WebJobs功能。

public class Functions 
{ 
    private readonly IValueProvider _valueProvider; 
    public Functions(IValueProvider valueProvider) 
    { 
     _valueProvider = valueProvider; 
    } 

    public async Task ProcessQueueMessage([ServiceBusTrigger(Constants.Constants.Queues.ImportQueue)] BrokeredMessage message, 
    TextWriter logger) 
    { 

     var queueMessage = message.GetBody<string>(); 

     using (var client = new HttpClient()) 
     { 
      client.BaseAddress = new Uri(_valueProvider.Get("ServiceBaseUri")); 

      var stringContent = new StringContent(queueMessage, Encoding.UTF8, "application/json"); 

      var result = await client.PostAsync(RestfulUrls.ImportMenu.ProcessUrl, stringContent); 

      if (result.IsSuccessStatusCode) 
      { 
       await message.CompleteAsync(); 
      } 
      else 
      { 
       await message.AbandonAsync(); 
      } 
     } 
    } 
} 
+0

任何更新?如果您觉得我的回答很有用/有帮助,请将其标记为答案,以便其他人可以从中受益。 –

回答

1

据我所知,天青webjobs SDK上的单个实例启用并发处理(默认为16)。

如果您运行webjobs,它将读取16个队列消息(如果函数成功完成或调用Abandon,则peeklock和调用完成消息),并创建16个进程以同时执行触发器函数。所以你觉得队列出队非常频繁。

如果你想在单个实例上禁用并行处理。

我建议你可以设置ServiceBusConfiguration的MessageOptions.MaxConcurrentCalls为1

更多细节,你可以参考下面的代码:

在Program.cs的:

JobHostConfiguration config = new JobHostConfiguration(); 
ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration(); 
serviceBusConfig.MessageOptions.MaxConcurrentCalls = 1; 
config.UseServiceBus(serviceBusConfig); 

JobHost host = new JobHost(config); 
host.RunAndBlock(); 

如果你想为了在两条消息读取之间创建一个延迟,我建议你可以创建一个自定义的ServiceBusConfiguration.MessagingProvider。

它包含CompleteProcessingMessageAsync方法,这种方法完成了指定的消息的处理,作业功能已被调用之后。

我建议你可以在CompleteProcessingMessageAsync增加了Thread.Sleep方法来实现延迟读取。

的更多细节,可以参考下面的代码示例:

CustomMessagingProvider.cs:

说明:我重写CompleteProcessingMessageAsync方法的代码。

public class CustomMessagingProvider : MessagingProvider 
    { 
     private readonly ServiceBusConfiguration _config; 

     public CustomMessagingProvider(ServiceBusConfiguration config) 
      : base(config) 
     { 
      _config = config; 
     } 

     public override NamespaceManager CreateNamespaceManager(string connectionStringName = null) 
     { 
      // you could return your own NamespaceManager here, which would be used 
      // globally 
      return base.CreateNamespaceManager(connectionStringName); 
     } 

     public override MessagingFactory CreateMessagingFactory(string entityPath, string connectionStringName = null) 
     { 
      // you could return a customized (or new) MessagingFactory here per entity 
      return base.CreateMessagingFactory(entityPath, connectionStringName); 
     } 

     public override MessageProcessor CreateMessageProcessor(string entityPath) 
     { 
      // demonstrates how to plug in a custom MessageProcessor 
      // you could use the global MessageOptions, or use different 
      // options per entity 
      return new CustomMessageProcessor(_config.MessageOptions); 
     } 

     private class CustomMessageProcessor : MessageProcessor 
     { 
      public CustomMessageProcessor(OnMessageOptions messageOptions) 
       : base(messageOptions) 
      { 
      } 

      public override Task<bool> BeginProcessingMessageAsync(BrokeredMessage message, CancellationToken cancellationToken) 
      { 
       // intercept messages before the job function is invoked 
       return base.BeginProcessingMessageAsync(message, cancellationToken); 
      } 

      public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken) 
      { 
       if (result.Succeeded) 
       { 
        if (!MessageOptions.AutoComplete) 
        { 
         // AutoComplete is true by default, but if set to false 
         // we need to complete the message 
         cancellationToken.ThrowIfCancellationRequested(); 


         await message.CompleteAsync(); 

         Console.WriteLine("Begin sleep"); 
         //Sleep 5 seconds 
         Thread.Sleep(5000); 
         Console.WriteLine("Sleep 5 seconds"); 

        } 
       } 
       else 
       { 
        cancellationToken.ThrowIfCancellationRequested(); 
        await message.AbandonAsync(); 
       } 
      } 
     } 
    } 

Program.cs的主要方法:

static void Main() 
     { 
      var config = new JobHostConfiguration(); 

      if (config.IsDevelopment) 
      { 
       config.UseDevelopmentSettings(); 
      } 

      var sbConfig = new ServiceBusConfiguration 
      { 
       MessageOptions = new OnMessageOptions 
       { 
        AutoComplete = false, 
        MaxConcurrentCalls = 1 
       } 
      }; 
      sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig); 
      config.UseServiceBus(sbConfig); 
      var host = new JobHost(config); 

      // The following code ensures that the WebJob will be running continuously 
      host.RunAndBlock(); 
     } 

结果:enter image description here

+0

因为这是你应该替换'async'方法'的Thread.Sleep()'和'等待Task.Delay()'。因此,底层的线程没有被完全阻塞,可在平均时间处理其他任务。 – Oliver