Commit 3ceed35a authored by 彭勇升 pengys's avatar 彭勇升 pengys Committed by 吴晟
Browse files

Fixed the inventory register lock invalid bug. (#2184)

* #2183

Merge commit into master.

* Make the h2 register lock same as mysql.
parent e3bcf15a
Loading
Loading
Loading
Loading
+9 −3
Original line number Diff line number Diff line
@@ -93,10 +93,16 @@ public class NetworkAddressInventory extends RegisterSource {
        return inventory;
    }

    @Override public void combine(RegisterSource registerSource) {
        super.combine(registerSource);
    @Override public boolean combine(RegisterSource registerSource) {
        boolean isCombine = super.combine(registerSource);
        NetworkAddressInventory inventory = (NetworkAddressInventory)registerSource;

        if (nodeType != inventory.nodeType) {
            setNodeType(inventory.nodeType);
            return true;
        } else {
            return isCombine;
        }
    }

    @Override public RemoteData.Builder serialize() {
+4 −1
Original line number Diff line number Diff line
@@ -36,9 +36,12 @@ public abstract class RegisterSource extends StreamData implements StorageData {
    @Getter @Setter @Column(columnName = REGISTER_TIME) private long registerTime;
    @Getter @Setter @Column(columnName = HEARTBEAT_TIME) private long heartbeatTime;

    public void combine(RegisterSource registerSource) {
    public boolean combine(RegisterSource registerSource) {
        if (heartbeatTime < registerSource.getHeartbeatTime()) {
            heartbeatTime = registerSource.getHeartbeatTime();
            return true;
        } else {
            return false;
        }
    }
}
+4 −1
Original line number Diff line number Diff line
@@ -181,15 +181,18 @@ public class ServiceInventory extends RegisterSource {
        return 0;
    }

    @Override public void combine(RegisterSource registerSource) {
    @Override public boolean combine(RegisterSource registerSource) {
        super.combine(registerSource);
        ServiceInventory serviceInventory = (ServiceInventory)registerSource;

        nodeType = serviceInventory.nodeType;
        setProp(serviceInventory.getProp());
        if (Const.NONE != serviceInventory.getMappingServiceId() && serviceInventory.getMappingLastUpdateTime() >= this.getMappingLastUpdateTime()) {
            this.mappingServiceId = serviceInventory.getMappingServiceId();
            this.mappingLastUpdateTime = serviceInventory.getMappingLastUpdateTime();
        }

        return true;
    }

    public static class Builder implements StorageBuilder<ServiceInventory> {
+4 −5
Original line number Diff line number Diff line
@@ -22,7 +22,7 @@ import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.slf4j.*;

@@ -43,7 +43,7 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
        this.nextWorker = nextWorker;
        this.sources = new HashMap<>();
        this.dataCarrier = new DataCarrier<>(1, 10000);
        this.dataCarrier.consume(new AggregatorConsumer(this), 1);
        this.dataCarrier.consume(new AggregatorConsumer(this), 1, 200);
    }

    @Override public final void in(RegisterSource source) {
@@ -61,9 +61,8 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
        }

        if (messageNum >= 1000 || source.getEndOfBatchContext().isEndOfBatch()) {
            sources.values().forEach(source1 -> {
                nextWorker.in(source1);
            });
            sources.values().forEach(nextWorker::in);
            sources.clear();
            messageNum = 0;
        }
    }
+22 −22
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.register.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.Scope;
@@ -52,7 +53,7 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
        this.registerLockDAO = moduleManager.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class);
        this.scope = scope;
        this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
        this.dataCarrier.consume(new RegisterPersistentWorker.PersistentConsumer(this), 1);
        this.dataCarrier.consume(new RegisterPersistentWorker.PersistentConsumer(this), 1, 200);
    }

    @Override public final void in(RegisterSource registerSource) {
@@ -67,31 +68,30 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
            sources.get(registerSource).combine(registerSource);
        }

        if (registerSource.getEndOfBatchContext().isEndOfBatch()) {

            if (registerLockDAO.tryLock(scope)) {
                try {
        if (sources.size() > 1000 || registerSource.getEndOfBatchContext().isEndOfBatch()) {
            sources.values().forEach(source -> {
                int sequence;
                if ((sequence = registerLockDAO.tryLockAndIncrement(scope)) != Const.NONE) {
                    try {
                        RegisterSource dbSource = registerDAO.get(modelName, source.id());
                        if (Objects.nonNull(dbSource)) {
                                dbSource.combine(source);
                            if (dbSource.combine(source)) {
                                registerDAO.forceUpdate(modelName, dbSource);
                            }
                        } else {
                                int sequence = registerDAO.registerId(modelName, source);
                            source.setSequence(sequence);
                            registerDAO.forceInsert(modelName, source);
                        }
                    } catch (Throwable t) {
                        logger.error(t.getMessage(), t);
                        }
                    });
                    } finally {
                        registerLockDAO.releaseLock(scope);
                    }
                } else {
                logger.info("Inventory register try lock failure.");
                    logger.info("{} inventory register try lock and increment sequence failure.", scope.name());
                }
            });
            sources.clear();
        }
    }

Loading