2012-10-12 78 views
0

我有一个系统,当找到某种类型的文件时,我下载,编码并将它们上载到一个单独的线程中。多线程从sftp服务器下载相同的文件

while(true) { 
    for(SftpClient c : clients) { 
     try { 
      filenames = c.list("*.wav", "_rdy_"); 
     } catch (SftpException e) { 
      e.printStackTrace(); 
     } 
     if(filenames.size() > 0) { 
      //AudioThread run() method handles the download, encode, and upload 
      AudioThread at = new AudioThread(filenames); 
      at.setNode(c.getNode()); 
      Thread t = new Thread(at); 
      t.start(); 
     } 
    } 
    try { 
     Thread.sleep(3000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

从AudioThread

public void run() { 
    System.out.println("Running..."); 
    this.buildAsteriskMapping(); 
    this.connectToSFTP(); 
    ac = new AudioConvert(); 
    this.connectToS3(); 

    String downloadDir = "_rough/" + getNode() + "/" + Time.getYYYYMMDDDate() + "/"; 
    String encodeDir = "_completed" + getNode() + "/" + Time.getYYYYMMDDDate() + "/"; 
    String uploadDir = getNode() + "/" + Time.getYYYYMMDDDate() + "/"; 

    System.out.println("Downloading..."); 
    try { 
     sftp.get(filenames, downloadDir); 
    } catch (SftpException e) { 
     //download failed 
     System.out.println("DL Failed..."); 
     e.printStackTrace(); 
    } 

    System.out.println("Encoding..."); 
    try { 
     ac.encodeWavToMP3(filenames, downloadDir, encodeDir); 
    } catch (IllegalArgumentException | EncoderException e) { 
     System.out.println("En Failed..."); 
     e.printStackTrace(); 
    } 

    System.out.println("Uploading..."); 
    try { 
     s3.upload(filenames, encodeDir, uploadDir); 
    } catch (AmazonClientException e) { 
     System.out.println("Up Failed..."); 
     e.printStackTrace(); 
    } 

} 

下载方法run方法:

public void get(ArrayList<String> src, String dest) throws SftpException { 
    for(String file : src) { 
     System.out.println(dest + file); 
     channel.get(file, dest + file); 
    } 
} 

的编码方法:

public void encodeWavToMP3(ArrayList<String> filenames, String downloadDir, String encodeDir) throws IllegalArgumentException, EncoderException { 
    for(String f : filenames) { 
     File wav = new File(downloadDir + f); 
     File mp3 = new File(encodeDir + wav.getName().replace(".wav", ".mp3")); 
     encoder.encode(wav, mp3, attrs); 
    } 
} 

上传方法:

public void upload(ArrayList<String> filenames, String encodeDir, String uploadDir) throws AmazonClientException, AmazonServiceException { 
    for(String f : filenames) { 
     s3.putObject(new PutObjectRequest(bucketName, uploadDir, new File(encodeDir + f))); 
    } 
} 

问题是我一直在为每个线程下载相同的文件(或大约相同的文件)。我想为包含正在下载的文件的每个客户端添加一个变量,但我不知道如何从该变量中删除列表/文件名。什么是解决方案?我的老板也只想让x线程运行。

回答

4

这是一种很难看到的问题,因为实际执行下载缺少代码:P

不过,我会用某种ExecutorService代替。

基本上,我会将每个下载请求添加到服务中(包装在“DownloadTask”中,参考要下载的文件以及可能需要获取文件的任何其他相关信息),并让服务保持小心剩下的。

可以对下载任务进行编码,以考虑您认为合适的现有文件。

根据您的要求,这可能是单线程或多线程服务。它也可以让你把上传任务也放在里面。

退房的Executors线索更多信息

总的想法是用一种生产者/消费者模式。你将有(至少)一个线程来查找所有要下载的文件,并为每个文件添加它到执行程序服务。文件下载后,我会排队并将请求上传到同一服务中。

这样,你避免所有的混乱与同步和线程管理:d

你可以用同样的想法与扫描任务,为每一个客户,你可以一个任务到一个单独的服务

+1

是,+1。这个问题接近生产者/消费者原则。声明一个队列(http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html),一个线程添加文件上传到队列,以及由ExecutorService管理的多个线程,请参阅http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool%28int%29消耗队列的文件路径 – Aubin

1

代码中存在一个问题,您在while循环中实例化AudioThread。

请注意,创建线程并执行t.start()后,所有下载,编码和上传都将异步发生。因此,在启动线程后,循环会继续执行另一个对c.list(...)的调用,而您创建的第一个线程仍在处理第一组文件。由于您在调用中指定了文件模式,并且没有代码标记哪些文件当前正在处理,因此很可能在后续的c.list()调用中返回相同的一组文件。

我的建议:

  • 使用Executors.newFixedThreadPool(INT来确定nthreads)在以前的文章中提到。并将线程数指定为机器中处理器的数量。在你的while循环之前执行此操作。
  • 对于你从FTP s.list()检索到的每个文件名,创建一个Callable类并调用ExecutorService.invokeAll(收藏<可赎回<牛逼> >任务)。您将创建的Callable中的代码是您的AudioThread代码。修改AudioThread代码,以便只处理一个文件(如果可能的话),这样您就可以为每个文件并行地进行下载,上传和编码。
  • 添加标记哪些文件已被处理的代码。我会建议添加一个代码,将您已处理的文件重命名为不同的名称,以避免在下一个c.list()调用中返回。
  • 在您的while循环块后呼叫ExecutorService.shutdown(...)