Commit 01a08cae authored by 吴晟's avatar 吴晟
Browse files

Merge branch 'feature/collector' into feature/3.0

* feature/collector:
  compile error
  refactor the way how to create worker instance
  refactor cluster module, modify the way use to tell cluster worker
  add recevier moudle

# Conflicts:
#	skywalking-collector/pom.xml
#	skywalking-collector/skywalking-collector-cluster/pom.xml
#	skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java
#	skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java
#	skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java
#	skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/WorkerRef.java
#	skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/WorkersCreator.java
#	skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java
#	skywalking-collector/skywalking-collector-worker/pom.xml
#	skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java
parents 0d0d9840 934aa6bb
Loading
Loading
Loading
Loading
+6 −5
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@
    <modules>
        <module>skywalking-collector-cluster</module>
        <module>skywalking-collector-worker</module>
        <module>skywalking-collector-role</module>
    </modules>
    <parent>
        <artifactId>skywalking</artifactId>
@@ -24,16 +25,16 @@
            <artifactId>akka-cluster_2.11</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.a.eye</groupId>
            <artifactId>skywalking-logging-api</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-testkit_2.11</artifactId>
            <version>${akka.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.a.eye</groupId>
            <artifactId>skywalking-logging-api</artifactId>
            <version>${project.version}</version>
        </dependency>
    </dependencies>
</project>
+5 −5
Original line number Diff line number Diff line
@@ -16,15 +16,15 @@
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.a.eye</groupId>
            <artifactId>skywalking-util</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.6</version>
        </dependency>
        <dependency>
            <groupId>com.a.eye</groupId>
            <artifactId>skywalking-util</artifactId>
            <version>${project.version}</version>
        </dependency>
    </dependencies>
</project>
+76 −0
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector;

import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.collector.actor.AbstractClusterWorkerProvider;
import com.a.eye.skywalking.collector.actor.AbstractLocalWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.UsedRoleNameException;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.ServiceLoader;

/**
 * @author pengys5
 */
public class CollectorSystem {
    private ILog logger = LogManager.getLogger(CollectorSystem.class);
    private ClusterWorkerContext clusterContext;

    public ClusterWorkerContext getClusterContext() {
        return clusterContext;
    }

    public void boot() throws Exception {
        createAkkaSystem();
        createListener();
        loadLocalProviders();

        createClusterWorker();
    }

    public void terminate() {
        clusterContext.getAkkaSystem().terminate();
    }

    private void createAkkaSystem() {
        ClusterConfigInitializer.initialize("collector.config");

        final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
                withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
                withFallback(ConfigFactory.parseString("akka.cluster.roles=" + ClusterConfig.Cluster.Current.roles)).
                withFallback(ConfigFactory.parseString("akka.actor.provider=" + ClusterConfig.Cluster.provider)).
                withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.nodes)).
                withFallback(ConfigFactory.load("application.conf"));
        ActorSystem akkaSystem = ActorSystem.create("ClusterSystem", config);

        clusterContext = new ClusterWorkerContext(akkaSystem);
    }

    private void createListener() {
        clusterContext.getAkkaSystem().actorOf(Props.create(WorkersListener.class, clusterContext), WorkersListener.WorkName);
    }

    private void createClusterWorker() throws Exception {
        ServiceLoader<AbstractClusterWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractClusterWorkerProvider.class);
        for (AbstractClusterWorkerProvider provider : clusterServiceLoader) {
            logger.info("create {%s} worker using java service loader", provider.workerNum());
            for (int i = 1; i <= provider.workerNum(); i++) {
                provider.create(clusterContext, new LocalWorkerContext());
            }
        }
    }

    private void loadLocalProviders() throws UsedRoleNameException {
        ServiceLoader<AbstractLocalWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractLocalWorkerProvider.class);
        for (AbstractLocalWorkerProvider provider : clusterServiceLoader) {
            clusterContext.putProvider(provider);
        }
    }
}
+0 −37
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector.actor;

import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.RingBuffer;

/**
 * @author pengys5
 */
public abstract class AbstractAsyncMember extends AbstractMember {

    private RingBuffer<MessageHolder> ringBuffer;

    public AbstractAsyncMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
        super(actorRef);
        this.ringBuffer = ringBuffer;
    }

    public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception {
        Object message = event.getMessage();
        event.reset();
        receive(message);
        if (endOfBatch) {
            receive(new EndOfBatchCommand());
        }
    }

    public void beTold(Object message) throws Exception {
        long sequence = ringBuffer.next();
        try {
            ringBuffer.get(sequence).setMessage(message);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}
+80 −0
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector.actor;

import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;

/**
 * @author pengys5
 */
public abstract class AbstractClusterWorker extends AbstractWorker {

    public AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
        super(role, clusterContext, selfContext);
    }

    static class WorkerWithAkka extends UntypedActor {
        private static ILog logger = LogManager.getLogger(WorkerWithAkka.class);
        private Cluster cluster;
        private final AbstractClusterWorker ownerWorker;

        public WorkerWithAkka(AbstractClusterWorker ownerWorker) {
            this.ownerWorker = ownerWorker;
            cluster = Cluster.get(getContext().system());
        }

        @Override
        public void preStart() throws Exception {
            cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
        }

        @Override
        public void postStop() throws Exception {
            cluster.unsubscribe(getSelf());
        }

        /**
         * Listening {@link ClusterEvent.MemberUp} and {@link ClusterEvent.CurrentClusterState}
         * cluster event, when event send from the member of {@link WorkersListener} then tell
         * the sender to register self.
         */
        @Override
        public void onReceive(Object message) throws Throwable {
            if (message instanceof ClusterEvent.CurrentClusterState) {
                ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message;
                for (Member member : state.getMembers()) {
                    if (member.status().equals(MemberStatus.up())) {
                        register(member);
                    }
                }
            } else if (message instanceof ClusterEvent.MemberUp) {
                ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message;
                logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString());
                register(memberUp.member());
            } else {
                logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName());
                ownerWorker.work(message);
            }
        }

        /**
         * When member role is {@link WorkersListener#WorkName} then Select actor from context
         * and send register message to {@link WorkersListener}
         *
         * @param member is the new created or restart worker
         */
        void register(Member member) {
            if (member.hasRole(WorkersListener.WorkName)) {
                WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(ownerWorker.getRole());
                logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
                getContext().actorSelection(member.address() + "/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf());
            }
        }
    }
}
Loading