2017-06-08 70 views
0

我有一个多读写器线程模型的写入器线程。 ThreadMultipleDateReceiver类旨在从多个线程读取。使用PipedOutputStream和PipedInputStream的单线程读取器的多线程写入器

public class ThreadMultipleDateReceiver extends Thread { 

    private static final int MAX_CLIENT_THREADS = 4; 
    private byte[] incomingBytes; 
    private volatile boolean isRunning; 
    private volatile List<ThreadStreamDateWriter> lThrdDate; 

    private static PipedInputStream pipedInputStream; 

    public ThreadMultipleDateReceiver() { 
    lThrdDate = Collections.synchronizedList(new ArrayList<>(MAX_CLIENT_THREADS)); 
    pipedInputStream = new PipedInputStream(); 
    System.out.println("ThreadMultipleDateReceiver Created"); 
    } 

    @Override public void run() { 
    isRunning = true; 
    while (isRunning) { 
     if (!lThrdDate.isEmpty()) { 
     System.out.println("ThreadMultipleDateReceiver has:" + lThrdDate.size()); 
     for (int i = lThrdDate.size(); i > 0; i--) { 
      if (lThrdDate.get(i - 1).getState() == Thread.State.TERMINATED) { 
      lThrdDate.remove(i - 1); 
      } else { 
      System.out.println("I ThreadMultipleDateReceiver have:" + lThrdDate.get(i - 1).getNameDateWriter()); 
      } 
     } 
     incomingBytes = new byte[1024]; 
     try { 
      String str = ""; 
      int iRd; 
      System.out.println("ThreadMultipleDateReceiver waiting:" + str); 
      while ((iRd = pipedInputStream.read(incomingBytes)) != -1) { 
      if (iRd > 0) { 
       str += new String(incomingBytes); 
      } 
      } 
      System.out.println("ThreadMultipleDateReceiver Received:\n\t:" + str); 
     } catch (IOException e) { } 
     } else { 
     System.out.println("ThreadMultipleDateReceiver Empty"); 
     } 
    } 
    emptyDateWriters(); 
    } 

    public void addDateWriter(ThreadStreamDateWriter threadDateWriter) { 
    if (lThrdDate.size() < MAX_CLIENT_THREADS) { 
     lThrdDate.add(threadDateWriter); 
    } 
    } 

    private void emptyDateWriters() { 
    if (!lThrdDate.isEmpty()) { 
     for (int i = lThrdDate.size(); i > 0; i--) { 
     ThreadStreamDateWriter threadDateWriter = lThrdDate.get(i - 1); 
     threadDateWriter.stopThread(); 
     lThrdDate.remove(i - 1); 
     } 
    } 
    } 

    public PipedInputStream getPipedInputStream() { 
    return pipedInputStream; 
    } 

    public void stopThread() { 
    isRunning = false; 
    } 

} 

和单写线程

public class ThreadStreamDateWriter extends Thread { 
    String Self; 
    private byte[] outgoingBytes; 
    private volatile boolean isRunning; 
    private static PipedOutputStream pipedOutputStream; 

    ThreadStreamDateWriter(String name, PipedInputStream snk) { 
    Self = name; 
    pipedOutputStream = new PipedOutputStream(); 
    try { 
     pipedOutputStream.connect(snk); 
    } catch (IOException e) { } 
    } 

    @Override public void run() { 
    isRunning = true; 
    while (isRunning) { 
     try { 
     outgoingBytes = getInfo().getBytes(); 
     System.out.println("ThreadStreamDateWriter -> write to pipedOutputStream:" + new String(outgoingBytes)); 
     pipedOutputStream.write(outgoingBytes); 
     System.out.println("ThreadStreamDateWriter -> wrote:" + new String(outgoingBytes)); 
     try { Thread.sleep(4000); } catch (InterruptedException ex) { } 
     } catch (IOException | NegativeArraySizeException | IndexOutOfBoundsException e) { 
     isRunning = false; 
     } 
    } 
    } 

    String getInfo() { 
     String sDtTm = new SimpleDateFormat("yyyyMMdd-hhmmss").format(Calendar.getInstance().getTime()); 
     return Self + " -> " + sDtTm; 
    } 

    public void stopThread() { 
    isRunning = false; 
    } 

    public String getNameDateWriter() { 
    return Self; 
    } 
} 

如何启动(我使用NetBeans)?

ThreadMultipleDateReceiver thrdMDateReceiver = null; 
ThreadStreamDateWriter thrdSDateWriter0 = null; 
ThreadStreamDateWriter thrdSDateWriter1 = null; 
    private void jtbDateExchangerActionPerformed(java.awt.event.ActionEvent evt) { 
    if (jtbDateExchanger.isSelected()) { 
     if (thrdMDateReceiver == null) { 
     thrdMDateReceiver = new ThreadMultipleDateReceiver(); 
     thrdMDateReceiver.start(); 
     } 
     if (thrdSDateWriter0 == null) { 
     thrdSDateWriter0 = new ThreadStreamDateWriter("-0-", thrdMDateReceiver.getPipedInputStream()); 
     thrdSDateWriter0.start(); 
     thrdMDateReceiver.addDateWriter(thrdSDateWriter0); 
     } 
     if (thrdSDateWriter1 == null) { 
     thrdSDateWriter1 = new ThreadStreamDateWriter("-1-", thrdMDateReceiver.getPipedInputStream()); 
     thrdSDateWriter1.start(); 
     thrdMDateReceiver.addDateWriter(thrdSDateWriter1); 
     } 
    } else { 
     if (thrdMDateReceiver != null) { 
     thrdMDateReceiver.stopThread(); 
     } 
    } 
    }             

OUTPUT

run: 
ThreadMultipleDateReceiver Created 
ThreadMultipleDateReceiver Empty 
ThreadMultipleDateReceiver Empty 
ThreadMultipleDateReceiver Empty 
..... 
ThreadMultipleDateReceiver Empty 
ThreadMultipleDateReceiver Empty 
ThreadMultipleDateReceiver Empty 
ThreadMultipleDateReceiver has:1 
I ThreadMultipleDateReceiver have:-0- 
ThreadMultipleDateReceiver waiting: 
ThreadStreamDateWriter -> write to pipedOutputStream:-0- -> 20170608-090003 
ThreadStreamDateWriter -> write to pipedOutputStream:-1- -> 20170608-090003 
BUILD SUCCESSFUL (total time: 1 minute 3 seconds) 

的ThreadMultipleDateReceiver被阻止,并且不打印:

ThreadMultipleDateReceiver Received: 
    -1- -> 20170608-090003 

ThreadMultipleDateReceiver Received: 
    -0- -> 20170608-090003 

如何解决?

回答

1

测试此代码...

public class ThreadMultipleDateReceiver extends Thread { 

    private static final int MAX_CLIENT_THREADS = 4; 
    private byte[] incomingBytes; 
    private volatile boolean isRunning; 
    private volatile List<ThreadStreamDateWriter> lThrdDate; 

    private PipedInputStream pipedInputStream; 
    private PipedOutputStream pipedOutputStream; 

    public ThreadMultipleDateReceiver() { 
     lThrdDate = Collections.synchronizedList(new ArrayList<>(MAX_CLIENT_THREADS)); 
     pipedInputStream = new PipedInputStream(); 
     pipedOutputStream = new PipedOutputStream(); 
     pipedInputStream.connect(pipedOutputStream); 
     System.out.println("ThreadMultipleDateReceiver Created"); 
    } 

    @Override public void run() { 
     isRunning = true; 
     while (isRunning) { 
      if (!lThrdDate.isEmpty()) { 
       System.out.println("ThreadMultipleDateReceiver has:" + lThrdDate.size()); 
       for (int i = lThrdDate.size(); i > 0; i--) { 
        if (lThrdDate.get(i - 1).getState() == Thread.State.TERMINATED) { 
         lThrdDate.remove(i - 1); 
        } else { 
         System.out.println("ThreadMultipleDateReceiver have:" + lThrdDate.get(i - 1).getNameDateWriter()); 
        } 
       } 
       incomingBytes = new byte[1024]; 
       try { 
        String str = ""; 
        int iRd; 
        System.out.println("ThreadMultipleDateReceiver waiting:" + str); 
        while ((iRd = pipedInputStream.read(incomingBytes)) != -1) { 
         String r = new String(Arrays.copyOf(incomingBytes, iRd)); 
//      if (iRd > 0) { 
//       str += r; 
//      } 
         System.out.println("ThreadMultipleDateReceiver Received:\t" + r); 
        } 
//     System.out.println("ThreadMultipleDateReceiver Received:\n\t:" + str); 
       } catch (IOException e) { } 
      } else { 
       System.out.println("ThreadMultipleDateReceiver Empty"); 
      } 
     } 
     emptyDateWriters(); 
    } 

public void addDateWriter(ThreadStreamDateWriter threadDateWriter) { 
    if (lThrdDate.size() < MAX_CLIENT_THREADS) { 
     lThrdDate.add(threadDateWriter); 
    } 
} 

    private void emptyDateWriters() { 
     if (!lThrdDate.isEmpty()) { 
      for (int i = lThrdDate.size(); i > 0; i--) { 
       ThreadStreamDateWriter threadDateWriter = lThrdDate.get(i - 1); 
       threadDateWriter.stopThread(); 
       lThrdDate.remove(i - 1); 
      } 
     } 
    } 

    public PipedOutputStream getPipedOutputStream() { 
     return pipedOutputStream; 
    } 

    public void stopThread() { 
     isRunning = false; 
    } 

} 

ThreadStreamDateWriter

public class ThreadStreamDateWriter extends Thread { 
    String Self; 
    private byte[] outgoingBytes; 
    private volatile boolean isRunning; 
    private final PipedOutputStream pipedOutputStream; 


    ThreadStreamDateWriter(String name, PipedOutputStream src) { 
     Self = name; 
     pipedOutputStream = src; 
    } 

    @Override public void run() { 
     isRunning = true; 
     while (isRunning) { 
      try { 
       outgoingBytes = getInfo().getBytes(); 
       System.out.println("ThreadStreamDateWriter -> write to pipedOutputStream:" + new String(outgoingBytes)); 
       pipedOutputStream.write(outgoingBytes); 
       System.out.println("ThreadStreamDateWriter -> wrote:" + new String(outgoingBytes)); 
       try { Thread.sleep(4000); } catch (InterruptedException ex) { } 
      } catch (IOException | NegativeArraySizeException | IndexOutOfBoundsException e) { 
       isRunning = false; 
      } 
     } 
    } 

    String getInfo() { 
     String sDtTm = new SimpleDateFormat("yyyyMMdd-hhmmss").format(Calendar.getInstance().getTime()); 
     return Self + " -> " + sDtTm; 
    } 

    public void stopThread() { 
     isRunning = false; 
    } 

    public String getNameDateWriter() { 
     return Self; 
    } 
} 

使用...

ThreadMultipleDateReceiver thrdMDateReceiver = null; 
    ThreadStreamDateWriter thrdSDateWriter0 = null; 
    ThreadStreamDateWriter thrdSDateWriter1 = null; 
    private void jtbDateExchangerActionPerformed(java.awt.event.ActionEvent evt) { 
    if (jtbDateExchanger.isSelected()) { 
     if (thrdMDateReceiver == null) { 
     thrdMDateReceiver = new ThreadMultipleDateReceiver(); 
     thrdMDateReceiver.start(); 
     } 
     if (thrdSDateWriter0 == null) { 
     thrdSDateWriter0 = new ThreadStreamDateWriter("-0-", thrdMDateReceiver.getPipedOutputStream()); 
     thrdSDateWriter0.start(); 
     thrdMDateReceiver.addDateWriter(thrdSDateWriter0); 
     } 
     if (thrdSDateWriter1 == null) { 
     thrdSDateWriter1 = new ThreadStreamDateWriter("-1-", thrdMDateReceiver.getPipedOutputStream()); 
     thrdSDateWriter1.start(); 
     thrdMDateReceiver.addDateWriter(thrdSDateWriter1); 
     } 
    } else { 
     if (thrdMDateReceiver != null) { 
     thrdMDateReceiver.stopThread(); 
     } 
    } 
    } 
0

看起来您的管道输出流是静态的,所以每次构建一个ThreadStreamDateWriter时,您都会踩入管道输出流的旧值。

尝试将其作为实例变量并将其传递给构造函数。所以你只有其中一个。

编辑1:我做了管道实例变量并添加了一些打印输出。似乎现在运行更长(见下文):

编辑2:you second pipedOutputStream.connect(snk);正在投掷。你一次只能连接一件东西。

import java.io.IOException; 
import java.io.PipedInputStream; 
import java.io.PipedOutputStream; 
import java.text.SimpleDateFormat; 
import java.util.ArrayList; 
import java.util.Calendar; 
import java.util.Collections; 
import java.util.List; 
public class So44438086 { 
    public static class ThreadMultipleDateReceiver extends Thread { 
     private static final int MAX_CLIENT_THREADS=4; 
     private byte[] incomingBytes; 
     private volatile boolean isRunning; 
     private volatile List<ThreadStreamDateWriter> lThrdDate; 
     private /*static*/ PipedInputStream pipedInputStream; 
     public ThreadMultipleDateReceiver() { 
      lThrdDate=Collections.synchronizedList(new ArrayList<>(MAX_CLIENT_THREADS)); 
      pipedInputStream=new PipedInputStream(); 
      System.out.println("ctor setting pipedInputStream to: "+pipedInputStream); 
      System.out.println("ThreadMultipleDateReceiver Created"); 
     } 
     @Override public void run() { 
      isRunning=true; 
      while(isRunning) { 
       if(!lThrdDate.isEmpty()) { 
        System.out.println("ThreadMultipleDateReceiver has:"+lThrdDate.size()); 
        for(int i=lThrdDate.size();i>0;i--) { 
         if(lThrdDate.get(i-1).getState()==Thread.State.TERMINATED) { 
          lThrdDate.remove(i-1); 
         } else { 
          System.out.println("I ThreadMultipleDateReceiver have:"+lThrdDate.get(i-1).getNameDateWriter()); 
         } 
        } 
        incomingBytes=new byte[1024]; 
        try { 
         String str=""; 
         int iRd; 
         System.out.println("ThreadMultipleDateReceiver waiting:"+str); 
         System.out.println("reading: "+pipedInputStream); 
         while((iRd=pipedInputStream.read(incomingBytes))!=-1) { 
          if(iRd>0) { 
           str+=new String(incomingBytes); 
          } 
         } 
         System.out.println("ThreadMultipleDateReceiver Received:\n\t:"+str); 
        } catch(IOException e) {} 
       } else { 
        System.out.println("ThreadMultipleDateReceiver Empty"); 
       } 
      } 
      emptyDateWriters(); 
     } 
     public void addDateWriter(ThreadStreamDateWriter threadDateWriter) { 
      if(lThrdDate.size()<MAX_CLIENT_THREADS) { 
       lThrdDate.add(threadDateWriter); 
      } 
     } 
     private void emptyDateWriters() { 
      if(!lThrdDate.isEmpty()) { 
       for(int i=lThrdDate.size();i>0;i--) { 
        ThreadStreamDateWriter threadDateWriter=lThrdDate.get(i-1); 
        threadDateWriter.stopThread(); 
        lThrdDate.remove(i-1); 
       } 
      } 
     } 
     public PipedInputStream getPipedInputStream() { 
      return pipedInputStream; 
     } 
     public void stopThread() { 
      isRunning=false; 
     } 
    } 
    public static class ThreadStreamDateWriter extends Thread { 
     String Self; 
     private byte[] outgoingBytes; 
     private volatile boolean isRunning; 
     private /*static*/ PipedOutputStream pipedOutputStream; 
     ThreadStreamDateWriter(String name,PipedInputStream snk) { 
      Self=name; 
      pipedOutputStream=new PipedOutputStream(); 
      System.out.println("ctor setting pipedOutputStream to: "+pipedOutputStream); 
      try { 
       pipedOutputStream.connect(snk); 
       System.out.println(pipedOutputStream+" connectd to: "+snk); 
      } catch(IOException e) {} 
     } 
     @Override public void run() { 
      isRunning=true; 
      while(isRunning) { 
       try { 
        outgoingBytes=getInfo().getBytes(); 
        System.out.println("ThreadStreamDateWriter -> write to pipedOutputStream:"+new String(outgoingBytes)); 
        System.out.println("writing to: "+pipedOutputStream); 
        pipedOutputStream.write(outgoingBytes); 
        System.out.println("ThreadStreamDateWriter -> wrote:"+new String(outgoingBytes)); 
        try { 
         Thread.sleep(4000); 
        } catch(InterruptedException ex) {} 
       } catch(IOException|NegativeArraySizeException|IndexOutOfBoundsException e) { 
        isRunning=false; 
       } 
      } 
     } 
     String getInfo() { 
      String sDtTm=new SimpleDateFormat("yyyyMMdd-hhmmss").format(Calendar.getInstance().getTime()); 
      return Self+" -> "+sDtTm; 
     } 
     public void stopThread() { 
      isRunning=false; 
     } 
     public String getNameDateWriter() { 
      return Self; 
     } 
    } 
    private void foo() { 
     if(thrdMDateReceiver==null) { 
      thrdMDateReceiver=new ThreadMultipleDateReceiver(); 
      thrdMDateReceiver.start(); 
     } 
     if(thrdSDateWriter0==null) { 
      thrdSDateWriter0=new ThreadStreamDateWriter("-0-",thrdMDateReceiver.getPipedInputStream()); 
      thrdSDateWriter0.start(); 
      thrdMDateReceiver.addDateWriter(thrdSDateWriter0); 
     } 
     if(thrdSDateWriter1==null) { 
      thrdSDateWriter1=new ThreadStreamDateWriter("-1-",thrdMDateReceiver.getPipedInputStream()); 
      thrdSDateWriter1.start(); 
      thrdMDateReceiver.addDateWriter(thrdSDateWriter1); 
     } 
    } 
    void run() throws InterruptedException { 
     System.out.println(("running")); 
     foo(); 
     System.out.println(("sleeping")); 
     Thread.sleep(10000); 
     System.out.println(("stopping")); 
     if(thrdMDateReceiver!=null) { 
      thrdMDateReceiver.stopThread(); 
     } 
    } 
    public static void main(String[] args) throws InterruptedException { 
     new So44438086().run(); 
    } 
    ThreadMultipleDateReceiver thrdMDateReceiver=null; 
    ThreadStreamDateWriter thrdSDateWriter0=null; 
    ThreadStreamDateWriter thrdSDateWriter1=null; 
} 
+0

是没可能只用一个PipedOutpudStream for m多重PipedInpudStream? –

+0

你可以使用其中一个,但是如果我正确读取你的代码,你正在制作3并踩上前两个。所以你的东西可能会去3个不同的管道。 –

+0

我知道它的对象概念错误...也许PipedXxxputStream它不是apropiated对象,但是我需要多个Writer对象一个Reader对象。我想避免阻塞... –

相关问题