我有一个系统,当找到某种类型的文件时,我下载,编码并将它们上载到一个单独的线程中。多线程从sftp服务器下载相同的文件
while(true) {
for(SftpClient c : clients) {
try {
filenames = c.list("*.wav", "_rdy_");
} catch (SftpException e) {
e.printStackTrace();
}
if(filenames.size() > 0) {
//AudioThread run() method handles the download, encode, and upload
AudioThread at = new AudioThread(filenames);
at.setNode(c.getNode());
Thread t = new Thread(at);
t.start();
}
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
从AudioThread
public void run() {
System.out.println("Running...");
this.buildAsteriskMapping();
this.connectToSFTP();
ac = new AudioConvert();
this.connectToS3();
String downloadDir = "_rough/" + getNode() + "/" + Time.getYYYYMMDDDate() + "/";
String encodeDir = "_completed" + getNode() + "/" + Time.getYYYYMMDDDate() + "/";
String uploadDir = getNode() + "/" + Time.getYYYYMMDDDate() + "/";
System.out.println("Downloading...");
try {
sftp.get(filenames, downloadDir);
} catch (SftpException e) {
//download failed
System.out.println("DL Failed...");
e.printStackTrace();
}
System.out.println("Encoding...");
try {
ac.encodeWavToMP3(filenames, downloadDir, encodeDir);
} catch (IllegalArgumentException | EncoderException e) {
System.out.println("En Failed...");
e.printStackTrace();
}
System.out.println("Uploading...");
try {
s3.upload(filenames, encodeDir, uploadDir);
} catch (AmazonClientException e) {
System.out.println("Up Failed...");
e.printStackTrace();
}
}
下载方法run方法:
public void get(ArrayList<String> src, String dest) throws SftpException {
for(String file : src) {
System.out.println(dest + file);
channel.get(file, dest + file);
}
}
的编码方法:
public void encodeWavToMP3(ArrayList<String> filenames, String downloadDir, String encodeDir) throws IllegalArgumentException, EncoderException {
for(String f : filenames) {
File wav = new File(downloadDir + f);
File mp3 = new File(encodeDir + wav.getName().replace(".wav", ".mp3"));
encoder.encode(wav, mp3, attrs);
}
}
上传方法:
public void upload(ArrayList<String> filenames, String encodeDir, String uploadDir) throws AmazonClientException, AmazonServiceException {
for(String f : filenames) {
s3.putObject(new PutObjectRequest(bucketName, uploadDir, new File(encodeDir + f)));
}
}
问题是我一直在为每个线程下载相同的文件(或大约相同的文件)。我想为包含正在下载的文件的每个客户端添加一个变量,但我不知道如何从该变量中删除列表/文件名。什么是解决方案?我的老板也只想让x线程运行。
是,+1。这个问题接近生产者/消费者原则。声明一个队列(http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html),一个线程添加文件上传到队列,以及由ExecutorService管理的多个线程,请参阅http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool%28int%29消耗队列的文件路径 – Aubin