package brooklyn.entity.messaging.kafka;

import brooklyn.config.ConfigKey;
import brooklyn.enricher.basic.SensorPropagatingEnricher;
import brooklyn.entity.Entity;
import brooklyn.entity.basic.AbstractEntity;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.group.DynamicCluster;
import brooklyn.entity.proxying.EntitySpec;
import brooklyn.entity.proxying.EntitySpecs;
import brooklyn.entity.trait.Startable;
import brooklyn.entity.zookeeper.Zookeeper;
import brooklyn.event.Sensor;
import brooklyn.event.feed.ConfigToAttributes;
import brooklyn.location.Location;
import brooklyn.util.MutableList;
import brooklyn.util.MutableMap;
import brooklyn.util.exceptions.CompoundRuntimeException;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:brooklyn/entity/messaging/kafka/KafkaClusterImpl.class */
public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster {
    public static final Logger log = LoggerFactory.getLogger(KafkaClusterImpl.class);

    public KafkaClusterImpl() {
        this(MutableMap.of(), null);
    }

    public KafkaClusterImpl(Map<?, ?> map) {
        this(map, null);
    }

    public KafkaClusterImpl(Entity entity) {
        this(MutableMap.of(), entity);
    }

    public KafkaClusterImpl(Map<?, ?> map, Entity entity) {
        super(map, entity);
        setAttribute(SERVICE_UP, false);
    }

    @Override // brooklyn.entity.basic.AbstractEntity
    public void init() {
        ConfigToAttributes.apply(this, BROKER_SPEC);
        ConfigToAttributes.apply(this, ZOOKEEPER);
        ConfigToAttributes.apply(this, ZOOKEEPER_SPEC);
        log.debug("creating zookeeper child for {}", this);
        Zookeeper zookeeper = (Zookeeper) getAttribute(ZOOKEEPER);
        if (zookeeper == null) {
            EntitySpec entitySpec = (EntitySpec) getAttribute(ZOOKEEPER_SPEC);
            if (entitySpec == null) {
                log.debug("creating zookeeper using default spec for {}", this);
                entitySpec = EntitySpecs.spec(KafkaZookeeper.class);
                setAttribute(ZOOKEEPER_SPEC, entitySpec);
            } else {
                log.debug("creating zookeeper using custom spec for {}", this);
            }
            zookeeper = (Zookeeper) m23addChild(entitySpec);
            if (Entities.isManaged(this)) {
                Entities.manage(zookeeper);
            }
            setAttribute(ZOOKEEPER, zookeeper);
        }
        log.debug("creating cluster child for {}", this);
        EntitySpec entitySpec2 = (EntitySpec) getAttribute(BROKER_SPEC);
        if (entitySpec2 == null) {
            log.debug("creating default broker spec for {}", this);
            entitySpec2 = EntitySpecs.spec(KafkaBroker.class);
            setAttribute(BROKER_SPEC, entitySpec2);
        }
        DynamicCluster dynamicCluster = (DynamicCluster) m23addChild((EntitySpec) EntitySpecs.spec(DynamicCluster.class).configure("memberSpec", EntitySpecs.wrapSpec(entitySpec2).configure((ConfigKey<ConfigKey<Zookeeper>>) KafkaBroker.ZOOKEEPER, (ConfigKey<Zookeeper>) zookeeper)));
        if (Entities.isManaged(this)) {
            Entities.manage(dynamicCluster);
        }
        setAttribute(CLUSTER, dynamicCluster);
    }

    @Override // brooklyn.entity.messaging.kafka.KafkaCluster
    public Zookeeper getZookeeper() {
        return (Zookeeper) getAttribute(ZOOKEEPER);
    }

    @Override // brooklyn.entity.messaging.kafka.KafkaCluster
    public DynamicCluster getCluster() {
        return (DynamicCluster) getAttribute(CLUSTER);
    }

    @Override // brooklyn.entity.trait.Startable
    public void start(Collection<? extends Location> collection) {
        if (isLegacyConstruction()) {
            init();
        }
        if (collection.isEmpty()) {
            collection = getLocations();
        }
        Iterables.getOnlyElement(collection);
        addLocations(collection);
        MutableList of = MutableList.of(getCluster());
        if (getZookeeper().getParent() == null) {
            addChild(getZookeeper());
        }
        if (Objects.equal(this, getZookeeper().getParent())) {
            of.add(getZookeeper());
        }
        Entities.invokeEffectorList(this, of, Startable.START, ImmutableMap.of("locations", collection)).getUnchecked();
        connectSensors();
    }

    @Override // brooklyn.entity.trait.Startable
    public void stop() {
        ArrayList newArrayList = Lists.newArrayList();
        if (getZookeeper() != null && Objects.equal(this, getZookeeper().getParent())) {
            try {
                getZookeeper().stop();
            } catch (Exception e) {
                newArrayList.add(e);
            }
        }
        if (getCurrentSize().intValue() > 0) {
            try {
                getCluster().stop();
            } catch (Exception e2) {
                newArrayList.add(e2);
            }
        }
        getLocations().clear();
        setAttribute(SERVICE_UP, false);
        if (newArrayList.size() != 0) {
            throw new CompoundRuntimeException("Error stopping Kafka cluster", newArrayList);
        }
    }

    @Override // brooklyn.entity.trait.Startable
    public void restart() {
        ArrayList newArrayList = Lists.newArrayList(getLocations());
        stop();
        start(newArrayList);
    }

    void connectSensors() {
        SensorPropagatingEnricher.newInstanceListeningToAllSensorsBut(getCluster(), SERVICE_UP).addToEntityAndEmitAll(this);
        SensorPropagatingEnricher.newInstanceListeningTo(getZookeeper(), (Sensor<?>[]) new Sensor[]{SERVICE_UP}).addToEntityAndEmitAll(this);
    }

    @Override // brooklyn.entity.Group
    public Collection<Entity> getMembers() {
        return getCluster().getMembers();
    }

    @Override // brooklyn.entity.Group
    public boolean hasMember(Entity entity) {
        return getCluster().hasMember(entity);
    }

    @Override // brooklyn.entity.Group
    public boolean addMember(Entity entity) {
        return getCluster().addMember(entity);
    }

    @Override // brooklyn.entity.Group
    public boolean removeMember(Entity entity) {
        return getCluster().removeMember(entity);
    }

    @Override // brooklyn.entity.trait.Resizable, brooklyn.entity.Group
    public Integer getCurrentSize() {
        return getCluster().getCurrentSize();
    }

    @Override // brooklyn.entity.trait.Resizable
    public Integer resize(Integer num) {
        return getCluster().resize(num);
    }
}
