2016-08-17 41 views
1

我目前正在尝试使用自动发现服务实现群集播放+ akka实现。但是,我似乎遇到了包含在游戏中的Guice DI加载器的问题。从他们的文档的摘录指出:带有播放框架的Akka群集设置

https://www.playframework.com/documentation/2.5.x/ScalaAkka#Integrating-with-Akka

虽然我们建议您使用内置的演员系统,因为它设置了一切,如正确的类加载器,生命周期挂钩,等等,没有什么能阻止你从使用你自己的演员系统。但重要的是确保您执行以下操作:

当播放关闭时,注册一个停止挂钩以关闭演员系统 从播放环境传递正确的类加载器,否则Akka将无法找到您的应用程序类

请确保您更改Play使用play.akka.config读取其akka配置的位置,或者您未从缺省akka配置中读取您的akka​​配置,因为这会导致出现问题,例如何时系统尝试绑定到相同的远程端口

我有d一个以上配置然而,他们建议我似乎无法避开Play仍会结合它从BuiltInModule内部ActorSystemProvider:

class BuiltinModule extends Module { 
def bindings(env: Environment, configuration: Configuration): Seq[Binding[_]] = 

    { 
     def dynamicBindings(factories: ((Environment, Configuration) => Seq[Binding[_]])*) = { 
      factories.flatMap(_(env, configuration)) 
     } 

     Seq(
      bind[Environment] to env, 
      bind[ConfigurationProvider].to(new ConfigurationProvider(configuration)), 
      bind[Configuration].toProvider[ConfigurationProvider], 
      bind[HttpConfiguration].toProvider[HttpConfiguration.HttpConfigurationProvider], 

      // Application lifecycle, bound both to the interface, and its implementation, so that Application can access it 
      // to shut it down. 
      bind[DefaultApplicationLifecycle].toSelf, 
      bind[ApplicationLifecycle].to(bind[DefaultApplicationLifecycle]), 

      bind[Application].to[DefaultApplication], 
      bind[play.Application].to[play.DefaultApplication], 

      bind[Router].toProvider[RoutesProvider], 
      bind[play.routing.Router].to[JavaRouterAdapter], 
      bind[ActorSystem].toProvider[ActorSystemProvider], 
      bind[Materializer].toProvider[MaterializerProvider], 
      bind[ExecutionContextExecutor].toProvider[ExecutionContextProvider], 
      bind[ExecutionContext].to[ExecutionContextExecutor], 
      bind[Executor].to[ExecutionContextExecutor], 
      bind[HttpExecutionContext].toSelf, 

      bind[CryptoConfig].toProvider[CryptoConfigParser], 
      bind[CookieSigner].toProvider[CookieSignerProvider], 
      bind[CSRFTokenSigner].toProvider[CSRFTokenSignerProvider], 
      bind[AESCrypter].toProvider[AESCrypterProvider], 
      bind[play.api.libs.Crypto].toSelf, 
      bind[TemporaryFileCreator].to[DefaultTemporaryFileCreator] 
     ) ++ dynamicBindings(
      HttpErrorHandler.bindingsFromConfiguration, 
      HttpFilters.bindingsFromConfiguration, 
      HttpRequestHandler.bindingsFromConfiguration, 
      ActionCreator.bindingsFromConfiguration 
     ) 
     } 
    } 

我曾尝试创建我自己的GuiceApplicationBuilder不过,为了绕过这个,现在它只是相反,移动重复绑定异常来自BuiltInModule。

这里就是我想:

AkkaConfigModule:

package module.akka 

import com.google.inject.{AbstractModule, Inject, Provider, Singleton} 
import com.typesafe.config.Config 
import module.akka.AkkaConfigModule.AkkaConfigProvider 
import net.codingwell.scalaguice.ScalaModule 
import play.api.Application 

/** 
    * Created by dmcquill on 8/15/16. 
    */ 
object AkkaConfigModule { 
    @Singleton 
    class AkkaConfigProvider @Inject() (application: Application) extends Provider[Config] { 
     override def get() = { 
      val classLoader = application.classloader 
      NodeConfigurator.loadConfig(classLoader) 
     } 
    } 
} 

/** 
    * Binds the application configuration to the [[Config]] interface. 
    * 
    * The config is bound as an eager singleton so that errors in the config are detected 
    * as early as possible. 
    */ 
class AkkaConfigModule extends AbstractModule with ScalaModule { 

    override def configure() { 
     bind[Config].toProvider[AkkaConfigProvider].asEagerSingleton() 
    } 

} 

ActorSystemModule:

package module.akka 


import actor.cluster.ClusterMonitor 
import akka.actor.ActorSystem 
import com.google.inject._ 
import com.typesafe.config.Config 
import net.codingwell.scalaguice.ScalaModule 
import play.api.inject.ApplicationLifecycle 

import scala.collection.JavaConversions._ 

/** 
    * Created by dmcquill on 7/27/16. 
    */ 
object ActorSystemModule { 
    @Singleton 
    class ActorSystemProvider @Inject() (val lifecycle: ApplicationLifecycle, val config: Config, val injector: Injector) extends Provider[ActorSystem] { 
     override def get() = { 
      val system = ActorSystem(config.getString(NodeConfigurator.CLUSTER_NAME_PROP), config.getConfig("fitnessApp")) 

      // add the GuiceAkkaExtension to the system, and initialize it with the Guice injector 
      GuiceAkkaExtension(system).initialize(injector) 

      system.log.info("Configured seed nodes: " + config.getStringList("fitnessApp.akka.cluster.seed-nodes").mkString(", ")) 
      system.actorOf(GuiceAkkaExtension(system).props(ClusterMonitor.name)) 

      lifecycle.addStopHook {() => 
       system.terminate() 
      } 

      system 
     } 
    } 
} 

/** 
    * A module providing an Akka ActorSystem. 
    */ 
class ActorSystemModule extends AbstractModule with ScalaModule { 
    import module.akka.ActorSystemModule.ActorSystemProvider 

    override def configure() { 
     bind[ActorSystem].toProvider[ActorSystemProvider].asEagerSingleton() 
    } 
} 

加载应用程序:

class CustomApplicationLoader extends GuiceApplicationLoader { 

    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = { 
     initialBuilder 
      .overrides(overrides(context): _*) 
      .bindings(new AkkaConfigModule, new ActorSystemModule) 
    } 

} 

主要的事情,我的东东要完成的是配置ActorSystem,以便我可以以编程方式加载Akka集群的种子节点。

上述方法是正确的方法还是有更好的方法来实现这一目标?如果这是正确的方法,有什么我从根本上不理解与DI设置玩/吉斯?

更新

对于这种架构,播放+阿卡位于同一节点上。

回答

2

最后,我最终试图做一些比必要更复杂的事情。而不是做上述流程,我只是以编程方式扩展了初始配置,以便我可以通过编程方式检索必要的网络信息。

最终的结果基本上由几类:

NodeConfigurator:此类包含用于检索从application.conf的属性,然后创建一个配置程序来配合使用相关的实用方法一个kubernetes发现服务。

object NodeConfigurator { 

    /** 
     * This method given a class loader will return the configuration object for an ActorSystem 
     * in a clustered environment 
     * 
     * @param classLoader the configured classloader of the application 
     * @return Config 
     */ 
    def loadConfig(classLoader: ClassLoader) = { 
     val config = ConfigFactory.load(classLoader) 

     val clusterName = config.getString(CLUSTER_NAME_PROP) 
     val seedPort = config.getString(SEED_PORT_CONF_PROP) 

     val host = if (config.getString(HOST_CONF_PROP) equals "eth0-address-or-localhost") { 
      getLocalHostAddress.getOrElse(DEFAULT_HOST_ADDRESS) 
     } else { 
      config.getString(HOST_CONF_PROP) 
     } 

     ConfigFactory.parseString(formatSeedNodesConfig(clusterName, getSeedNodes(config), seedPort, host)) 
      .withValue(HOST_CONF_PROP, ConfigValueFactory.fromAnyRef(host)) 
      .withValue("fitnessApp.akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(host)) 
      .withFallback(config) 
      .resolve() 
    } 

    /** 
     * Get the local ip address which defaults to localhost if not 
     * found on the eth0 adapter 
     * 
     * @return Option[String] 
     */ 
    def getLocalHostAddress: Option[String] = { 
     import java.net.NetworkInterface 

     import scala.collection.JavaConversions._ 

     NetworkInterface.getNetworkInterfaces 
      .find(_.getName equals "eth0") 
      .flatMap { interface => 
       interface.getInetAddresses.find(_.isSiteLocalAddress).map(_.getHostAddress) 
      } 
    } 

    /** 
     * Retrieves a set of seed nodes that are currently running in our cluster 
     * 
     * @param config akka configuration object 
     * @return Array[String] 
     */ 
    def getSeedNodes(config: Config) = { 
     if(config.hasPath(SEED_NODES_CONF_PROP)) { 
      config.getString(SEED_NODES_CONF_PROP).split(",").map(_.trim) 
     } else { 
      Array.empty[String] 
     } 
    } 

    /** 
     * formats the seed node addresses in the proper format 
     * 
     * @param clusterName name of akka cluster 
     * @param seedNodeAddresses listing of current seed nodes 
     * @param seedNodePort configured seed node port 
     * @param defaultSeedNodeAddress default seed node address 
     * @return 
     */ 
    def formatSeedNodesConfig(clusterName: String, seedNodeAddresses: Array[String], seedNodePort: String, defaultSeedNodeAddress: String) = { 
     if(seedNodeAddresses.isEmpty) { 
      s"""fitnessApp.akka.cluster.seed-nodes = [ "akka.tcp://[email protected]$defaultSeedNodeAddress:$seedNodePort" ]""" 
     } else { 
      seedNodeAddresses.map { address => 
       s"""fitnessApp.akka.cluster.seed-nodes += "akka.tcp://[email protected]$address:$seedNodePort"""" 
      }.mkString("\n") 
     } 
    } 

    val CLUSTER_NAME_PROP = "fitnessAkka.cluster-name" 
    val HOST_CONF_PROP = "fitnessAkka.host" 
    val PORT_CONF_PROP = "fitnessAkka.port" 
    val SEED_NODES_CONF_PROP = "fitnessAkka.seed-nodes" 
    val SEED_PORT_CONF_PROP = "fitnessAkka.seed-port" 

    private val DEFAULT_HOST_ADDRESS = "127.0.0.1" 
} 

CustomApplicationLoader:简单地使用的播放重写的应用程序加载器在所产生的配置采取从NodeConfigurator,然后与它的延伸initialConfiguration。

class CustomApplicationLoader extends GuiceApplicationLoader { 

    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = { 
     val classLoader = context.environment.classLoader 
     val configuration = Configuration(NodeConfigurator.loadConfig(classLoader)) 

     initialBuilder 
       .in(context.environment) 
       .loadConfig(context.initialConfiguration ++ configuration) 
       .overrides(overrides(context): _*) 
    } 

} 

AkkaActorModule:提供用于与一个API使用以显示集群成员的依赖注射演员REF。

class AkkaActorModule extends AbstractModule with AkkaGuiceSupport { 
    def configure = { 
     bindActor[ClusterMonitor]("cluster-monitor") 
    } 
} 

ClusterMonitor:这是被简单地听群集事件演员并另外接收消息,以产生当前集群状态。

class ClusterMonitor @Inject() extends Actor with ActorLogging { 
    import actor.cluster.ClusterMonitor.GetClusterState 

    val cluster = Cluster(context.system) 
    private var nodes = Set.empty[Address] 

    override def preStart(): Unit = { 
     cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) 
    } 

    override def postStop(): Unit = cluster.unsubscribe(self) 

    override def receive = { 
     case MemberUp(member) => { 
      nodes += member.address 
      log.info(s"Cluster member up: ${member.address}") 
     } 
     case UnreachableMember(member) => log.warning(s"Cluster member unreachable: ${member.address}") 
     case MemberRemoved(member, previousStatus) => { 
      nodes -= member.address 
      log.info(s"Cluster member removed: ${member.address}") 
     } 
     case MemberExited(member) => log.info(s"Cluster member exited: ${member.address}") 
     case GetClusterState => sender() ! nodes 
     case _: MemberEvent => 
    } 

} 

object ClusterMonitor { 
    case class GetClusterState() 
} 

应用:简单地测试控制器以输出已加入群集

class Application @Inject() (@Named("cluster-monitor") clusterMonitorRef: ActorRef) extends Controller { 

    implicit val addressWrites = new Writes[Address] { 
     def writes(address: Address) = Json.obj(
      "host" -> address.host, 
      "port" -> address.port, 
      "protocol" -> address.protocol, 
      "system" -> address.system 
     ) 
    } 

    implicit val timeout = Timeout(5, TimeUnit.SECONDS) 

    def listClusterNodes = Action.async { 
     (clusterMonitorRef ? GetClusterState).mapTo[Set[Address]].map { addresses => 
      Ok(Json.toJson(addresses)) 
     } 
    } 

} 

上述控制器的结果节点的列表产生类似于下面的输出:

$ http GET 192.168.99.100:30760/cluster/nodes 

HTTP/1.1 200 OK 
Content-Length: 235 
Content-Type: application/json 
Date: Thu, 18 Aug 2016 02:50:30 GMT 

[ 
    { 
     "host": "172.17.0.3", 
     "port": 2551, 
     "protocol": "akka.tcp", 
     "system": "fitnessApp" 
    }, 
    { 
     "host": "172.17.0.4", 
     "port": 2551, 
     "protocol": "akka.tcp", 
     "system": "fitnessApp" 
    }, 
    { 
     "host": "172.17.0.5", 
     "port": 2551, 
     "protocol": "akka.tcp", 
     "system": "fitnessApp" 
    } 
]