Unverified Commit 279c81e9 authored by Gao Hongtao's avatar Gao Hongtao Committed by GitHub
Browse files

Fix k8s api connection leak (#1848)

* Update submodule skywalking-ui

* Fix k8s api bugs

* Fix api connection leak
parent f2828e77
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -92,6 +92,9 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery

    private void generateRemoteNodes() {
        for (Event event : watch) {
            if (event == null) {
                break;
            }
            logger.debug("Received event {} {}-{}", event.getType(), event.getUid(), event.getHost());
            switch (event.getType()) {
                case "ADDED":
+29 −12
Original line number Diff line number Diff line
@@ -18,7 +18,6 @@

package org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies;

import com.google.common.collect.Iterators;
import com.google.common.reflect.TypeToken;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
@@ -28,9 +27,10 @@ import io.kubernetes.client.models.V1Pod;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.Event;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.ReusableWatch;
import org.slf4j.Logger;
@@ -83,16 +83,33 @@ public class NamespacedPodListWatch implements ReusableWatch<Event> {
    }

    @Override public Iterator<Event> iterator() {
        try {
            return Iterators.transform(watch.iterator(), response -> {
                if (response == null) {
                    throw new NullPointerException("Original event is null");
        final Iterator<Watch.Response<V1Pod>> watchItr = watch.iterator();
        return new Iterator<Event>() {
            @Override public boolean hasNext() {
                return wrap(watchItr::hasNext, false);
            }

            @Override public Event next() {
                return wrap(() -> {
                    final Watch.Response<V1Pod> response = watchItr.next();
                    return new Event(response.type, response.object.getMetadata().getUid(), response.object.getStatus().getPodIP());
            });
        } catch (final RuntimeException exp) {
            logger.trace("Runtime exception", exp);
                }, null);
            }

            private <R> R wrap(final Supplier<R> action, final R defaultValue) {
                Objects.requireNonNull(action);
                try {
                    return action.get();
                } catch (final Throwable t) {
                    logger.trace("Throwable", t);
                    try {
                        watch.close();
                    } catch (IOException e) {
                        logger.error("Close watch error", e);
                    }
                }
                return defaultValue;
            }
        return Collections.emptyIterator();
        };
    }
}