2016-11-24 24 views
3

我有一段代码用于监视添加文件的目录。无论何时将新文件添加到目录中,文件的内容都会被选取并在kafka上发布,然后文件被删除。java - 进程无法访问该文件,因为它正在被另一个进程使用

这是有效的,当我提出一个请求,但只要我的代码从jMeter的5或10用户请求,内容成功发布在kafka上,但代码无法删除文件。我得到一个FileSystemException消息,The process cannot access the file because it is being used by another process.

我想有一些并发问题,我无法看到。请帮忙 !

public void monitor() throws IOException, InterruptedException { 
    Path faxFolder = Paths.get(TEMP_FILE_LOCATION); 
    WatchService watchService = FileSystems.getDefault().newWatchService(); 
    faxFolder.register(watchService, StandardWatchEventKinds.ENTRY_CREATE); 
    boolean valid = true; 
    do { 
     WatchKey watchKey = watchService.take(); 
     for (WatchEvent<?> event : watchKey.pollEvents()) { 
      if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { 
       String fileName = event.context().toString(); 
       publishToKafka(new File(TEMP_FILE_LOCATION + fileName).toPath(), "topic"); 
      } 
     } 
     valid = watchKey.reset(); 
    } while (valid); 
} 

private void publishToKafka(Path path, String topic) { 
    try (BufferedReader reader = Files.newBufferedReader(path)) { 
     String input = null; 
     while ((input = reader.readLine()) != null) { 
      kafkaProducer.publishMessageOnTopic(input, topic); 
     } 
    } catch (IOException e) { 
     LOG.error("Could not read buffered file to send message on kafka.", e); 
    } finally { 
     try { 
      Files.deleteIfExists(path); // This is where I get the exception 
     } catch (IOException e) { 
      LOG.error("Problem in deleting the buffered file {}.", path.getFileName(), e); 
     } 
    } 
} 

异常日志:

java.nio.file.FileSystemException: D:\upload\notif-1479974962595.csv: The process cannot access the file because it is being used by another process. 

    at sun.nio.fs.WindowsException.translateToIOException(Unknown Source) 
    at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source) 
    at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source) 
    at sun.nio.fs.WindowsFileSystemProvider.implDelete(Unknown Source) 
    at sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(Unknown Source) 
    at java.nio.file.Files.deleteIfExists(Unknown Source) 
    at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.publishToKafka(MonitorDirectory.java:193) 
    at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.sendData(MonitorDirectory.java:125) 
    at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.run(MonitorDirectory.java:113) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
+1

在尝试读取并删除文件之前,创建文件的过程是否尚未完成创建(尚未关闭)? –

+0

顺便说一下,WatchService是可关闭的,所以把'newWatchService()'放在try-with-resources中。另外,因为你在其他地方使用nio,为了一致性而不是'new File(...).toPath()'做'faxFolder.resolve(filename)'。 –

+3

@Klitos Kyriacou:由于'WatchEvent'的上下文在观看文件系统时已经是'Path',所以即使转换为字符串已经过时,并且'faxFolder.resolve((Path)event.context())'就足够了。我认为,你是对的,立即对创作事件做出反应并不会让创作者有足够的时间来关闭文件。有趣的是,在我的系统上,这有相反的效果:读取文件失败,因为它仍然被写入,但是删除作品,因为系统推迟实际删除的点,创建者关闭文件。 – Holger

回答

0

你必须关闭所有被删除前访问该文件的连接。

+1

我猜'try-with-resources'在执行'finally'代码之前就是这样做的。 –

0

看着你的代码看起来,当一个文件被线程挑选出来再次发布时,另一个线程正在为发布选择它。这就是为什么没有人能够删除它。 它只能是并发问题。您应该根据标准重新设计代码:可以同时运行的步骤和不能运行的步骤。 在整个过程中所以的步骤是:

  1. 拿起文件(主线程应该这样做)
  2. 发布文件(调用其他线程做)
  3. 删除文件(称为线程应该删除)
  4. 检查,如果存在的任何文件(一次主线程可以做到这一点)

而且选择了一个文件的时候,你可以将它读入缓冲区,删除它,然后继续发布。这将确保主线程不会将此文件分配给其他线程。

+1

我在OP的帖子中看不到任何线程的使用。 –

0

我有类似的问题,以及在下面的线程:Multithreading on Queue 当试图上传一个动态创建的文件到队列服务,它需要2天的时间来解决。由于Holger谁给了上面的答案,从而锁定发生可能是由于创建没有完全完成后,读另一个线程,它节省了我很多时间。

我最初的解决方案,发现了很多来自互联网,就是:

WatchEvent<Path> ev = cast(event); 
Path name = ev.context(); 
Path child = dir.resolve(name); 
//queueUploadFile(child); 
if (kind == ENTRY_CREATE) { 
    uploadToQueue(this.queueId, child); 
} 

我把它改为:

WatchEvent<Path> ev = cast(event); 
Path name = ev.context(); 
Path child = dir.resolve(name); 
//queueUploadFile(child); 
if (kind == ENTRY_MODIFY) { 
    uploadToQueue(this.queueId, child); 
} 

所有的一切都完美。为了处理“不知何故”多次ENTRY_MODIFY事件触发(上传重复文件),我在uploadToQueue()方法上传文件后执行删除操作。

我希望基于上述贡献采取的方法也能帮助其他类似问题的人。

相关问题